A use case is that I have a file zipcontaining many files csv. Each line in each file is then queued sedafor processing. The problem I am facing is that I would like to know when each line was processed in the queue sedato do some other work. I am not sure how to approach this. I am currently investigating the use of polling to check when the queue is sedaempty, but this can lead to false results if the lines are processed faster than they arrive.
the code
I have a class that decompresses a file zipand reads each file as InputStream. Each line in the file is then sent to the manufacturer, which in turn is sent to the queue seda.
@Component
public class CsvProcessor {
@Resource(name = "csvLineProducer")
ProducerTemplate producer;
public void process(InputStream flatFileStream) throws IOException {
if (flatFileStream==null) return;
try {
LineIterator it = IOUtils.lineIterator(flatFileStream, "UTF-8");
while (it.hasNext()) {
final String recordLine = it.nextLine();
this.producer.send(new Processor() {
public void process(Exchange outExchange) {
outExchange.getIn().setBody(recordLine);
}
});
}
} finally {
IOUtils.closeQuietly(flatFileStream);
}
}
}
Here is the configuration of the camel.
<camelContext xmlns="http://camel.apache.org/schema/spring" trace="true">
<template id="csvLineProducer" defaultEndpoint="seda:flatRecordStream"/>
...
<route>
<from uri="seda:flatRecordStream" />
<bean ref="myProcessor" method="processLine" />
</route>
...
</camelContext>
source
share