Skip to content
Advertisement

Count the same items in a row in Java 8 Stream API

I have a bean and a stream

public class TokenBag {
    private String token;
    private int count;
    // Standard constructor and getters here
}
Stream<String> src = Stream.of("a", "a", "a", "b", "b", "a", "a");

and want to apply some intermediate operation to the stream that returns another stream of objects of TokenBag. In this example there must be two: (“a”, 3), (“b”, 3) and (“a”, 2).

Please think it as a very simplistic example. In real there will be much more complicated logic than just counting the same values in a row. Actually I try to design a simple parser that accepts a stream of tokens and returns a stream of objects.

Also please note that it must stay a stream (with no intermediate accumulation), and also in this example it must really count the same values in a row (it differs from grouping).

Will appreciate your suggestions about the general approach to this task solution.

Advertisement

Answer

You’d need to convert your stream to a Spliterator and then adapt this spliterator to a custom one that partially-reduces some elements according to your logic (in your example it would need to count equal elements until a different element appears). Then, you’d need to turn your spliterator back to a new stream.

Bear in mind that this can’t be 100% lazy, as you’d need to eagerly consume some elements from the backing stream in order to create a new TokenBag element for the new stream.

Here’s the code for the custom spliterator:

public class CountingSpliterator
        extends Spliterators.AbstractSpliterator<TokenBag>
        implements Consumer<String> {

    private final Spliterator<String> source;
    private String currentToken;
    private String previousToken;
    private int tokenCount = 0;
    private boolean tokenHasChanged;

    public CountingSpliterator(Spliterator<String> source) {
        super(source.estimateSize(), source.characteristics());
        this.source = source;
    }

    @Override
    public boolean tryAdvance(Consumer<? super TokenBag> action) {
        while (source.tryAdvance(this)) {
            if (tokenHasChanged) {
                action.accept(new TokenBag(previousToken, tokenCount));
                tokenCount = 1;
                return true;
            }
        }
        if (tokenCount > 0) {
            action.accept(new TokenBag(currentToken, tokenCount));
            tokenCount = 0;
            return true;
        }
        return false;
    }

    @Override
    public void accept(String newToken) {
        if (currentToken != null) {
            previousToken = currentToken;
        }
        currentToken = newToken;
        if (previousToken != null && !previousToken.equals(currentToken)) {
            tokenHasChanged = true;
        } else {
            tokenCount++;
            tokenHasChanged = false;
        }
    }
}

So this spliterator extends Spliterators.AbstractSpliterator and also implements Consumer. The code is quite complex, but the idea is that it adapts one or more tokens from the source spliterator into an instance of TokenBag.

For every accepted token from the source spliterator, the count for that token is incremented, until the token changes. At this point, a TokenBag instance is created with the token and the count and is immediately pushed to the Consumer<? super TokenBag> action parameter. Also, the counter is reset to 1. The logic in the accept method handles token changes, border cases, etc.

Here’s how you should use this spliterator:

Stream<String> src = Stream.of("a", "a", "a", "b", "b", "a", "a");

Stream<TokenBag> stream = StreamSupport.stream(
        new CountingSpliterator(src.spliterator()),
        false); // false means sequential, we don't want parallel!

stream.forEach(System.out::println);

If you override toString() in TokenBag, the output is:

TokenBag{token='a', count=3}
TokenBag{token='b', count=2}
TokenBag{token='a', count=2}

A note on parallelism: I don’t know how to parallelize this partial-reduce task, I even don’t know if it’s at all possible. But if it were, I doubt it would produce any measurable improvement.

User contributions licensed under: CC BY-SA
8 People found this is helpful
Advertisement