Does Spark streamline chain conversions?

It is often easier to express a complex cartographic operation as a sequence of related map tasks in code, rather than as one large operation. I know that the Scark DAG Scheduler performs the optimization, but will it also optimize the chain of operations in this way?

Here is a far-fetched example when a list of individual dates is deduced from the CSV field:

csv.map(row => row.split(",")) .map(row => row(6)) // extract the proper field .map(date_field => DateTime.parse(date_field).withTimeAtStartOfDay()) .distinct() 

Would this example be more efficient as a single map operation followed by distinct() ?

+8
scala apache-spark
source share
2 answers

Guess I'll just get my comment back, because no one else decided to answer. This is basically one of the highlights of creating the lazy DAG architecture. Since nothing will be done until the final DAG appears, optimizations such as combining operations that do not require shuffling are relatively trivial (I will see if I can find the actual code). Let's say you have many cards in a row, the spark knows that it can discard the results of the previous card, if you do not cache, caching does not allow you to recalculate the RDD if you use it more than once. Thus, consolidating up to 1 map function will be nothing more than micro-optimization and probably will have no effect if you think that many MR style tasks are related to IO.

UPDATE: looking through the list of spark users, it seems that Stage can have several tasks, in particular tasks that can be connected together, like cards, can be placed in one stage.

+1
source share

Short answer: yes, but only for linear relationship.

Long answer: comparison of the Spark SQL / DataFrame query optimizer, almost nonexistent.

The Spark core API does not rewrite the DAG implementation plan, even its clearly advantageous. here is an example:

including DAG:

 A > B > D > C > 

Where D is collected and A is not saved (saving is an expensive operation, plus if you don’t know if D will be collected, you cannot decide when it should not be). Ideally, the optimizer should convert this DAG to a linear and much cheaper A> Tuple2 (B, C)> D. So let's test it:

 val acc = sc.accumulator(0) val O = sc.parallelize(1 to 100) val A = O.map{ v => acc += 1 v * 2 } val B = A.map(_*2) val C = A.map(_*3) val D = B.zip(C).map(v => v._1 + v._2) D.collect() assert(acc.value == 100) 

Result?

 200 did not equal 100 

It is understood that a non-optimized DAG is running.

In addition, such a function (or something close, for example, an optimizer for joining connections / shuffling connections) has never been proposed. Probably because most Spark developers prefer more direct control over execution, or such optimization has a very limited effect compared to what the SQL query optimizer can do.

+1
source share

All Articles