Given an infinite flux of objects, where each object has an ID, how can I use flux to create a buffered list of updates grouped by ID property (keeping the last emitted value)? Thanks
Example
Obj(ID=A, V=1) Obj(ID=A, V=2) Obj(ID=B, V=3) --- buffer -> I want to subscribe with a list of [Obj(ID=A, V=2), Obj(ID=B, V=3)] Obj(ID=A, V=1) Obj(ID=B, V=4) Obj(ID=B, V=6) Obj(ID=A, V=2) --- buffer -> I want to subscribe with a list of [Obj(ID=B, V=6), Obj(ID=A, V=2)] Obj(ID=B, V=1) --- buffer -> I want to subscribe with a list of [Obj(ID=B, V=1)]
Something like the following would be perfect but it seems to wait the end of the flux in my tests instead of buffering.
flux .buffer(Duration.ofMillis(2000)) .groupBy(Obj::getId) .flatMap(GroupedFlux::getLast) .collectToList() .subscribe(this::printList);
It works with buffer and custom logic for grouping
public static void main(String[] args) { flux.buffer(Duration.ofMillis(2000)).subscribe(this::groupList); } private void groupList(List<T> ts) { Collection<T> values = ts.stream() .collect(Collectors.toMap(T::getId, Function.identity(), (k, v) -> v)) .values(); System.out.println(values); }
Advertisement
Answer
I was able to achieve it with the reactive grouping
flux.window(Duration.ofMillis(2000)) .flatMap(window -> window.groupBy(Entry::getId) .flatMap(GroupedFlux::last) .collectList() ) .subscribe(this::printList);