I have a bean and a stream
public class TokenBag {
private String token;
private int count;
// Standard constructor and getters here
}
Stream<String> src = Stream.of("a", "a", "a", "b", "b", "a", "a");
and want to apply some intermediate operation to the stream that returns another stream of objects of TokenBag. In this example there must be two: (“a”, 3), (“b”, 3) and (“a”, 2).
Please think it as a very simplistic example. In real there will be much more complicated logic than just counting the same values in a row. Actually I try to design a simple parser that accepts a stream of tokens and returns a stream of objects.
Also please note that it must stay a stream (with no intermediate accumulation), and also in this example it must really count the same values in a row (it differs from grouping).
Will appreciate your suggestions about the general approach to this task solution.
Advertisement
Answer
You’d need to convert your stream to a Spliterator
and then adapt this spliterator to a custom one that partially-reduces some elements according to your logic (in your example it would need to count equal elements until a different element appears). Then, you’d need to turn your spliterator back to a new stream.
Bear in mind that this can’t be 100% lazy, as you’d need to eagerly consume some elements from the backing stream in order to create a new TokenBag
element for the new stream.
Here’s the code for the custom spliterator:
public class CountingSpliterator
extends Spliterators.AbstractSpliterator<TokenBag>
implements Consumer<String> {
private final Spliterator<String> source;
private String currentToken;
private String previousToken;
private int tokenCount = 0;
private boolean tokenHasChanged;
public CountingSpliterator(Spliterator<String> source) {
super(source.estimateSize(), source.characteristics());
this.source = source;
}
@Override
public boolean tryAdvance(Consumer<? super TokenBag> action) {
while (source.tryAdvance(this)) {
if (tokenHasChanged) {
action.accept(new TokenBag(previousToken, tokenCount));
tokenCount = 1;
return true;
}
}
if (tokenCount > 0) {
action.accept(new TokenBag(currentToken, tokenCount));
tokenCount = 0;
return true;
}
return false;
}
@Override
public void accept(String newToken) {
if (currentToken != null) {
previousToken = currentToken;
}
currentToken = newToken;
if (previousToken != null && !previousToken.equals(currentToken)) {
tokenHasChanged = true;
} else {
tokenCount++;
tokenHasChanged = false;
}
}
}
So this spliterator extends Spliterators.AbstractSpliterator
and also implements Consumer
. The code is quite complex, but the idea is that it adapts one or more tokens from the source spliterator into an instance of TokenBag
.
For every accepted token from the source spliterator, the count for that token is incremented, until the token changes. At this point, a TokenBag
instance is created with the token and the count and is immediately pushed to the Consumer<? super TokenBag> action
parameter. Also, the counter is reset to 1
. The logic in the accept
method handles token changes, border cases, etc.
Here’s how you should use this spliterator:
Stream<String> src = Stream.of("a", "a", "a", "b", "b", "a", "a");
Stream<TokenBag> stream = StreamSupport.stream(
new CountingSpliterator(src.spliterator()),
false); // false means sequential, we don't want parallel!
stream.forEach(System.out::println);
If you override toString()
in TokenBag
, the output is:
TokenBag{token='a', count=3}
TokenBag{token='b', count=2}
TokenBag{token='a', count=2}
A note on parallelism: I don’t know how to parallelize this partial-reduce task, I even don’t know if it’s at all possible. But if it were, I doubt it would produce any measurable improvement.