Error reading field 'topic_metadata': error reading array of size 1139567, only 45 bytes available

- consumer

Properties props = new Properties(); String groupId = "consumer-tutorial-group"; List<String> topics = Arrays.asList("consumer-tutorial"); props.put("bootstrap.servers", "192.168.1.75:9092"); props.put("group.id", groupId); props.put("enable.auto.commit", "true"); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName()); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); try { consumer.subscribe(topics); while (true) { ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE); for (ConsumerRecord<String, String> record : records) System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value()); } } catch (Exception e) { System.out.println(e.toString()); } finally { consumer.close(); } } 

I am trying to write the code run above, its simple consumer code that is trying to read from a topic, but I got a weird exception, and I can't handle it.

 org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'topic_metadata': Error reading array of size 1139567, only 45 bytes available 

I quote also my producer code

- Producer

 Properties props = new Properties(); props.put("bootstrap.servers", "192.168.1.7:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<String, String>(props); for(int i = 0; i < 100; i++) producer.send(new ProducerRecord<String, String>("consumer-tutorial", Integer.toString(i), Integer.toString(i))); producer.close(); 

Here are the kafka configs

- run zookeeper

bin / zookeeper-server-start.sh config / zookeeper.properties

- Start the Kafka server

bin / kafka-server-start.sh config / server.properties

- Create topic

bin / kafka-topics.sh --create --topic consumer-tutorial - replication factor 1 - sections 3 --zookeeper 192.168.1.75:2181

- Kafka 0.10.0

 <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.10.0.0</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>0.10.0.0</version> </dependency> 
+8
apache-kafka kafka-consumer-api
source share
3 answers

I solved the downgrade problem to kafka 0.9.0, but this is still not an effective solution for me. If someone knows an effective way to fix this in kafka version 0.10.0, feel free to post it. Until then, this is my decision

 <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.9.0.0</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>0.9.0.0</version> </dependency> 
0
source share

I also have the same problem when using the kafka_2.11 artifact with version 0.10.0.0. But this was solved as soon as I changed the kafka server to 0.10.0.0. I used to point to 0.9.0.1. It looks like the server and your pom version should be in sync.

+8
source share

I have the same problem. Compatibility issue with jarClient since I am using Kafka server 9.0.0 and Kafka client 10.0.0. In general, Kafka 0.10.0 introduced a new message format and was unable to read topic metadata from an older version.

 <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>1.0.0.RELEASE</version> <!-- changed due lower version of the kafka server --> </dependency> 
0
source share

All Articles