Spark: how to get the number of lines written?

I am wondering if there is a way to find out the number of lines written by the Spark save operation. I know that this is enough to make an account on RDD before writing it, but I would like to know if there is a way to get the same information without doing it.

Thanks Marco

+5
source share
3 answers

If you really want to, you can add a custom listener and extract the number of lines written from outputMetrics . A very simple example might look like this:

 import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} var recordsWrittenCount = 0L sc.addSparkListener(new SparkListener() { override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { synchronized { recordsWrittenCount += taskEnd.taskMetrics.outputMetrics.recordsWritten } } }) sc.parallelize(1 to 10, 2).saveAsTextFile("/tmp/foobar") recordsWrittenCount // Long = 10 

but this part of the API is for internal use.

+6
source

The accepted answer more closely matches the specific needs of the OP (as indicated in various comments), however, this answer will correspond to most.

The most effective approach is to use Battery: http://spark.apache.org/docs/latest/programming-guide.html#accumulators

 val accum = sc.accumulator(0L) data.map { x => accum += 1 x } .saveAsTextFile(path) val count = accum.value 

Then you can wrap this in a useful pimp:

 implicit class PimpedStringRDD(rdd: RDD[String]) { def saveAsTextFileAndCount(p: String): Long = { val accum = rdd.sparkContext.accumulator(0L) rdd.map { x => accum += 1 x } .saveAsTextFile(p) accum.value } } 

So you can do

 val count = data.saveAsTextFileAndCount(path) 
+5
source

If you look

 taskEnd.taskInfo.accumulables 

You will see that it is linked to the next AccumulableInfo in the ListBuffer in sequential order.

 AccumulableInfo(1,Some(internal.metrics.executorDeserializeTime),Some(33),Some(33),true,true,None), AccumulableInfo(2,Some(internal.metrics.executorDeserializeCpuTime),Some(32067956),Some(32067956),true,true,None), AccumulableInfo(3,Some(internal.metrics.executorRunTime),Some(325),Some(325),true,true,None), AccumulableInfo(4,Some(internal.metrics.executorCpuTime),Some(320581946),Some(320581946),true,true,None), AccumulableInfo(5,Some(internal.metrics.resultSize),Some(1459),Some(1459),true,true,None), AccumulableInfo(7,Some(internal.metrics.resultSerializationTime),Some(1),Some(1),true,true,None), AccumulableInfo(0,Some(number of output rows),Some(3),Some(3),true,true,Some(sql) 

You can clearly see that the number of output lines is at the 7th position of the listBuffer, so the correct way to get the number of entries written to the lines is

 taskEnd.taskInfo.accumulables(6).value.get 

We can get the lines written as follows (I just changed @ zero323's answer)

 import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} var recordsWrittenCount = 0L sc.addSparkListener(new SparkListener() { override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { synchronized { recordsWrittenCount += taskEnd.taskInfo.accumulables(6).value.get.asInstanceOf[Long] } } }) sc.parallelize(1 to 10, 2).saveAsTextFile("/tmp/foobar") recordsWrittenCount 
0
source

All Articles