Skip to content
Advertisement

How to convert Flux<List> into Mono<List>

I have a service which returns Flux<List<Integer>> and I would like to convert it into Mono<List<Integer>> to be used inside transform()

Here is what I did by using flatMap and Mono.just():

private Mono<List<Integer>> filterAndMap(Mono<List<Integer>> listMono) {
  return listMono.filter(integers -> integers.size() == 3)
      .map(integers -> integers.stream()
          .map(integer -> integer * 10)
          .collect(Collectors.toList()));
}

void method1() {
  Flux.just(Arrays.asList(1, 2, 3), Arrays.asList(4, 5, 6), Arrays.asList(7, 8, 9, 10))
      .flatMap(integers -> this.filterAndMap(Mono.just(integers)))
      .doOnNext(System.out::println)
      .blockLast();
}

But what I would like is to use transform instead:

void method2() {
  Flux.just(Arrays.asList(1, 2, 3), Arrays.asList(4, 5, 6), Arrays.asList(7, 8, 9, 10))
      .convertFluxToMono()?
      .transform(this::filterAndMap)
      .doOnNext(System.out::println)
      .block();
}

Is there any operator or technique to make method2 to work? As a result I expect onNext() be called 2 times each with value of List

I know filterAndMap method could be simplified and be used without any complication but actual method is huge(just modified for clarity) and there are many operators in a chain that I want to reuse by avoiding duplications.

Update:

A little bit context what I want to achieve. I have 2 services – one via Http which returns me Mono<List<Integer>> and another one via Redis which returns Flux<List<Integer>>. For both cases I have the exact same functionality – chain of 10-15 operators and what I want to achieve is to avoid duplicate code.

For example:

  void f1() {
    Mono<List<Integer>> mono = getFromHttp();
    
    mono
        .map(integers -> integers.stream().collect(Collectors.groupingBy(Function.identity())))
        .filter(entry -> entry.size() > 5)
        //...
        //many other operators
        //...
        //.flatMap()
        //.switchIfEmpty()
        //.doOnNext()
        .retryWhen();
  }

  void f2() {
    Flux<List<Integer>> flux = getFromRedis();
    
    flux.
        //...
        //same functionality here
        //...
  }

Maybe better to not concentrate to convert Flux to Mono but instead convert Mono to Flux which I guess much easier?

Update2:

I have changed my existing filterAndMap into Flux. And instead of converting Flux to Mono I went with Mono to Flux

now I can call Flux.transform(this::filterAndMap) and Mono.transform(mono -> this.filterAndMap(mono.flux())) accordingly

private Flux<List<Integer>> filterAndMap(Flux<List<Integer>> listMono) {
  return listMono.filter(integers -> integers.size() == 3)
      .map(integers -> integers.stream()
          .map(integer -> integer * 10)
          .collect(Collectors.toList()));
}

Thanks @Michael Berry, even I have changed the implementation, your solution fully covers my initial question/issue. So I accept it.

And thanks to @Simon Baslé for a good call, I have redesign my transformer and went with safer approach(Mono to Flux)

Advertisement

Answer

As per the comments, taking the question here as “How do I apply the same transformation to both a Mono and a Flux without duplicating the code?”

You could use a simple utility function like so:

static <T> Function<Mono<T>, Mono<T>> toMonoTransformer(
    Function<Flux<T>, Flux<T>> transformer) {
  return listMono -> transformer.apply(listMono.flux()).next();
}

This function can then be used on any transformer of any type. You can then use the transformation function as-is for a Flux:

void method2() {
  Flux.just(Arrays.asList(1, 2, 3), Arrays.asList(4, 5, 6), Arrays.asList(7, 8, 9, 10))
      .transform(this::filterAndMap)
      .doOnNext(System.out::println)
      .blockLast();
}

…and applying a quick utility method for the Mono:

void method1() {
  Mono.just(Arrays.asList(1, 2, 3))
      .transform(toMonoTransformer(this::filterAndMap))
      .doOnNext(System.out::println)
      .block();
}
User contributions licensed under: CC BY-SA
7 People found this is helpful
Advertisement