It’s my first Kafka program.
From a kafka_2.13-3.1.0
instance, I created a Kafka topic poids_garmin_brut
and filled it with this csv
:
kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic poids_garmin_brut kafka-console-producer.sh --broker-list localhost:9092 --topic poids_garmin_brut < "Poids(1).csv"
Durée,Poids,Variation,IMC,Masse grasse,Masse musculaire squelettique,Masse osseuse,Masse hydrique, " 14 Fév. 2022", 06:37,72.1 kg,0.3 kg,22.8,26.3 %,29.7 kg,3.5 kg,53.8 %, " 13 Fév. 2022", 06:48,72.4 kg,0.2 kg,22.9,25.4 %,29.8 kg,3.6 kg,54.4 %, " 12 Fév. 2022", 06:17,72.2 kg,0.0 kg,22.8,25.3 %,29.7 kg,3.6 kg,54.5 %, [...]
And at anytime now, before or after running the program I’ll show, its content can be displayed by a kafka-console-consumer
command:
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic poids_garmin_brut --from-beginning Durée,Poids,Variation,IMC,Masse grasse,Masse musculaire squelettique,Masse osseuse,Masse hydrique, " 14 Fév. 2022", 06:37,72.1 kg,0.3 kg,22.8,26.3 %,29.7 kg,3.5 kg,53.8 %, " 13 Fév. 2022", 06:48,72.4 kg,0.2 kg,22.9,25.4 %,29.8 kg,3.6 kg,54.4 %, " 12 Fév. 2022", 06:17,72.2 kg,0.0 kg,22.8,25.3 %,29.7 kg,3.6 kg,54.5 %, " 11 Fév. 2022", 05:54,72.2 kg,0.1 kg,22.8,25.6 %,29.7 kg,3.5 kg,54.3 %, " 10 Fév. 2022", 06:14,72.3 kg,0.0 kg,22.8,25.9 %,29.7 kg,3.5 kg,54.1 %, " 9 Fév. 2022", 06:06,72.3 kg,0.5 kg,22.8,26.3 %,29.7 kg,3.5 kg,53.8 %, " 8 Fév. 2022", 07:14,71.8 kg,0.7 kg,22.7,26.3 %,29.6 kg,3.5 kg,53.8 %,
Here is the Java program, based on org.apache.kafka:kafka-streams:3.1.0
dependency, extracting this topic as a stream:
package extracteur.garmin; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.KStream; import org.slf4j.*; import org.springframework.boot.autoconfigure.SpringBootApplication; import java.util.Properties; @SpringBootApplication public class Kafka { /** Logger. */ private static final Logger LOGGER = LoggerFactory.getLogger(Kafka.class); public static void main(String[] args) { LOGGER.info("L'extracteur de données Garmin démarre..."); /* Les données du fichier CSV d'entrée sont sous cette forme : Durée,Poids,Variation,IMC,Masse grasse,Masse musculaire squelettique,Masse osseuse,Masse hydrique, " 14 Fév. 2022", 06:37,72.1 kg,0.3 kg,22.8,26.3 %,29.7 kg,3.5 kg,53.8 %, " 13 Fév. 2022", 06:48,72.4 kg,0.2 kg,22.9,25.4 %,29.8 kg,3.6 kg,54.4 %, */ // Création d'un flux sans clef et valeur : chaîne de caractères. StreamsBuilder builder = new StreamsBuilder(); KStream<Void,String> stream = builder.stream("poids_garmin_brut"); // C'est un foreach de Kafka, pas de lambda java. Il est lazy. stream.foreach((key, value) -> { LOGGER.info(value); }); KafkaStreams streams = new KafkaStreams(builder.build(), config()); streams.start(); // Fermer le flux Kafka quand la VM s'arrêtera, en faisant appeler streams.close(); Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); } /** * Propriétés pour le démarrage. * @return propriétés de configuration. */ private static Properties config() { Properties config = new Properties(); config.put(StreamsConfig.APPLICATION_ID_CONFIG, "dev1"); config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Void().getClass()); config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); return config; } }
But, while the logs don’t seem to report any error during execution, my program doesn’t enter the stream.forEach
, and therefore: displays no content from that topic.
(in this log I removed the dev1-d1c8ce47-6fbf-41b7-b8aa-e3d094703088-
part of [dev1-d1c8ce47-6fbf-41b7-b8aa-e3d094703088-StreamThread-1]
you should read inside, for SO message length and lisibility. And org.apache.kafka
becames o.a.k.
).
/usr/lib/jvm/java-1.11.0-openjdk-amd64/bin/java -XX:TieredStopAtLevel=1 -noverify -Dspring.output.ansi.enabled=always -Dcom.sun.management.jmxremote -Dspring.jmx.enabled=true -Dspring.liveBeansView.mbeanDomain -Dspring.application.admin.enabled=true -javaagent:/opt/idea-IU-212.5284.40/lib/idea_rt.jar=41397:/opt/idea-IU-212.5284.40/bin -Dfile.encoding=UTF-8 -classpath /home/lebihan/dev/Java/garmin/target/classes:/home/lebihan/.m2/repository/org/slf4j/slf4j-api/1.7.33/slf4j-api-1.7.33.jar:/home/lebihan/.m2/repository/org/slf4j/log4j-over-slf4j/1.7.33/log4j-over-slf4j-1.7.33.jar:/home/lebihan/.m2/repository/ch/qos/logback/logback-classic/1.2.10/logback-classic-1.2.10.jar:/home/lebihan/.m2/repository/ch/qos/logback/logback-core/1.2.10/logback-core-1.2.10.jar:/home/lebihan/.m2/repository/org/springframework/boot/spring-boot-starter-web/2.6.3/spring-boot-starter-web-2.6.3.jar:/home/lebihan/.m2/repository/org/springframework/boot/spring-boot-starter/2.6.3/spring-boot-starter-2.6.3.jar:/home/lebihan/.m2/repository/org/springframework/boot/spring-boot/2.6.3/spring-boot-2.6.3.jar:/home/lebihan/.m2/repository/org/springframework/boot/spring-boot-autoconfigure/2.6.3/spring-boot-autoconfigure-2.6.3.jar:/home/lebihan/.m2/repository/org/springframework/boot/spring-boot-starter-logging/2.6.3/spring-boot-starter-logging-2.6.3.jar:/home/lebihan/.m2/repository/org/apache/logging/log4j/log4j-to-slf4j/2.17.1/log4j-to-slf4j-2.17.1.jar:/home/lebihan/.m2/repository/org/apache/logging/log4j/log4j-api/2.17.1/log4j-api-2.17.1.jar:/home/lebihan/.m2/repository/org/slf4j/jul-to-slf4j/1.7.33/jul-to-slf4j-1.7.33.jar:/home/lebihan/.m2/repository/jakarta/annotation/jakarta.annotation-api/1.3.5/jakarta.annotation-api-1.3.5.jar:/home/lebihan/.m2/repository/org/yaml/snakeyaml/1.29/snakeyaml-1.29.jar:/home/lebihan/.m2/repository/org/springframework/boot/spring-boot-starter-json/2.6.3/spring-boot-starter-json-2.6.3.jar:/home/lebihan/.m2/repository/com/fasterxml/jackson/datatype/jackson-datatype-jdk8/2.13.1/jackson-datatype-jdk8-2.13.1.jar:/home/lebihan/.m2/repository/com/fasterxml/jackson/datatype/jackson-datatype-jsr310/2.13.1/jackson-datatype-jsr310-2.13.1.jar:/home/lebihan/.m2/repository/com/fasterxml/jackson/module/jackson-module-parameter-names/2.13.1/jackson-module-parameter-names-2.13.1.jar:/home/lebihan/.m2/repository/org/springframework/boot/spring-boot-starter-tomcat/2.6.3/spring-boot-starter-tomcat-2.6.3.jar:/home/lebihan/.m2/repository/org/apache/tomcat/embed/tomcat-embed-core/9.0.56/tomcat-embed-core-9.0.56.jar:/home/lebihan/.m2/repository/org/apache/tomcat/embed/tomcat-embed-el/9.0.56/tomcat-embed-el-9.0.56.jar:/home/lebihan/.m2/repository/org/apache/tomcat/embed/tomcat-embed-websocket/9.0.56/tomcat-embed-websocket-9.0.56.jar:/home/lebihan/.m2/repository/org/springframework/spring-web/5.3.15/spring-web-5.3.15.jar:/home/lebihan/.m2/repository/org/springframework/spring-beans/5.3.15/spring-beans-5.3.15.jar:/home/lebihan/.m2/repository/org/springframework/spring-webmvc/5.3.15/spring-webmvc-5.3.15.jar:/home/lebihan/.m2/repository/org/springframework/spring-aop/5.3.15/spring-aop-5.3.15.jar:/home/lebihan/.m2/repository/org/springframework/spring-context/5.3.15/spring-context-5.3.15.jar:/home/lebihan/.m2/repository/org/springframework/spring-expression/5.3.15/spring-expression-5.3.15.jar:/home/lebihan/.m2/repository/org/springframework/spring-core/5.3.15/spring-core-5.3.15.jar:/home/lebihan/.m2/repository/org/springframework/spring-jcl/5.3.15/spring-jcl-5.3.15.jar:/home/lebihan/.m2/repository/org/apache/kafka/kafka-streams/3.1.0/kafka-streams-3.1.0.jar:/home/lebihan/.m2/repository/org/apache/kafka/kafka-clients/3.0.0/kafka-clients-3.0.0.jar:/home/lebihan/.m2/repository/com/github/luben/zstd-jni/1.5.0-2/zstd-jni-1.5.0-2.jar:/home/lebihan/.m2/repository/org/lz4/lz4-java/1.7.1/lz4-java-1.7.1.jar:/home/lebihan/.m2/repository/org/xerial/snappy/snappy-java/1.1.8.1/snappy-java-1.1.8.1.jar:/home/lebihan/.m2/repository/org/rocksdb/rocksdbjni/6.22.1.1/rocksdbjni-6.22.1.1.jar:/home/lebihan/.m2/repository/com/fasterxml/jackson/core/jackson-annotations/2.13.1/jackson-annotations-2.13.1.jar:/home/lebihan/.m2/repository/com/fasterxml/jackson/core/jackson-databind/2.13.1/jackson-databind-2.13.1.jar:/home/lebihan/.m2/repository/com/fasterxml/jackson/core/jackson-core/2.13.1/jackson-core-2.13.1.jar extracteur.garmin.Kafka 07:57:49.720 [main] INFO extracteur.garmin.Kafka - L'extracteur de données Garmin démarre... 07:57:49.747 [main] INFO o.a.k.streams.StreamsConfig - StreamsConfig values: acceptable.recovery.lag = 10000 application.id = dev1 application.server = bootstrap.servers = [localhost:9092] buffered.records.per.partition = 1000 built.in.metrics.version = latest cache.max.bytes.buffering = 10485760 client.id = commit.interval.ms = 30000 connections.max.idle.ms = 540000 default.deserialization.exception.handler = class o.a.k.streams.errors.LogAndFailExceptionHandler default.key.serde = class o.a.k.common.serialization.Serdes$VoidSerde default.list.key.serde.inner = null default.list.key.serde.type = null default.list.value.serde.inner = null default.list.value.serde.type = null default.production.exception.handler = class o.a.k.streams.errors.DefaultProductionExceptionHandler default.timestamp.extractor = class o.a.k.streams.processor.FailOnInvalidTimestamp default.value.serde = class o.a.k.common.serialization.Serdes$StringSerde max.task.idle.ms = 0 max.warmup.replicas = 2 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 num.standby.replicas = 0 num.stream.threads = 1 poll.ms = 100 probing.rebalance.interval.ms = 600000 processing.guarantee = at_least_once receive.buffer.bytes = 32768 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 replication.factor = -1 request.timeout.ms = 40000 retries = 0 retry.backoff.ms = 100 rocksdb.config.setter = null security.protocol = PLAINTEXT send.buffer.bytes = 131072 state.cleanup.delay.ms = 600000 state.dir = /tmp/kafka-streams task.timeout.ms = 300000 topology.optimization = none upgrade.from = null window.size.ms = null windowed.inner.class.serde = null windowstore.changelog.additional.retention.ms = 86400000 07:57:49.760 [main] INFO o.a.k.clients.admin.AdminClientConfig - AdminClientConfig values: bootstrap.servers = [localhost:9092] client.dns.lookup = use_all_dns_ips client.id = admin connections.max.idle.ms = 300000 default.api.timeout.ms = 60000 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 receive.buffer.bytes = 65536 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retries = 2147483647 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT security.providers = null send.buffer.bytes = 131072 socket.connection.setup.timeout.max.ms = 30000 socket.connection.setup.timeout.ms = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.3] ssl.endpoint.identification.algorithm = https ssl.engine.factory.class = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.certificate.chain = null ssl.keystore.key = null ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLSv1.3 ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.certificates = null ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS 07:57:49.790 [main] INFO o.a.k.common.utils.AppInfoParser - Kafka version: 3.0.0 07:57:49.790 [main] INFO o.a.k.common.utils.AppInfoParser - Kafka commitId: 8cb0a5e9d3441962 07:57:49.790 [main] INFO o.a.k.common.utils.AppInfoParser - Kafka startTimeMs: 1644908269788 07:57:49.793 [main] INFO o.a.k.streams.KafkaStreams - stream-client [dev1-d1c8ce47-6fbf-41b7-b8aa-e3d094703088] Kafka Streams version: 3.1.0 07:57:49.793 [main] INFO o.a.k.streams.KafkaStreams - stream-client [dev1-d1c8ce47-6fbf-41b7-b8aa-e3d094703088] Kafka Streams commit ID: 37edeed0777bacb3 07:57:49.800 [main] INFO o.a.k.streams.processor.internals.StreamThread - stream-thread [StreamThread-1] Creating restore consumer client 07:57:49.802 [main] INFO o.a.k.clients.consumer.ConsumerConfig - ConsumerConfig values: allow.auto.create.topics = true auto.commit.interval.ms = 5000 auto.offset.reset = none bootstrap.servers = [localhost:9092] check.crcs = true client.dns.lookup = use_all_dns_ips client.id = StreamThread-1-restore-consumer client.rack = connections.max.idle.ms = 540000 default.api.timeout.ms = 60000 enable.auto.commit = false exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = null group.instance.id = null heartbeat.interval.ms = 3000 interceptor.classes = [] internal.leave.group.on.close = false internal.throw.on.fetch.stable.offset.unsupported = false isolation.level = read_uncommitted key.deserializer = class o.a.k.common.serialization.ByteArrayDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 300000 max.poll.records = 1000 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partition.assignment.strategy = [class o.a.k.clients.consumer.RangeAssignor, class o.a.k.clients.consumer.CooperativeStickyAssignor] receive.buffer.bytes = 65536 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT security.providers = null send.buffer.bytes = 131072 session.timeout.ms = 45000 socket.connection.setup.timeout.max.ms = 30000 socket.connection.setup.timeout.ms = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.3] ssl.endpoint.identification.algorithm = https ssl.engine.factory.class = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.certificate.chain = null ssl.keystore.key = null ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLSv1.3 ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.certificates = null ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = class o.a.k.common.serialization.ByteArrayDeserializer 07:57:49.816 [main] INFO o.a.k.common.utils.AppInfoParser - Kafka version: 3.0.0 07:57:49.816 [main] INFO o.a.k.common.utils.AppInfoParser - Kafka commitId: 8cb0a5e9d3441962 07:57:49.816 [main] INFO o.a.k.common.utils.AppInfoParser - Kafka startTimeMs: 1644908269816 07:57:49.818 [main] INFO o.a.k.streams.processor.internals.StreamThread - stream-thread [StreamThread-1] Creating thread producer client 07:57:49.820 [main] INFO o.a.k.clients.producer.ProducerConfig - ProducerConfig values: acks = -1 batch.size = 16384 bootstrap.servers = [localhost:9092] buffer.memory = 33554432 client.dns.lookup = use_all_dns_ips client.id = StreamThread-1-producer compression.type = none connections.max.idle.ms = 540000 delivery.timeout.ms = 120000 enable.idempotence = true interceptor.classes = [] key.serializer = class o.a.k.common.serialization.ByteArraySerializer linger.ms = 100 max.block.ms = 60000 max.in.flight.requests.per.connection = 5 max.request.size = 1048576 metadata.max.age.ms = 300000 metadata.max.idle.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partitioner.class = class o.a.k.clients.producer.internals.DefaultPartitioner receive.buffer.bytes = 32768 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retries = 2147483647 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT security.providers = null send.buffer.bytes = 131072 socket.connection.setup.timeout.max.ms = 30000 socket.connection.setup.timeout.ms = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.3] ssl.endpoint.identification.algorithm = https ssl.engine.factory.class = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.certificate.chain = null ssl.keystore.key = null ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLSv1.3 ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.certificates = null ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS transaction.timeout.ms = 60000 transactional.id = null value.serializer = class o.a.k.common.serialization.ByteArraySerializer 07:57:49.828 [main] INFO o.a.k.common.utils.AppInfoParser - Kafka version: 3.0.0 07:57:49.828 [main] INFO o.a.k.common.utils.AppInfoParser - Kafka commitId: 8cb0a5e9d3441962 07:57:49.828 [main] INFO o.a.k.common.utils.AppInfoParser - Kafka startTimeMs: 1644908269828 07:57:49.830 [main] INFO o.a.k.streams.processor.internals.StreamThread - stream-thread [StreamThread-1] Creating consumer client 07:57:49.831 [main] INFO o.a.k.clients.consumer.ConsumerConfig - ConsumerConfig values: allow.auto.create.topics = false auto.commit.interval.ms = 5000 auto.offset.reset = earliest bootstrap.servers = [localhost:9092] check.crcs = true client.dns.lookup = use_all_dns_ips client.id = StreamThread-1-consumer client.rack = connections.max.idle.ms = 540000 default.api.timeout.ms = 60000 enable.auto.commit = false exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = dev1 group.instance.id = null heartbeat.interval.ms = 3000 interceptor.classes = [] internal.leave.group.on.close = false internal.throw.on.fetch.stable.offset.unsupported = false isolation.level = read_uncommitted key.deserializer = class o.a.k.common.serialization.ByteArrayDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 300000 max.poll.records = 1000 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partition.assignment.strategy = [o.a.k.streams.processor.internals.StreamsPartitionAssignor] receive.buffer.bytes = 65536 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT security.providers = null send.buffer.bytes = 131072 session.timeout.ms = 45000 socket.connection.setup.timeout.max.ms = 30000 socket.connection.setup.timeout.ms = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.3] ssl.endpoint.identification.algorithm = https ssl.engine.factory.class = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.certificate.chain = null ssl.keystore.key = null ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLSv1.3 ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.certificates = null ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = class o.a.k.common.serialization.ByteArrayDeserializer replication.factor = -1 windowstore.changelog.additional.retention.ms = 86400000 07:57:49.836 [main] INFO o.a.k.streams.processor.internals.assignment.AssignorConfiguration - stream-thread [StreamThread-1-consumer] Cooperative rebalancing protocol is enabled now 07:57:49.840 [main] INFO o.a.k.common.utils.AppInfoParser - Kafka version: 3.0.0 07:57:49.840 [main] INFO o.a.k.common.utils.AppInfoParser - Kafka commitId: 8cb0a5e9d3441962 07:57:49.840 [main] INFO o.a.k.common.utils.AppInfoParser - Kafka startTimeMs: 1644908269840 07:57:49.844 [main] INFO o.a.k.streams.KafkaStreams - stream-client [dev1-d1c8ce47-6fbf-41b7-b8aa-e3d094703088] State transition from CREATED to REBALANCING 07:57:49.845 [StreamThread-1] INFO o.a.k.streams.processor.internals.StreamThread - stream-thread [StreamThread-1] Starting 07:57:49.845 [StreamThread-1] INFO o.a.k.streams.processor.internals.StreamThread - stream-thread [StreamThread-1] State transition from CREATED to STARTING 07:57:49.845 [StreamThread-1] INFO o.a.k.clients.consumer.KafkaConsumer - [Consumer clientId=StreamThread-1-consumer, groupId=dev1] Subscribed to topic(s): poids_garmin_brut 07:57:49.845 [main] INFO o.a.k.streams.KafkaStreams - stream-client [dev1-d1c8ce47-6fbf-41b7-b8aa-e3d094703088] State transition from REBALANCING to PENDING_SHUTDOWN 07:57:49.846 [kafka-streams-close-thread] INFO o.a.k.streams.processor.internals.StreamThread - stream-thread [StreamThread-1] Informed to shut down 07:57:49.846 [kafka-streams-close-thread] INFO o.a.k.streams.processor.internals.StreamThread - stream-thread [StreamThread-1] State transition from STARTING to PENDING_SHUTDOWN 07:57:49.919 [kafka-producer-network-thread | StreamThread-1-producer] INFO o.a.k.clients.Metadata - [Producer clientId=StreamThread-1-producer] Cluster ID: QKJGs4glRAy7besZxXNCrg 07:57:49.920 [StreamThread-1] INFO o.a.k.clients.Metadata - [Consumer clientId=StreamThread-1-consumer, groupId=dev1] Cluster ID: QKJGs4glRAy7besZxXNCrg 07:57:49.921 [StreamThread-1] INFO o.a.k.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=StreamThread-1-consumer, groupId=dev1] Discovered group coordinator debian:9092 (id: 2147483647 rack: null) 07:57:49.922 [StreamThread-1] INFO o.a.k.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=StreamThread-1-consumer, groupId=dev1] (Re-)joining group 07:57:49.929 [StreamThread-1] INFO o.a.k.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=StreamThread-1-consumer, groupId=dev1] Request joining group due to: need to re-join with the given member-id 07:57:49.929 [StreamThread-1] INFO o.a.k.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=StreamThread-1-consumer, groupId=dev1] (Re-)joining group 07:57:49.930 [StreamThread-1] INFO o.a.k.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=StreamThread-1-consumer, groupId=dev1] Successfully joined group with generation Generation{generationId=3, memberId='StreamThread-1-consumer-34c0df37-baeb-4582-bdfe-79ab9e2e410c', protocol='stream'} 07:57:49.936 [StreamThread-1] INFO o.a.k.streams.processor.internals.StreamsPartitionAssignor - stream-thread [StreamThread-1-consumer] All members participating in this rebalance: d1c8ce47-6fbf-41b7-b8aa-e3d094703088: [StreamThread-1-consumer-34c0df37-baeb-4582-bdfe-79ab9e2e410c]. 07:57:49.938 [StreamThread-1] INFO o.a.k.streams.processor.internals.assignment.HighAvailabilityTaskAssignor - Decided on assignment: {d1c8ce47-6fbf-41b7-b8aa-e3d094703088=[activeTasks: ([0_0]) standbyTasks: ([]) prevActiveTasks: ([]) prevStandbyTasks: ([]) changelogOffsetTotalsByTask: ([]) taskLagTotals: ([]) capacity: 1 assigned: 1]} with no followup probing rebalance. 07:57:49.938 [StreamThread-1] INFO o.a.k.streams.processor.internals.StreamsPartitionAssignor - stream-thread [StreamThread-1-consumer] Assigned tasks [0_0] including stateful [] to clients as: d1c8ce47-6fbf-41b7-b8aa-e3d094703088=[activeTasks: ([0_0]) standbyTasks: ([])]. 07:57:49.939 [StreamThread-1] INFO o.a.k.streams.processor.internals.StreamsPartitionAssignor - stream-thread [StreamThread-1-consumer] Client d1c8ce47-6fbf-41b7-b8aa-e3d094703088 per-consumer assignment: prev owned active {} prev owned standby {StreamThread-1-consumer-34c0df37-baeb-4582-bdfe-79ab9e2e410c=[]} assigned active {StreamThread-1-consumer-34c0df37-baeb-4582-bdfe-79ab9e2e410c=[0_0]} revoking active {} assigned standby {} 07:57:49.939 [StreamThread-1] INFO o.a.k.streams.processor.internals.StreamsPartitionAssignor - stream-thread [StreamThread-1-consumer] Finished stable assignment of tasks, no followup rebalances required. 07:57:49.939 [StreamThread-1] INFO o.a.k.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=StreamThread-1-consumer, groupId=dev1] Finished assignment for group at generation 3: {StreamThread-1-consumer-34c0df37-baeb-4582-bdfe-79ab9e2e410c=Assignment(partitions=[poids_garmin_brut-0], userDataSize=52)} 07:57:49.943 [StreamThread-1] INFO o.a.k.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=StreamThread-1-consumer, groupId=dev1] Successfully synced group in generation Generation{generationId=3, memberId='StreamThread-1-consumer-34c0df37-baeb-4582-bdfe-79ab9e2e410c', protocol='stream'} 07:57:49.943 [StreamThread-1] INFO o.a.k.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=StreamThread-1-consumer, groupId=dev1] Updating assignment with Assigned partitions: [poids_garmin_brut-0] Current owned partitions: [] Added partitions (assigned - owned): [poids_garmin_brut-0] Revoked partitions (owned - assigned): [] 07:57:49.943 [StreamThread-1] INFO o.a.k.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=StreamThread-1-consumer, groupId=dev1] Notifying assignor about the new Assignment(partitions=[poids_garmin_brut-0], userDataSize=52) 07:57:49.944 [StreamThread-1] INFO o.a.k.streams.processor.internals.StreamsPartitionAssignor - stream-thread [StreamThread-1-consumer] No followup rebalance was requested, resetting the rebalance schedule. 07:57:49.944 [StreamThread-1] INFO o.a.k.streams.processor.internals.TaskManager - stream-thread [StreamThread-1] Handle new assignment with: New active tasks: [0_0] New standby tasks: [] Existing active tasks: [] Existing standby tasks: [] 07:57:49.950 [StreamThread-1] INFO o.a.k.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=StreamThread-1-consumer, groupId=dev1] Adding newly assigned partitions: poids_garmin_brut-0 07:57:49.953 [StreamThread-1] INFO o.a.k.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=StreamThread-1-consumer, groupId=dev1] Found no committed offset for partition poids_garmin_brut-0 07:57:49.954 [StreamThread-1] INFO o.a.k.streams.processor.internals.StreamThread - stream-thread [StreamThread-1] Shutting down [...] Process finished with exit code 0
What am I doing wrong?
I’m running my Kafka instance and its Java program locally, on the same PC.
I’ve experienced
3.1.0
and2.8.1
versions of Kafka, or removed any traces of Spring in the Java program without success.
I belive I’m facing a configuration problem.
Advertisement
Answer
I fear having shamefully found the cause of my trouble :
streams.start(); streams.close();
The streams.start()
starts the listening of the stream.
But the steams.close()
ends it immediately and close the program a second after!
And what I saw, commenting that streams.close()
statement, is that the first message from poids_garmin_brut
topic takes around 20 seconds to be detected and displayed, provided new messages are injected in the topic, then others new injected ones comes instantaneously.
I believed that the property
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
had for goal to ask Kafka to read a topic from its beginning, but it’s not the case.