I have a normal spring cloud stream application that simple reads data from Kafka topic and produces messages to another Kafka topic, please find below the configurations:
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.5.6</version> <relativePath/> </parent> <properties> <spring-cloud.version>2020.0.4</spring-cloud.version> <spring-boot-maven-plugin.version>2.3.0.RELEASE</spring-boot-maven-plugin.version> </properties> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>${spring-cloud.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-mongodb-reactive</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-kafka</artifactId> <exclusions> <exclusion> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-tomcat</artifactId> </exclusion> </exclusions> </dependency> </dependencies>
And the following application.proeprties
#Kafka Configurations spring.kafka.bootstrap-servers=localhost:9092 spring.cloud.stream.kafka.binder.configuration.auto.offset.reset=latest spring.cloud.function.definition=merchantCredentials;validatedProducts;validateImages;retryUnprocessedItems #Input topics #Merchants spring.cloud.stream.bindings.merchantCredentials-in-0.destination=mis.merchantCtpCredentials spring.cloud.stream.kafka.bindings.merchantCredentials-in-0.consumer.ack-mode=manual_immediate spring.cloud.stream.bindings.merchantCredentials-in-0.contentType=application/json spring.cloud.stream.bindings.merchantCredentials-in-0.consumer.header-mode=headers spring.cloud.stream.bindings.merchantCredentials-in-0.consumer.partitioned=true spring.cloud.stream.bindings.merchantCredentials-in-0.consumer.max-attempts=1 spring.cloud.stream.bindings.merchantCredentials-in-0.group=tuevGroup #kfc.notifications.product spring.cloud.stream.bindings.validatedProducts-in-0.destination=kfc.notifications.product spring.cloud.stream.kafka.bindings.validatedProducts-in-0.consumer.ack-mode=manual_immediate spring.cloud.stream.bindings.validatedProducts-in-0.contentType=application/json spring.cloud.stream.bindings.validatedProducts-in-0.consumer.header-mode=headers spring.cloud.stream.bindings.validatedProducts-in-0.consumer.concurrency=5 spring.cloud.stream.bindings.validatedProducts-in-0.consumer.partitioned=true spring.cloud.stream.bindings.validatedProducts-in-0.consumer.max-attempts=1 spring.cloud.stream.bindings.validatedProducts-in-0.group=tuevGroup #marketplace.products spring.cloud.stream.bindings.validateImages-in-0.destination=marketplace.products spring.cloud.stream.kafka.bindings.validateImages-in-0.consumer.ack-mode=manual_immediate spring.cloud.stream.bindings.validateImages-in-0.contentType=application/json spring.cloud.stream.bindings.validateImages-in-0.consumer.header-mode=headers spring.cloud.stream.bindings.validateImages-in-0.consumer.partitioned=true spring.cloud.stream.bindings.validateImages-in-0.consumer.max-attempts=1 spring.cloud.stream.bindings.validateImages-in-0.group=tuevGroup #Output topics #productValidated spring.cloud.stream.bindings.validatedProducts-out-0.destination=marketplace.validated.products spring.cloud.stream.bindings.validatedProducts-out-0.contentType=application/json spring.cloud.stream.bindings.validatedProducts-out-0.producer.partition-count=10 spring.cloud.stream.bindings.validatedProducts-out-0.producer.header-mode=headers spring.cloud.stream.bindings.retryUnprocessedItems-out-0.destination=marketplace.validated.products spring.cloud.stream.bindings.retryUnprocessedItems-out-0.contentType=application/json spring.cloud.stream.bindings.retryUnprocessedItems-out-0.producer.partition-count=10 spring.cloud.stream.bindings.retryUnprocessedItems-out-0.producer.header-mode=headers spring.cloud.stream.poller.cron=0 0/10 * * * * spring.cloud.stream.poller.initial-delay=10000
And below is the signature of all defined spring cloud functions
@Bean public Consumer<Flux<Message<JsonNode>>> merchantCredentials() { @Bean public Function<Message<NotificationMessage>, Message<ProductValidatedEvent>> validatedProducts() { @Bean public Consumer<Message<ProductImportMessage>> validateImages() { @PollableBean @SchedulerLock(name = "retryProcess_scheduledTask", lockAtMostFor = "${retry.job.lock.atMost}", lockAtLeastFor = "${retry.job.lock.atLeast}") public Supplier<Flux<Message<ProductValidatedEvent>>> retryUnprocessedItems() {
Everything works fine, and the application starts and functions as it should, however, in the logs I encounter this exception multiple times, specially during the start up phase of the application
org.springframework.boot.SpringApplication - Application run failed org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'functionBindingRegistrar' defined in class path resource [org/springframework/cloud/stream/function/FunctionConfiguration.class]: Invocation of init method failed; nested exception is java.lang.IllegalArgumentException: Type must be one of Supplier, Function or Consumer
I have double checked all configurations and I still have no clue how can I prevent this issue from happening. Why this exception is happening? is it ignorable ?
UPDATE 1:
I have tracked the bug to this function in spring framework, FunctionTypeUtils:
public static Type discoverFunctionTypeFromClass(Class<?> functionalClass) { Assert.isTrue(isFunctional(functionalClass), "Type must be one of Supplier, Function or Consumer");
this function gets called by this function in FunctionConfiguration:
private String[] filterEligibleFunctionDefinitions() { ... for (int i = 0; i < functionNames.length && eligibleDefinition; i++) { String functionName = functionNames[i]; if (this.applicationContext.containsBean(functionName)) {
And when I added debug points to this one, as well as the previous one I got the following output
functionName: merchantCredentials functionalClass: com.rewedigital.services.tuev.marketplace.merchant.flow.MerchantFlowManger$$Lambda$1323/0x00000008008fc040 functionName: validatedProducts functionalClass: com.rewedigital.services.tuev.marketplace.validator.listener.ProductChangedListener$$Lambda$1331/0x00000008008fa040 functionName: validateImages functionalClass: com.rewedigital.services.tuev.marketplace.sieve.listener.ProductImagesListener$$Lambda$1324/0x00000008008fc440 functionName: retryUnprocessedItems functionalClass: org.springframework.beans.factory.support.NullBean
Showing that the retryUnprocessedItems is the culprit, not sure why though?
Advertisement
Answer
After some investigation, it turned out that the problem is mainly with the @SchedulerLock
annotation.
I observed that this issue happens while then shedLock table has lock added on the method, and hence it was preventing the FunctionBeanRegistrar
from adding the method, and so the exception.
Of course this means on the other hand that the annotation is now deemed not usable because what @PollableBean
annotation really runs is not the function itself, but rather the Supplier lambda expression returned by the function, practically rendering the ShedLock
useless.
Once I remove the annotation, all the exceptions are gone and the sun shines again, birds sing, etc etc..
The next question to answer would be how to use rollable bean in a distributed manner but that is out of the scope of this question