Why is Kafka Direct Stream creating a new decoder for each message?

I have a Spark Streaming application written in Java and using Spark 2.1. I use KafkaUtils.createDirectStreamKafka to read messages. I am using kryo encoder / decoder for kafka messages. I indicated this in the properties Kafka -> key.deserializer, value.deserializer, key.serializer, value.deserializer

When Spark pulls messages in the micropacket, the messages are successfully decoded using the kryo decoder. However, I noticed that the Spark executor creates a new instance of the kryo decoder to decode every message read from kafka. I checked this by placing the logs inside the decoder constructor.

It seems strange to me. Should the same decoder instance be used for each message and each batch?

Code where I read from kafka:

JavaInputDStream<ConsumerRecord<String, Class1>> consumerRecords = KafkaUtils.createDirectStream(
        jssc,
        LocationStrategies.PreferConsistent(),
        ConsumerStrategies.<String, Class1>Subscribe(topics, kafkaParams));

JavaPairDStream<String, Class1> converted = consumerRecords.mapToPair(consRecord -> {
    return new Tuple2<String, Class1>(consRecord.key(), consRecord.value());
});
+6
1

, Spark Kafka , KafkaRDD.compute, , RDD, , , , , RDD:

override def compute(thePart: Partition, context: TaskContext): Iterator[R] = {
  val part = thePart.asInstanceOf[KafkaRDDPartition]
  assert(part.fromOffset <= part.untilOffset, errBeginAfterEnd(part))
  if (part.fromOffset == part.untilOffset) {
    logInfo(s"Beginning offset ${part.fromOffset} is the same as ending offset " +
    s"skipping ${part.topic} ${part.partition}")
    Iterator.empty
  } else {
    new KafkaRDDIterator(part, context)
  }
}

, else, KafkaRDDIterator. :

val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
  .newInstance(kc.config.props)
  .asInstanceOf[Decoder[K]]

val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties])
  .newInstance(kc.config.props)
  .asInstanceOf[Decoder[V]]

, , . , , Kafka.

? . , , Spark.

, , . .

+3

All Articles