Flink throws NullPointerException when adding salt for the key and window aggregation on some field



I have a program doing 2 phase aggregation to solve the data skew in my job. And I used a simple ThreadLocalRandom to generate a suffix to my original like :

  private class KeyByTileWithSalt implements KeySelector<Type, String> {
    @Override
    public Long getKey(Type value) {
      return value.toString() + ThreadLocalRandom.current().nextLong(1, 8);
    }
  }

But Flink throws NullPointerException when adding salt for the key I’m doing window aggregation on some field.

I found a similar post on the flink-mail-list, and got the reason of the exception may occur, but I still cannot find the bug in my program about the unstable of hash value. Any ideas?

Answer

Flink relies on the result of keyBy being deterministic across the cluster. This is necessary so that every node in the cluster has a consistent view regarding which node is responsible for processing each key. By having the key depend on ThreadLocalRandom you have violated this assumption.

What you can do instead is to add a field to each record that you populate with a random value during ingestion, and then use that field as the key.



Source: stackoverflow