Skip to content
Advertisement

How to consume infinite flux multiple times

This is what I’m trying to achieve:

When somebody requests http://localhost/runIt, I would like to return data from cache that would be refreshed every 6 seconds. Below, I have a flux (always same one that is stored in map) that is first time instantiated and starts emitting numbers 0,1,2,3,4… to infinity.

Is it possible to make this Spring MVC Controller method return "1,2" on first request, then on request after 7 seconds to return "3,4" etc. ?

Also, if lastRunIt is not updated for 60 seconds, I would need to terminate the flux.

This code below is something that I had in mind, but it is currently not working at all.

Map<Integer, Flux<String>> itos = new ConcurrentHashMap<>();
  Instant lastRunIt;

  @GetMapping("runIt")
  public Flux<String> runIt(){
    lastRunIt = Instant.now();
    return itos.computeIfAbsent(1, k ->
        Flux.interval(Duration.ZERO, Duration.ofSeconds(3))
          .doOnNext(x -> {
            //dispose if no request for 60 seconds
            if(lastRunIt.plusSeconds(60).isBefore(Instant.now())){
              //someDispisable.dispose(); //<--- HOW TO GET Disposable here?
            }
            System.out.println(x);
          })
          .cache(Duration.ofSeconds(6))
    );

  }

Advertisement

Answer

OK, I managed to do something that seems working. Would like to know if it can be used this way, or there some possibility here for resource over-use, or maybe more compact way to represent this?

  Map<Integer, Flux<String>> itos = new ConcurrentHashMap<>();
  Instant lastRunIt;

  Subscription subskrip;

  @GetMapping("runIt")
  public Flux<String> runIt(){
    lastRunIt = Instant.now();
    return itos.computeIfAbsent(1, k -> {
      return Flux.interval(Duration.ZERO, Duration.ofSeconds(3))
          .doOnSubscribe(sb -> {
            subskrip = sb; //save subscription first time this is called
          })
          .doOnNext(x -> {
            //dispose if no request for 10 seconds
            if(lastRunIt.plusSeconds(10).isBefore(Instant.now())){
                System.out.println("DISPOSINGGG");
                subskrip.cancel(); //cancel this flux
                itos.remove(1); //remove from map
            }
            System.out.println(x);
          })
          .cache(Duration.ofSeconds(9))
            .take(3) //on every REST request take only 3 items that are already in cache
          .map(x -> ">" + x);
      });
  }
Advertisement