I have a large file that I want to read and process. I don’t want to load it entirely into memory and instead of this I read each line of the file separately and perform actions on it. I’ve come to this implementation during the work:
@Override public void configure() { from(fileSftpLocationUrl) .routeId("my-route") .onException(Exception.class).handled(true).bean(exchangeErrorHandler, "processError").end() .split(body().tokenize("n")).streaming() .filter(/*condition for skip first and last line*/) .bean(/*my action*/) .to(String.format("activemq:%s", myQueue)); }
Before starting to read the file I skip header and footer .filter(/*condition for skip first and last line*/)
on that and in the next line I try to start reading my file line by line .split(body().tokenize("n")).streaming()
but something is going wrong and I get all information from the file in its entirety. I see that problem in the .bean(/*my action*/)
when parsing that data and perform actions on them.
I think that my problem is hidden at the beginning because the algorithm looks strange, first I describe the condition for the whole file (skip header and footer), then I ask Camel to process it line by line, and only then the action for a specific line.
My question is, how do I change this implementation so that the file is processed line by line?
Advertisement
Answer
I think I got it. By default, the split result is sent to the FIRST next endpoint
from(...) .split(body().tokenize("n")).streaming() .to("direct:processLine")
If you want to send it to a complex routing, you have to mark the split ending, eg
from(...) .split(body().tokenize("n")).streaming() .filter(/*condition for skip first and last line*/) .bean(/*my action*/) .to(String.format("activemq:%s", myQueue)) .end() .log("Split done");
If you omit the end(), the logic will be this one (see indentation):
from(...) .split(body().tokenize("n")).streaming() .filter(/*condition for skip first and last line*/) .end() // Implicit .bean(/*my action*/) .to(String.format("activemq:%s", myQueue))
-> in your attempt, the bean(…) was invoked with the original message (after the split was performed)
See it like a kind of “for-loop”
for (String line: lines) filter(line); bean.run(line); sendto(...);
is not the same at all as:
for (String line: lines) { filter(line); bean.run(); sendto(...); }