I set 2 different message keys for 2 partitions . This is my application.yml for producer application To test parallelism, I created a consumer application that reads messages from pf-topic. This is configuration from consumer application. . I created a function in consumer application to consume messages Now…
Tag: spring-cloud-stream
Spring cloud stream merge responses from two different functions
I’m trying to use spring cloud stream to solve the following problem: I have a class that calls two separated functions (Function A and B), both of those functions must work in parallel if the Function A finishes it must call the Function C, the same happens if Function B finish but this will call Funct…
An “Error creating bean with name ‘functionBindingRegistrar'” exception every time I start my application
I have a normal spring cloud stream application that simple reads data from Kafka topic and produces messages to another Kafka topic, please find below the configurations: And the following application.proeprties And below is the signature of all defined spring cloud functions Everything works fine, and the a…
How to use org.springframework.messaging.converter.ProtobufMessageConverter
I was trying to use Spring framework built-in ProtobufMessageConverter to convert my kafka message but I couldn’t seem to find a way to have this converter being used. The configuration of my binding is as the following: When I use my custom message converter, I can see it registered in SimpleFunctionRe…
How to add a field from body to a condition
I use the @StreamListener annotation to listen to the topic. How can I add a field from body to the condition? An example of an entity that is sent to the topic: Answer As you can see from version 3.0, you should avoid using filtering based on the message payload. Notice these lines from the documentation: Th…
Starting and Stopping an InboundChannelAdapter manually
I’m trying to start an InboundChannelAdapter manually using a @Scheduled function. I think I’m setting up the message payload wrong but I’m not sure what to have it as. Here’s the code: I’m getting this error: The error occurs at the controlBusChannel.send() line and the transfor…
How to define a bean from a dependency class as @Primary in Spring?
I have a Kafka Consumer and I’m implementing it using the Spring Cloud Stream Source.class binding and InboundChannelAdapter. This Source.class defines 3 MessageChannel beans: output, nullChannel, and errorChannel. My code looks like this: I want to autowire in the output channel so that I can use it to…
Failed to create topics”,”exception”:”norg.apache.kafka.common.errors.UnsupportedVersionException
Please suggest what changes are required as i am getting this error. Answer I see you are new here. You should always include version information and full stack trace for questions like this. Upgrade your broker to >= 2.4 or set the binder replication factor property. See https://github.com/spring-cloud/sp…
what is the use of the property spring.cloud.stream.bindings..consumer.partitioned
What will happen If partitionCount (spring.cloud.stream.bindings..producer.partitionCount) is greater than 1 and consumer.partitioned (spring.cloud.stream.bindings..consumer.partitioned) is false (Using Kafka) Answer In the case of Kafa binder, the property spring.cloud.stream.bindings..consumer.partitioned i…
Spring Cloud Stream Kafka with Microservices and Docker-Compose Error
I wanted to see if I can connect Spring Cloud Stream Kafka with the help of docker-compose in Docker containers, but I’m stuck and I didn’t find a solution yet, please help me. I’m working from Spring Microservices In Action; I didn’t find any help by now. Docker-compose with Kafka and…