I’m trying to write something using reactor which I know how to write using completable futures. I’m getting “Calling subscribe in non-blocking scope” warning in it.
My goal is to call turnOn()
with a timeout which should call turnOff()
after the timeout. If turnOn() is called again it should cancel the old timeout and wait for a new timeout.
How should I do this? I could do a hibrate and use CompletableFuture for the timeout but reactor’s api is just a bit easier.
this test works as expected:
JavaScript
x
public class TimeoutTest {
Service service;
@BeforeEach
public void setUp() {
service = mock(Service.class);
}
CompletableFuture<Void> turnOffFuture = null;
@DisplayName("Should timeout on turnOn with timeout")
@Test
public void timeoutCompletableFuture() throws InterruptedException {
turnOn(Duration.ofMillis(100)).join();
verify(service).turnOn();
verify(service,never()).turnOff();
Thread.sleep(1000);
verify(service).turnOff();
}
private interface Service{
void turnOn();
void turnOff();
}
public void cancelTimeout() {
if (turnOffFuture != null)
turnOffFuture.cancel(false);
turnOffFuture = null;
}
public CompletableFuture<Void> turnOn(Duration timeout) {
CompletableFuture<Void> turnOnFuture = turnOn();
cancelTimeout();
turnOffFuture = turnOnFuture.thenRun(() -> delay(timeout))
.thenRun(this::turnOff);
return turnOnFuture;
}
private void delay(Duration duration) {
try {
Thread.sleep(BigDecimal.valueOf(duration.getSeconds())
.scaleByPowerOfTen(3)
.add(BigDecimal.valueOf(duration.getNano(), 6))
.intValue());
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
private CompletableFuture<Void> turnOn() {
return CompletableFuture.runAsync(() -> service.turnOn());
}
private CompletableFuture<Void> turnOff() {
return CompletableFuture.runAsync(() -> service.turnOff());
}
}
but my reactor code does not.
JavaScript
public class TimeoutMonoTest {
Service service;
@BeforeEach
public void setUp() {
service = mock(Service.class);
}
Disposable turnOffDisposable = null;
@DisplayName("Should timeout on turnOn with timeout")
@Test
public void timeoutMono() throws InterruptedException {
turnOn(Duration.ofMillis(100)).block(Duration.ofMillis(10));
verify(service).turnOn();
verify(service, never()).turnOff();
Thread.sleep(1000);
verify(service).turnOff();
}
private interface Service {
void turnOn();
void turnOff();
}
public void cancelTimeout() {
if (turnOffDisposable != null)
turnOffDisposable.dispose();
turnOffDisposable = null;
}
public Mono<Void> turnOn(Duration timeout) {
Mono<Void> turnOnFuture = turnOn();
cancelTimeout();
turnOffDisposable = turnOnFuture.delayElement(timeout)
.subscribe(it -> this.turnOff());
return turnOnFuture;
}
private Mono<Void> turnOn() {
service.turnOn();
return Mono.just("not empty but mapped to void").then();
}
private Mono<Void> turnOff() {
service.turnOff();
return Mono.just("not empty but mapped to void").then();
}
}
Advertisement
Answer
The problem lies in the mapping to void mono’s in the turnOn()
and turnOff()
methods. They do not actually get a “next” signal, just a “success” signal.
The fix is simply to change the turnOn method to:
JavaScript
public Mono<Void> turnOn(Duration timeout) {
cancelTimeout();
Mono<Void> turnOnMono = turnOn();
turnOffDisposable = turnOnMono.delayElement(timeout)
.then(turnOff())
.subscribe();
return turnOn();
}