I’m trying to write something using reactor which I know how to write using completable futures. I’m getting “Calling subscribe in non-blocking scope” warning in it.
My goal is to call turnOn()
with a timeout which should call turnOff()
after the timeout. If turnOn() is called again it should cancel the old timeout and wait for a new timeout.
How should I do this? I could do a hibrate and use CompletableFuture for the timeout but reactor’s api is just a bit easier.
this test works as expected:
public class TimeoutTest { Service service; @BeforeEach public void setUp() { service = mock(Service.class); } CompletableFuture<Void> turnOffFuture = null; @DisplayName("Should timeout on turnOn with timeout") @Test public void timeoutCompletableFuture() throws InterruptedException { turnOn(Duration.ofMillis(100)).join(); verify(service).turnOn(); verify(service,never()).turnOff(); Thread.sleep(1000); verify(service).turnOff(); } private interface Service{ void turnOn(); void turnOff(); } public void cancelTimeout() { if (turnOffFuture != null) turnOffFuture.cancel(false); turnOffFuture = null; } public CompletableFuture<Void> turnOn(Duration timeout) { CompletableFuture<Void> turnOnFuture = turnOn(); cancelTimeout(); turnOffFuture = turnOnFuture.thenRun(() -> delay(timeout)) .thenRun(this::turnOff); return turnOnFuture; } private void delay(Duration duration) { try { Thread.sleep(BigDecimal.valueOf(duration.getSeconds()) .scaleByPowerOfTen(3) .add(BigDecimal.valueOf(duration.getNano(), 6)) .intValue()); } catch (InterruptedException e) { throw new RuntimeException(e); } } private CompletableFuture<Void> turnOn() { return CompletableFuture.runAsync(() -> service.turnOn()); } private CompletableFuture<Void> turnOff() { return CompletableFuture.runAsync(() -> service.turnOff()); } }
but my reactor code does not.
public class TimeoutMonoTest { Service service; @BeforeEach public void setUp() { service = mock(Service.class); } Disposable turnOffDisposable = null; @DisplayName("Should timeout on turnOn with timeout") @Test public void timeoutMono() throws InterruptedException { turnOn(Duration.ofMillis(100)).block(Duration.ofMillis(10)); verify(service).turnOn(); verify(service, never()).turnOff(); Thread.sleep(1000); verify(service).turnOff(); } private interface Service { void turnOn(); void turnOff(); } public void cancelTimeout() { if (turnOffDisposable != null) turnOffDisposable.dispose(); turnOffDisposable = null; } public Mono<Void> turnOn(Duration timeout) { Mono<Void> turnOnFuture = turnOn(); cancelTimeout(); turnOffDisposable = turnOnFuture.delayElement(timeout) .subscribe(it -> this.turnOff()); return turnOnFuture; } private Mono<Void> turnOn() { service.turnOn(); return Mono.just("not empty but mapped to void").then(); } private Mono<Void> turnOff() { service.turnOff(); return Mono.just("not empty but mapped to void").then(); } }
Advertisement
Answer
The problem lies in the mapping to void mono’s in the turnOn()
and turnOff()
methods. They do not actually get a “next” signal, just a “success” signal.
The fix is simply to change the turnOn method to:
public Mono<Void> turnOn(Duration timeout) { cancelTimeout(); Mono<Void> turnOnMono = turnOn(); turnOffDisposable = turnOnMono.delayElement(timeout) .then(turnOff()) .subscribe(); return turnOn(); }