Skip to content
Advertisement

How to reduce/iterate multiple java streams of same size at once?

I have multiple Java streams of the same length, and I want to perform some operation on the corresponding elements of each stream. For e.g., add 1st elements of all stream, 2nd elements of all streams and 3rd elements of all streams and so on.

How can we do this without first reducing each of the streams?

for Minimum reproducible example, I have the following test snippet

@Test
void aggregateMultipleStreams() {
    Stream<Integer> s1 = Stream.of(1, 2);
    Stream<Integer> s2 = Stream.of(4, 5);
    Stream<Integer> s3 = Stream.of(7, 8);
    assertEquals(List.of(1 + 4 + 7, 2 + 5 + 8), aggregate(s1, s2, s3, 2));
}

I can write the aggregate method as follows, by reducing all the streams first.

private List<Integer> aggregate(Stream<Integer> s1, Stream<Integer> s2, Stream<Integer> s3, int streamSize) {
    final List<List<Integer>> reduced = Stream.of(s1, s2, s3)
            .map(s -> s.collect(Collectors.toList())).collect(Collectors.toList());
    return IntStream.range(0, streamSize).mapToObj(n -> IntStream.range(0, reduced.size())
            .map(v -> reduced.get(v).get(n)).sum()).collect(Collectors.toList());
}

But this could be a storage hassle, if each stream contains numerous records, for N records, we need 3N storage here.

can we accomplish the addition of corresponding elements in different streams without first reduction? can we reduce multiple streams at once in Java?

After implementing @jb_dk’s solution below, the solution code snippet became:

    private List<Integer> aggregate(Stream<Integer> s1, Stream<Integer> s2, Stream<Integer> s3, int streamSize) {

    final List<Iterator<Integer>> iterators = Stream.of(s1, s2, s3)
            .map(Stream::iterator).collect(Collectors.toList());
    return IntStream.range(0, streamSize).mapToObj(n -> IntStream.range(0, iterators.size())
            .map(v -> iterators.get(v).next()).sum()).collect(Collectors.toList());
}

Advertisement

Answer

Form an array of the input stream objects instead of 3 named variables, then create an output List or stream and use an outer loop over the stream length and an inner loop that iterates over the array of input streams and reads one element from each, adding to the output array elements.

Something like (code not tested, syntax errors may exist)

    ...
    {
        List<Integer> results; // NOT final, can be a stream builder instead
        final List<Stream<Integer>> instrms = Stream.Of(s1, s2, s3);
        Iterator<Integer>[] initers = new Iterator<Integer>[instrms.length]
        // Get the iterator (not rewindable) for each stream
        int i = 0;
        for (Stream<Integer> instrm : instrms) {
            initers[i++] = ((Iterator<Integer>)instrm::iterator);
        }
        // Actually loop over the stream elements, outputting one
        //    sum element for each element.  Assumes all input streams
        //    are same length as the first one.
        while(! initers[0].hasNext()) {
            Integer    res1 = 0;
            for (Iterator<Integer> initer : initers) {
                res1 += initer.next();
            }
            results.Add(res1);
        }
        return results;  // results.build() if a stream builder
    }
User contributions licensed under: CC BY-SA
10 People found this is helpful
Advertisement