Lets say we have:
- a list of URLs, that is a source for our Multi
- as a first step we grab HTML of this page using HTTP client call
- then we try to find some specific tag and grab its content
- then we store things we found into database
Now we have a 3 steps here. Is there a way how these steps can be run in parallel? I mean after some time it should: grab HTML and simultaneously processing html + getting tags content and also simultaneously saving data into database from item that was processed already.(hopefully its obvious what I mean here) This way we can have parallel processing. As default, what I can see, mutiny does it in serial manner.
Here is an example:
@Test public void test3() { Multi<String> source = Multi.createFrom().items("a", "b", "c"); source .onItem().transform(i -> trans(i, "-step1")) .onItem().transform(i -> trans(i, "-step2")) .onItem().transform(i -> trans(i, "-step3")) .subscribe().with(item -> System.out.println("Subscriber received " + item)); } private String trans(String s, String add) { int t = new Random().nextInt(4) * 1000; try { print("Sleeping for '" + s + "' miliseconds: " + t); Thread.sleep(t); } catch (InterruptedException e) { e.printStackTrace(); } return s + add; }
Now this reports following console output:
Sleeping for 'a' miliseconds: 2000 Sleeping for 'a-step1' miliseconds: 3000 Sleeping for 'a-step1-step2' miliseconds: 3000 Subscriber received a-step1-step2-step3 Sleeping for 'b' miliseconds: 0 Sleeping for 'b-step1' miliseconds: 0 Sleeping for 'b-step1-step2' miliseconds: 0 Subscriber received b-step1-step2-step3 Sleeping for 'c' miliseconds: 1000 Sleeping for 'c-step1' miliseconds: 3000 Sleeping for 'c-step1-step2' miliseconds: 3000 Subscriber received c-step1-step2-step3
One can see that its not running in parallel. What did I miss here?
Advertisement
Answer
As @jponge mentioned, you can collect your items in some List<Uni<String>>
and then call
Uni.combine().all().unis(listOfUnis).onitem().subscribe().with() List<Uni<String>> listOfUnis = new ArrayList<>(); Multi<String> source = Multi.createFrom().items("a", "b", "c"); source .onItem().invoke(i -> listOfUnis.add(trans(i, "-step1"))) .onItem().invoke(i -> listOfUnis.add(trans(i, "-step2"))) .onItem().invoke(i -> listOfUnis.add(trans(i, "-step3"))) // do not subscribe on Multis here
one more note here – if you are going to make HTTP calls, better add
.emitOn(someBlockingPoolExecutor)
since you don’t want to block Netty threads waiting for http calls to complete