I’m using spring boot v2.2.4 and Apache Kafka in my project.
Below is my pom.xml
file:
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-webflux</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-jpa</artifactId> </dependency> <dependency> <groupId>com.h2database</groupId> <artifactId>h2</artifactId> <version>1.4.200</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <scope>runtime</scope> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-configuration-processor</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>com.github.docker-java</groupId> <artifactId>docker-java</artifactId> <version>3.1.5</version> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>0.11.0.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.11.0.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.scala-lang/scala-library --> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>2.12.3</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.25</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.25</version> <scope>test</scope> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency> <!-- <dependency> <groupId>org.hibernate</groupId> <artifactId>hibernate-annotations</artifactId> <version>3.5.6-Final</version> </dependency> --> </dependencies>
Below is the code which i have as part of kafka
@Configuration public class KafkaConfig { @Bean public ProducerFactory<String,ServingDetailsEntity> producerFactoryServingDetail(){ Map<String,Object> config = new HashMap<>(); config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092"); config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); return new DefaultKafkaProducerFactory(config); } @Bean public ProducerFactory<String,String> producerFactory(){ Map<String,Object> config = new HashMap<>(); config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092"); config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); return new DefaultKafkaProducerFactory(config); } @Bean public KafkaTemplate<String, ServingDetailsEntity> kafkaTemplateItem(){ return new KafkaTemplate<String, ServingDetailsEntity>(producerFactoryServingDetail()); } @Bean public KafkaTemplate<String, String> kafkaTemplate(){ return new KafkaTemplate<String, String>(producerFactory()); } }
But when json message is sent to kafka queue, i’m getting below error
java.lang.NoSuchMethodError: org.apache.kafka.clients.producer.Producer.close(Ljava/time/Duration;)V at org.springframework.kafka.core.KafkaTemplate.closeProducer(KafkaTemplate.java:382) ~[spring-kafka-2.3.5.RELEASE.jar:2.3.5.RELEASE] at org.springframework.kafka.core.KafkaTemplate.lambda$buildCallback$4(KafkaTemplate.java:433) ~[spring-kafka-2.3.5.RELEASE.jar:2.3.5.RELEASE] at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:198) ~[kafka-clients-0.11.0.0.jar:na] at org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:185) ~[kafka-clients-0.11.0.0.jar:na] at org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:570) ~[kafka-clients-0.11.0.0.jar:na] at org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:550) ~[kafka-clients-0.11.0.0.jar:na] at org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:474) ~[kafka-clients-0.11.0.0.jar:na] at org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:75) ~[kafka-clients-0.11.0.0.jar:na] at org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:660) ~[kafka-clients-0.11.0.0.jar:na] at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:101) ~[kafka-clients-0.11.0.0.jar:na] at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:454) ~[kafka-clients-0.11.0.0.jar:na] at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:446) ~[kafka-clients-0.11.0.0.jar:na] at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:224) ~[kafka-clients-0.11.0.0.jar:na] at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162) ~[kafka-clients-0.11.0.0.jar:na] at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_201]
however json message is getting reached to queue, but i want to understand why i’m getting above error
looking forward for any help
Advertisement
Answer
You are using an old version of kafka-clients
. Try using a more recent one:
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.4.0</version> </dependency>