I need to get a Subscription object to have an opportunity of unsubscribing listeners. For this I want to give a FlowableSubscriber to function.
Code:
FlowableSubscriber fs = new FlowableSubscriber() { @Override public void onSubscribe(@NonNull Subscription s) { System.out.println("Flowable onSubs"); } @Override public void onNext(Object o) { System.out.println("Flowable onNext"); } @Override public void onError(Throwable t) { System.out.println("Flowable onErr"); } @Override public void onComplete() { System.out.println("Flowable onComlet"); } };
Logs are:
Running... Flowable onSubs
If I use lambdas it works, but there is not onSubscribe callback.
How can I get a subscription and why these mathods haven’t been called?
Advertisement
Answer
Since Flowable
supports backpressure you have to actually control how many items you can consume by calling request
method on your Subscription
so that they can be emitted by Flowable
:
FlowableSubscriber fs = new FlowableSubscriber() { private Subscription subscription; @Override public void onSubscribe(Subscription s) { System.out.println("Flowable onSubs"); subscription=s; subscription.request(Integer.MAX_VALUE); } @Override public void onNext(Object o) { System.out.println("Flowable onNext"); //subscription.request(1); you can also request more items from onNext method - it is up to you } @Override public void onError(Throwable t) { System.out.println("Flowable onErr"); } @Override public void onComplete() { System.out.println("Flowable onComlet"); } };
In the example Integer.MAX_VALUE
is requested on subscription however it might not be the best idea. The thing is you should call Subscription::request
from onSubscribe
to request initial items and then call it from onNext
and decide how many items you can actually process.