Using this guide : https://doc.akka.io/docs/akka/current/stream/operators/Source-or-Flow/splitAfter.html I’m attempting to calculate the average of a number of attributes within a List. More specifically, using the code below I’m attempting to calculate the average value for a an actor source that emits values within a specified time frame :
import akka.Done; import akka.actor.ActorRef; import akka.actor.typed.ActorSystem; import akka.actor.typed.javadsl.Behaviors; import akka.japi.Pair; import akka.stream.CompletionStrategy; import akka.stream.OverflowStrategy; import akka.stream.javadsl.Sink; import akka.stream.javadsl.Source; import lombok.Getter; import lombok.Setter; import java.time.Duration; import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneOffset; import java.util.Optional; public class TestTradeStrategy1 { class GroupedStats { @Getter @Setter private Double volume; @Getter @Setter private Double open; @Getter @Setter private long timestamp; } private final static ActorSystem system = ActorSystem.create(Behaviors.empty(), "as"); private final static int BUFFER_SIZE = 100; private void runFlow() throws InterruptedException { final Source<Integer, ActorRef> source = Source.actorRef( elem -> { if (elem == Done.done()) return Optional.of(CompletionStrategy.immediately()); else return Optional.empty(); }, elem -> Optional.empty(), BUFFER_SIZE, OverflowStrategy.dropHead()); final ActorRef actorRef = source.throttle(20, Duration.ofSeconds(1)) .map(elem -> new Pair<>(elem, Instant.now())) .sliding(2 , 1) .splitAfter(slidingElements -> { if (slidingElements.size() == 2) { final Pair<Integer, Instant> current = slidingElements.get(0); final Pair<Integer, Instant> next = slidingElements.get(1); final LocalDateTime currentBucket = LocalDateTime.ofInstant(current.second(), ZoneOffset.UTC).withNano(0); final LocalDateTime nextBucket = LocalDateTime.ofInstant(next.second(), ZoneOffset.UTC).withNano(0); return !currentBucket.equals(nextBucket); } else { return false; } }) .map(slidingElements -> slidingElements.get(0).first()) .reduce((acc, element) -> acc + element) .to(Sink.foreach(x -> { System.out.println(x); })) .run(system); while(true){ actorRef.tell(1, ActorRef.noSender()); Thread.sleep(100); } } public static void main(String args[]) throws Exception { TestTradeStrategy1 t = new TestTradeStrategy1(); t.runFlow(); } }
The closest I’ve got is just summing the values using reduce
:
.reduce((acc, element) -> acc + element)
How to modify so that the average of values is calculated ?
Advertisement
Answer
Instead of reducing a stream of numeric elements, you can first map the stream converting the elements to a new data structure that a) contains the numeric value and b) the number of elements contributing to this value, this means it will be 1 initially, so a stream of 10, 11, 12 will become (10, 1), (11, 1), (12, 1).
When you then reduce the stream, you combine the elements by adding up the numeric values and the number-of-elements-counter. This way you eventually not only obtain the sum of all numeric values but also the count of elements you added up (this will be (33, 3) in the example). Obtaining the average is then trivial.