I am trying to work with kafka API in java. I am using the following maven dependency:
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.8.2.0</version> </dependency>
I am having trouble connecting to a remote kafka server. I changed the port attribute of kafka file 'server.properties' to port 8080. I cannot start both zookeeper and kafka server. I can also use console production and consumer applications that come with kafka download. (Scala version 2.10)
I use the following client code to create a remote KafkaProducer
Properties propsProducer = new Properties(); propsProducer.put("bootstrap.servers", "172.xx.xx.xxx:8080"); propsProducer.put("key.serializer", org.apache.kafka.common.serialization.ByteArraySerializer.class); propsProducer.put("value.serializer", org.apache.kafka.common.serialization.ByteArraySerializer.class); propsProducer.put("topic.metadata.refresh.interval.ms", "0"); KafkaProducer<byte[], byte[]> m_kafkaProducer = new KafkaProducer<byte[], byte[]>(propsProducer);
Once I created the producer, I can run the next line and get the correct topic information provided by strTopic - this is the existing name of the topic.
List<PartitionInfo> partitionInfo = m_kafkaProducer.partitionsFor(strTopic);
When I try to send a message, I do the following:
ProducerRecord<byte[], byte[]> prMessage = new ProducerRecord<byte[],byte[]>(strTopic, strMessage.getBytes()); RecordMetadata futureData = m_kafkaProducer.send(prMessage).get();
The send () call blocks an indefinite time and when I manually terminate the process, I see that ERROR Closing failed due to an error on the kafka server (IOException, Connection Reset by Peer error).
In addition, it does not cost anything that the properties of host.name, advertised.host.name and advertized.port are still commented out in the server.properties file. Oh and if I change the line:
propsProducer.put("bootstrap.servers", "172.xx.xx.xxx:8080");
to
propsProducer.put("bootstrap.servers", "127.0.0.1:8080");
and run it on the same server where the kafka server is installed, it works, but I'm trying to work with it remotely.
Appreciate any help, and if I can clarify, let me know.