The recommended way to do this is to create a thread pool so that Java can handle the organization for you, and for each thread, the createMessageStreamsByFilter method allows you to use it in Runnable. For example:
int NUMBER_OF_PARTITIONS = 6; Properties consumerConfig = new Properties(); consumerConfig.put("zk.connect", "zookeeper.mydomain.com:2181" ); consumerConfig.put("backoff.increment.ms", "100"); consumerConfig.put("autooffset.reset", "largest"); consumerConfig.put("groupid", "java-consumer-example"); consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(consumerConfig)); TopicFilter sourceTopicFilter = new Whitelist("mytopic|myothertopic"); List<KafkaStream<Message>> streams = consumer.createMessageStreamsByFilter(sourceTopicFilter, NUMBER_OF_PARTITIONS); ExecutorService executor = Executors.newFixedThreadPool(streams.size()); for(final KafkaStream<Message> stream: streams){ executor.submit(new Runnable() { public void run() { for (MessageAndMetadata<Message> msgAndMetadata: stream) { ByteBuffer buffer = msgAndMetadata.message().payload(); byte [] bytes = new byte[buffer.remaining()]; buffer.get(bytes);
In this example, I asked for 6 threads, mainly because I know that I have 3 sections for each topic, and I have listed two topics in my whitelist. When we have handles for incoming streams, we can iterate over their contents, which are MessageAndMetadata objects. Metadata is simply the name of the topic and offset. As you have discovered, you can do this in one thread if you ask for 1 thread instead, in my example 6, but if you need parallel processing, a good way is to run an executor with one thread for each returned thread.
feldoh
source share