Apache Kafka - KafkaStream by topic / section

I am writing Kafka Consumer for a high-speed distributed application with large volumes. I have only one topic, but the speed of incoming messages is very high. Having multiple partitions that serve more consumers will be appropriate for this use case. The best way to consume is to have multiple stream readers. According to the documentation or available samples, the number of KafkaStreams issues issued by the ConsumerConnector is based on the number of topics. It's amazing how to get more than one KafkaStream reader [based on a section] so that I can span one stream in a stream, or Reading from the same KafkaStream in multiple streams would do a parallel reading from several sections?

Any ideas are greatly appreciated.

+7
java multithreading concurrency apache-kafka
source share
3 answers

I would like to share what I found from the mailing list:

The number that you pass in the topic map determines the number of threads divided by topic. In your case, if you go through 1, all 10 sections will be transferred to 1 stream. If you go to 2, each of the 2 threads will receive data from 5 sections. If you go to 11, 10 of them will receive data from 1 section, and 1 stream will not receive anything.

Generally, you need to iterate each thread in its thread. This is due to the fact that each thread can be blocked forever if there is no new event.

Fragment example:

topicCount.put(msgTopic, new Integer(partitionCount)); Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = connector.createMessageStreams(topicCount); List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(msgTopic); for (final KafkaStream stream : streams) { ReadTask task = new ReadTask(stream, msgTopic); task.addObserver(this.msgObserver); tasks.add(task); executor.submit(task); } 

Link: http://mail-archives.apache.org/mod_mbox/incubator-kafka-users/201201.mbox/% 3CCA+sHyy_Z903dOmnjp7_yYR_aE2sRW-x7XpAnqkmWaP66GOqf6w@mail.gmail .com% 3E

+14
source share

The recommended way to do this is to create a thread pool so that Java can handle the organization for you, and for each thread, the createMessageStreamsByFilter method allows you to use it in Runnable. For example:

 int NUMBER_OF_PARTITIONS = 6; Properties consumerConfig = new Properties(); consumerConfig.put("zk.connect", "zookeeper.mydomain.com:2181" ); consumerConfig.put("backoff.increment.ms", "100"); consumerConfig.put("autooffset.reset", "largest"); consumerConfig.put("groupid", "java-consumer-example"); consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(consumerConfig)); TopicFilter sourceTopicFilter = new Whitelist("mytopic|myothertopic"); List<KafkaStream<Message>> streams = consumer.createMessageStreamsByFilter(sourceTopicFilter, NUMBER_OF_PARTITIONS); ExecutorService executor = Executors.newFixedThreadPool(streams.size()); for(final KafkaStream<Message> stream: streams){ executor.submit(new Runnable() { public void run() { for (MessageAndMetadata<Message> msgAndMetadata: stream) { ByteBuffer buffer = msgAndMetadata.message().payload(); byte [] bytes = new byte[buffer.remaining()]; buffer.get(bytes); //Do something with the bytes you just got off Kafka. } } }); } 

In this example, I asked for 6 threads, mainly because I know that I have 3 sections for each topic, and I have listed two topics in my whitelist. When we have handles for incoming streams, we can iterate over their contents, which are MessageAndMetadata objects. Metadata is simply the name of the topic and offset. As you have discovered, you can do this in one thread if you ask for 1 thread instead, in my example 6, but if you need parallel processing, a good way is to run an executor with one thread for each returned thread.

+3
source share
 /** * @param source : source kStream to sink output-topic */ private static void pipe(KStream<String, String> source) { source.to(Serdes.String(), Serdes.String(), new StreamPartitioner<String, String>() { @Override public Integer partition(String arg0, String arg1, int arg2) { return 0; } }, "output-topic"); } 

the above code will record in section 1 of the topic name "output-topic"

0
source share

All Articles