I have a very simple spring boot project with a KTable and I want to customize my configuration in application.yml, but the config seems to not be applied. This is my configuration file application.yml
spring:
kafka:
bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS:localhost:9092}
streams:
application-id: ${APPLICATION_ID:train-builder-processor}
buffered-records-per-partition: 50
consumer:
auto-offset-reset: earliest
max-poll-records: ${MAX_POLL_RECORDS:50}
max-poll-interval-ms: ${KAFKA_CONSUMER_MAX_POLL_INTERVAL_MS:1000}
properties:
spring:
json:
trusted:
packages:
- com.example.kafkastream
However, when starting the application the log outputs the following:
2022-03-03 08:20:06.992 INFO 32989 --- [ main] s.r.s.m.t.TrainBuilderApplication : Starting TrainBuilderApplication using Java 16.0.2 on MAPFVFG90ZQQ05P with PID 32989 (/Users/xxx/dev/train-builder-processor/target/classes started by xxx in /Users/xxx/dev/train-builder-processor)
2022-03-03 08:20:06.995 DEBUG 32989 --- [ main] s.r.s.m.t.TrainBuilderApplication : Running with Spring Boot v2.6.3, Spring v5.3.15
2022-03-03 08:20:06.995 INFO 32989 --- [ main] s.r.s.m.t.TrainBuilderApplication : No active profile set, falling back to default profiles: default
2022-03-03 08:20:08.856 INFO 32989 --- [ main] org.apache.kafka.streams.StreamsConfig : StreamsConfig values:
acceptable.recovery.lag = 10000
application.id = test.train-builder-processor
application.server =
bootstrap.servers = [localhost:9092]
buffered.records.per.partition = 1000
a bunch of other configs) (
ConsumerConfig:
max.poll.interval.ms = 300000
max.poll.records = 1000
Below is the simple application class I’m using:
@EnableKafka
@EnableKafkaStreams
@SpringBootApplication
public class TrainBuilderApplication {
@Autowired
private TrainIdMapper trainIdMapper;
@Autowired
private TrainBuilder trainBuilder;
public static void main(String[] args) {
SpringApplication.run(TrainBuilderApplication.class, args);
}
@Bean
public KTable<String, Train> trainTable(StreamsBuilder kStreamBuilder) {
return kStreamBuilder
.stream(Pattern.compile(sourceTopicsPattern), Consumed.with(Serdes.String(), myJsonSerde))
.map(trainIdMapper)
.filter((key, value) -> key != null)
.groupByKey(Grouped.with(Serdes.String(), mySerde))
.aggregate(() -> null, trainBuilder, trainStore);
}
}
The values from my application.yml seems to be ignored. What could be the cause of this? What am I missing? Thanks in advance!
Advertisement
Answer
So I figured it out with the help of How do I properly externalize spring-boot kafka-streams configuration in a properties file?.
Apparently, consumer
and producer
configs are completely separated from streams
config when using a KStream. To set specific properties for the consumer of the kafka stream one must use “additional properties” like so:
spring:
kafka:
bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS,localhost:9092}
streams:
application-id: ${APPLICATION_ID:train-builder-processor}
cache-max-size-buffering: 1048576
cleanup.on-shutdown: ${CLEANUP_ON_SHUTDOWN:false}
properties:
max:
poll:
records: 50
which was a bit unintuitive, but it works. Hope this can help someone in the future!