I am trying to use SimpleConsumer in Kafka 9 so that users can play events from a time offset. But the messages that I receive from Kafka are in a very strange encoding:
7icf-test-testEvent.ebebf1a4.2911.431d.a138.f5d6db4647d7\ W>8 {"namespace":"test","type":"testEvent.ebebf1a4.2911.431d.a138.f5d6db4647d7","received":1464819330373,"context":{"userid":0,"username":"testUser"}} !} a {"namespace":"test","type":"testEvent.ebebf1a4.2911.431d.a138.f5d6db4647d7","received":1464819331637,"context":{"userid":1,"username":"testUser"}} r {"namespace":"test","type":"testEvent.ebebf1a4.2911.431d.a138.f5d6db4647d7","received":1464819332754,"context":{"userid":2,"username":"testUser"}} {"namespace":"test","type":"testEvent.ebebf1a4.2911.431d.a138.f5d6db4647d7","received":1464819333868,"context":{"userid":3,"username":"testUser"}} p= {"namespace":"test","type":"testEvent.ebebf1a4.2911.431d.a138.f5d6db4647d7","received":1464819334997,"context":{"userid":4,"username"
Using KafkaConsumer, these messages are handled just fine. Here is the code I use to retrieve messages using SimpleConsumer:
for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(topic, partition)) { long currentOffset = messageAndOffset.offset(); if (currentOffset < readOffset) { log.debug("Found an old offset - skip"); continue; } readOffset = messageAndOffset.nextOffset(); int payloadOffset = 14 + messageAndOffset.message().keySize();
I added code to skip the first x bytes after I continued to receive UTF-32 errors regarding too large bytes, which I assume because Kafka adds information such as message size to the payload. Is this an avro artifact?
java apache-kafka kafka-consumer-api
Gandalf
source share