Skip to content
Advertisement

Tag: spring-cloud-stream

Spring Cloud Stream send/consume message to different partitions with KafkaHeaders.Message_KEY

I set 2 different message keys for 2 partitions . This is my application.yml for producer application To test parallelism, I created a consumer application that reads messages from pf-topic. This is configuration from consumer application. . I created a function in consumer application to consume messages Now it is time for testing. To test parallelism, I run 2 instances

An “Error creating bean with name ‘functionBindingRegistrar'” exception every time I start my application

I have a normal spring cloud stream application that simple reads data from Kafka topic and produces messages to another Kafka topic, please find below the configurations: And the following application.proeprties And below is the signature of all defined spring cloud functions Everything works fine, and the application starts and functions as it should, however, in the logs I encounter

How to use org.springframework.messaging.converter.ProtobufMessageConverter

I was trying to use Spring framework built-in ProtobufMessageConverter to convert my kafka message but I couldn’t seem to find a way to have this converter being used. The configuration of my binding is as the following: When I use my custom message converter, I can see it registered in SimpleFunctionRegistry.messageConverter which is a list of customer converter and default

How to add a field from body to a condition

I use the @StreamListener annotation to listen to the topic. How can I add a field from body to the condition? An example of an entity that is sent to the topic: Answer As you can see from version 3.0, you should avoid using filtering based on the message payload. Notice these lines from the documentation: The preceding code is

How to define a bean from a dependency class as @Primary in Spring?

I have a Kafka Consumer and I’m implementing it using the Spring Cloud Stream Source.class binding and InboundChannelAdapter. This Source.class defines 3 MessageChannel beans: output, nullChannel, and errorChannel. My code looks like this: I want to autowire in the output channel so that I can use it to start and stop my InboundChannelAdapter manually, but I’m getting this error when

Failed to create topics”,”exception”:”norg.apache.kafka.common.errors.UnsupportedVersionException

Please suggest what changes are required as i am getting this error. Answer I see you are new here. You should always include version information and full stack trace for questions like this. Upgrade your broker to >= 2.4 or set the binder replication factor property. See https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/commit/4161f875ede0446ab1d485730c51e6a2c5baa37a Change default replication factor to -1 Binder now uses a default value

what is the use of the property spring.cloud.stream.bindings..consumer.partitioned

What will happen If partitionCount (spring.cloud.stream.bindings..producer.partitionCount) is greater than 1 and consumer.partitioned (spring.cloud.stream.bindings..consumer.partitioned) is false (Using Kafka) Answer In the case of Kafa binder, the property spring.cloud.stream.bindings..consumer.partitioned is not relevant. You can skip setting this property on the consumer side. This property’s default value is false. Since Kafka has built-in partitioning support, the binder will simply delegate to Kafka broker

Advertisement