Set spark.streaming.kafka.maxRatePerPartition for createDirectStream

I need to increase the input speed for each section for my application, and I use .set("spark.streaming.kafka.maxRatePerPartition",100) for configuration. The flow duration is 10 s, so I expect a message of 5*100*10=5000 for this batch. However, the input speed I received is about 500. Can you suggest any changes to increase this speed?

+5
source share
2 answers

The duration of the stream is 10 s, so I expect the process to be 5 * 100 * 10 = 5000 messages for this batch.

This is not what setting means. This means that “how many elements in each section can have per package ”, and not per second. I assume you have 5 partitions, so you get 5 * 100 = 500. If you want 5000, set maxRatePerPartition 1000.

From "Exactly Once Spark Streaming by Apache Kafka" (written by Cody, author of the Direct Stream approach, emphasis mine)

To limit the speed, you can use the Spark spark.streaming.kafka.maxRatePerPartition configuration variable to set the maximum number of messages for each section per packet .

Edit:

After the comment by @avrs I looked at the code that determines the maximum speed . As it turned out, heuristics are a bit more complicated than indicated on the blog and in the documents.

There are two branches. If backpressure is enabled along with maxRate, then maxRate is the minimum between the current back pressure frequency calculated by the RateEstimator and the maxRate set by the user. If it is not enabled, it takes the value maxRate, as indicated.

Now, after choosing the speed, it is always multiplied by the full seconds of the batch, effectively doing this speed per second:

 if (effectiveRateLimitPerPartition.values.sum > 0) { val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 1000 Some(effectiveRateLimitPerPartition.map { case (tp, limit) => tp -> (secsPerBatch * limit).toLong }) } else { None } 
+9
source

The property selects N messages from the section per second. If I have a section M and the packet interval is B, then the total number of messages that I see in the packet is N * M * B.

There are a few things you should check.

  1. Is your input speed> 500 for 10 seconds.
  2. Is the kafka theme properly divided.
0
source

All Articles