Skip to content
Advertisement

Spring Batch: KafkaItemReader reads from offset 0 if the JVM is not restarted

I’m trying to implement a Spring Batch job that is scheduled by a Spring scheduler.

The current befavior is as follows:

  • The job is launched by the scheduler using JarLauncher
  • The job consumes the kafka records and processed them
  • The next time the jobs ris launched by the scheduler, it consumes the kafka records already processed

The expected behavior is that the job needs to start processing from the last commited offset.

I have set an empty map to the partitionOffsets property as follows to make the reader read the offsets from kafka.

kafkaItemReader.setPartitionOffsets(new HashMap<>());

This only works when i restart the scheduler process. FYI, the offsets are correcty stored in the _consumer_offsets topic. My question is, i’m i mising some configuration that makes the KafkaItemReader read the offsets from kafka when the scheduler is not restarted ?

EDIT:
Each time the scheduler is calling jobLauncher.run(job, jobParameters);, so i guess it is running a new instance The item reader created is:

public ItemReader<byte[]> readFile() {
            KafkaItemReader<String, byte[]> kafkaItemReader = new KafkaItemReaderBuilder<String, byte[]>()
                    .partitions(partitionsList)
                    .consumerProperties(props)
                    .name("consumer name")
                    .saveState(true)
                    .topic(topicName)
                    .build();
            kafkaItemReader.setPartitionOffsets(new HashMap<>());
            return kafkaItemReader;
}

My kafkaConsumer configurations are:

props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONFIG_BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, KAFKA_CONFIG_GROUP_ID);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KAFKA_CONFIG_KEY_DESERIALIZER_CLASS);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KAFKA_CONFIG_VALUE_DESERIALIZER_BYTES_CLASS);
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, KAFKA_CONFIG_FETCH_BYTES);
props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, KAFKA_CONFIG_FETCH_BYTES);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, KAFKA_CONFIG_AUTO_OFFSET_RESET_LATEST);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, KAFKA_CONFIG_ENABLE_AUTO_COMMMIT);

Advertisement

Answer

This only works when i restart the scheduler process.

This is probably because your item reader is a singleton bean, having the same state for the entire lifetime of the application context. Things should work as you expect if you make your item reader step-scoped, so that a new instance is created for each scheduled run, and therefore the partition offset would be set to the one stored in Kafka.

Advertisement