Skip to content
Advertisement

Webflux execute a sequence of actions in sequential order and get their result

I have this issue with Webflux which I don’t seem to have a valid solution for: I have an item, T, which I need to process using n Webflux actions. Each action is a function that takes T and returns Mono<T>.

I could execute the actions on the item using flatMapSequential, but the issue is that I want to terminate the sequential actions if prior actions has been failed. For example, say T = User and we want to support deletion of users. So deletion will require the following actions: “deleting from db” > “delete pictures” > “publish delete message on Kafka”.

I must follow these exact steps, and not publish a message to Kafka if the DB deletion has been failed. However, ATM each action that’s being executed is a standalone one, so my “publish to Kafka” action is being executed even when my “remove from db” action fails and throws.

I’d love to understand what am I missing…

My execute method:

    public Mono<T> execute(final T item) {
        if (actions.isEmpty()) {
            LOG.warn("No actions to execute on item {}", item);
            return Mono.just(item);
        }

        return Flux.fromIterable(actions)
                .as(this::doBeforeItemApply)
                .flatMapSequential(this::applyAction, 1)
                .onErrorStop()
                .contextWrite(ctx -> ctx.put(getItemClass(), item))
                .last()
                .then(Mono.just(item));
    }


    protected Mono<A> applyAction(final A action) {
        return Mono.deferContextual(ctx -> applyAction(ctx, action, ctx.get(getItemClass()))
                .as(this::doOnApplyError)
                .as(this::doAfterItemApply)
                .contextWrite(innerCtx -> innerCtx.put(getActionClass(), action))
                .then(Mono.just(action)));
    }

The actions are being injected using Spring Boot.

Advertisement

Answer

So seems like I got to something that works, posting it here in case it’ll help anybody in the future:

public interface PipelineAction<T> {
    /**
     * Invokes the action.
     * @param item the object to invoke the action on.
     * @param ctx the {@link ContextView} available to the action.
     * @return a Mono that completes when the log archive has been handled.
     */
    @NotNull
    Mono<T> process(@NotNull T item, @NotNull ContextView ctx);
}


@Slf4j
public abstract class AbstractActionsPipelineExecutor<T, A extends PipelineAction<T>>
        implements ActionsPipelineExecutor<T> {
    private static final int DEFAULT_RETRY_MAX_RETRIES = 5;
    private static final Duration DEFAULT_RETRY_MIN_BACKOFF_DURATION = Duration.ofSeconds(10);

    protected final List<A> actions;

    /**
     * Instantiates a simple pipeline of actions.
     * @param actions the actions to run.
     */
    public AbstractActionsPipelineExecutor(@NotNull final List<A> actions) {
        this.actions = actions;
    }

    /** {@inheritDoc} */
    @NotNull
    @Override
    public Mono<T> execute(@NotNull final T item) {
        if (actions.isEmpty()) {
            LOG.warn("No actions to execute on item {}", item);
            return Mono.just(item);
        }

        return Flux.deferContextual(contextView ->
                        Flux.fromIterable(actions)
                                .concatMap(action -> {
                                    onBeforeApply(item, action);
                                    var result = action.process(item, contextView)
                                            .doOnError(throwable -> onApplyError(item, action, throwable))
                                            .doOnSuccess(ignored -> onAfterApply(item, action));
                                    if (getMaxBackoffRetryAttempts() > 0) {
                                        result = result.retryWhen(Retry.backoff(
                                                getMaxBackoffRetryAttempts(), DEFAULT_RETRY_MIN_BACKOFF_DURATION));
                                    }
                                    return result;
                                })
                                .onErrorStop()
                )
                .contextWrite(ctx -> ctx.put(getItemClass(), item))
                .last()
                .then(Mono.just(item));
    }

    /**
     * Event handler used before an action is applied to the item.
     * @param item the item to apply the action to.
     * @param action the action to apply.
     */
    protected void onBeforeApply(final T item, final A action) { }

    /**
     * Event handler used after an action has been applied to the item.
     * @param item the item that was applied the action to.
     * @param action the action that was applied.
     */
    protected void onAfterApply(final T item, final A action) { }

    /**
     * Event handler used when an error occurs while applying an action to the item.
     * @param item the item that was applied the action to.
     * @param action the action that was applied.
     * @param e the error that occurred.
     */
    protected void onApplyError(final T item, final A action, final Throwable e) { }

    /**
     * Returns the maximum number of times to retry an action before giving up.
     * @return the maximum number of times to retry an action before giving up.
     */
    protected int getMaxBackoffRetryAttempts() {
        return DEFAULT_RETRY_MAX_RETRIES;
    }

    /**
     * Returns the class of the item that this pipeline executor is for.
     * @return the class of the item that this pipeline executor is for.
     */
    @NotNull
    protected Duration getMaxBackoffDuration() {
        return DEFAULT_RETRY_MIN_BACKOFF_DURATION;
    }

    /**
     * Gets the {@link Class} of the items that are processed by this pipeline.
     * @return the {@link Class} of the items that are processed by this pipeline.
     */
    protected abstract Class<T> getItemClass();

    /**
     * Gets the {@link Class} of the actions that are applied by this pipeline.
     * @return the {@link Class} of the actions that are applied by this pipeline.
     */
    protected abstract Class<A> getActionClass();

    /**
     * Performs an action when an error occurs while applying an action to the item.
     * @param mono the mono to apply the action to.
     * @return the mono after the action has been applied.
     */
    @NotNull
    private Mono<T> doOnApplyError(final Mono<T> mono) {
        return Mono.deferContextual(ctx -> mono.doOnError(e -> {
            var item = ctx.get(getItemClass());
            var action = ctx.get(getActionClass());
            onApplyError(item, action, e);
        }));
    }
}
User contributions licensed under: CC BY-SA
8 People found this is helpful
Advertisement