I have a simple class for using messages from the kafka server. Most of the codes are copied from the comments org.apache.kafka.clients.consumer.KafkaConsumer.java.
public class Demo { public static void main(String[] args) { Properties props = new Properties(); props.put("metadata.broker.list", "192.168.144.10:29092"); props.put("group.id", "test"); props.put("session.timeout.ms", "1000"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "10000"); KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<byte[], byte[]>(props); consumer.subscribe("voltdbexportAUDIT", "voltdbexportTEST"); boolean isRunning = true; while (isRunning) { Map<String, ConsumerRecords<byte[], byte[]>> records = consumer.poll(100); process(records); } consumer.close(); } private static Map<TopicPartition, Long> process(Map<String, ConsumerRecords<byte[], byte[]>> records) { Map<TopicPartition, Long> processedOffsets = new HashMap<>(); for (Map.Entry<String, ConsumerRecords<byte[], byte[]>> recordMetadata : records.entrySet()) { List<ConsumerRecord<byte[], byte[]>> recordsPerTopic = recordMetadata.getValue().records(); for (int i = 0; i < recordsPerTopic.size(); i++) { ConsumerRecord<byte[], byte[]> record = recordsPerTopic.get(i);
I am using 'org.apache.kafka: kafka-clients: 0.8.2.0'. he throws an exception
Exception in thread "main" org.apache.kafka.common.config.ConfigException: Missing required configuration "key.deserializer" which has no default value. at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:124) at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:48) at org.apache.kafka.clients.consumer.ConsumerConfig.<init>(ConsumerConfig.java:194) at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:430) at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:413) at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:400) at kafka.integration.Demo.main(Demo.java:26) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
How do I configure key.deserializer?
java apache-kafka
David
source share