Good day collegues. I am using Spring Kafka 2.2.5 I have a listener:
@KafkaListener(topics = "${kafka.execution-task-topic}", containerFactory = "executionTaskObjectContainerFactory") public void protocolEventsHandle(ExecutionTask executionTask, Acknowledgment acknowledgment, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic, @Header(KafkaHeaders.OFFSET) long offset) { ResponseEntity < String > stringResponseEntity = airflowRestRunner.startDag(executionTask); JSONObject body = new JSONObject(stringResponseEntity.getBody()); String message = body.getString("message"); String runId = messageParser.getRunId(message); ExecutionTaskMessageInfo messageInfo = new ExecutionTaskMessageInfo(offset, partition, false, acknowledgment); kafkaAcknowledgeObject.putMessageInfo(messageInfo, partition); this.executorService.submit(kafkaAlertProducer.produceMessageAfterTaskSuccess(runId, executionTask, messageInfo)); }
I do some operations and if they are succes i use Acknowledge interface to commit offset.
I have a problem. While calculations are taking place in the created thread, the listener reads the message from the same offset again. Because of this, when i try to confirm the offset, the application crashes.
What the best practise to work with Kafka in concurency? I could get up to 10 messages in parallel and i need commit them only after calculations.
UPDATE1
I store all my messages from kafka in: key – partion number value – special model Class that contains reference to needed Acknowledge
@Data @NoArgsConstructor @AllArgsConstructor public abstract class KafkaAcknowledgeObject < T extends Comparable > { protected ConcurrentHashMap < Integer, TreeSet < T >> hashMap = new ConcurrentHashMap < > (); public abstract void doAck(); public void putMessageInfo(T message, int partition) { if (hashMap.containsKey(partition)) { hashMap.get(partition).add(message); } else { TreeSet < T > messageInfos = new TreeSet < > (); messageInfos.add(message); hashMap.put(partition, messageInfos); } } }
After calculations i call doAck()
, for example
@Override public void doAck() { for (TreeSet < ExecutionTaskMessageInfo > messageInfoTreeSet: super.hashMap.values()) { checkHandledOffsets(messageInfoTreeSet); } } private void checkHandledOffsets(TreeSet < ExecutionTaskMessageInfo > messageInfoTreeSet) { ExecutionTaskMessageInfo first = getFirstMessageInfo(messageInfoTreeSet); if (first.isCompleted()) { first.getAcknowledgment().acknowledge(); messageInfoTreeSet.remove(first); checkHandledOffsets(messageInfoTreeSet); } return; } private ExecutionTaskMessageInfo getFirstMessageInfo(TreeSet < ExecutionTaskMessageInfo > messageInfoTreeSet) { Iterator < ExecutionTaskMessageInfo > iterator = messageInfoTreeSet.iterator(); return iterator.next(); }
Advertisement
Answer
What you are doing should be fine; I just tested a similar arrangement and it works fine for me…
@SpringBootApplication public class So56190029Application { public static void main(String[] args) { SpringApplication.run(So56190029Application.class, args); } private final ExecutorService exec = Executors.newSingleThreadExecutor(); private final AtomicInteger count = new AtomicInteger(); @KafkaListener(id = "so56190029", topics = "so56190029") public void listen(String in, Acknowledgment ack) { this.exec.execute(runner(in, ack)); } private Runnable runner(String payload, Acknowledgment ack) { return () -> { System.out.println(payload); if (this.count.incrementAndGet() % 3 == 0) { System.out.println("acking"); ack.acknowledge(); } }; } @Bean public ApplicationRunner runner(KafkaTemplate<?, String> template) { return args -> IntStream.range(0, 6).forEach(i -> template.send("so56190029", "foo" + i)); } @Bean public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory( ConcurrentKafkaListenerContainerFactoryConfigurer configurer, ConsumerFactory<Object, Object> kafkaConsumerFactory) { ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>(); configurer.configure(factory, kafkaConsumerFactory); factory.getContainerProperties().setCommitLogLevel(Level.INFO); return factory; } }
and
spring.kafka.consumer.enable-auto-commit=false spring.kafka.consumer.auto-offset-reset=earliest spring.kafka.consumer.properties.max.poll.records=3 spring.kafka.listener.ack-mode=MANUAL
and
foo0 foo1 foo2 acking foo3 foo4 foo5 acking 2019-05-17 14:46:28.790 INFO 62429 --- [o56190029-0-C-1] essageListenerContainer$ListenerConsumer : Committing: {so56190029-0=OffsetAndMetadata{offset=36, leaderEpoch=null, metadata=''}}