In my Spring Boot/Kafka application before the library update, I used the following class org.telegram.telegrambots.api.objects.Update
in order to post messages to the Kafka topic. Right now I use the following org.telegram.telegrambots.meta.api.objects.Update
. As you may see – they have different packages.
After application restart I ran into the following issue:
[org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] o.s.kafka.listener.LoggingErrorHandler : Error while processing: null org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition telegram.fenix.bot.update-0 at offset 4223. If needed, please seek past the record to continue consumption. Caused by: java.lang.IllegalArgumentException: The class 'org.telegram.telegrambots.api.objects.Update' is not in the trusted packages: [java.util, java.lang, org.telegram.telegrambots.meta.api.objects]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*). at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:139) ~[spring-kafka-2.1.8.RELEASE.jar!/:2.1.8.RELEASE] at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.toJavaType(DefaultJackson2JavaTypeMapper.java:113) ~[spring-kafka-2.1.8.RELEASE.jar!/:2.1.8.RELEASE] at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:221) ~[spring-kafka-2.1.8.RELEASE.jar!/:2.1.8.RELEASE] at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:967) ~[kafka-clients-1.1.0.jar!/:na] at org.apache.kafka.clients.consumer.internals.Fetcher.access$3300(Fetcher.java:93) ~[kafka-clients-1.1.0.jar!/:na] at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1144) ~[kafka-clients-1.1.0.jar!/:na] at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1400(Fetcher.java:993) ~[kafka-clients-1.1.0.jar!/:na] at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:527) ~[kafka-clients-1.1.0.jar!/:na] at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:488) ~[kafka-clients-1.1.0.jar!/:na] at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1155) ~[kafka-clients-1.1.0.jar!/:na] at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115) ~[kafka-clients-1.1.0.jar!/:na] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:699) ~[spring-kafka-2.1.8.RELEASE.jar!/:2.1.8.RELEASE] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_171] at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_171] at java.lang.Thread.run(Thread.java:748) [na:1.8.0_171]
This is my config:
@EnableAsync @Configuration public class ApplicationConfig { @Bean public StringJsonMessageConverter jsonConverter() { return new StringJsonMessageConverter(); } } @Configuration public class KafkaProducerConfig { @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; @Bean public Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 15000000); return props; } @Bean public ProducerFactory<String, Update> updateProducerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } @Bean public KafkaTemplate<String, Update> updateKafkaTemplate() { return new KafkaTemplate<>(updateProducerFactory()); } } @Configuration public class KafkaConsumerConfig { @Value("${kafka.consumer.max.poll.interval.ms}") private String kafkaConsumerMaxPollIntervalMs; @Value("${kafka.consumer.max.poll.records}") private String kafkaConsumerMaxPollRecords; @Value("${kafka.topic.telegram.fenix.bot.update.consumer.concurrency}") private Integer updateConsumerConcurrency; @Bean public ConsumerFactory<String, String> consumerFactory(KafkaProperties kafkaProperties) { return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties(), new StringDeserializer(), new JsonDeserializer<>(String.class)); } @Bean public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(KafkaProperties kafkaProperties) { kafkaProperties.getProperties().put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, kafkaConsumerMaxPollIntervalMs); kafkaProperties.getProperties().put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaConsumerMaxPollRecords); ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE); factory.setConsumerFactory(consumerFactory(kafkaProperties)); return factory; } @Bean public ConsumerFactory<String, Update> updateConsumerFactory(KafkaProperties kafkaProperties) { return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties(), new StringDeserializer(), new JsonDeserializer<>(Update.class)); } @Bean public ConcurrentKafkaListenerContainerFactory<String, Update> updateKafkaListenerContainerFactory(KafkaProperties kafkaProperties) { kafkaProperties.getProperties().put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, kafkaConsumerMaxPollIntervalMs); kafkaProperties.getProperties().put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaConsumerMaxPollRecords); ConcurrentKafkaListenerContainerFactory<String, Update> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE); factory.setConsumerFactory(updateConsumerFactory(kafkaProperties)); factory.setConcurrency(updateConsumerConcurrency); return factory; } }
application.properties
spring.kafka.bootstrap-servers=${kafka.host}:${kafka.port} spring.kafka.consumer.auto-offset-reset=earliest spring.kafka.consumer.group-id=postfenix spring.kafka.consumer.enable-auto-commit=false spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
How to solve this issue and let Kafka deserialize old messages into the new ones ?
UPDATED
This is my listener
@Component public class UpdateConsumer { @KafkaListener(topics = "${kafka.topic.update}", containerFactory = "updateKafkaListenerContainerFactory") public void onUpdateReceived(ConsumerRecord<String, Update> consumerRecord, Acknowledgment ack) { //do some logic here ack.acknowledge(); } }
Advertisement
Answer
See the documentation.
Starting with version 2.1, type information can be conveyed in record Headers, allowing the handling of multiple types. In addition, the serializer/deserializer can be configured using Kafka properties.
JsonSerializer.ADD_TYPE_INFO_HEADERS (default true); set to false to disable this feature on the JsonSerializer (sets the addTypeInfo property).
JsonDeserializer.KEY_DEFAULT_TYPE; fallback type for deserialization of keys if no header information is present.
JsonDeserializer.VALUE_DEFAULT_TYPE; fallback type for deserialization of values if no header information is present.
JsonDeserializer.TRUSTED_PACKAGES (default java.util, java.lang); comma-delimited list of package patterns allowed for deserialization; * means deserialize all.
By default, the serializer will add type information to the headers.
Similarly, you can disable the JsonSerializer default behavior of sending type information in headers:
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer spring.kafka.producer.properties.spring.json.add.type.headers=false
Or you can add type mapping to the inbound message converter to map the source type to the destination type.
EDIT
Having said that, what version are you using?