Kafka Java SimpleConsumer weird encoding

I am trying to use SimpleConsumer in Kafka 9 so that users can play events from a time offset. But the messages that I receive from Kafka are in a very strange encoding:

7icf-test-testEvent.ebebf1a4.2911.431d.a138.f5d6db4647d7\ W>8      {"namespace":"test","type":"testEvent.ebebf1a4.2911.431d.a138.f5d6db4647d7","received":1464819330373,"context":{"userid":0,"username":"testUser"}} !} a     {"namespace":"test","type":"testEvent.ebebf1a4.2911.431d.a138.f5d6db4647d7","received":1464819331637,"context":{"userid":1,"username":"testUser"}}   r     {"namespace":"test","type":"testEvent.ebebf1a4.2911.431d.a138.f5d6db4647d7","received":1464819332754,"context":{"userid":2,"username":"testUser"}}        {"namespace":"test","type":"testEvent.ebebf1a4.2911.431d.a138.f5d6db4647d7","received":1464819333868,"context":{"userid":3,"username":"testUser"}} p=       {"namespace":"test","type":"testEvent.ebebf1a4.2911.431d.a138.f5d6db4647d7","received":1464819334997,"context":{"userid":4,"username" 

Using KafkaConsumer, these messages are handled just fine. Here is the code I use to retrieve messages using SimpleConsumer:

  for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(topic, partition)) { long currentOffset = messageAndOffset.offset(); if (currentOffset < readOffset) { log.debug("Found an old offset - skip"); continue; } readOffset = messageAndOffset.nextOffset(); int payloadOffset = 14 + messageAndOffset.message().keySize(); // remove first x bytes, schema Id byte[] data = messageAndOffset.message().payload().array(); byte[] realData = Arrays.copyOfRange(data, payloadOffset, data.length - payloadOffset); log.debug("Read " + new String(realData, "UTF-8")); } 

I added code to skip the first x bytes after I continued to receive UTF-32 errors regarding too large bytes, which I assume because Kafka adds information such as message size to the payload. Is this an avro artifact?

+8
java apache-kafka kafka-consumer-api
source share
3 answers

I never found a good answer for this, but I switched to SimpleConsumer to ask Kafka for the offsets that I need (per section ... although the implementation is bad), and then use your own KafkaConsumer with seek(TopicPartition, offset) or seekToBeginning(TopicPartition) to receive messages . I hope they add to their own client the ability to receive messages from a given timestamp in the next version.

0
source share

Are you looking for this?

 readOffset = messageAndOffset.nextOffset(); ByteBuffer payload = messageAndOffset.message().payload(); if(payload == null) { System.err.println("Message is null : " + readOffset); continue; } final byte[] realData = new byte[payload.limit()]; payload.get(realData); System.out.println("Read " + new String(realData, "UTF-8")); 
0
source share

You can periodically register the offset section that you commit with the timestamp of the message (perhaps not every commit), and then in the future you can change your consumer biases to some extent. I guess this is for production debugging.

I doubt that they will add such a feature, it seems unsatisfactory, given how Kafka works, although I can be wrong, there is always a brilliant broadcast. I would make a record.

0
source share

All Articles