I have made a custom collector that uses a MessageDigest to create a hash. In general MessageDigest does not work in parallel. The issue I’m seeing is in the combiner()
method. It is not possible to combine two MessageDigest objects. When I return null it seems to work but if I throw an UnsupportedOperationException
it fails. What is the typical way to implement a collector that doesn’t support parallel operations?
class ChecksumCollector implements Collector<String, MessageDigest, ByteBuffer> { private String algorithm; ChecksumCollector(final String algorithm) { this.algorithm = algorithm; } @Override public Supplier<MessageDigest> supplier() { return () -> { try { return MessageDigest.getInstance(algorithm); } catch (NoSuchAlgorithmException e) { throw new UnsupportedOperationException("Could not find MessageDigest for algorithm " + algorithm, e); } }; } @Override public BiConsumer<MessageDigest, String> accumulator() { return (md, s) -> md.update(s.getBytes(StandardCharsets.UTF_8)); } @Override public BinaryOperator<MessageDigest> combiner() { return null; //seems to work but hash may not be correct? //throw new UnsupportedOperationException(LineDuplicationHash.class.getSimpleName() + " does not support parallel streams"); } @Override public Function<MessageDigest, ByteBuffer> finisher() { return md -> ByteBuffer.wrap(md.digest()); } @Override public Set<Characteristics> characteristics() { return Set.of(); } }
Advertisement
Answer
A Collector
’s BinaryOperator
returned by combiner()
will only be used when used for parallel streams, however the combiner()
method itself will be invoked when calling Stream.collect()
to retrieve that combiner, in the JDK’s implementation (see ReduceOps.makeRef(Collector)
).
You thus have 2 options:
- either return
null
, which would cause aNullPointerException
if your collector is used in a parallel Stream, at the time the combiner needs to be used; - or return a
BinaryOperator
that actually throws the exception when called:
return (a, b) -> throw new UnsupportedOperationException( LineDuplicationHash.class.getSimpleName() + " does not support parallel streams");
This second option would be more friendly to the unknowing developer who later changes your pipeline to make it parallel.