I have a service which returns Flux<List<Integer>>
and I would like to convert it into Mono<List<Integer>>
to be used inside transform()
Here is what I did by using flatMap
and Mono.just()
:
private Mono<List<Integer>> filterAndMap(Mono<List<Integer>> listMono) {
return listMono.filter(integers -> integers.size() == 3)
.map(integers -> integers.stream()
.map(integer -> integer * 10)
.collect(Collectors.toList()));
}
void method1() {
Flux.just(Arrays.asList(1, 2, 3), Arrays.asList(4, 5, 6), Arrays.asList(7, 8, 9, 10))
.flatMap(integers -> this.filterAndMap(Mono.just(integers)))
.doOnNext(System.out::println)
.blockLast();
}
But what I would like is to use transform
instead:
void method2() {
Flux.just(Arrays.asList(1, 2, 3), Arrays.asList(4, 5, 6), Arrays.asList(7, 8, 9, 10))
.convertFluxToMono()?
.transform(this::filterAndMap)
.doOnNext(System.out::println)
.block();
}
Is there any operator or technique to make method2 to work?
As a result I expect onNext()
be called 2 times each with value of List
I know filterAndMap method could be simplified and be used without any complication but actual method is huge(just modified for clarity) and there are many operators in a chain that I want to reuse by avoiding duplications.
Update:
A little bit context what I want to achieve.
I have 2 services – one via Http which returns me Mono<List<Integer>>
and another one via Redis which returns Flux<List<Integer>>
.
For both cases I have the exact same functionality – chain of 10-15 operators and what I want to achieve is to avoid duplicate code.
For example:
void f1() {
Mono<List<Integer>> mono = getFromHttp();
mono
.map(integers -> integers.stream().collect(Collectors.groupingBy(Function.identity())))
.filter(entry -> entry.size() > 5)
//...
//many other operators
//...
//.flatMap()
//.switchIfEmpty()
//.doOnNext()
.retryWhen();
}
void f2() {
Flux<List<Integer>> flux = getFromRedis();
flux.
//...
//same functionality here
//...
}
Maybe better to not concentrate to convert Flux to Mono but instead convert Mono to Flux which I guess much easier?
Update2:
I have changed my existing filterAndMap into Flux. And instead of converting Flux to Mono I went with Mono to Flux
now I can call Flux.transform(this::filterAndMap)
and Mono.transform(mono -> this.filterAndMap(mono.flux()))
accordingly
private Flux<List<Integer>> filterAndMap(Flux<List<Integer>> listMono) {
return listMono.filter(integers -> integers.size() == 3)
.map(integers -> integers.stream()
.map(integer -> integer * 10)
.collect(Collectors.toList()));
}
Thanks @Michael Berry, even I have changed the implementation, your solution fully covers my initial question/issue. So I accept it.
And thanks to @Simon Baslé for a good call, I have redesign my transformer and went with safer approach(Mono to Flux)
Advertisement
Answer
As per the comments, taking the question here as “How do I apply the same transformation to both a Mono
and a Flux
without duplicating the code?”
You could use a simple utility function like so:
static <T> Function<Mono<T>, Mono<T>> toMonoTransformer(
Function<Flux<T>, Flux<T>> transformer) {
return listMono -> transformer.apply(listMono.flux()).next();
}
This function can then be used on any transformer of any type. You can then use the transformation function as-is for a Flux:
void method2() {
Flux.just(Arrays.asList(1, 2, 3), Arrays.asList(4, 5, 6), Arrays.asList(7, 8, 9, 10))
.transform(this::filterAndMap)
.doOnNext(System.out::println)
.blockLast();
}
…and applying a quick utility method for the Mono
:
void method1() {
Mono.just(Arrays.asList(1, 2, 3))
.transform(toMonoTransformer(this::filterAndMap))
.doOnNext(System.out::println)
.block();
}