Skip to content
Advertisement

Apache Camel: Process file line by line

I have a large file that I want to read and process. I don’t want to load it entirely into memory and instead of this I read each line of the file separately and perform actions on it. I’ve come to this implementation during the work:

@Override
public void configure() {
    from(fileSftpLocationUrl)
        .routeId("my-route")
        .onException(Exception.class).handled(true).bean(exchangeErrorHandler, "processError").end()
        .split(body().tokenize("n")).streaming()
        .filter(/*condition for skip first and last line*/)
        .bean(/*my action*/)
        .to(String.format("activemq:%s", myQueue));
}

Before starting to read the file I skip header and footer .filter(/*condition for skip first and last line*/) on that and in the next line I try to start reading my file line by line .split(body().tokenize("n")).streaming() but something is going wrong and I get all information from the file in its entirety. I see that problem in the .bean(/*my action*/) when parsing that data and perform actions on them.

I think that my problem is hidden at the beginning because the algorithm looks strange, first I describe the condition for the whole file (skip header and footer), then I ask Camel to process it line by line, and only then the action for a specific line.

My question is, how do I change this implementation so that the file is processed line by line?

Advertisement

Answer

I think I got it. By default, the split result is sent to the FIRST next endpoint

from(...)
    .split(body().tokenize("n")).streaming()
    .to("direct:processLine")

If you want to send it to a complex routing, you have to mark the split ending, eg

from(...)
       
     .split(body().tokenize("n")).streaming()
        .filter(/*condition for skip first and last line*/)
        .bean(/*my action*/)
        .to(String.format("activemq:%s", myQueue))
     .end()
     .log("Split done");

If you omit the end(), the logic will be this one (see indentation):

from(...)
       
     .split(body().tokenize("n")).streaming()
        .filter(/*condition for skip first and last line*/)
     .end() // Implicit 
     .bean(/*my action*/)
     .to(String.format("activemq:%s", myQueue))         

-> in your attempt, the bean(…) was invoked with the original message (after the split was performed)

See it like a kind of “for-loop”

for (String line: lines) 
filter(line);
bean.run(line);
sendto(...);

is not the same at all as:

for (String line: lines) {
  filter(line);
  bean.run();
  sendto(...);
}
User contributions licensed under: CC BY-SA
1 People found this is helpful
Advertisement