I have a service which returns Flux<List<Integer>>
and I would like to convert it into Mono<List<Integer>>
to be used inside transform()
Here is what I did by using flatMap
and Mono.just()
:
private Mono<List<Integer>> filterAndMap(Mono<List<Integer>> listMono) { return listMono.filter(integers -> integers.size() == 3) .map(integers -> integers.stream() .map(integer -> integer * 10) .collect(Collectors.toList())); } void method1() { Flux.just(Arrays.asList(1, 2, 3), Arrays.asList(4, 5, 6), Arrays.asList(7, 8, 9, 10)) .flatMap(integers -> this.filterAndMap(Mono.just(integers))) .doOnNext(System.out::println) .blockLast(); }
But what I would like is to use transform
instead:
void method2() { Flux.just(Arrays.asList(1, 2, 3), Arrays.asList(4, 5, 6), Arrays.asList(7, 8, 9, 10)) .convertFluxToMono()? .transform(this::filterAndMap) .doOnNext(System.out::println) .block(); }
Is there any operator or technique to make method2 to work?
As a result I expect onNext()
be called 2 times each with value of List
I know filterAndMap method could be simplified and be used without any complication but actual method is huge(just modified for clarity) and there are many operators in a chain that I want to reuse by avoiding duplications.
Update:
A little bit context what I want to achieve.
I have 2 services – one via Http which returns me Mono<List<Integer>>
and another one via Redis which returns Flux<List<Integer>>
.
For both cases I have the exact same functionality – chain of 10-15 operators and what I want to achieve is to avoid duplicate code.
For example:
void f1() { Mono<List<Integer>> mono = getFromHttp(); mono .map(integers -> integers.stream().collect(Collectors.groupingBy(Function.identity()))) .filter(entry -> entry.size() > 5) //... //many other operators //... //.flatMap() //.switchIfEmpty() //.doOnNext() .retryWhen(); } void f2() { Flux<List<Integer>> flux = getFromRedis(); flux. //... //same functionality here //... }
Maybe better to not concentrate to convert Flux to Mono but instead convert Mono to Flux which I guess much easier?
Update2:
I have changed my existing filterAndMap into Flux. And instead of converting Flux to Mono I went with Mono to Flux
now I can call Flux.transform(this::filterAndMap)
and Mono.transform(mono -> this.filterAndMap(mono.flux()))
accordingly
private Flux<List<Integer>> filterAndMap(Flux<List<Integer>> listMono) { return listMono.filter(integers -> integers.size() == 3) .map(integers -> integers.stream() .map(integer -> integer * 10) .collect(Collectors.toList())); }
Thanks @Michael Berry, even I have changed the implementation, your solution fully covers my initial question/issue. So I accept it.
And thanks to @Simon Baslé for a good call, I have redesign my transformer and went with safer approach(Mono to Flux)
Advertisement
Answer
As per the comments, taking the question here as “How do I apply the same transformation to both a Mono
and a Flux
without duplicating the code?”
You could use a simple utility function like so:
static <T> Function<Mono<T>, Mono<T>> toMonoTransformer( Function<Flux<T>, Flux<T>> transformer) { return listMono -> transformer.apply(listMono.flux()).next(); }
This function can then be used on any transformer of any type. You can then use the transformation function as-is for a Flux:
void method2() { Flux.just(Arrays.asList(1, 2, 3), Arrays.asList(4, 5, 6), Arrays.asList(7, 8, 9, 10)) .transform(this::filterAndMap) .doOnNext(System.out::println) .blockLast(); }
…and applying a quick utility method for the Mono
:
void method1() { Mono.just(Arrays.asList(1, 2, 3)) .transform(toMonoTransformer(this::filterAndMap)) .doOnNext(System.out::println) .block(); }