Simple Kafka consumption example not working

I have a simple class for using messages from the kafka server. Most of the codes are copied from the comments org.apache.kafka.clients.consumer.KafkaConsumer.java.

public class Demo { public static void main(String[] args) { Properties props = new Properties(); props.put("metadata.broker.list", "192.168.144.10:29092"); props.put("group.id", "test"); props.put("session.timeout.ms", "1000"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "10000"); KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<byte[], byte[]>(props); consumer.subscribe("voltdbexportAUDIT", "voltdbexportTEST"); boolean isRunning = true; while (isRunning) { Map<String, ConsumerRecords<byte[], byte[]>> records = consumer.poll(100); process(records); } consumer.close(); } private static Map<TopicPartition, Long> process(Map<String, ConsumerRecords<byte[], byte[]>> records) { Map<TopicPartition, Long> processedOffsets = new HashMap<>(); for (Map.Entry<String, ConsumerRecords<byte[], byte[]>> recordMetadata : records.entrySet()) { List<ConsumerRecord<byte[], byte[]>> recordsPerTopic = recordMetadata.getValue().records(); for (int i = 0; i < recordsPerTopic.size(); i++) { ConsumerRecord<byte[], byte[]> record = recordsPerTopic.get(i); // process record try { processedOffsets.put(record.topicAndPartition(), record.offset()); } catch (Exception e) { e.printStackTrace(); } } } return processedOffsets; } } 

I am using 'org.apache.kafka: kafka-clients: 0.8.2.0'. he throws an exception

 Exception in thread "main" org.apache.kafka.common.config.ConfigException: Missing required configuration "key.deserializer" which has no default value. at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:124) at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:48) at org.apache.kafka.clients.consumer.ConsumerConfig.<init>(ConsumerConfig.java:194) at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:430) at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:413) at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:400) at kafka.integration.Demo.main(Demo.java:26) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140) 

How do I configure key.deserializer?

+8
java apache-kafka
source share
3 answers

This works without having to use your own serializers.

 Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); props.put("partition.assignment.strategy", "range"); 
+10
source share

You are dealing with a byte array for the key and value parameter. Thus, a byte serializer and deserializer are needed.

you can add properties

For deserialization

 props.put("key.deserializer","org.apache.kafka.common.serialization.ByteArrayDeserializer"); 

For serialize

 props.put("value.deserializer","org.apache.kafka.common.serialization.ByteArraySerializer"); 
+2
source share

You need to set the properties:

 props.put("serializer.class","my.own.serializer.StringSupport"); props.put("key.serializer.class","my.own.serializer.LongSupport"); 

in your main method to pass them to the manufacturer constructor. Of course, you will need to specify the correct encoders. The serializer class converts the message into an byte array, and the key.serializer class turns the key object into an byte array. As a rule, you also have the opportunity to change the process.

+1
source share

All Articles