Spark: Difference between collect (), take () and show () outputs after converting to DF

I am using Spark 1.5.

I have a column of 30 identifiers that I load as integers from the database:

 val numsRDD = sqlContext .table(constants.SOURCE_DB + "." + IDS) .select("id") .distinct .map(row=>row.getInt(0)) 

This is the result of numsRDD :

 numsRDD.collect.foreach(println(_)) 643761 30673603 30736590 30773400 30832624 31104189 31598495 31723487 32776244 32801792 32879386 32981901 33469224 34213505 34709608 37136455 37260344 37471301 37573190 37578690 37582274 37600896 37608984 37616677 37618105 37644500 37647770 37648497 37720353 37741608 

Next time I want to create all the combinations of 3 for these ids , then save each combination as a tuple of the form: < tripletID: String, triplet: Array(Int)> and convert it to a data framework, which I do as follows:

 // |combinationsDF| = 4060 combinations val combinationsDF = sc .parallelize(numsRDD .collect .combinations(3) .toArray .map(row => row.sorted) .map(row => ( List(row(0), row(1), row(2)).mkString(","), List(row(0), row(1), row(2)).toArray))) .toDF("tripletID","triplet") 

As soon as I do this, I will try to print some of the contents of combinationsDF to make sure everything is as it should be. So I try this:

 combinationsDF.show 

