Skip to content
Advertisement

Can we get parallel processing of each of Multi pipeline steps?

Lets say we have:

  • a list of URLs, that is a source for our Multi
  • as a first step we grab HTML of this page using HTTP client call
  • then we try to find some specific tag and grab its content
  • then we store things we found into database

Now we have a 3 steps here. Is there a way how these steps can be run in parallel? I mean after some time it should: grab HTML and simultaneously processing html + getting tags content and also simultaneously saving data into database from item that was processed already.(hopefully its obvious what I mean here) This way we can have parallel processing. As default, what I can see, mutiny does it in serial manner.

Here is an example:

  @Test
  public void test3() {
    Multi<String> source = Multi.createFrom().items("a", "b", "c");
    source
            .onItem().transform(i -> trans(i, "-step1"))
            .onItem().transform(i -> trans(i, "-step2"))
            .onItem().transform(i -> trans(i, "-step3"))
            .subscribe().with(item -> System.out.println("Subscriber received " + item));
  }


  private String trans(String s, String add) {
    int t = new Random().nextInt(4) * 1000;
    try {
      print("Sleeping for '" + s + "' miliseconds: " + t);
      Thread.sleep(t);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }

    return s + add;
  }

Now this reports following console output:

Sleeping for 'a' miliseconds: 2000
Sleeping for 'a-step1' miliseconds: 3000
Sleeping for 'a-step1-step2' miliseconds: 3000
Subscriber received a-step1-step2-step3
Sleeping for 'b' miliseconds: 0
Sleeping for 'b-step1' miliseconds: 0
Sleeping for 'b-step1-step2' miliseconds: 0
Subscriber received b-step1-step2-step3
Sleeping for 'c' miliseconds: 1000
Sleeping for 'c-step1' miliseconds: 3000
Sleeping for 'c-step1-step2' miliseconds: 3000
Subscriber received c-step1-step2-step3

One can see that its not running in parallel. What did I miss here?

Advertisement

Answer

As @jponge mentioned, you can collect your items in some List<Uni<String>> and then call

Uni.combine().all().unis(listOfUnis).onitem().subscribe().with()

List<Uni<String>> listOfUnis = new ArrayList<>();
    Multi<String> source = Multi.createFrom().items("a", "b", "c");
    source
            .onItem().invoke(i -> listOfUnis.add(trans(i, "-step1")))
            .onItem().invoke(i -> listOfUnis.add(trans(i, "-step2")))
            .onItem().invoke(i -> listOfUnis.add(trans(i, "-step3")))
// do not subscribe on Multis here

one more note here – if you are going to make HTTP calls, better add

.emitOn(someBlockingPoolExecutor)

since you don’t want to block Netty threads waiting for http calls to complete

User contributions licensed under: CC BY-SA
10 People found this is helpful
Advertisement