I have this very simple Beam Pipeline that reads records from a Kafka Topic and writes them to a Pulsar Topic:
PipelineOptions options = PipelineOptionsFactory.create(); Pipeline p = Pipeline.create(options); p.apply( KafkaIO.<Long, String>read() .withTopic("<topic>") .withBootstrapServers("<url>") .withKeyDeserializer(LongDeserializer.class) .withValueDeserializer(StringDeserializer.class) .updateConsumerProperties(getConsumerProps()) .withoutMetadata() // PCollection<KV<Long, String>> ) .apply(Values.<String>create()) .apply(ParDo.of(new PulsarSink())); p.run();
From my understanding this should create exactly one Kafka Consumer that pushes it’s values down the Pipeline. Now for some reason the Pipeline seems to restart over and over again creating multiple Kafka Consumers and multiple Pulsar Producers.
Here is an excerpt from the logs that show multiple Kafka Consumers being created:
2019-06-07 16:08:30,010 INFO o.a.k.c.consumer.ConsumerConfig - ConsumerConfig values: auto.commit.interval.ms = 5000 auto.offset.reset = earliest bootstrap.servers = [<url>:39701, <url>:39702, <url>:39703] check.crcs = true client.dns.lookup = default client.id = connections.max.idle.ms = 540000 default.api.timeout.ms = 60000 enable.auto.commit = false exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = 292330999892453 heartbeat.interval.ms = 3000 interceptor.classes = [] internal.leave.group.on.close = true isolation.level = read_uncommitted key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 300000 max.poll.records = 500 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 524288 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.config = [hidden] sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.mechanism = SCRAM-SHA-256 security.protocol = SASL_SSL send.buffer.bytes = 131072 session.timeout.ms = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = https ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer 2019-06-07 16:08:30,097 INFO o.a.k.c.s.a.AbstractLogin - Successfully logged in. 2019-06-07 16:08:30,204 INFO o.a.kafka.common.utils.AppInfoParser - Kafka version : 2.1.0 2019-06-07 16:08:30,205 INFO o.a.kafka.common.utils.AppInfoParser - Kafka commitId : eec43959745f444f 2019-06-07 16:08:30,684 INFO org.apache.kafka.clients.Metadata - Cluster ID: AKrCWqWfQKOfb9OSgwFyIQ 2019-06-07 16:08:30,693 INFO o.a.b.s.i.kafka.KafkaUnboundedSource - Partitions assigned to split 0 (total 1): 292330999892453.events.all.v1.json-0 2019-06-07 16:08:30,693 INFO o.a.b.s.i.kafka.KafkaUnboundedSource - Partitions assigned to split 1 (total 1): 292330999892453.events.all.v1.json-1 2019-06-07 16:08:30,693 INFO o.a.b.s.i.kafka.KafkaUnboundedSource - Partitions assigned to split 2 (total 1): 292330999892453.events.all.v1.json-2 2019-06-07 16:08:30,720 INFO o.a.k.c.consumer.ConsumerConfig - ConsumerConfig values: auto.commit.interval.ms = 5000 auto.offset.reset = earliest bootstrap.servers = [<url>:39701, <url>:39702, <url>:39703] check.crcs = true client.dns.lookup = default client.id = connections.max.idle.ms = 540000 default.api.timeout.ms = 60000 enable.auto.commit = false exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = 292330999892453 heartbeat.interval.ms = 3000 interceptor.classes = [] internal.leave.group.on.close = true isolation.level = read_uncommitted key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 300000 max.poll.records = 500 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 524288 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.config = [hidden] sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.mechanism = SCRAM-SHA-256 security.protocol = SASL_SSL send.buffer.bytes = 131072 session.timeout.ms = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = https ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer 2019-06-07 16:08:30,720 INFO o.a.k.c.consumer.ConsumerConfig - ConsumerConfig values: auto.commit.interval.ms = 5000 auto.offset.reset = earliest bootstrap.servers = [<url>:39701, <url>:39702, <url>:39703] check.crcs = true client.dns.lookup = default client.id = connections.max.idle.ms = 540000 default.api.timeout.ms = 60000 enable.auto.commit = false exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = 292330999892453 heartbeat.interval.ms = 3000 interceptor.classes = [] internal.leave.group.on.close = true isolation.level = read_uncommitted key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 300000 max.poll.records = 500 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 524288 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.config = [hidden] sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.mechanism = SCRAM-SHA-256 security.protocol = SASL_SSL send.buffer.bytes = 131072 session.timeout.ms = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = https ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer 2019-06-07 16:08:30,720 INFO o.a.k.c.consumer.ConsumerConfig - ConsumerConfig values: auto.commit.interval.ms = 5000 auto.offset.reset = earliest bootstrap.servers = [<url>:39701, <url>:39702, <url>:39703] check.crcs = true client.dns.lookup = default client.id = connections.max.idle.ms = 540000 default.api.timeout.ms = 60000 enable.auto.commit = false exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = 292330999892453 heartbeat.interval.ms = 3000 interceptor.classes = [] internal.leave.group.on.close = true isolation.level = read_uncommitted key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 300000 max.poll.records = 500 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 524288 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.config = [hidden] sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.mechanism = SCRAM-SHA-256 security.protocol = SASL_SSL send.buffer.bytes = 131072 session.timeout.ms = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = https ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer 2019-06-07 16:08:30,721 INFO o.a.k.c.s.a.AbstractLogin - Successfully logged in. 2019-06-07 16:08:30,734 INFO o.a.kafka.common.utils.AppInfoParser - Kafka version : 2.1.0 2019-06-07 16:08:30,734 INFO o.a.kafka.common.utils.AppInfoParser - Kafka commitId : eec43959745f444f 2019-06-07 16:08:30,742 INFO o.a.kafka.common.utils.AppInfoParser - Kafka version : 2.1.0 2019-06-07 16:08:30,742 INFO o.a.kafka.common.utils.AppInfoParser - Kafka commitId : eec43959745f444f 2019-06-07 16:08:30,743 INFO o.a.kafka.common.utils.AppInfoParser - Kafka version : 2.1.0 2019-06-07 16:08:30,743 INFO o.a.kafka.common.utils.AppInfoParser - Kafka commitId : eec43959745f444f 2019-06-07 16:08:31,116 INFO org.apache.kafka.clients.Metadata - Cluster ID: AKrCWqWfQKOfb9OSgwFyIQ 2019-06-07 16:08:31,117 INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-2, groupId=292330999892453] Discovered group coordinator <url>:39703 (id: 2147483644 rack: null) 2019-06-07 16:08:31,145 INFO org.apache.kafka.clients.Metadata - Cluster ID: AKrCWqWfQKOfb9OSgwFyIQ 2019-06-07 16:08:31,145 INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-3, groupId=292330999892453] Discovered group coordinator <url>:39703 (id: 2147483644 rack: null) 2019-06-07 16:08:31,147 INFO org.apache.kafka.clients.Metadata - Cluster ID: AKrCWqWfQKOfb9OSgwFyIQ 2019-06-07 16:08:31,148 INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-4, groupId=292330999892453] Discovered group coordinator <url>:39703 (id: 2147483644 rack: null) 2019-06-07 16:08:31,351 INFO o.a.b.s.i.kafka.KafkaUnboundedSource - Reader-0: reading from 292330999892453.events.all.v1.json-0 starting at offset 318186186 2019-06-07 16:08:31,352 INFO o.a.k.c.consumer.ConsumerConfig - ConsumerConfig values: auto.commit.interval.ms = 5000 auto.offset.reset = earliest bootstrap.servers = [<url>:39701, <url>:39702, <url>:39703] check.crcs = true client.dns.lookup = default client.id = connections.max.idle.ms = 540000 default.api.timeout.ms = 60000 enable.auto.commit = false exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = Reader-0_offset_consumer_1189437256_292330999892453 heartbeat.interval.ms = 3000 interceptor.classes = [] internal.leave.group.on.close = true isolation.level = read_uncommitted key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 300000 max.poll.records = 500 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 524288 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.config = [hidden] sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.mechanism = SCRAM-SHA-256 security.protocol = SASL_SSL send.buffer.bytes = 131072 session.timeout.ms = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = https ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer 2019-06-07 16:08:31,359 INFO o.a.kafka.common.utils.AppInfoParser - Kafka version : 2.1.0 2019-06-07 16:08:31,359 INFO o.a.kafka.common.utils.AppInfoParser - Kafka commitId : eec43959745f444f 2019-06-07 16:08:31,389 INFO o.a.b.s.i.kafka.KafkaUnboundedSource - Reader-1: reading from 292330999892453.events.all.v1.json-1 starting at offset 318738731 2019-06-07 16:08:31,389 INFO o.a.k.c.consumer.ConsumerConfig - ConsumerConfig values: auto.commit.interval.ms = 5000 auto.offset.reset = earliest bootstrap.servers = [<url>:39701, <url>:39702, <url>:39703] check.crcs = true client.dns.lookup = default client.id = connections.max.idle.ms = 540000 default.api.timeout.ms = 60000 enable.auto.commit = false exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = Reader-1_offset_consumer_1231768376_292330999892453 heartbeat.interval.ms = 3000 interceptor.classes = [] internal.leave.group.on.close = true isolation.level = read_uncommitted key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 300000 max.poll.records = 500 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 524288 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.config = [hidden] sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.mechanism = SCRAM-SHA-256 security.protocol = SASL_SSL send.buffer.bytes = 131072 session.timeout.ms = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = https ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer 2019-06-07 16:08:31,394 INFO o.a.b.s.i.kafka.KafkaUnboundedSource - Reader-2: reading from 292330999892453.events.all.v1.json-2 starting at offset 318129714 2019-06-07 16:08:31,394 INFO o.a.k.c.consumer.ConsumerConfig - ConsumerConfig values: auto.commit.interval.ms = 5000 auto.offset.reset = earliest bootstrap.servers = [<url>:39701, <url>:39702, <url>:39703] check.crcs = true client.dns.lookup = default client.id = connections.max.idle.ms = 540000 default.api.timeout.ms = 60000 enable.auto.commit = false exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = Reader-2_offset_consumer_64443017_292330999892453 heartbeat.interval.ms = 3000 interceptor.classes = [] internal.leave.group.on.close = true isolation.level = read_uncommitted key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 300000 max.poll.records = 500 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 524288 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.config = [hidden] sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.mechanism = SCRAM-SHA-256 security.protocol = SASL_SSL send.buffer.bytes = 131072 session.timeout.ms = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = https ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer 2019-06-07 16:08:31,395 INFO o.a.kafka.common.utils.AppInfoParser - Kafka version : 2.1.0 2019-06-07 16:08:31,395 INFO o.a.kafka.common.utils.AppInfoParser - Kafka commitId : eec43959745f444f 2019-06-07 16:08:31,397 INFO o.a.kafka.common.utils.AppInfoParser - Kafka version : 2.1.0 2019-06-07 16:08:31,398 INFO o.a.kafka.common.utils.AppInfoParser - Kafka commitId : eec43959745f444f 2019-06-07 16:08:31,613 INFO o.a.k.c.consumer.internals.Fetcher - [Consumer clientId=consumer-3, groupId=292330999892453] Fetch offset 318129714 is out of range for partition 292330999892453.events.all.v1.json-2, resetting offset 2019-06-07 16:08:31,613 INFO o.a.k.c.consumer.internals.Fetcher - [Consumer clientId=consumer-2, groupId=292330999892453] Fetch offset 318186186 is out of range for partition 292330999892453.events.all.v1.json-0, resetting offset 2019-06-07 16:08:31,641 INFO o.a.k.c.consumer.internals.Fetcher - [Consumer clientId=consumer-3, groupId=292330999892453] Resetting offset for partition 292330999892453.events.all.v1.json-2 to offset 320367573. 2019-06-07 16:08:31,641 INFO o.a.k.c.consumer.internals.Fetcher - [Consumer clientId=consumer-2, groupId=292330999892453] Resetting offset for partition 292330999892453.events.all.v1.json-0 to offset 321301099. 2019-06-07 16:08:31,648 INFO org.apache.kafka.clients.Metadata - Cluster ID: AKrCWqWfQKOfb9OSgwFyIQ 2019-06-07 16:08:31,649 INFO o.a.k.c.consumer.internals.Fetcher - [Consumer clientId=consumer-4, groupId=292330999892453] Fetch offset 318738731 is out of range for partition 292330999892453.events.all.v1.json-1, resetting offset 2019-06-07 16:08:31,667 INFO org.apache.kafka.clients.Metadata - Cluster ID: AKrCWqWfQKOfb9OSgwFyIQ 2019-06-07 16:08:31,672 INFO o.a.k.c.consumer.internals.Fetcher - [Consumer clientId=consumer-4, groupId=292330999892453] Resetting offset for partition 292330999892453.events.all.v1.json-1 to offset 320867070. 2019-06-07 16:08:31,714 INFO org.apache.kafka.clients.Metadata - Cluster ID: AKrCWqWfQKOfb9OSgwFyIQ 2019-06-07 16:08:31,860 INFO o.a.k.c.consumer.internals.Fetcher - [Consumer clientId=consumer-5, groupId=Reader-0_offset_consumer_1189437256_292330999892453] Resetting offset for partition 292330999892453.events.all.v1.json-0 to offset 336281187. 2019-06-07 16:08:31,885 INFO o.a.k.c.consumer.internals.Fetcher - [Consumer clientId=consumer-5, groupId=Reader-0_offset_consumer_1189437256_292330999892453] Resetting offset for partition 292330999892453.events.all.v1.json-0 to offset 336281187. 2019-06-07 16:08:31,905 INFO o.a.k.c.consumer.internals.Fetcher - [Consumer clientId=consumer-6, groupId=Reader-1_offset_consumer_1231768376_292330999892453] Resetting offset for partition 292330999892453.events.all.v1.json-1 to offset 336474159. 2019-06-07 16:08:31,938 INFO o.a.k.c.consumer.internals.Fetcher - [Consumer clientId=consumer-6, groupId=Reader-1_offset_consumer_1231768376_292330999892453] Resetting offset for partition 292330999892453.events.all.v1.json-1 to offset 336474159. 2019-06-07 16:08:31,957 INFO o.a.k.c.consumer.internals.Fetcher - [Consumer clientId=consumer-7, groupId=Reader-2_offset_consumer_64443017_292330999892453] Resetting offset for partition 292330999892453.events.all.v1.json-2 to offset 336295646. 2019-06-07 16:08:31,981 INFO o.a.k.c.consumer.internals.Fetcher - [Consumer clientId=consumer-7, groupId=Reader-2_offset_consumer_64443017_292330999892453] Resetting offset for partition 292330999892453.events.all.v1.json-2 to offset 336295646. 2019-06-07 16:08:32,142 INFO o.a.b.s.i.kafka.KafkaUnboundedSource - Reader-0: first record offset 321301099
Why are Kafka Consumers being restarted over and over again? Is this the expected behavior?
Advertisement
Answer
The primary purpose of the DirectRunner is local testing of Pipelines. As such, it exhibits behavior, that might be suboptimal performance-wise. One example might be that it purposefully serializes and deserializes data between operators, even though it is not necessary. The reason is to validate that the application code does not modify the input object, which is forbidden in Beam. The reason why there are many consumers being created is another example of this – DirectRunner performs a checkpoint (including many possibly unnecessary steps, like re-creating the consumer), very often – see here.
As such, DirectRunner really should be used only in tests and/or moderate conditions, where performance is not a concern. When performance is a concern, a different runner should be used – either some distributed, or local version of such runner – e.g. local FlinkRunner.