Skip to content
Advertisement

How to verify if a Kafka Topic is not empty meaning has at least 1 message?

I am writing a little Kafka metrics exporter (Yes there are loads available like prometheus etc but I want a light weight custom one. Kindly excuse me on this).

As part of this I would like to know as soon as first message is received (or topic has messages) in a Kafka topic. I am using Spring Boot and Kafka.

I have the below code which gives the name of the topic and number of partitions. I want to know if the topic has messages? Kindly let me know how can I get this stat. Any lead is much appreciated!

 @ReadOperation
public List<TopicManifest> kafkaTopic() throws ExecutionException, InterruptedException {
    ListTopicsOptions listTopicsOptions = new ListTopicsOptions();
    listTopicsOptions.listInternal(true);

    ListTopicsResult listTopicsResult = adminClient.listTopics(listTopicsOptions);

    Set<String> topics = listTopicsResult.names().get().stream().filter(topic -> !topic.startsWith("_")).collect(Collectors.toSet());
    System.out.println(topics);

    DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(topics);
    Map<String, KafkaFuture<TopicDescription>> topicNameValues = describeTopicsResult.topicNameValues();
   List<TopicManifest> topicManifests =  topicNameValues.entrySet().stream().map(entry -> {
        try {
            TopicDescription topicDescription = entry.getValue().get();
            return TopicManifest.builder().name(entry.getKey())
                                          .noOfPartitions(topicDescription.partitions().size())
                   
                                          .build();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        return null;
    }).collect(Collectors.toList());
    return topicManifests;
}

Advertisement

Answer

Create a KafkaConsumer and call endOffsets (the consumer does not need to be subscribed to the topic(s)).

@Bean
ApplicationRunner runner1(ConsumerFactory cf) {
    return args -> {
        try (Consumer consumer = cf.createConsumer()) {
            System.out.println(consumer.endOffsets(List.of(new TopicPartition("ktest29", 0),
                    new TopicPartition("ktest29", 1),
                    new TopicPartition("ktest29", 2))));
        }
    };
}
Advertisement