Skip to content

Object reuse – mutating same object – in Flink operators

I was reading the doc here, which gives a use case to reuse the object as given below:

stream
    .apply(new WindowFunction<WikipediaEditEvent, Tuple2<String, Long>, String, TimeWindow>() {
        // Create an instance that we will reuse on every call
        private Tuple2<String, Long> result = new Tuple<>();
    
        @Override
        public void apply(String userName, TimeWindow timeWindow, Iterable<WikipediaEditEvent> iterable, Collector<Tuple2<String, Long>> collector) throws Exception {
            long changesCount = ...
            
            // Set fields on an existing object instead of creating a new one
            result.f0 = userName;
            // Auto-boxing!! A new Long value may be created
            result.f1 = changesCount;
            
            // Reuse the same Tuple2 object
            collector.collect(result);
        }
    }

So every time instead of creating a new Tuple, it seems to be able to use the same Tuple by using its mutable nature in order to decrease the pressure on GC. Would it be applicable in all operators, where we can mutate and pass the same object in the pipeline via collector.collect(...) call?

By using that idea, I wonder in what places I can make such an optimization without breaking the code or introducing sneaky bugs. Again as an example a KeySelector which returns a Tuple taken from this answer given below:

KeyedStream<Employee, Tuple2<String, String>> employeesKeyedByCountryndEmployer = 
  streamEmployee.keyBy(
    new KeySelector<Employee, Tuple2<String, String>>() {

      @Override
      public Tuple2<String, String> getKey(Employee value) throws Exception {
        return Tuple2.of(value.getCountry(), value.getEmployer());
      }
    }
  );

I wonder if that case, can I reuse the same Tuple by mutating it with different inputs as below. Of course in all cases I assume parallelism is more than 1, probably much higher in a real use case.

KeyedStream<Employee, Tuple2<String, String>> employeesKeyedByCountryndEmployer = 
  streamEmployee.keyBy(
    new KeySelector<Employee, Tuple2<String, String>>() {
      
      Tuple2<String, String> tuple = new Tuple2<>();

      @Override
      public Tuple2<String, String> getKey(Employee value) throws Exception {
        tuple.f0 = value.getCountry();
        tuple.f1 = value.value.getEmployer();
        return tuple;
      }
    }
  );

I do not know, if Flink copies objects between stages in the pipeline, so I wonder if it’s safe to do such an optimization. I read about enableObjectReuse() configuration in the docs, though I am not sure if I really understood it. Actually, it may be a bit Flink internals, though could not understand when Flink does what to manage data/object/records in the pipeline. May be I should make this clear first?

Thanks,

Answer

This is sort of reuse in a KeySelector is not safe. keyBy is not an operator, and the usual rules about object reuse in operator chains (which I covered here) do not apply.