What exactly does .select () do?

When using .select() I came across unexpected behavior:

 >>> my_df.show() +---+---+---+ | a| b| c| +---+---+---+ | 1| 3| 5| | 2| 4| 6| +---+---+---+ >>> a_c = s_df.select(col("a"), col("c")) # removing column b >>> a_c.show() +---+---+ | a| c| +---+---+ | 1| 5| | 2| 6| +---+---+ >>> a_c.filter(col("b") == 3).show() # I can still filter on "b"! +---+---+ | a| c| +---+---+ | 1| 5| +---+---+ 

This behavior surprised me ... Are my following points correct?

DataFrames are just representations, a simple DataFrame is an idea of ​​oneself. In my case, a_c is just a look at my_df .

When I created a_c , no new data was created, a_c just points to the same data as the my_df pointer.

If you have more information, please add!

+7
apache-spark pyspark
source share
2 answers

This is due to the lazy nature of Spark. It is smart enough to push the filter so that it occurs at a lower level - in front of the filter *. Thus, since this all happens within the same stage of execution and can be resolved. In fact, you can see this in explain :

 == Physical Plan == *Project [a#0, c#2] +- *Filter (b#1 = 3) <---Filter before Project +- LocalTableScan [A#0, B#1, C#2] 

You can force shuffle a new stage, and then see how your filter goes down. Even catch it at compile time. Here is an example:

 a_c.groupBy("a","c").count.filter(col("b") === 3) 

* There is also projection cropping, which pushes the selection down to the layers of the database if it understands that a column is not needed at any point. However, I believe that the filter will cause it to β€œneed” it, rather than prune it ... but I have not tested this.

+6
source share

Let's start with some of the foundations for the underlying spark. This will simplify your understanding. RDD At the core of the spark core is a data structure called RDD, which is lazily evaluated. By lazy evaluation, we understand that the calculation of RDD occurs when an action (for example, calling a counter in RDD or displaying in a data set).

A dataset or Dataframe (in which Dataset [Row]) also uses RDD in the kernel.

This means that each transformation (for example, a filter) will be implemented only when the action is started (shown).

So your question is "When I created a_c, no new data was created, a_c just points to the same data as my_df."
Since there is no data that has been implemented. We must realize this in order to bring it to mind. Your filter runs on the original frame. The only way to force a_c.filter(col("b") == 3).show() is to cache your intermediate framework using dataframe.cache. So the spark will throw out the "main" org.apache.spark.sql.AnalysisException: cannot resolve the column name for example.

 val a_c = s_df.select(col("a"), col("c")).cache a_c.filter(col("b") == 3).show() 

Thus, the spark will throw a "main" org.apache.spark.sql.AnalysisException: Can not resolve the column name.

+1
source share

All Articles