I have a bean and a stream
public class TokenBag { private String token; private int count; // Standard constructor and getters here } Stream<String> src = Stream.of("a", "a", "a", "b", "b", "a", "a");
and want to apply some intermediate operation to the stream that returns another stream of objects of TokenBag. In this example there must be two: (“a”, 3), (“b”, 3) and (“a”, 2).
Please think it as a very simplistic example. In real there will be much more complicated logic than just counting the same values in a row. Actually I try to design a simple parser that accepts a stream of tokens and returns a stream of objects.
Also please note that it must stay a stream (with no intermediate accumulation), and also in this example it must really count the same values in a row (it differs from grouping).
Will appreciate your suggestions about the general approach to this task solution.
Advertisement
Answer
You’d need to convert your stream to a Spliterator
and then adapt this spliterator to a custom one that partially-reduces some elements according to your logic (in your example it would need to count equal elements until a different element appears). Then, you’d need to turn your spliterator back to a new stream.
Bear in mind that this can’t be 100% lazy, as you’d need to eagerly consume some elements from the backing stream in order to create a new TokenBag
element for the new stream.
Here’s the code for the custom spliterator:
public class CountingSpliterator extends Spliterators.AbstractSpliterator<TokenBag> implements Consumer<String> { private final Spliterator<String> source; private String currentToken; private String previousToken; private int tokenCount = 0; private boolean tokenHasChanged; public CountingSpliterator(Spliterator<String> source) { super(source.estimateSize(), source.characteristics()); this.source = source; } @Override public boolean tryAdvance(Consumer<? super TokenBag> action) { while (source.tryAdvance(this)) { if (tokenHasChanged) { action.accept(new TokenBag(previousToken, tokenCount)); tokenCount = 1; return true; } } if (tokenCount > 0) { action.accept(new TokenBag(currentToken, tokenCount)); tokenCount = 0; return true; } return false; } @Override public void accept(String newToken) { if (currentToken != null) { previousToken = currentToken; } currentToken = newToken; if (previousToken != null && !previousToken.equals(currentToken)) { tokenHasChanged = true; } else { tokenCount++; tokenHasChanged = false; } } }
So this spliterator extends Spliterators.AbstractSpliterator
and also implements Consumer
. The code is quite complex, but the idea is that it adapts one or more tokens from the source spliterator into an instance of TokenBag
.
For every accepted token from the source spliterator, the count for that token is incremented, until the token changes. At this point, a TokenBag
instance is created with the token and the count and is immediately pushed to the Consumer<? super TokenBag> action
parameter. Also, the counter is reset to 1
. The logic in the accept
method handles token changes, border cases, etc.
Here’s how you should use this spliterator:
Stream<String> src = Stream.of("a", "a", "a", "b", "b", "a", "a"); Stream<TokenBag> stream = StreamSupport.stream( new CountingSpliterator(src.spliterator()), false); // false means sequential, we don't want parallel! stream.forEach(System.out::println);
If you override toString()
in TokenBag
, the output is:
TokenBag{token='a', count=3} TokenBag{token='b', count=2} TokenBag{token='a', count=2}
A note on parallelism: I don’t know how to parallelize this partial-reduce task, I even don’t know if it’s at all possible. But if it were, I doubt it would produce any measurable improvement.