I have the following configuration for my Kafka Stream application
Properties config = new Properties(); config.put(StreamsConfig.APPLICATION_ID_CONFIG,this.applicaionId); config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,svrConfig.getBootstrapServers()); config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // we disable the cache to demonstrate all the "steps" involved in the transformation - not recommended in prod config.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, svrConfig.getCacheMaxBytesBufferingConfig()); // Exactly once processing!! config.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE); config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,SpecificAvroSerde.class); config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,SpecificAvroSerde.class); config.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,"http://localhost:8081");
And I got the following error:
Exception in thread "main" io.confluent.common.config.ConfigException: Missing required configuration "schema.registry.url" which has no default value. at io.confluent.common.config.ConfigDef.parse(ConfigDef.java:243) at io.confluent.common.config.AbstractConfig.<init>(AbstractConfig.java:78) at io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig.<init>(AbstractKafkaAvroSerDeConfig.java:100) at io.confluent.kafka.serializers.KafkaAvroSerializerConfig.<init>(KafkaAvroSerializerConfig.java:32) at io.confluent.kafka.serializers.KafkaAvroSerializer.configure(KafkaAvroSerializer.java:48) at io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer.configure(SpecificAvroSerializer.java:58) at io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde.configure(SpecificAvroSerde.java:107)
I have tried to replace the line
config.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,"http://localhost:8081");
with
config.put("schema.registry.url","http://localhost:8081");
but with the same error
I have followed the instruction from this url when preparing my Stream application.
Any suggestion?
Advertisement
Answer
If you have keys and values in Avro format the following lines should do the trick for you,
config.put("key.converter.schema.registry.url", "http://localhost:8081"); config.put("value.converter.schema.registry.url", "http://localhost:8081");
If this doesn’t seem to work you can override Serdes
explicitly. For example, if you have Avro keys:
final Map<String, String> serdeConfig = Collections.singletonMap("schema.registry.url", "http://localhost:8081"); final Serde<GenericRecord> keyGenericAvroSerde = new GenericAvroSerde(); keyGenericAvroSerde.configure(serdeConfig, true); // true for record keys final Serde<GenericRecord> valueGenericAvroSerde = new GenericAvroSerde(); valueGenericAvroSerde.configure(serdeConfig, false); // false for record values StreamsBuilder builder = new StreamsBuilder(); KStream<GenericRecord, GenericRecord> textLines = builder.stream(keyGenericAvroSerde, valueGenericAvroSerde, "my-avro-topic"); // Do whatever you like