I have Spring Integration QueueChannel that I want to consume in backpressure aware Flux pipeline.
- Prefetch
n
Messages from the queue. - Make async calls to external system like
fun remoteCall(message: Message): Mono<Void>
. - Pull next message(s) from the queue after external calls will complete.
I don’t want to use Poller with scheduler to pull messages from the queue ahead of time.
What is the best way to do it in latest Spring Integration + Java/Kotlin DSL, with error recovery etc?
Advertisement
Answer
See IntegrationReactiveUtils.messageChannelToFlux()
:
/** * Adapt a provided {@link MessageChannel} into a {@link Flux} source: * - a {@link org.springframework.integration.channel.FluxMessageChannel} * is returned as is because it is already a {@link Publisher}; * - a {@link SubscribableChannel} is subscribed with a {@link MessageHandler} * for the {@link Sinks.Many#tryEmitNext(Object)} which is returned from this method; * - a {@link PollableChannel} is wrapped into a {@link MessageSource} lambda and reuses * {@link #messageSourceToFlux(MessageSource)}. * @param messageChannel the {@link MessageChannel} to adapt. * @param <T> the expected payload type. * @return a {@link Flux} which uses a provided {@link MessageChannel} as a source for events to publish. */ @SuppressWarnings("unchecked") public static <T> Flux<Message<T>> messageChannelToFlux(MessageChannel messageChannel) {
Then you can use an IntegrationFlows.from(Publisher)
:
/** * Populate a {@link FluxMessageChannel} to the {@link IntegrationFlowBuilder} chain * and subscribe it to the provided {@link Publisher}. * @param publisher the {@link Publisher} to subscribe to. * @return new {@link IntegrationFlowBuilder}. */ public static IntegrationFlowBuilder from(Publisher<? extends Message<?>> publisher) {