Spring Integration: How to consume QueueChannel on-demand as backpressure aware reactive (Flux) pipeline



I have Spring Integration QueueChannel that I want to consume in backpressure aware Flux pipeline.

  1. Prefetch n Messages from the queue.
  2. Make async calls to external system like fun remoteCall(message: Message): Mono<Void>.
  3. Pull next message(s) from the queue after external calls will complete.

I don’t want to use Poller with scheduler to pull messages from the queue ahead of time.

What is the best way to do it in latest Spring Integration + Java/Kotlin DSL, with error recovery etc?

Answer

See IntegrationReactiveUtils.messageChannelToFlux():

/**
 * Adapt a provided {@link MessageChannel} into a {@link Flux} source:
 * - a {@link org.springframework.integration.channel.FluxMessageChannel}
 * is returned as is because it is already a {@link Publisher};
 * - a {@link SubscribableChannel} is subscribed with a {@link MessageHandler}
 * for the {@link Sinks.Many#tryEmitNext(Object)} which is returned from this method;
 * - a {@link PollableChannel} is wrapped into a {@link MessageSource} lambda and reuses
 * {@link #messageSourceToFlux(MessageSource)}.
 * @param messageChannel the {@link MessageChannel} to adapt.
 * @param <T> the expected payload type.
 * @return a {@link Flux} which uses a provided {@link MessageChannel} as a source for events to publish.
 */
@SuppressWarnings("unchecked")
public static <T> Flux<Message<T>> messageChannelToFlux(MessageChannel messageChannel) {

Then you can use an IntegrationFlows.from(Publisher):

/**
 * Populate a {@link FluxMessageChannel} to the {@link IntegrationFlowBuilder} chain
 * and subscribe it to the provided {@link Publisher}.
 * @param publisher the {@link Publisher} to subscribe to.
 * @return new {@link IntegrationFlowBuilder}.
 */
public static IntegrationFlowBuilder from(Publisher<? extends Message<?>> publisher) {


Source: stackoverflow