Kafka Avro Consumer with decoder problems

When I tried to run Kafka Consumer with Avro on top of the data with the appropriate schema, it returns the error "AvroRuntimeException: garbled data. Length negative: -40". I see that others had similar problems converting the byte array to json , writing and reading Avro, and the Kafka Avro * binary encoder . I also referred to this example of a group of consumers who were all helpful, but have not yet helped with this error. It works up to this part of the code (line 73).

Decoder decoder = DecoderFactory.get (). BinaryDecoder (byteArrayInputStream, null);

I tried other decoders and printed the contents of the byteArrayInputStream variable, which looks like, in my opinion, what you expect serialized avro data to look like (in the message I see a diagram, some data and some distorted data). I printed Available bytes using the .available () method, which returns 594. I have problems understanding why this error occurs. Apache Nifi is used to create a Kafka stream with the same scheme from hdfs. I would be grateful for any help.

+7
java apache-kafka kafka-consumer-api avro apache-nifi
source share
2 answers

Perhaps the problem is the discrepancy between how Avro data is written (encoded) by Nifi compared to how your consumer application reads (decodes) the data.

In short, the Avro API provides two different approaches to serialization:

  • To create the right Avro files : for encoding data records, as well as for embedding the Avro scheme in the form of a preamble (via org.apache.avro.file.{DataFileWriter/DataFileReader} ). Attaching a schema to Avro files makes a lot of sense, because (a) usually the "payload" of Avro files is orders of magnitude larger than Avro's built-in schema, and (b) you can copy or move these files to your heart and still Be sure that you can read them again without consulting someone or something.
  • To encode only data records, i.e. do not embed a schema (via org.apache.avro.io.{BinaryEncoder/BinaryDecoder} , note the difference in the package name: io here and file above). This approach is often used when Avro-coding messages that are written in the Kafka topic, for example, because compared to option 1 above, you do not bear the overhead of re-implementing the Avro scheme in each message, assuming your (very reasonable) The policy is that for the same Kafka topic, messages are formatted / encoded using the same Avro scheme. This is a significant advantage because, in the context of stream data, data recording data in motion is typically much smaller (typically from 100 bytes to several hundred KB) than data from Avro data files at rest, as described above (often hundreds or thousands MB) therefore, the Avro schema size is relatively large, and therefore you do not want to embed 2000x when writing 2000 data records to Kafka. The disadvantage is that you have to "somehow" track how Avro schemes are mapped to Kafka themes, or rather, you must somehow track which Avro scheme a message has been encoded without going along the path of directly implementing the scheme . The good news is that there is a tool for transparency.

    I am not very familiar with Apache Nifi, but a quick look at the source code (e.g.) tells me that this uses option 1, i.e. Introduces Avro Scheme along with Avro Records. However, your consumer code uses DecoderFactory.get().binaryDecoder() and therefore option 2 (without an inline schema).

    Perhaps this explains the error you encountered?

+16
source share

Could you provide a completely modified byteArrayToDatum method? I have the same problem, but I can’t figure out what to change except two lines to use instead of DataFileReader.

0
source share

All Articles