Skip to content

Spring Webflux: how to compare results from two Mono streams and save based on filter

The situation is as follows: I have two MongoDB documents: User and Reproduction.

Based on a check if the User has a Reproduction entry (document), I want to add another entity in MongoDB.

Currently I am using Spring Webflux and Spring Reactive Mongo. Please see code below.

    @Autowired
    ParserRepository parserRepository;

    @Autowired
    ReproductionRepository reproductionRepository;

    @Autowired
    UserRepository userRepository;

    public void addParser(Parser parser, String keycloakUserId) {

        Mono<User> userMono = userRepository.findByKeycloakUserId(keycloakUserId);
        Mono<Reproduction> reproductionMono = reproductionRepository.findById(parser.getReproductionId());


        userMono.zipWith(reproductionMono)
                .filter(objects -> objects.getT2().getUserId().equals(objects.getT1().get_id()))
                .then(parserRepository.save(parser))
                .switchIfEmpty(Mono.error(new ParserDoesNotBelongToUserException("Unable to add, since this parser does not belong to you")));
        
    }

My question is as follows: how can the result from the Mono be used in order to verify that the correct Mono is there, and based on that save Parser Document. Basically combining the results from both Mono streams in order to perform save of another document and doing this in a non-blocking way.

The method as above doesn’t seem to work apparently. What is the best way of doing this scenario with two separate Mono’s in this case? Any best practice tips are welcome.

Answer

taken from Mono#filter docs:

filter(Predicate<? super T> tester) If this Mono is valued, test the result and replay it if predicate returns true.

So if the filter evaluates to true, it will pass through the value, if false, it will not.

The problem is that you are calling then after. Docs for Mono#then

then(Mono other) Let this Mono complete then play another Mono.

the key word here is complete which basically means, that whatever the row before completes with, its ignored, as long as it completes. So whatever it completed with (false/true) in the row before doesn’t really matter we run then anyway.

im guessing you want something like:

userMono.zipWith(reproductionMono).flatMap(objects -> {
    if(objects.getT2().getUserId().equals(objects.getT1().get_id()))) {
        return parserRepository.save(parser)
    } else {
        return Mono.error(new ParserDoesNotBelongToUserException("Unable to add, since this parser does not belong to you"));
    }
}