How to print a battery variable from inside a task (it seems to “work” without a method of calling a value)?

I know that battery variables are “written only” in terms of tasks when they run in work nodes. I did some tests on this, and I realized that I can print the battery value in the task.

Here I initialize the battery in the driver: -

scala> val accum = sc.accumulator(123) accum: org.apache.spark.Accumulator[Int] = 123 

Then I continue to define the function 'foo': -

 scala> def foo(pair:(String,String)) = { println(accum); pair } foo: (pair: (String, String))(String, String) 

In this function, I just print the battery and then return the same pair that was received.

Now I have an RDD called myrdd with the following type: -

 scala> myrdd res13: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[9] at map at <console>:21 

And now I call the map conversion on this RDD: -

 myrdd.map(foo).collect 

The action "collect" is used to assess the strength. So what actually happens here is that during this execution, zero (0) is printed for each RDD line. Since this RDD has 4 elements, it prints 0 4 times. Since the action "collect" is, it also prints all the elements at the end, but in fact this is not the focus. Therefore, I have two questions: -

  • Logically, printing is equivalent to reading, because only when you can read can you print. So why is this allowed? Why did the exception not choose something that could happen if we try to “return” the battery to the function)?
  • Why does it print 0 as the battery value when we initiated it as 123 in the driver?

After some experiment, I found that if I change the definition of a function to access the actual value of the property of the battery object (accum.value), and then run the RDD action as described above, it does throw an exception: -

 scala> def foo(pair:(String,String)) = { println(accum.value); pair } 

Exception thrown when evaluating RDD: -

Unable to read battery value in task

So, what I did earlier is trying to print the battery object itself. But the question remains, why did he print 0? Since at the driver level, if I issue the same command that I used in the function definition, I really get the value 123: -

 scala> println(accum) 123 

I did not need to tell println (accum.value) to make it work. So why, when I issue this command in the function that the task uses, it prints 0?

+7
scala apache-spark rdd
source share
1 answer

Why does it print 0 as the battery value when we initiated it as 123 in the driver?

Because work nodes will never see the initial value. The only thing passed to the workers is zero , as defined in AccumulatorParam . For Accumulator[Int] it's just 0. If you update the battery first, you will see the updated local value:

 val acc = sc.accumulator(123) val rdd = sc.parallelize(List(1, 2, 3)) rdd.foreach(i => {acc += i; println(acc)}) 

Even simpler if you use one section:

 rdd.repartition(1).foreach(i => {acc += i; println(acc)} 

Why was no exception thrown (...)?

Because an exception occurs when accessing value and toString does not use it at all. Instead, a private value_ variable is used, the same one that returns value if the !deserialized passed.

+7
source share

All Articles