Kafka is a fast, scalable, distributed, partitioned, and replicated commit log service. Thus, there is no priority in the subject or message.
I also had the same problem as you. The solution is very simple. Create threads in the queue kafka, Let say:
1) high_priority_queue
2) medium_priority_queue
3) low_priority_queue
Post a high priority message to a high_priority_queue message and a medium priority message to medium_priority_queue.
Now you can create a kafka user and open a thread for the entire topic.
val props = new Properties()
props.put("group.id", groupId)
props.put("zookeeper.connect", zookeeperConnect)
val config = new ConsumerConfig(props)
val connector = Consumer.create(config)
val topicWithStreamCount = Map(
"high_priority_queue" -> 1,"medium_priority_queue" -> 1,"low_priority_queue" -> 1)
val streamsMap = connector.createMessageStreams(topicWithStreamCount)
//this is scala code
You get a stream of each topic. Now you can read the high_priority topic first, if there is no message in the topic, then cancel the medium_priority_queue topic. if medium_priority_queue is empty then read the low_priority queue.
This trick works great for me. May be useful for you!