I am using Spark 1.5.
I have a column of 30 identifiers that I load as integers from the database:
val numsRDD = sqlContext .table(constants.SOURCE_DB + "." + IDS) .select("id") .distinct .map(row=>row.getInt(0))
This is the result of numsRDD :
numsRDD.collect.foreach(println(_)) 643761 30673603 30736590 30773400 30832624 31104189 31598495 31723487 32776244 32801792 32879386 32981901 33469224 34213505 34709608 37136455 37260344 37471301 37573190 37578690 37582274 37600896 37608984 37616677 37618105 37644500 37647770 37648497 37720353 37741608
Next time I want to create all the combinations of 3 for these ids , then save each combination as a tuple of the form: < tripletID: String, triplet: Array(Int)> and convert it to a data framework, which I do as follows:
// |combinationsDF| = 4060 combinations val combinationsDF = sc .parallelize(numsRDD .collect .combinations(3) .toArray .map(row => row.sorted) .map(row => ( List(row(0), row(1), row(2)).mkString(","), List(row(0), row(1), row(2)).toArray))) .toDF("tripletID","triplet")
As soon as I do this, I will try to print some of the contents of combinationsDF to make sure everything is as it should be. So I try this:
combinationsDF.show
which returns:
+--------------------+--------------------+ | tripletID| triplet| +--------------------+--------------------+ |,37136455,3758227...|[32776244, 371364...| |,37136455,3761667...|[32776244, 371364...| |,32776244,3713645...|[31723487, 327762...| |,37136455,3757869...|[32776244, 371364...| |,32776244,3713645...|[31598495, 327762...| |,37136455,3760089...|[32776244, 371364...| |,37136455,3764849...|[32776244, 371364...| |,37136455,3764450...|[32776244, 371364...| |,37136455,3747130...|[32776244, 371364...| |,32981901,3713645...|[32776244, 329819...| |,37136455,3761810...|[32776244, 371364...| |,34213505,3713645...|[32776244, 342135...| |,37136455,3726034...|[32776244, 371364...| |,37136455,3772035...|[32776244, 371364...| |2776244,37136455...|[643761, 32776244...| |,37136455,3764777...|[32776244, 371364...| |,37136455,3760898...|[32776244, 371364...| |,32879386,3713645...|[32776244, 328793...| |,32776244,3713645...|[31104189, 327762...| |,32776244,3713645...|[30736590, 327762...| +--------------------+--------------------+ only showing top 20 rows
As you can see, the first element of each tripletID missing. Therefore, to be 100% sure, I use take(20) as follows:
combinationsDF.take(20).foreach(println(_))
which returns a more detailed view, as shown below:
[,37136455,37582274,WrappedArray(32776244, 37136455, 37582274)] [,37136455,37616677,WrappedArray(32776244, 37136455, 37616677)] [,32776244,37136455,WrappedArray(31723487, 32776244, 37136455)] [,37136455,37578690,WrappedArray(32776244, 37136455, 37578690)] [,32776244,37136455,WrappedArray(31598495, 32776244, 37136455)] [,37136455,37600896,WrappedArray(32776244, 37136455, 37600896)] [,37136455,37648497,WrappedArray(32776244, 37136455, 37648497)] [,37136455,37644500,WrappedArray(32776244, 37136455, 37644500)] [,37136455,37471301,WrappedArray(32776244, 37136455, 37471301)] [,32981901,37136455,WrappedArray(32776244, 32981901, 37136455)] [,37136455,37618105,WrappedArray(32776244, 37136455, 37618105)] [,34213505,37136455,WrappedArray(32776244, 34213505, 37136455)] [,37136455,37260344,WrappedArray(32776244, 37136455, 37260344)] [,37136455,37720353,WrappedArray(32776244, 37136455, 37720353)] [2776244,37136455,WrappedArray(643761, 32776244, 37136455)] [,37136455,37647770,WrappedArray(32776244, 37136455, 37647770)] [,37136455,37608984,WrappedArray(32776244, 37136455, 37608984)] [,32879386,37136455,WrappedArray(32776244, 32879386, 37136455)] [,32776244,37136455,WrappedArray(31104189, 32776244, 37136455)] [,32776244,37136455,WrappedArray(30736590, 32776244, 37136455)]
So, now I'm sure that the first id of tripletID is somehow outdated for some reason. But still, if I try to use collect instead of take(20) :
combinationsDF.collect.foreach(println(_))
everything returns to being beautiful again (!!!):
[32776244,37136455,37582274,WrappedArray(32776244, 37136455, 37582274)] [32776244,37136455,37616677,WrappedArray(32776244, 37136455, 37616677)] [31723487,32776244,37136455,WrappedArray(31723487, 32776244, 37136455)] [32776244,37136455,37578690,WrappedArray(32776244, 37136455, 37578690)] [31598495,32776244,37136455,WrappedArray(31598495, 32776244, 37136455)] [32776244,37136455,37600896,WrappedArray(32776244, 37136455, 37600896)] [32776244,37136455,37648497,WrappedArray(32776244, 37136455, 37648497)] [32776244,37136455,37644500,WrappedArray(32776244, 37136455, 37644500)] [32776244,37136455,37471301,WrappedArray(32776244, 37136455, 37471301)] [32776244,32981901,37136455,WrappedArray(32776244, 32981901, 37136455)] [32776244,37136455,37618105,WrappedArray(32776244, 37136455, 37618105)] [32776244,34213505,37136455,WrappedArray(32776244, 34213505, 37136455)] [32776244,37136455,37260344,WrappedArray(32776244, 37136455, 37260344)] [32776244,37136455,37720353,WrappedArray(32776244, 37136455, 37720353)] [643761,32776244,37136455,WrappedArray(643761, 32776244, 37136455)] [32776244,37136455,37647770,WrappedArray(32776244, 37136455, 37647770)] [32776244,37136455,37608984,WrappedArray(32776244, 37136455, 37608984)] [32776244,32879386,37136455,WrappedArray(32776244, 32879386, 37136455)] [31104189,32776244,37136455,WrappedArray(31104189, 32776244, 37136455)] [30736590,32776244,37136455,WrappedArray(30736590, 32776244, 37136455)] ...
1. I exhaustively set steps before I parallelize array of combinations in RDD, and everything is all right. 2. I also printed the output immediately after using parallelize and again everything is fine. 3. The problem seems to be related to converting numsRDD to DF and, despite all my efforts, I canβt handle it. 4. I also could not reproduce the problem with the layout data using the same code fragment.
So first: What causes this problem? and second: How to fix it?