Apache Camel Kafka is a collection of kafka posts and is regularly posted in another thread.

I have usecase:

I need to regularly read and collect messages from the Kafka topic and publish them in another topic. Localstorage is not an option. Here's how I plan to review this; any suggestions for improvement are welcome

Schedule the aggregation and publication of kafka messages, planning to use the completeInterval option for the Aggregator EIP. Here is the code.

@Autowired ObjectMapper objectMapper; JacksonDataFormat jacksonDataFormat; @PostConstruct public void initialize(){ //objectMapper.setPropertyNamingStrategy(PropertyNamingStrategy.SNAKE_CASE); jacksonDataFormat = new JacksonDataFormat(objectMapper,EventMessage.class); } 

and route:

 public void configure() throws Exception { from("kafka:localhost:9092?topic=item-events" + "&groupId=aggregator-group-id&autoCommitIntervalMs=25000&autoOffsetReset=earliest&consumersCount=1") .routeId("kafkapoller") .unmarshal(jacksonDataFormat) .aggregate(body().method("getItemId"), new EventAggregationStrategy()).completionInterval(20000) .marshal().json(JsonLibrary.Jackson) .to("kafka:localhost:9092?topic=item-events-aggregated&serializerClass=org.apache.kafka.common.serialization.ByteArraySerializer"); } 
+6
java apache-camel apache-kafka
source share
1 answer

It looks ok. What you need to remember:

  • What happens if / when the JVM dies halfway through the aggregation cycle? Don’t worry, then cool down, otherwise you can examine the PersistentAggregationRepository to store / replay messages, although you can replay messages that you lost from kafka (this would be the biggest operational problem).
  • Next, think about controlling runtime. A camel is a shocker who does not tell you very well what happens during work. Things like escape methods in your aggregator (i.e. a very greedy regex) will leave you with a little idea about the current state of aggregated exchanges, and JMX probably won't tell you too much about what's going on.
  • I would use the AggregateController so that you can force the completion of the exchange, so you can do things like turn off the camel and then call this to complete the light exchange
+2
source share

All Articles