Skip to content
Advertisement

Quarkus exception Loading KafkaConsumerRebalanceListener

Having this Listener KafkaConsumerRebalanceListener class

@ApplicationScoped
@Identifier("responses")
public class KafkaPartitionListener implements KafkaConsumerRebalanceListener {

    private static final Logger logger = Logger.getLogger(KafkaPartitionListener.class);

    public Option<Integer> partition;

    @Override
    public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
        logger.info("Topic partition collection received. Assigning partition");
        partition = List.ofAll(partitions)
            .headOption()
            .map(tp -> {
                logger.infov("Partition={0} assigned", tp.partition());
                return tp.partition();
            });
    }

}

When I add into my application.properties file

mp.messaging.incoming.responses.consumer-rebalance-listener.name=responses

And adding this dependency into another class

@Inject
KafkaPartitionListener partitionListener;

I’m having this exception when I run the service

java.lang.RuntimeException: io.quarkus.builder.BuildException: Build failure: Build failed due to errors
    [error]: Build step io.quarkus.arc.deployment.ArcProcessor#validate threw an exception: javax.enterprise.inject.spi.DeploymentException: javax.enterprise.inject.UnsatisfiedResolutionException: Unsatisfied dependency for type com.KafkaPartitionListener and qualifiers [@Default]
    - java member: com.rest.Controller#partitionListener
    - declared on CLASS bean [types=[com.rest.Controller, java.lang.Object], qualifiers=[@Default, @Any], target=com.rest.Controller]
    The following beans match by type, but none have matching qualifiers:
        - Bean [class=com.KafkaPartitionListener, qualifiers=[@Identifier(value = "responses"), @Any]]
    at io.quarkus.arc.processor.BeanDeployment.processErrors(BeanDeployment.java:1100)
    at io.quarkus.arc.processor.BeanDeployment.init(BeanDeployment.java:265)
    at io.quarkus.arc.processor.BeanProcessor.initialize(BeanProcessor.java:129)
    at io.quarkus.arc.deployment.ArcProcessor.validate(ArcProcessor.java:418)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at io.quarkus.deployment.ExtensionLoader$2.execute(ExtensionLoader.java:820)
    at io.quarkus.builder.BuildContext.run(BuildContext.java:277)
    at org.jboss.threads.ContextHandler$1.runWith(ContextHandler.java:18)
    at org.jboss.threads.EnhancedQueueExecutor$Task.run(EnhancedQueueExecutor.java:2449)
    at org.jboss.threads.EnhancedQueueExecutor$ThreadBody.run(EnhancedQueueExecutor.java:1478)
    at java.base/java.lang.Thread.run(Thread.java:834)
    at org.jboss.threads.JBossThread.run(JBossThread.java:501)

Any idea what’s wrong?

Regards

Advertisement

Answer

You have to use same @Identifier(“responses”) near your injection point. It’s because @Identifier have @Qualifier over it.

User contributions licensed under: CC BY-SA
6 People found this is helpful
Advertisement