I set 2 different message keys for 2 partitions . This is my application.yml for producer application To test parallelism, I created a consumer application that reads messages from pf-topic. This is configuration from consumer application. . I created a function in consumer application to consume messages Now it is time for testing. To test parallelism, I run 2 instances
Tag: spring-cloud-stream
Spring cloud stream merge responses from two different functions
I’m trying to use spring cloud stream to solve the following problem: I have a class that calls two separated functions (Function A and B), both of those functions must work in parallel if the Function A finishes it must call the Function C, the same happens if Function B finish but this will call Function D, then I need
An “Error creating bean with name ‘functionBindingRegistrar'” exception every time I start my application
I have a normal spring cloud stream application that simple reads data from Kafka topic and produces messages to another Kafka topic, please find below the configurations: And the following application.proeprties And below is the signature of all defined spring cloud functions Everything works fine, and the application starts and functions as it should, however, in the logs I encounter
How to use org.springframework.messaging.converter.ProtobufMessageConverter
I was trying to use Spring framework built-in ProtobufMessageConverter to convert my kafka message but I couldn’t seem to find a way to have this converter being used. The configuration of my binding is as the following: When I use my custom message converter, I can see it registered in SimpleFunctionRegistry.messageConverter which is a list of customer converter and default
How to add a field from body to a condition
I use the @StreamListener annotation to listen to the topic. How can I add a field from body to the condition? An example of an entity that is sent to the topic: Answer As you can see from version 3.0, you should avoid using filtering based on the message payload. Notice these lines from the documentation: The preceding code is
Starting and Stopping an InboundChannelAdapter manually
I’m trying to start an InboundChannelAdapter manually using a @Scheduled function. I think I’m setting up the message payload wrong but I’m not sure what to have it as. Here’s the code: I’m getting this error: The error occurs at the controlBusChannel.send() line and the transformer in the error is the first one that runs after the source. How do
How to define a bean from a dependency class as @Primary in Spring?
I have a Kafka Consumer and I’m implementing it using the Spring Cloud Stream Source.class binding and InboundChannelAdapter. This Source.class defines 3 MessageChannel beans: output, nullChannel, and errorChannel. My code looks like this: I want to autowire in the output channel so that I can use it to start and stop my InboundChannelAdapter manually, but I’m getting this error when
Failed to create topics”,”exception”:”norg.apache.kafka.common.errors.UnsupportedVersionException
Please suggest what changes are required as i am getting this error. Answer I see you are new here. You should always include version information and full stack trace for questions like this. Upgrade your broker to >= 2.4 or set the binder replication factor property. See https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/commit/4161f875ede0446ab1d485730c51e6a2c5baa37a Change default replication factor to -1 Binder now uses a default value
what is the use of the property spring.cloud.stream.bindings..consumer.partitioned
What will happen If partitionCount (spring.cloud.stream.bindings..producer.partitionCount) is greater than 1 and consumer.partitioned (spring.cloud.stream.bindings..consumer.partitioned) is false (Using Kafka) Answer In the case of Kafa binder, the property spring.cloud.stream.bindings..consumer.partitioned is not relevant. You can skip setting this property on the consumer side. This property’s default value is false. Since Kafka has built-in partitioning support, the binder will simply delegate to Kafka broker
Spring Cloud Stream Kafka with Microservices and Docker-Compose Error
I wanted to see if I can connect Spring Cloud Stream Kafka with the help of docker-compose in Docker containers, but I’m stuck and I didn’t find a solution yet, please help me. I’m working from Spring Microservices In Action; I didn’t find any help by now. Docker-compose with Kafka and Zookeeper: Docker-Compose with my Spring services: App.properties for my