Skip to content

Batch consumer camel kafka

I am unable to read in batch with the kafka camel consumer, despite following an example posted here. Are there changes I need to make to my producer, or is the problem most likely with my consumer configuration?

The application in question utilizes the kafka camel component to ingest messages from a rest endpoint, validate them, and place them on a topic. I then have a separate service that consumes them from the topic and persists them in a time-series database.

The messages were being produced and consumed one at a time, but the database expects the messages to be consumed and committed in batch for optimal performance. Without touching the producer, I tried adjusting the consumer to match the example in the answer to this question:

How to transactionally poll Kafka from Camel?

I wasn’t sure how the messages would appear, so for now I’m just logging them:

    from(kafkaReadingConsumerEndpoint).routeId("rawReadingsConsumer").process(exchange -> {
        // simple approach to generating errors
        String body = exchange.getIn().getBody(String.class);
        if (body.startsWith("error")) {
            throw new RuntimeException("can't handle the message");
        }
        log.info("BODY:{}", body);
    }).process(kafkaOffsetManager);

But the messages still appear to be coming across one at a time with no batch read.

My consumer config is this:

  kafka:
    host: myhost
    port: myport
    consumer:
      seekTo: beginning
      maxPartitionFetchBytes: 55000
      maxPollRecords: 50
      consumerCount: 1
      autoOffsetReset: earliest
      autoCommitEnable: false
      allowManualCommit: true
      breakOnFirstError: true

Does my config need work, or are there changes I need to make to the producer to have this work correctly?

Answer

At the lowest layer, the KafkaConsumer#poll method is going to return an Iterator<ConsumerRecord>; there’s no way around that.

I don’t have in-depth experience with Camel, but in order to get a “batch” of records, you’ll need some intermediate collection to “queue” the data that you want to eventually send downstream to some “collection consumer” process. Then you will need some “switch” processor that says “wait, process this batch” or “continue filling this batch”.

As far as databases go, that process is exactly what Kafka Connect JDBC Sink does with batch.size config.