I am trying to get a simple Kafka user to work using the Java API v0.9.0.1. The kafka server that I use is a docker container, as well as version 0.9.0.1. The following is the consumer code:
public class Consumer { public static void main(String[] args) throws IOException { KafkaConsumer<String, String> consumer; try (InputStream props = Resources.getResource("consumer.props").openStream()) { Properties properties = new Properties(); properties.load(props); consumer = new KafkaConsumer<>(properties); } consumer.subscribe(Arrays.asList("messages")); try { while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) System.out.println("Message received: " + record.value()); } }catch(WakeupException ex){ System.out.println("Exception caught " + ex.getMessage()); }finally{ consumer.close(); System.out.println("After closing KafkaConsumer"); } } }
However, when the consumer starts, it calls the polling method (100) above and never returns. Debugging, it looks like it got stuck by running the following method in org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient forever:
public void awaitMetadataUpdate() { int version = this.metadata.requestUpdate(); do { this.poll(9223372036854775807L); } while(this.metadata.version() == version); }
(both versions and this.metadata.version () always seem == 2). Also, although it does not cause errors, messages from my Java manufacturer have never been seen to get into the queue. I checked that with the kafka command line tools I can send and receive messages from the queue.
Does anyone know what is going on here?
source share