Skip to content
Advertisement

How to buffer and group elements in Reactor Flux in Java

Given an infinite flux of objects, where each object has an ID, how can I use flux to create a buffered list of updates grouped by ID property (keeping the last emitted value)? Thanks

Example

    Obj(ID=A, V=1)
    Obj(ID=A, V=2)
    Obj(ID=B, V=3) 
    --- buffer -> I want to subscribe with a list of [Obj(ID=A, V=2), Obj(ID=B, V=3)]
    Obj(ID=A, V=1)
    Obj(ID=B, V=4)
    Obj(ID=B, V=6)
    Obj(ID=A, V=2)
    --- buffer -> I want to subscribe with a list of [Obj(ID=B, V=6), Obj(ID=A, V=2)]
    Obj(ID=B, V=1)
    --- buffer -> I want to subscribe with a list of [Obj(ID=B, V=1)]

Something like the following would be perfect but it seems to wait the end of the flux in my tests instead of buffering.

flux
    .buffer(Duration.ofMillis(2000))
    .groupBy(Obj::getId)
    .flatMap(GroupedFlux::getLast)
    .collectToList()
    .subscribe(this::printList);

It works with buffer and custom logic for grouping

    public static void main(String[] args) {
        flux.buffer(Duration.ofMillis(2000)).subscribe(this::groupList);
    }

    private void groupList(List<T> ts) {
        Collection<T> values = ts.stream()
                .collect(Collectors.toMap(T::getId, Function.identity(), (k, v) -> v))
                .values();
        System.out.println(values);
    }

Advertisement

Answer

I was able to achieve it with the reactive grouping

flux.window(Duration.ofMillis(2000))
    .flatMap(window -> window.groupBy(Entry::getId)
        .flatMap(GroupedFlux::last)
        .collectList()
    )
    .subscribe(this::printList);
Advertisement