Skip to content
Advertisement

Reduce a flux to mono using data from flux

I have this scenario. I have one paginated API which gives me the data for last 12 months. The response of the API is like:

public class PagedTransfersDto {

  private List<Transfer> content;

  private Page page;

  @Getter
  public static class Transfer {
      private String id;
      private Long transferId;
      private Long transferRequestId;
      private String status;
      private BigDecimal accountReceivable;
      private BigDecimal accountPayable;
      private BigDecimal netReceivable;
      private BigDecimal netPayable;
      private String currency;
      private Long transferDate;
  }

  @Getter
  public static class Page {
      private Integer size;
      private Integer number;
      private Integer totalElements;
      private Integer totalPages;
  }
}

Now I have to collect all the data and then calculate the sum of all the netReceivable and return as a Mono<CompanyIncome>. This pojo is like

public class CompanyIncome {
  private BigDecimal inferredIncome = new BigDecimal(0);
}

To do this I have written something like:

CompanyIncome initialIncome = new CompanyIncome();
    return myService.getTransfers(0, 50, fromDate, toDate)
        .expand(pagedTransfersDto -> {
            if (pagedTransfersDto.getPage().getNumber().equals(pagedTransfersDto.getPage().getTotalPages())) {
                return Mono.empty();
            }
            return myService.getTransfers(pagedTransfersDto.getPage().getNumber() + 1, 50, fromDate, toDate);
        })
        .flatMap(pagedTransfersDto -> Flux.fromIterable(pagedTransfersDto.getContent()))
        .reduce(initialIncome, ((companyIncome, transfer) -> {
            companyIncome.setInferredIncome(companyIncome.getInferredIncome().add(transfer.getNetReceivable()));
            return companyIncome;
        }));

Now the catch is that it is possible that this data is only for 3 months in which case I have to extrapolate this to 12 months by multiplying by 4.

What I am thinking is to get the first item of transfers list and the last one and the see if the data is not for a whole year but cant think of a place where to perform this operation.

Since after reduce the transfers data is gone. Before that I cannot seem to find a way how to get this info and still reduce from transfers flux

I am a little new to reactive way and cant seem to find a way to do this. Any help will be greatly appreciated. Thanks

Advertisement

Answer

For that purpose, the best solution is to store the necessary “metadata” in the reduced object. You already have a CompanyIncome object, so maybe that is a good place? Otherwise I’d introduce either a Tuple2 or some intermediate business object (eg. CompanyIncomeAggregator) into which to store both the aggregated income and the information that you need to decide at the end if further processing is necessary.

Then in a map step, you’d read that information, act on it and either return the computed income as is or modified according to your criterion.

Important note: Using variables external to the reactive chain is a code smell, as it introduces leaky shared state: if two subscriptions are made to the same Mono, they’ll work on the same CompanyIncome object. You can remediate here by using reduceWith, which takes a Supplier for the initial value: reduceWith(CompanyIncome::new, ...).

User contributions licensed under: CC BY-SA
5 People found this is helpful
Advertisement