How can I create messages with the Kafka 8.2 API in Java?

I am trying to work with kafka API in java. I am using the following maven dependency:

<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.8.2.0</version> </dependency> 

I am having trouble connecting to a remote kafka server. I changed the port attribute of kafka file 'server.properties' to port 8080. I cannot start both zookeeper and kafka server. I can also use console production and consumer applications that come with kafka download. (Scala version 2.10)

I use the following client code to create a remote KafkaProducer

 Properties propsProducer = new Properties(); propsProducer.put("bootstrap.servers", "172.xx.xx.xxx:8080"); propsProducer.put("key.serializer", org.apache.kafka.common.serialization.ByteArraySerializer.class); propsProducer.put("value.serializer", org.apache.kafka.common.serialization.ByteArraySerializer.class); propsProducer.put("topic.metadata.refresh.interval.ms", "0"); KafkaProducer<byte[], byte[]> m_kafkaProducer = new KafkaProducer<byte[], byte[]>(propsProducer); 

Once I created the producer, I can run the next line and get the correct topic information provided by strTopic - this is the existing name of the topic.

 List<PartitionInfo> partitionInfo = m_kafkaProducer.partitionsFor(strTopic); 

When I try to send a message, I do the following:

 ProducerRecord<byte[], byte[]> prMessage = new ProducerRecord<byte[],byte[]>(strTopic, strMessage.getBytes()); RecordMetadata futureData = m_kafkaProducer.send(prMessage).get(); 

The send () call blocks an indefinite time and when I manually terminate the process, I see that ERROR Closing failed due to an error on the kafka server (IOException, Connection Reset by Peer error).

In addition, it does not cost anything that the properties of host.name, advertised.host.name and advertized.port are still commented out in the server.properties file. Oh and if I change the line:

 propsProducer.put("bootstrap.servers", "172.xx.xx.xxx:8080"); 

to

 propsProducer.put("bootstrap.servers", "127.0.0.1:8080"); 

and run it on the same server where the kafka server is installed, it works, but I'm trying to work with it remotely.

Appreciate any help, and if I can clarify, let me know.

+5
source share
1 answer

After a long search, I decided to implement the example given here: Kafka manufacturer example . I shortened the code and did not implement the separator class. I updated my pom with the indicated dependency and I still had the same problem. Ultimately, I made some configuration changes and it worked.

The last part of the puzzle defined the Kafka server in / etc / hosts of both the server and client machines. I added the following to both files.

 172.xx.xx.xxx serverHost1 

Again, x are just masks. Then, I set advertized.host.name in the server.properties serverHost1 file. NOTE. I got this IP address after running ifconfig on the server machine.

I changed the line

 propsProducer.put("metadata.broker.list", "172.xx.xx.xxx:8080"); 

to

 propsProducer.put("metadata.broker.list", "serverHost1:8080"); 

The Kafka API did not like that I defined IP as a string. Instead, he looked at the IP address from the etc / hosts file, although the documentation says:

"The host name that the broker will advertise to producers and consumers. If it is not installed, it will use the value for" host.name "if configured. Otherwise, it will use the value returned from java.net.InetAddress.getCanonicalHostName () . "

Which will only return IP, in string form, I previously used if it is not defined in etc / hosts of the client computer, otherwise it returns the name paired with IP (serverHost1 in my case). Also, I never set the value of host.name.

+3
source

Source: https://habr.com/ru/post/1216481/


All Articles