I have a kafka topic with 15 partitions [0-14] and I’m running flink with 5 parallelism. So ideally each parallel flink consumer should consume 3 partitions each. But even after multiple restarts, few of the kafka partitions are not subscribed by any flink workers.
org.apache.kafka.clients.consumer.KafkaConsumer assign Subscribed to partition(s): topic_name-13, topic_name-8, topic_name-9 org.apache.kafka.clients.consumer.KafkaConsumer assign Subscribed to partition(s): topic_name-11, topic_name-12, topic_name-13 org.apache.kafka.clients.consumer.KafkaConsumer assign Subscribed to partition(s): topic_name-14, topic_name-0, topic_name-10 org.apache.kafka.clients.consumer.KafkaConsumer assign Subscribed to partition(s): topic_name-5, topic_name-6, topic_name-10 org.apache.kafka.clients.consumer.KafkaConsumer assign Subscribed to partition(s): topic_name-2, topic_name-3, topic_name-7
From the above logs, it shows that partitions 10 and 13 have been subscribed by 2 consumers and partition 1 and 4 are not subscribed at all.
Note: If I start the job with 1 parallelism, the job works perfectly fine.
Flink Version: 1.3.3
Advertisement
Answer
This sounds like https://issues.apache.org/jira/browse/FLINK-7143.
Reading through the details in the Jira ticket and in the pull request (https://github.com/apache/flink/pull/4301), it sounds like if you are on Flink 1.3.x you can only benefit from this bug fix if you do a fresh restart. Restarting from a savepoint isn’t enough to benefit from the fix.