Skip to content
Advertisement

Spring Integration: How to consume QueueChannel on-demand as backpressure aware reactive (Flux) pipeline

I have Spring Integration QueueChannel that I want to consume in backpressure aware Flux pipeline.

  1. Prefetch n Messages from the queue.
  2. Make async calls to external system like fun remoteCall(message: Message): Mono<Void>.
  3. Pull next message(s) from the queue after external calls will complete.

I don’t want to use Poller with scheduler to pull messages from the queue ahead of time.

What is the best way to do it in latest Spring Integration + Java/Kotlin DSL, with error recovery etc?

Advertisement

Answer

See IntegrationReactiveUtils.messageChannelToFlux():

JavaScript

Then you can use an IntegrationFlows.from(Publisher):

JavaScript
User contributions licensed under: CC BY-SA
5 People found this is helpful
Advertisement