I am studying data skew processing in Flink and how I can change the low-level control of physical partition in order to have an even processing of tuples. I have created synthetic skewed data sources and I aim to process (aggregate) them over a window. Here is the complete code.
streamTrainsStation01.union(streamTrainsStation02) .union(streamTicketsStation01).union(streamTicketsStation02) // map the keys .map(new StationPlatformMapper(metricMapper)).name(metricMapper) .rebalance() // or .rescale() .shuffle() .keyBy(new StationPlatformKeySelector()) .window(TumblingProcessingTimeWindows.of(Time.seconds(20))) .apply(new StationPlatformRichWindowFunction(metricWindowFunction)).name(metricWindowFunction) .setParallelism(4) .map(new StationPlatformMapper(metricSkewedMapper)).name(metricSkewedMapper) .addSink(new MqttStationPlatformPublisher(ipAddressSink, topic)).name(metricSinkFunction) ;
According to the Flink dashboard I could not see too much difference among .shuffle()
, .rescale()
, and .rebalance()
. Even though the documentation says rebalance() transformation is more suitable for data skew.
After that I tried to use .partitionCustom(partitioner, "someKey")
. However, for my surprise, I could not use setParallelism(4) on the window operation. The documentation says
Note: This operation is inherently non-parallel since all elements have to pass through the same operator instance.
I did not understand why. If I am allowed to do partitionCustom
, why can’t I use parallelism after that? Here is the complete code.
streamTrainsStation01.union(streamTrainsStation02) .union(streamTicketsStation01).union(streamTicketsStation02) // map the keys .map(new StationPlatformMapper(metricMapper)).name(metricMapper) .partitionCustom(new StationPlatformKeyCustomPartitioner(), new StationPlatformKeySelector()) .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(20))) .apply(new StationPlatformRichAllWindowFunction(metricWindowFunction)).name(metricWindowFunction) .map(new StationPlatformMapper(metricSkewedMapper)).name(metricSkewedMapper) .addSink(new MqttStationPlatformPublisher(ipAddressSink, topic)).name(metricSinkFunction) ;
Thanks, Felipe
Advertisement
Answer
I got an answer from FLink-user-mail list. Basically using keyBy()
after rebalance()
is killing all effect that rebalance()
is trying to do. The first (ad-hoc) solution that I found is to create a composite key that cares about the skewed key.
public class CompositeSkewedKeyStationPlatform implements Serializable { private static final long serialVersionUID = -5960601544505897824L; private Integer stationId; private Integer platformId; private Integer skewParameter; }
I use it on the map
function before use keyBy()
.
public class StationPlatformSkewedKeyMapper extends RichMapFunction<MqttSensor, Tuple2<CompositeSkewedKeyStationPlatform, MqttSensor>> { private SkewParameterGenerator skewParameterGenerator; public StationPlatformSkewedKeyMapper() { this.skewParameterGenerator = new SkewParameterGenerator(10); } @Override public Tuple2<CompositeSkewedKeyStationPlatform, MqttSensor> map(MqttSensor value) throws Exception { Integer platformId = value.getKey().f2; Integer stationId = value.getKey().f4; Integer skewParameter = 0; if (stationId.equals(new Integer(2)) && platformId.equals(new Integer(3))) { skewParameter = this.skewParameterGenerator.getNextItem(); } CompositeSkewedKeyStationPlatform compositeKey = new CompositeSkewedKeyStationPlatform(stationId, platformId, skewParameter); return Tuple2.of(compositeKey, value); } }
here is my complete solution.