I am writing a little Kafka metrics exporter (Yes there are loads available like prometheus etc but I want a light weight custom one. Kindly excuse me on this).
As part of this I would like to know as soon as first message is received (or topic has messages) in a Kafka topic. I am using Spring Boot and Kafka.
I have the below code which gives the name of the topic and number of partitions. I want to know if the topic has messages? Kindly let me know how can I get this stat. Any lead is much appreciated!
@ReadOperation public List<TopicManifest> kafkaTopic() throws ExecutionException, InterruptedException { ListTopicsOptions listTopicsOptions = new ListTopicsOptions(); listTopicsOptions.listInternal(true); ListTopicsResult listTopicsResult = adminClient.listTopics(listTopicsOptions); Set<String> topics = listTopicsResult.names().get().stream().filter(topic -> !topic.startsWith("_")).collect(Collectors.toSet()); System.out.println(topics); DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(topics); Map<String, KafkaFuture<TopicDescription>> topicNameValues = describeTopicsResult.topicNameValues(); List<TopicManifest> topicManifests = topicNameValues.entrySet().stream().map(entry -> { try { TopicDescription topicDescription = entry.getValue().get(); return TopicManifest.builder().name(entry.getKey()) .noOfPartitions(topicDescription.partitions().size()) .build(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } return null; }).collect(Collectors.toList()); return topicManifests; }
Advertisement
Answer
Create a KafkaConsumer
and call endOffsets
(the consumer does not need to be subscribed to the topic(s)).
@Bean ApplicationRunner runner1(ConsumerFactory cf) { return args -> { try (Consumer consumer = cf.createConsumer()) { System.out.println(consumer.endOffsets(List.of(new TopicPartition("ktest29", 0), new TopicPartition("ktest29", 1), new TopicPartition("ktest29", 2)))); } }; }