SpringBoot, Kafka : java.lang.NoSuchMethodError: org.apache.kafka.clients.producer.Producer.close(Ljava/time/Duration;)V

Tags: , , , ,



I’m using spring boot v2.2.4 and Apache Kafka in my project.

Below is my pom.xml file:

<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-jpa</artifactId>
        </dependency>
        <dependency>
            <groupId>com.h2database</groupId>
            <artifactId>h2</artifactId>
            <version>1.4.200</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>com.github.docker-java</groupId>
            <artifactId>docker-java</artifactId>
            <version>3.1.5</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>0.11.0.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.11.0.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.scala-lang/scala-library -->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.12.3</version>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.25</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.25</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>

        <!-- <dependency>
            <groupId>org.hibernate</groupId>
            <artifactId>hibernate-annotations</artifactId>
            <version>3.5.6-Final</version>
        </dependency>
 -->
    </dependencies>

Below is the code which i have as part of kafka

@Configuration
public class KafkaConfig {

    @Bean
    public ProducerFactory<String,ServingDetailsEntity> producerFactoryServingDetail(){
        Map<String,Object> config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return new DefaultKafkaProducerFactory(config);
    }

    @Bean
    public ProducerFactory<String,String> producerFactory(){
        Map<String,Object> config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return new DefaultKafkaProducerFactory(config);
    }

    @Bean
    public KafkaTemplate<String, ServingDetailsEntity> kafkaTemplateItem(){
        return new KafkaTemplate<String, ServingDetailsEntity>(producerFactoryServingDetail());
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate(){
        return new KafkaTemplate<String, String>(producerFactory());
    }
}

But when json message is sent to kafka queue, i’m getting below error

java.lang.NoSuchMethodError: org.apache.kafka.clients.producer.Producer.close(Ljava/time/Duration;)V
    at org.springframework.kafka.core.KafkaTemplate.closeProducer(KafkaTemplate.java:382) ~[spring-kafka-2.3.5.RELEASE.jar:2.3.5.RELEASE]
    at org.springframework.kafka.core.KafkaTemplate.lambda$buildCallback$4(KafkaTemplate.java:433) ~[spring-kafka-2.3.5.RELEASE.jar:2.3.5.RELEASE]
    at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:198) ~[kafka-clients-0.11.0.0.jar:na]
    at org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:185) ~[kafka-clients-0.11.0.0.jar:na]
    at org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:570) ~[kafka-clients-0.11.0.0.jar:na]
    at org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:550) ~[kafka-clients-0.11.0.0.jar:na]
    at org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:474) ~[kafka-clients-0.11.0.0.jar:na]
    at org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:75) ~[kafka-clients-0.11.0.0.jar:na]
    at org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:660) ~[kafka-clients-0.11.0.0.jar:na]
    at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:101) ~[kafka-clients-0.11.0.0.jar:na]
    at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:454) ~[kafka-clients-0.11.0.0.jar:na]
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:446) ~[kafka-clients-0.11.0.0.jar:na]
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:224) ~[kafka-clients-0.11.0.0.jar:na]
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162) ~[kafka-clients-0.11.0.0.jar:na]
    at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_201]

however json message is getting reached to queue, but i want to understand why i’m getting above error

looking forward for any help

Answer

You are using an old version of kafka-clients. Try using a more recent one:

<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.4.0</version>
</dependency>


Source: stackoverflow