We use Akka Stream Kafka for producing and consuming messages and Strimzi Kafka cluster. Here are the versions if matters:
com.typesafe.akka:akka-stream-kafka_2.13:2.0.7 com.typesafe.akka:akka-stream_2.13:2.6.14 org.apache.kafka:kafka-clients:2.4.1 (*) org.scala-lang.modules:scala-collection-compat_2.13:2.1.6 org.scala-lang:scala-library:2.13.5
After a refactoring message consumer stopped working. We do have messages in the topic, but consumer just waits endlessly.
Here is the log fragment:
[2021-04-14 21:20:43,869] [INFO] [org.apache.kafka.common.utils.AppInfoParser] [SchemaRegistrySerializationTest-akka.kafka.default-dispatcher-8] - Kafka version: 2.4.1 [2021-04-14 21:20:43,869] [INFO] [org.apache.kafka.common.utils.AppInfoParser] [SchemaRegistrySerializationTest-akka.kafka.default-dispatcher-8] - Kafka commitId: c57222ae8cd7866b [2021-04-14 21:20:43,869] [INFO] [org.apache.kafka.common.utils.AppInfoParser] [SchemaRegistrySerializationTest-akka.kafka.default-dispatcher-8] - Kafka startTimeMs: 1618424443866 [2021-04-14 21:20:43,879] [INFO] [org.apache.kafka.clients.consumer.KafkaConsumer] [SchemaRegistrySerializationTest-akka.kafka.default-dispatcher-8] - [Consumer clientId=consumer-xyz-abc-import-1, groupId=xyz-abc-import] Subscribed to topic(s): xyz-abc-import-dev-abc-input-topic [2021-04-14 21:20:45,907] [INFO] [org.apache.kafka.clients.Metadata] [SchemaRegistrySerializationTest-akka.kafka.default-dispatcher-19] - [Consumer clientId=consumer-xyz-abc-import-1, groupId=xyz-abc-import] Cluster ID: L9OdIPABTGa7V9OPdViAaw [2021-04-14 21:20:45,973] [INFO] [org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [SchemaRegistrySerializationTest-akka.kafka.default-dispatcher-21] - [Consumer clientId=consumer-xyz-abc-import-1, groupId=xyz-abc-import] Discovered group coordinator kafka-staging-abc-cluster-kafka-0-kafka-abc.xyzabccluster-host:443 (id: 2147483647 rack: null) [2021-04-14 21:20:46,245] [INFO] [org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [SchemaRegistrySerializationTest-akka.kafka.default-dispatcher-21] - [Consumer clientId=consumer-xyz-abc-import-1, groupId=xyz-abc-import] (Re-)joining group [2021-04-14 21:20:47,554] [INFO] [org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [SchemaRegistrySerializationTest-akka.kafka.default-dispatcher-9] - [Consumer clientId=consumer-xyz-abc-import-1, groupId=xyz-abc-import] (Re-)joining group [2021-04-14 21:20:50,780] [INFO] [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] [SchemaRegistrySerializationTest-akka.kafka.default-dispatcher-22] - [Consumer clientId=consumer-xyz-abc-import-1, groupId=xyz-abc-import] Finished assignment for group at generation 5: {consumer-xyz-abc-import-1-995fd3d7-24b5-480d-90bc-b0967f0898f0=Assignment(partitions=[xyz-abc-import-dev-abc-input-topic-0, xyz-abc-import-dev-abc-input-topic-1])} [2021-04-14 21:20:51,114] [INFO] [org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [SchemaRegistrySerializationTest-akka.kafka.default-dispatcher-16] - [Consumer clientId=consumer-xyz-abc-import-1, groupId=xyz-abc-import] Successfully joined group with generation 5 [2021-04-14 21:20:51,125] [INFO] [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] [SchemaRegistrySerializationTest-akka.kafka.default-dispatcher-16] - [Consumer clientId=consumer-xyz-abc-import-1, groupId=xyz-abc-import] Adding newly assigned partitions: xyz-abc-import-dev-abc-input-topic-0, xyz-abc-import-dev-abc-input-topic-1 [2021-04-14 21:20:51,334] [INFO] [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] [SchemaRegistrySerializationTest-akka.kafka.default-dispatcher-18] - [Consumer clientId=consumer-xyz-abc-import-1, groupId=xyz-abc-import] Setting offset for partition xyz-abc-import-dev-abc-input-topic-0 to the committed offset FetchPosition{offset=38, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=kafka-staging-abc-cluster-kafka-0-kafka-abc.xyzabccluster-host:443 (id: 0 rack: null), epoch=0}} [2021-04-14 21:20:51,336] [INFO] [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] [SchemaRegistrySerializationTest-akka.kafka.default-dispatcher-18] - [Consumer clientId=consumer-xyz-abc-import-1, groupId=xyz-abc-import] Setting offset for partition xyz-abc-import-dev-abc-input-topic-1 to the committed offset FetchPosition{offset=51, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=kafka-staging-abc-cluster-kafka-1-kafka-abc.xyzabccluster-host:443 (id: 1 rack: null), epoch=0}}
Some more points:
- Schema registry properly configured and good (otherwise producer would not work).
- Topic (and group coordinator) is good, I can consume messages via plain consumer like that:
KafkaConsumer<String, MyMsg> consumer = new KafkaConsumer<String, MyMsg>(props); consumer.subscribe(Collections.singletonList(inputTopic), rebalanceListener); ConsumerRecords<String, MyMsg> records = consumer.poll(Duration.of(60L, ChronoUnit.SECONDS));
- I use “earliest” reset config property, so it should see unconsumed messages.
- This is what I have in the topic when I run – in partition 1 I have messages at offset 51 and 52, and consumer coordinator set to offset 51 in log, so the message is there (in fact, it must read 2 messages I produced for this testing)
This is the place where the code stuck — I use blocking call to get 2 messages (and cannot get even 1)
final Source<ProducerMessage.Results<String, Object, ConsumerMessage.PartitionOffset>, NotUsed> stream = ... stream.take(SUBMISSION_SIZE).runWith(Sink.ignore(), mat).toCompletableFuture().get();
Not really sure how to debug it. 8-(
[UPD 1]
Can it relate somehow to transactions? Bacause plain Akka Stream Consumer can see the messages and consume:
@Test @Order(3) void exploreTopic() throws IOException, ExecutionException, InterruptedException { Consumer.DrainingControl<java.util.List<ConsumerRecord<String, Object>>> controlCompletionStagePair = Consumer.plainSource(consumerSettings, Subscriptions.topics(inputTopic)) .take(SUBMISSION_SIZE) .map(x -> { System.out.println(x); return x; }) .toMat(Sink.seq(), Consumer::createDrainingControl) .run(mat); controlCompletionStagePair.streamCompletion().toCompletableFuture().get(); System.out.println("xxx"); }
[UPD 2]
I turned on DEBUG level of logging and see my partitions are placed on pause:
[2021-04-14 23:46:05,122] [DEBUG] [org.apache.kafka.clients.consumer.internals.Fetcher] [kafka-coordinator-heartbeat-thread | xyz-abc-import] - [Consumer clientId=consumer-xyz-abc-import-1, groupId=xyz-abc-import] Fetch READ_COMMITTED at offset 40 for partition xyz-abc-import-dev-abc-input-topic-0 returned fetch data (error=NONE, highWaterMark=40, lastStableOffset = 40, logStartOffset = 40, preferredReadReplica = absent, abortedTransactions = [], recordsSizeInBytes=0) [2021-04-14 23:46:05,725] [DEBUG] [org.apache.kafka.clients.consumer.internals.Fetcher] [SchemaRegistrySerializationTest-akka.kafka.default-dispatcher-6] - [Consumer clientId=consumer-xyz-abc-import-1, groupId=xyz-abc-import] Added READ_COMMITTED fetch request for partition xyz-abc-import-dev-abc-input-topic-0 at position FetchPosition{offset=40, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=kafka-staging-abc-cluster-kafka-0-kafka-abc.xyzabccluster-host:443 (id: 0 rack: null), epoch=0}} to node kafka-staging-abc-cluster-kafka-0-kafka-abc.xyzabccluster-host:443 (id: 0 rack: null) [2021-04-14 23:46:05,725] [DEBUG] [org.apache.kafka.clients.consumer.internals.Fetcher] [SchemaRegistrySerializationTest-akka.kafka.default-dispatcher-6] - [Consumer clientId=consumer-xyz-abc-import-1, groupId=xyz-abc-import] Added READ_COMMITTED fetch request for partition xyz-abc-import-dev-abc-input-topic-1 at position FetchPosition{offset=61, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=kafka-staging-abc-cluster-kafka-1-kafka-abc.xyzabccluster-host:443 (id: 1 rack: null), epoch=0}} to node kafka-staging-abc-cluster-kafka-1-kafka-abc.xyzabccluster-host:443 (id: 1 rack: null) [2021-04-14 23:46:05,725] [DEBUG] [org.apache.kafka.clients.FetchSessionHandler] [SchemaRegistrySerializationTest-akka.kafka.default-dispatcher-6] - [Consumer clientId=consumer-xyz-abc-import-1, groupId=xyz-abc-import] Built incremental fetch (sessionId=339828876, epoch=1) for node 0. Added 0 partition(s), altered 0 partition(s), removed 0 partition(s) out of 1 partition(s) [2021-04-14 23:46:05,725] [DEBUG] [org.apache.kafka.clients.FetchSessionHandler] [SchemaRegistrySerializationTest-akka.kafka.default-dispatcher-6] - [Consumer clientId=consumer-xyz-abc-import-1, groupId=xyz-abc-import] Built incremental fetch (sessionId=23429972, epoch=1) for node 1. Added 0 partition(s), altered 1 partition(s), removed 0 partition(s) out of 1 partition(s) [2021-04-14 23:46:05,725] [DEBUG] [org.apache.kafka.clients.consumer.internals.Fetcher] [SchemaRegistrySerializationTest-akka.kafka.default-dispatcher-6] - [Consumer clientId=consumer-xyz-abc-import-1, groupId=xyz-abc-import] Sending READ_COMMITTED IncrementalFetchRequest(toSend=(), toForget=(), implied=(xyz-abc-import-dev-abc-input-topic-0)) to broker kafka-staging-abc-cluster-kafka-0-kafka-abc.xyzabccluster-host:443 (id: 0 rack: null) [2021-04-14 23:46:05,726] [DEBUG] [org.apache.kafka.clients.consumer.internals.Fetcher] [SchemaRegistrySerializationTest-akka.kafka.default-dispatcher-6] - [Consumer clientId=consumer-xyz-abc-import-1, groupId=xyz-abc-import] Sending READ_COMMITTED IncrementalFetchRequest(toSend=(xyz-abc-import-dev-abc-input-topic-1), toForget=(), implied=()) to broker kafka-staging-abc-cluster-kafka-1-kafka-abc.xyzabccluster-host:443 (id: 1 rack: null) [2021-04-14 23:46:05,796] [DEBUG] [org.apache.kafka.clients.consumer.KafkaConsumer] [SchemaRegistrySerializationTest-akka.kafka.default-dispatcher-7] - [Consumer clientId=consumer-xyz-abc-import-1, groupId=xyz-abc-import] Pausing partitions [xyz-abc-import-dev-abc-input-topic-0, xyz-abc-import-dev-abc-input-topic-1] [2021-04-14 23:46:05,866] [DEBUG] [org.apache.kafka.clients.consumer.KafkaConsumer] [SchemaRegistrySerializationTest-akka.kafka.default-dispatcher-10] - [Consumer clientId=consumer-xyz-abc-import-1, groupId=xyz-abc-import] Pausing partitions [xyz-abc-import-dev-abc-input-topic-0, xyz-abc-import-dev-abc-input-topic-1] [2021-04-14 23:46:05,935] [DEBUG] [org.apache.kafka.clients.consumer.KafkaConsumer] [SchemaRegistrySerializationTest-akka.kafka.default-dispatcher-11] - [Consumer clientId=consumer-xyz-abc-import-1, groupId=xyz-abc-import] Pausing partitions [xyz-abc-import-dev-abc-input-topic-0, xyz-abc-import-dev-abc-input-topic-1] [2021-04-14 23:46:06,009] [DEBUG] [org.apache.kafka.clients.consumer.KafkaConsumer] [SchemaRegistrySerializationTest-akka.kafka.default-dispatcher-13] - [Consumer clientId=consumer-xyz-abc-import-1, groupId=xyz-abc-import] Pausing partitions [xyz-abc-import-dev-abc-input-topic-0, xyz-abc-import-dev-abc-input-topic-1] [2021-04-14 23:46:06,075] [DEBUG] [org.apache.kafka.clients.consumer.KafkaConsumer] [SchemaRegistrySerializationTest-akka.kafka.default-dispatcher-15] - [Consumer clientId=consumer-xyz-abc-import-1, groupId=xyz-abc-import] Pausing partitions [xyz-abc-import-dev-abc-input-topic-0, xyz-abc-import-dev-abc-input-topic-1] [2021-04-14 23:46:06,149] [DEBUG] [org.apache.kafka.clients.consumer.KafkaConsumer] [SchemaRegistrySerializationTest-akka.kafka.default-dispatcher-18] - [Consumer clientId=consumer-xyz-abc-import-1, groupId=xyz-abc-import] Pausing partitions [xyz-abc-import-dev-abc-input-topic-0, xyz-abc-import-dev-abc-input-topic-1] [2021-04-14 23:46:06,215] [DEBUG] [org.apache.kafka.clients.consumer.KafkaConsumer] [SchemaRegistrySerializationTest-akka.kafka.default-dispatcher-20] - [Consumer clientId=consumer-xyz-abc-import-1, groupId=xyz-abc-import] Pausing partitions [xyz-abc-import-dev-abc-input-topic-0, xyz-abc-import-dev-abc-input-topic-1] [2021-04-14 23:46:06,288] [DEBUG] [org.apache.kafka.clients.consumer.KafkaConsumer] [SchemaRegistrySerializationTest-akka.kafka.default-dispatcher-21] - [Consumer clientId=consumer-xyz-abc-import-1, groupId=xyz-abc-import] Pausing partitions [xyz-abc-import-dev-abc-input-topic-0, xyz-abc-import-dev-abc-input-topic-1] [2021-04-14 23:46:06,355] [DEBUG] [org.apache.kafka.clients.consumer.KafkaConsumer] [SchemaRegistrySerializationTest-akka.kafka.default-dispatcher-8] - [Consumer clientId=consumer-xyz-abc-import-1, groupId=xyz-abc-import] Pausing partitions [xyz-abc-import-dev-abc-input-topic-0, xyz-abc-import-dev-abc-input-topic-1] [2021-04-14 23:46:06,355] [DEBUG] [org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [kafka-coordinator-heartbeat-thread | xyz-abc-import] - [Consumer clientId=consumer-xyz-abc-import-1, groupId=xyz-abc-import] Sending Heartbeat request to coordinator kafka-staging-abc-cluster-kafka-0-kafka-abc.xyzabccluster-host:443 (id: 2147483647 rack: null) [2021-04-14 23:46:06,427] [DEBUG] [org.apache.kafka.clients.consumer.KafkaConsumer] [SchemaRegistrySerializationTest-akka.kafka.default-dispatcher-9] - [Consumer clientId=consumer-xyz-abc-import-1, groupId=xyz-abc-import] Pausing partitions [xyz-abc-import-dev-abc-input-topic-0, xyz-abc-import-dev-abc-input-topic-1] [2021-04-14 23:46:06,459] [DEBUG] [org.apache.kafka.clients.FetchSessionHandler] [kafka-coordinator-heartbeat-thread | xyz-abc-import] - [Consumer clientId=consumer-xyz-abc-import-1, groupId=xyz-abc-import] Node 1 sent an incremental fetch response for session 23429972 with 0 response partition(s), 1 implied partition(s) [2021-04-14 23:46:06,459] [DEBUG] [org.apache.kafka.clients.FetchSessionHandler] [kafka-coordinator-heartbeat-thread | xyz-abc-import] - [Consumer clientId=consumer-xyz-abc-import-1, groupId=xyz-abc-import] Node 0 sent an incremental fetch response for session 339828876 with 0 response partition(s), 1 implied partition(s) [2021-04-14 23:46:06,500] [DEBUG] [org.apache.kafka.clients.consumer.KafkaConsumer] [SchemaRegistrySerializationTest-akka.kafka.default-dispatcher-12] - [Consumer clientId=consumer-xyz-abc-import-1, groupId=xyz-abc-import] Pausing partitions [xyz-abc-import-dev-abc-input-topic-0, xyz-abc-import-dev-abc-input-topic-1] [2021-04-14 23:46:06,557] [DEBUG] [org.apache.kafka.clients.consumer.KafkaConsumer] [SchemaRegistrySerializationTest-akka.kafka.default-dispatcher-14] - [Consumer clientId=consumer-xyz-abc-import-1, groupId=xyz-abc-import] Pausing partitions [xyz-abc-import-dev-abc-input-topic-0, xyz-abc-import-dev-abc-input-topic-1] [2021-04-14 23:46:06,559] [DEBUG] [org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [SchemaRegistrySerializationTest-akka.kafka.default-dispatcher-14] - [Consumer clientId=consumer-xyz-abc-import-1, groupId=xyz-abc-import] Received successful Heartbeat response [2021-04-14 23:46:06,625] [DEBUG] [org.apache.kafka.clients.consumer.KafkaConsumer] [SchemaRegistrySerializationTest-akka.kafka.default-dispatcher-16] - [Consumer clientId=consumer-xyz-abc-import-1, groupId=xyz-abc-import] Pausing partitions [xyz-abc-import-dev-abc-input-topic-0, xyz-abc-import-dev-abc-input-topic-1] [2021-04-14 23:46:06,697] [DEBUG] [org.apache.kafka.clients.consumer.KafkaConsumer] [SchemaRegistrySerializationTest-akka.kafka.default-dispatcher-17] - [Consumer clientId=consumer-xyz-abc-import-1, groupId=xyz-abc-import] Pausing partitions [xyz-abc-import-dev-abc-input-topic-0, xyz-abc-import-dev-abc-input-topic-1] [2021-04-14 23:46:06,765] [DEBUG] [org.apache.kafka.clients.consumer.KafkaConsumer] [SchemaRegistrySerializationTest-akka.kafka.default-dispatcher-19] - [Consumer clientId=consumer-xyz-abc-import-1, groupId=xyz-abc-import] Pausing partitions [xyz-abc-import-dev-abc-input-topic-0, xyz-abc-import-dev-abc-input-topic-1]
[UPD 3] Does it have to do with auto created topics?
Advertisement
Answer
It’s hard to tell how it impacts, but consumer started working properly once I added this property to consumer:
props.put(AvroDatumProvider.REGISTRY_USE_SPECIFIC_AVRO_READER_CONFIG_PARAM, String.valueOf(Boolean.TRUE));
Now sure what was the real cause, and why there was no error thrown, but adding property above helped, at least in my case.