I have this issue with Webflux which I don’t seem to have a valid solution for: I have an item, T
, which I need to process using n
Webflux actions. Each action is a function that takes T
and returns Mono<T>
.
I could execute the actions on the item using flatMapSequential
, but the issue is that I want to terminate the sequential actions if prior actions has been failed.
For example, say T = User
and we want to support deletion of users. So deletion will require the following actions: “deleting from db” > “delete pictures” > “publish delete message on Kafka”.
I must follow these exact steps, and not publish a message to Kafka if the DB deletion has been failed. However, ATM each action that’s being executed is a standalone one, so my “publish to Kafka” action is being executed even when my “remove from db” action fails and throws.
I’d love to understand what am I missing…
My execute method:
public Mono<T> execute(final T item) { if (actions.isEmpty()) { LOG.warn("No actions to execute on item {}", item); return Mono.just(item); } return Flux.fromIterable(actions) .as(this::doBeforeItemApply) .flatMapSequential(this::applyAction, 1) .onErrorStop() .contextWrite(ctx -> ctx.put(getItemClass(), item)) .last() .then(Mono.just(item)); } protected Mono<A> applyAction(final A action) { return Mono.deferContextual(ctx -> applyAction(ctx, action, ctx.get(getItemClass())) .as(this::doOnApplyError) .as(this::doAfterItemApply) .contextWrite(innerCtx -> innerCtx.put(getActionClass(), action)) .then(Mono.just(action))); }
The actions are being injected using Spring Boot.
Advertisement
Answer
So seems like I got to something that works, posting it here in case it’ll help anybody in the future:
public interface PipelineAction<T> { /** * Invokes the action. * @param item the object to invoke the action on. * @param ctx the {@link ContextView} available to the action. * @return a Mono that completes when the log archive has been handled. */ @NotNull Mono<T> process(@NotNull T item, @NotNull ContextView ctx); } @Slf4j public abstract class AbstractActionsPipelineExecutor<T, A extends PipelineAction<T>> implements ActionsPipelineExecutor<T> { private static final int DEFAULT_RETRY_MAX_RETRIES = 5; private static final Duration DEFAULT_RETRY_MIN_BACKOFF_DURATION = Duration.ofSeconds(10); protected final List<A> actions; /** * Instantiates a simple pipeline of actions. * @param actions the actions to run. */ public AbstractActionsPipelineExecutor(@NotNull final List<A> actions) { this.actions = actions; } /** {@inheritDoc} */ @NotNull @Override public Mono<T> execute(@NotNull final T item) { if (actions.isEmpty()) { LOG.warn("No actions to execute on item {}", item); return Mono.just(item); } return Flux.deferContextual(contextView -> Flux.fromIterable(actions) .concatMap(action -> { onBeforeApply(item, action); var result = action.process(item, contextView) .doOnError(throwable -> onApplyError(item, action, throwable)) .doOnSuccess(ignored -> onAfterApply(item, action)); if (getMaxBackoffRetryAttempts() > 0) { result = result.retryWhen(Retry.backoff( getMaxBackoffRetryAttempts(), DEFAULT_RETRY_MIN_BACKOFF_DURATION)); } return result; }) .onErrorStop() ) .contextWrite(ctx -> ctx.put(getItemClass(), item)) .last() .then(Mono.just(item)); } /** * Event handler used before an action is applied to the item. * @param item the item to apply the action to. * @param action the action to apply. */ protected void onBeforeApply(final T item, final A action) { } /** * Event handler used after an action has been applied to the item. * @param item the item that was applied the action to. * @param action the action that was applied. */ protected void onAfterApply(final T item, final A action) { } /** * Event handler used when an error occurs while applying an action to the item. * @param item the item that was applied the action to. * @param action the action that was applied. * @param e the error that occurred. */ protected void onApplyError(final T item, final A action, final Throwable e) { } /** * Returns the maximum number of times to retry an action before giving up. * @return the maximum number of times to retry an action before giving up. */ protected int getMaxBackoffRetryAttempts() { return DEFAULT_RETRY_MAX_RETRIES; } /** * Returns the class of the item that this pipeline executor is for. * @return the class of the item that this pipeline executor is for. */ @NotNull protected Duration getMaxBackoffDuration() { return DEFAULT_RETRY_MIN_BACKOFF_DURATION; } /** * Gets the {@link Class} of the items that are processed by this pipeline. * @return the {@link Class} of the items that are processed by this pipeline. */ protected abstract Class<T> getItemClass(); /** * Gets the {@link Class} of the actions that are applied by this pipeline. * @return the {@link Class} of the actions that are applied by this pipeline. */ protected abstract Class<A> getActionClass(); /** * Performs an action when an error occurs while applying an action to the item. * @param mono the mono to apply the action to. * @return the mono after the action has been applied. */ @NotNull private Mono<T> doOnApplyError(final Mono<T> mono) { return Mono.deferContextual(ctx -> mono.doOnError(e -> { var item = ctx.get(getItemClass()); var action = ctx.get(getActionClass()); onApplyError(item, action, e); })); } }