Error reading field 'topic_metadata' in Kafka

I am trying to connect to my aws broker with auto.create.topics.enable = true in server.properties file. But when I try to connect to the broker using the Java client, I get the following error .

1197 [kafka-manufacturer-network-thread | producer-1] ERROR org.apache.kafka.clients.producer.internals.Sender. Fault in kafka I / O wiring: org.apache.kafka.common.protocol.types.SchemaException: Read error field 'topic_metadata': error reading array of size 619631, only 37 bytes available in org.apache.kafka.common.protocol .types.Schema.read (Schema.java:73) in org.apache.kafka.clients.NetworkClient.parseResponse (NetworkClient.java{80) in org.apache.kafka.clients.NetworkClient.handleCompletedReceives (NetworkClient.java:449 ) in org.apache.kafka.clients.NetworkClient.poll (NetworkClient.java:269) in org.apache.kafka.clients.producer.internals.Sender.run (Sender.java:229) in org.apache.kafka. clients.producer.internals.Sender.run (Sender.java:134) in java.lang.Thread.run (Unknown source)

Below is the code of my client.

 public static void main(String[] argv){ Properties props = new Properties(); props.put("bootstrap.servers", "http://XX.XX.XX.XX:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 0); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("block.on.buffer.full",true); Producer<String, String> producer = new KafkaProducer<String, String>(props); try{ for(int i = 0; i < 10; i++) { producer.send(new ProducerRecord<String, String>("topicjava", Integer.toString(i), Integer.toString(i))); System.out.println("Tried sending:"+i);} } catch (Exception e){ e.printStackTrace(); } producer.close(); } 

Can someone help me resolve this?

+6
source share
4 answers

I ran into a similar problem. The problem here is that when there is a mismatch between the kafka client versions in the pom file and the kafka server is different. I used kafka clients 0.10.0.0_1, but the kafka server was still at 0.9.0.0. Therefore, I upgraded the kafka server version to 10, and the problem was resolved.

 <dependency> <groupId>org.apache.servicemix.bundles</groupId> <artifactId>org.apache.servicemix.bundles.kafka-clients</artifactId> <version>0.10.0.0_1</version> </dependency> 
+2
source

It looks like I was setting the wrong properties on the client side, also my server.properties file had properties that were not intended for the client I used. Therefore, I decided to change the java client to version 0.9.0 using maven.

 <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>0.9.0.0</version> </dependency> 

The server.properties file is shown below.

 broker.id=0 port=9092 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/tmp/kafka-logs num.partitions=1 num.recovery.threads.per.data.dir=1 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 log.cleaner.enable=false zookeeper.connect=localhost:2181 zookeeper.connection.timeout.ms=9000 delete.topic.enable=true advertised.host.name=<aws public Ip> advertised.port=9092 

My manufacturer code looks like

  import java.util.Properties; import java.util.concurrent.ExecutionException; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; public class HelloKafkaProducer { public static void main(String args[]) throws InterruptedException, ExecutionException { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"IP:9092"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName()); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName()); KafkaProducer<String,String> producer = new KafkaProducer<String,String>(props); boolean sync = false; String topic="loader1"; String key = "mykey"; for(int i=0;i<1000;i++) { String value = "myvaluehasbeensent"+i+i; ProducerRecord<String,String> producerRecord = new ProducerRecord<String,String>(topic, key, value); if (sync) { producer.send(producerRecord).get(); } else { producer.send(producerRecord); } } producer.close(); } } 
+1
source

Make sure you are using the correct version . Suppose you use the following maven dependecy:

 <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.8_2.10</artifactId> <version>${flink.version}</version> </dependency> 

So, the artifact is: flink-connector-kafka-0.8_2.10

Now check if you are using the correct version of Kafka:

 cd /KAFKA_HOME/libs 

Now find kafka_YOUR-VERSION-sources.jar.

In my case, I have kafka_2.10-0.8.2.1-sources.jar . So it works great! :) If you are using different versions, just change the maven dependecies or download the correct version of kafka.

+1
source

I solved this problem by editing

 /etc/hosts file 

Check the hosts file if there are no zookeeper or other ip brokers in this file.

-one
source

All Articles