Does Kafka have a batch consumer?

The high-level user API seems to read one message at a time.

This can be quite problematic for consumers if they want to process and send these messages to other downstream users, such as Solr or Elastic-Search, because they prefer to have messages in bulk rather than one at a time.

It is not easy to unload these messages into memory, because, in addition to the fact that offsets in Kafka should also be synchronized only when the packet is already executed, otherwise a broken kafka consumer with uncommitted downstream messages (as in Solr or ES) will its offsets and, therefore, free messages have already been updated.

A consumer may consume messages more than once if it fails after sending messages downstream, but before updating message offsets.

If Kafka spends messages in batch mode, then some pointers to code / documentation will be greatly appreciated.

Thanks!

+6
source share
1 answer

I do not know what a batch consumer is. But even if there is one, your main problem remains. You want to lock the offset after successful data transfer. One way to achieve this is to turn off automatic consumer commit by setting the property auto.commit.enable = false . The trade-off, of course, is that you need to take care of when to make your shifts.

Find full documentation on consumer properties here: https://kafka.apache.org/documentation.html#consumerconfigs

A good example of how to manually perform the offset stolen from java-doc ( https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html ):

  Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "false"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("foo", "bar")); final int minBatchSize = 200; List<ConsumerRecord<String, String>> buffer = new ArrayList<>(); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { buffer.add(record); } if (buffer.size() >= minBatchSize) { insertIntoDb(buffer); consumer.commitSync(); buffer.clear(); } } 
+3
source

All Articles