I try to send message to exact Kafka topic and then receive message from it. I have 3 configuration classes for Consumer, Producer and Topic:
public class KafkaTopicConfiguration { @Autowired GlobalConfiguration globalConfiguration; @Bean public KafkaAdmin kafkaAdmin() { Map<String, Object> configs = new HashMap<>(); configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, globalConfiguration.kafka().bootstrapAddress()); return new KafkaAdmin(configs); } }
public class KafkaConsumerConfiguration { @Autowired GlobalConfiguration globalConfiguration; @Bean public Consumer<String, String> consumer() { final Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, globalConfiguration.kafka().bootstrapAddress()); props.put(ConsumerConfig.GROUP_ID_CONFIG, globalConfiguration.kafka().groupId()); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); return new KafkaConsumer<>(props); } }
public class KafkaProducerConfiguration { @Autowired GlobalConfiguration globalConfiguration; @Bean public KafkaProducer<String, String> kafkaProducer() { Map<String, Object> configProps = new HashMap<>(); configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, globalConfiguration.kafka().bootstrapAddress()); configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return new KafkaProducer<>(configProps); } }
GlobalConfiguration class stores all my properties. For Kafka:
bootstrapAddress = "localhost:9092" groupId = "KafkaExampleConsumer"
Then I send message in this way
private void sendMessage(final String topic, final String message) { kafkaAdmin.createOrModifyTopics(new NewTopic(topic, 1, (short) 1)); ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, message); Future<RecordMetadata> sendResponse = kafkaProducer.send(producerRecord); kafkaProducer.flush(); boolean isSent = sendResponse.isDone(); }
I check if message is sent with sendResponse.isDone() and it returns true. But then I try to receive message:
protected String receiveMessage(final String topic, final String message) { kafkaAdmin.createOrModifyTopics(new NewTopic(topic, 1, (short) 1)); consumer.subscribe(Collections.singleton(topic)); ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(10000)); consumerRecords.isEmpty(); }
And ConsumerRecords always empty. What problem it can be?
Advertisement
Answer
If you want to consume previously sent records, rather than poll records sent within 10 seconds of you starting the consumer, you need to add
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OffsetResetStrategy.EARLIEST.name().toLowerCase());