which returns:

 +--------------------+--------------------+ | tripletID| triplet| +--------------------+--------------------+ |,37136455,3758227...|[32776244, 371364...| |,37136455,3761667...|[32776244, 371364...| |,32776244,3713645...|[31723487, 327762...| |,37136455,3757869...|[32776244, 371364...| |,32776244,3713645...|[31598495, 327762...| |,37136455,3760089...|[32776244, 371364...| |,37136455,3764849...|[32776244, 371364...| |,37136455,3764450...|[32776244, 371364...| |,37136455,3747130...|[32776244, 371364...| |,32981901,3713645...|[32776244, 329819...| |,37136455,3761810...|[32776244, 371364...| |,34213505,3713645...|[32776244, 342135...| |,37136455,3726034...|[32776244, 371364...| |,37136455,3772035...|[32776244, 371364...| |2776244,37136455...|[643761, 32776244...| |,37136455,3764777...|[32776244, 371364...| |,37136455,3760898...|[32776244, 371364...| |,32879386,3713645...|[32776244, 328793...| |,32776244,3713645...|[31104189, 327762...| |,32776244,3713645...|[30736590, 327762...| +--------------------+--------------------+ only showing top 20 rows 

As you can see, the first element of each tripletID missing. Therefore, to be 100% sure, I use take(20) as follows:

 combinationsDF.take(20).foreach(println(_)) 

which returns a more detailed view, as shown below:

 [,37136455,37582274,WrappedArray(32776244, 37136455, 37582274)] [,37136455,37616677,WrappedArray(32776244, 37136455, 37616677)] [,32776244,37136455,WrappedArray(31723487, 32776244, 37136455)] [,37136455,37578690,WrappedArray(32776244, 37136455, 37578690)] [,32776244,37136455,WrappedArray(31598495, 32776244, 37136455)] [,37136455,37600896,WrappedArray(32776244, 37136455, 37600896)] [,37136455,37648497,WrappedArray(32776244, 37136455, 37648497)] [,37136455,37644500,WrappedArray(32776244, 37136455, 37644500)] [,37136455,37471301,WrappedArray(32776244, 37136455, 37471301)] [,32981901,37136455,WrappedArray(32776244, 32981901, 37136455)] [,37136455,37618105,WrappedArray(32776244, 37136455, 37618105)] [,34213505,37136455,WrappedArray(32776244, 34213505, 37136455)] [,37136455,37260344,WrappedArray(32776244, 37136455, 37260344)] [,37136455,37720353,WrappedArray(32776244, 37136455, 37720353)] [2776244,37136455,WrappedArray(643761, 32776244, 37136455)] [,37136455,37647770,WrappedArray(32776244, 37136455, 37647770)] [,37136455,37608984,WrappedArray(32776244, 37136455, 37608984)] [,32879386,37136455,WrappedArray(32776244, 32879386, 37136455)] [,32776244,37136455,WrappedArray(31104189, 32776244, 37136455)] [,32776244,37136455,WrappedArray(30736590, 32776244, 37136455)] 

So, now I'm sure that the first id of tripletID is somehow outdated for some reason. But still, if I try to use collect instead of take(20) :

 combinationsDF.collect.foreach(println(_)) 

everything returns to being beautiful again (!!!):

 [32776244,37136455,37582274,WrappedArray(32776244, 37136455, 37582274)] [32776244,37136455,37616677,WrappedArray(32776244, 37136455, 37616677)] [31723487,32776244,37136455,WrappedArray(31723487, 32776244, 37136455)] [32776244,37136455,37578690,WrappedArray(32776244, 37136455, 37578690)] [31598495,32776244,37136455,WrappedArray(31598495, 32776244, 37136455)] [32776244,37136455,37600896,WrappedArray(32776244, 37136455, 37600896)] [32776244,37136455,37648497,WrappedArray(32776244, 37136455, 37648497)] [32776244,37136455,37644500,WrappedArray(32776244, 37136455, 37644500)] [32776244,37136455,37471301,WrappedArray(32776244, 37136455, 37471301)] [32776244,32981901,37136455,WrappedArray(32776244, 32981901, 37136455)] [32776244,37136455,37618105,WrappedArray(32776244, 37136455, 37618105)] [32776244,34213505,37136455,WrappedArray(32776244, 34213505, 37136455)] [32776244,37136455,37260344,WrappedArray(32776244, 37136455, 37260344)] [32776244,37136455,37720353,WrappedArray(32776244, 37136455, 37720353)] [643761,32776244,37136455,WrappedArray(643761, 32776244, 37136455)] [32776244,37136455,37647770,WrappedArray(32776244, 37136455, 37647770)] [32776244,37136455,37608984,WrappedArray(32776244, 37136455, 37608984)] [32776244,32879386,37136455,WrappedArray(32776244, 32879386, 37136455)] [31104189,32776244,37136455,WrappedArray(31104189, 32776244, 37136455)] [30736590,32776244,37136455,WrappedArray(30736590, 32776244, 37136455)] ... 

1. I exhaustively set steps before I parallelize array of combinations in RDD, and everything is all right. 2. I also printed the output immediately after using parallelize and again everything is fine. 3. The problem seems to be related to converting numsRDD to DF and, despite all my efforts, I can’t handle it. 4. I also could not reproduce the problem with the layout data using the same code fragment.

So first: What causes this problem? and second: How to fix it?

+7
scala dataframe collect take apache-spark
source share
2 answers

I would look at your original numsRDD , it looks like you might have an empty string or a null value. This works for me:

 scala> val numsRDD = sc.parallelize(0 to 30) numsRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:27 scala> :pa // Entering paste mode (ctrl-D to finish) val combinationsDF = sc .parallelize(numsRDD .collect .combinations(3) .toArray .map(row => row.sorted) .map(row => ( List(row(0), row(1), row(2)).mkString(","), List(row(0), row(1), row(2)).toArray))) .toDF("tripletID","triplet") // Exiting paste mode, now interpreting. combinationsDF: org.apache.spark.sql.DataFrame = [tripletID: string, triplet: array<int>] scala> combinationsDF.show +---------+----------+ |tripletID| triplet| +---------+----------+ | 0,1,2| [0, 1, 2]| | 0,1,3| [0, 1, 3]| | 0,1,4| [0, 1, 4]| | 0,1,5| [0, 1, 5]| | 0,1,6| [0, 1, 6]| | 0,1,7| [0, 1, 7]| | 0,1,8| [0, 1, 8]| | 0,1,9| [0, 1, 9]| | 0,1,10|[0, 1, 10]| | 0,1,11|[0, 1, 11]| | 0,1,12|[0, 1, 12]| | 0,1,13|[0, 1, 13]| | 0,1,14|[0, 1, 14]| | 0,1,15|[0, 1, 15]| | 0,1,16|[0, 1, 16]| | 0,1,17|[0, 1, 17]| | 0,1,18|[0, 1, 18]| | 0,1,19|[0, 1, 19]| | 0,1,20|[0, 1, 20]| | 0,1,21|[0, 1, 21]| +---------+----------+ only showing top 20 rows 

The only thing I can come up with is that mkString does not work as you expected. Try this string interpolation (there is also no need to recreate the List ):

 val combinationsDF = sc .parallelize(numsRDD .collect .combinations(3) .toArray .map(row => row.sorted) .map{case List(a,b,c) => ( s"$a,$b,$c", Array(a,b,c))} .toDF("tripletID","triplet") scala> combinationsDF.show +---------+----------+ |tripletID| triplet| +---------+----------+ | 0,1,2| [0, 1, 2]| | 0,1,3| [0, 1, 3]| | 0,1,4| [0, 1, 4]| | 0,1,5| [0, 1, 5]| | 0,1,6| [0, 1, 6]| | 0,1,7| [0, 1, 7]| | 0,1,8| [0, 1, 8]| | 0,1,9| [0, 1, 9]| | 0,1,10|[0, 1, 10]| | 0,1,11|[0, 1, 11]| | 0,1,12|[0, 1, 12]| | 0,1,13|[0, 1, 13]| | 0,1,14|[0, 1, 14]| | 0,1,15|[0, 1, 15]| | 0,1,16|[0, 1, 16]| | 0,1,17|[0, 1, 17]| | 0,1,18|[0, 1, 18]| | 0,1,19|[0, 1, 19]| | 0,1,20|[0, 1, 20]| | 0,1,21|[0, 1, 21]| +---------+----------+ only showing top 20 rows 
+1
source share
  • df.show () shows only the content.

eg.

 df.show() Out[11]: +----+-------+ | age| name| +----+-------+ |null|Michael| | 30| Andy| | 19| Justin| +----+-------+ 
  1. df.collect () shows the contents and structure of /metadata.eg

     df.collect() Out[11]: [Row(age=None, name=u'Michael'), Row(age=30, name=u'Andy'), Row(age=19, name=u'Justin')] 
    1. df.take (some number) can be used to display content and structure / metadata for a limited number of rows for a very large dataset. note that it smooths the data and shows in one line.

eg. to view only the first two rows of the data frame

 df.take(2) Out[13]: [Row(age=None, name=u'Michael'), Row(age=30, name=u'Andy')] 
-2
source share

All Articles