I am consuming messages from Apache kafka using apache camel JAVA DSLs.
I am writing an object by converting it to byte[]
on kafka. when I consume it I receive a message back with byte[]
. I deserialize it and get an object.
I checks it if it is an object of MyObject
then need to pass it to bean using java DSL .to()
. My code is as below:
JavaScript
x
public class KafkaRouter extends RouteBuilder {
private MessageBean msgBean;
@Override
public void configure() throws Exception {
from("{{kafka.cons.uri}}").process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
Object obj = SerializationUtils.deserialize(exchange.getIn().getBody(byte[].class)); //TODO cast to specific class as returned after deserialization.
if(obj !=null && obj instanceof MessageBean){
msgBean = (MessageBean)obj;
}
else {
throw new PTFException("Invalid Message read in Kafka Consumer");
}
}
}).bean(PTFTransformerService.class,"callTransformerService(msgBean)"); ;
}
Now issue is I just want MyObject
in corresponding called method and I do not want to use TypeConvertors
. I do not want to get Exchange/body in method I will process my stream in process and throw exception if read invalid messages and do not forward it to the bean.
My method on the other end will be:
JavaScript
private void callTransformerService(MessageBean msgObj){
// Got my object here ;-)
}
Advertisement
Answer
Add @Body
before function argument MessageBean msgObj
:
JavaScript
import org.apache.camel.Body;
private void callTransformerService(@Body MessageBean msgObj){
}