I have usecase:
I need to regularly read and collect messages from the Kafka topic and publish them in another topic. Localstorage is not an option. Here's how I plan to review this; any suggestions for improvement are welcome
Schedule the aggregation and publication of kafka messages, planning to use the completeInterval option for the Aggregator EIP. Here is the code.
@Autowired ObjectMapper objectMapper; JacksonDataFormat jacksonDataFormat; @PostConstruct public void initialize(){
and route:
public void configure() throws Exception { from("kafka:localhost:9092?topic=item-events" + "&groupId=aggregator-group-id&autoCommitIntervalMs=25000&autoOffsetReset=earliest&consumersCount=1") .routeId("kafkapoller") .unmarshal(jacksonDataFormat) .aggregate(body().method("getItemId"), new EventAggregationStrategy()).completionInterval(20000) .marshal().json(JsonLibrary.Jackson) .to("kafka:localhost:9092?topic=item-events-aggregated&serializerClass=org.apache.kafka.common.serialization.ByteArraySerializer"); }
java apache-camel apache-kafka
so-random-dude
source share