I have this issue with Webflux which I don’t seem to have a valid solution for: I have an item, T
, which I need to process using n
Webflux actions. Each action is a function that takes T
and returns Mono<T>
.
I could execute the actions on the item using flatMapSequential
, but the issue is that I want to terminate the sequential actions if prior actions has been failed.
For example, say T = User
and we want to support deletion of users. So deletion will require the following actions: “deleting from db” > “delete pictures” > “publish delete message on Kafka”.
I must follow these exact steps, and not publish a message to Kafka if the DB deletion has been failed. However, ATM each action that’s being executed is a standalone one, so my “publish to Kafka” action is being executed even when my “remove from db” action fails and throws.
I’d love to understand what am I missing…
My execute method:
public Mono<T> execute(final T item) {
if (actions.isEmpty()) {
LOG.warn("No actions to execute on item {}", item);
return Mono.just(item);
}
return Flux.fromIterable(actions)
.as(this::doBeforeItemApply)
.flatMapSequential(this::applyAction, 1)
.onErrorStop()
.contextWrite(ctx -> ctx.put(getItemClass(), item))
.last()
.then(Mono.just(item));
}
protected Mono<A> applyAction(final A action) {
return Mono.deferContextual(ctx -> applyAction(ctx, action, ctx.get(getItemClass()))
.as(this::doOnApplyError)
.as(this::doAfterItemApply)
.contextWrite(innerCtx -> innerCtx.put(getActionClass(), action))
.then(Mono.just(action)));
}
The actions are being injected using Spring Boot.
Advertisement
Answer
So seems like I got to something that works, posting it here in case it’ll help anybody in the future:
public interface PipelineAction<T> {
/**
* Invokes the action.
* @param item the object to invoke the action on.
* @param ctx the {@link ContextView} available to the action.
* @return a Mono that completes when the log archive has been handled.
*/
@NotNull
Mono<T> process(@NotNull T item, @NotNull ContextView ctx);
}
@Slf4j
public abstract class AbstractActionsPipelineExecutor<T, A extends PipelineAction<T>>
implements ActionsPipelineExecutor<T> {
private static final int DEFAULT_RETRY_MAX_RETRIES = 5;
private static final Duration DEFAULT_RETRY_MIN_BACKOFF_DURATION = Duration.ofSeconds(10);
protected final List<A> actions;
/**
* Instantiates a simple pipeline of actions.
* @param actions the actions to run.
*/
public AbstractActionsPipelineExecutor(@NotNull final List<A> actions) {
this.actions = actions;
}
/** {@inheritDoc} */
@NotNull
@Override
public Mono<T> execute(@NotNull final T item) {
if (actions.isEmpty()) {
LOG.warn("No actions to execute on item {}", item);
return Mono.just(item);
}
return Flux.deferContextual(contextView ->
Flux.fromIterable(actions)
.concatMap(action -> {
onBeforeApply(item, action);
var result = action.process(item, contextView)
.doOnError(throwable -> onApplyError(item, action, throwable))
.doOnSuccess(ignored -> onAfterApply(item, action));
if (getMaxBackoffRetryAttempts() > 0) {
result = result.retryWhen(Retry.backoff(
getMaxBackoffRetryAttempts(), DEFAULT_RETRY_MIN_BACKOFF_DURATION));
}
return result;
})
.onErrorStop()
)
.contextWrite(ctx -> ctx.put(getItemClass(), item))
.last()
.then(Mono.just(item));
}
/**
* Event handler used before an action is applied to the item.
* @param item the item to apply the action to.
* @param action the action to apply.
*/
protected void onBeforeApply(final T item, final A action) { }
/**
* Event handler used after an action has been applied to the item.
* @param item the item that was applied the action to.
* @param action the action that was applied.
*/
protected void onAfterApply(final T item, final A action) { }
/**
* Event handler used when an error occurs while applying an action to the item.
* @param item the item that was applied the action to.
* @param action the action that was applied.
* @param e the error that occurred.
*/
protected void onApplyError(final T item, final A action, final Throwable e) { }
/**
* Returns the maximum number of times to retry an action before giving up.
* @return the maximum number of times to retry an action before giving up.
*/
protected int getMaxBackoffRetryAttempts() {
return DEFAULT_RETRY_MAX_RETRIES;
}
/**
* Returns the class of the item that this pipeline executor is for.
* @return the class of the item that this pipeline executor is for.
*/
@NotNull
protected Duration getMaxBackoffDuration() {
return DEFAULT_RETRY_MIN_BACKOFF_DURATION;
}
/**
* Gets the {@link Class} of the items that are processed by this pipeline.
* @return the {@link Class} of the items that are processed by this pipeline.
*/
protected abstract Class<T> getItemClass();
/**
* Gets the {@link Class} of the actions that are applied by this pipeline.
* @return the {@link Class} of the actions that are applied by this pipeline.
*/
protected abstract Class<A> getActionClass();
/**
* Performs an action when an error occurs while applying an action to the item.
* @param mono the mono to apply the action to.
* @return the mono after the action has been applied.
*/
@NotNull
private Mono<T> doOnApplyError(final Mono<T> mono) {
return Mono.deferContextual(ctx -> mono.doOnError(e -> {
var item = ctx.get(getItemClass());
var action = ctx.get(getActionClass());
onApplyError(item, action, e);
}));
}
}