Skip to content
Advertisement

RxJava3. Why FlowableSubscriber onNext has not been called?

I need to get a Subscription object to have an opportunity of unsubscribing listeners. For this I want to give a FlowableSubscriber to function.

Code:

FlowableSubscriber fs = new FlowableSubscriber() {
        @Override
        public void onSubscribe(@NonNull Subscription s) {
            System.out.println("Flowable onSubs");
        }

        @Override
        public void onNext(Object o) {
            System.out.println("Flowable onNext");
        }

        @Override
        public void onError(Throwable t) {
            System.out.println("Flowable onErr");
        }

        @Override
        public void onComplete() {
            System.out.println("Flowable onComlet");
        }
    };

Logs are:

Running...
Flowable onSubs

If I use lambdas it works, but there is not onSubscribe callback.

How can I get a subscription and why these mathods haven’t been called?

Advertisement

Answer

Since Flowable supports backpressure you have to actually control how many items you can consume by calling request method on your Subscription so that they can be emitted by Flowable :

FlowableSubscriber fs = new FlowableSubscriber() {

        private Subscription subscription;

        @Override
        public void onSubscribe(Subscription s) {
            System.out.println("Flowable onSubs");
            subscription=s;
            subscription.request(Integer.MAX_VALUE);
        }

        @Override
        public void onNext(Object o) {
           System.out.println("Flowable onNext");
           //subscription.request(1); you can also request more items from onNext method - it is up to you
        }

        @Override
        public void onError(Throwable t) {
            System.out.println("Flowable onErr");
        }

        @Override
        public void onComplete() {
            System.out.println("Flowable onComlet");
        }
};

In the example Integer.MAX_VALUE is requested on subscription however it might not be the best idea. The thing is you should call Subscription::request from onSubscribe to request initial items and then call it from onNext and decide how many items you can actually process.

Advertisement