Skip to content
Advertisement

ConsumerRecords is always empty in Kafka, Java, but Future isDone method result true

I try to send message to exact Kafka topic and then receive message from it. I have 3 configuration classes for Consumer, Producer and Topic:

public class KafkaTopicConfiguration {

    @Autowired
    GlobalConfiguration globalConfiguration;

    @Bean
    public KafkaAdmin kafkaAdmin() {
        Map<String, Object> configs = new HashMap<>();
        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, globalConfiguration.kafka().bootstrapAddress());
        return new KafkaAdmin(configs);
    }
}
public class KafkaConsumerConfiguration {

    @Autowired
    GlobalConfiguration globalConfiguration;

    @Bean
    public Consumer<String, String> consumer() {
        final Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                globalConfiguration.kafka().bootstrapAddress());
        props.put(ConsumerConfig.GROUP_ID_CONFIG,
                globalConfiguration.kafka().groupId());
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());

        return new KafkaConsumer<>(props);
    }
}
public class KafkaProducerConfiguration {

    @Autowired
    GlobalConfiguration globalConfiguration;

    @Bean
    public KafkaProducer<String, String> kafkaProducer() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, globalConfiguration.kafka().bootstrapAddress());
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return new KafkaProducer<>(configProps);
    }
}

GlobalConfiguration class stores all my properties. For Kafka:

bootstrapAddress = "localhost:9092"
groupId = "KafkaExampleConsumer"

Then I send message in this way

    private void sendMessage(final String topic, final String message) {
        kafkaAdmin.createOrModifyTopics(new NewTopic(topic, 1, (short) 1));
        ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, message);
        Future<RecordMetadata> sendResponse = kafkaProducer.send(producerRecord);
        kafkaProducer.flush();
        boolean isSent = sendResponse.isDone();
    } 

I check if message is sent with sendResponse.isDone() and it returns true. But then I try to receive message:

protected String receiveMessage(final String topic, final String message) {
        kafkaAdmin.createOrModifyTopics(new NewTopic(topic, 1, (short) 1));
        consumer.subscribe(Collections.singleton(topic));
        ConsumerRecords<String, String> consumerRecords =
                consumer.poll(Duration.ofMillis(10000));
        consumerRecords.isEmpty();
    }

And ConsumerRecords always empty. What problem it can be?

Advertisement

Answer

If you want to consume previously sent records, rather than poll records sent within 10 seconds of you starting the consumer, you need to add

props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
            OffsetResetStrategy.EARLIEST.name().toLowerCase());
User contributions licensed under: CC BY-SA
5 People found this is helpful
Advertisement