Created a cluster with two brokers using same zookeeper and trying to produce message to a topic whose details are as below.
When the producer sets acks="all"
or -1,min.insync.replicas="2"
, it is supposed to receive acknowledgement from the brokers(leaders and replicas) but when one broker is shut manually while it is producing, it is making no difference to the kafka producer even when acks=”all” can someone explain the reason for this weird behavior?
brokers are on 9091,9092.
acks = -1 batch.size = 16384 bootstrap.servers = [localhost:9092] buffer.memory = 33554432 client.dns.lookup = use_all_dns_ips client.id = producer-1 compression.type = none connections.max.idle.ms = 540000 delivery.timeout.ms = 120000 enable.idempotence = false interceptor.classes = [] internal.auto.downgrade.txn.commit = false key.serializer = class org.apache.kafka.common.serialization.StringSerializer linger.ms = 0 max.block.ms = 60000 max.in.flight.requests.per.connection = 5 max.request.size = 1048576 metadata.max.age.ms = 300000 metadata.max.idle.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner receive.buffer.bytes = 32768 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retries = 2147483647 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT security.providers = null send.buffer.bytes = 131072 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2] ssl.endpoint.identification.algorithm = https ssl.engine.factory.class = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLSv1.2 ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS transaction.timeout.ms = 60000 transactional.id = null value.serializer = class org.apache.kafka.common.serialization.StringSerializer
Below is the source code for the kafka producer
public static void main(String k[]) { Properties prop=new Properties(); prop.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092"); prop.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); prop.setProperty(ProducerConfig.ACKS_CONFIG,"all"); prop.setProperty("min.insync.replicas", "2"); prop.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); KafkaProducer<String,String> producer=new KafkaProducer<>(prop); ProducerRecord<String,String> rec=new ProducerRecord<String,String>("clust_topic","123"); while(true) { producer.send(rec, new Callback() { @Override public void onCompletion(RecordMetadata rm, Exception arg1) { System.out.println(arg1); if(arg1!=null) System.out.println(arg1); else System.out.println(rm.topic()+" "+rm.partition()+" "+rm.offset()+" "); } }); } }
Advertisement
Answer
ack=all
means that it requires ack from all in-sync replicas, not from all replicas (refer to documentation)