How to convert spark DataFrame to RDD mllib LabeledPoints?

I tried applying PCA to my data and then applying RandomForest to the converted data. However, PCA.transform (data) gave me a DataFrame, but I need mllib LabeledPoints to feed my RandomForest. How can i do this? My code is:

import org.apache.spark.mllib.util.MLUtils import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.mllib.tree.RandomForest import org.apache.spark.mllib.tree.model.RandomForestModel import org.apache.spark.ml.feature.PCA import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.linalg.Vectors val dataset = MLUtils.loadLibSVMFile(sc, "data/mnist/mnist.bz2") val splits = dataset.randomSplit(Array(0.7, 0.3)) val (trainingData, testData) = (splits(0), splits(1)) val trainingDf = trainingData.toDF() val pca = new PCA() .setInputCol("features") .setOutputCol("pcaFeatures") .setK(100) .fit(trainingDf) val pcaTrainingData = pca.transform(trainingDf) val numClasses = 10 val categoricalFeaturesInfo = Map[Int, Int]() val numTrees = 10 // Use more in practice. val featureSubsetStrategy = "auto" // Let the algorithm choose. val impurity = "gini" val maxDepth = 20 val maxBins = 32 val model = RandomForest.trainClassifier(pcaTrainingData, numClasses, categoricalFeaturesInfo, numTrees, featureSubsetStrategy, impurity, maxDepth, maxBins) error: type mismatch; found : org.apache.spark.sql.DataFrame required: org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint] 

I tried the following two possible solutions, but they did not work:

  scala> val pcaTrainingData = trainingData.map(p => p.copy(features = pca.transform(p.features))) <console>:39: error: overloaded method value transform with alternatives: (dataset: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame <and> (dataset: org.apache.spark.sql.DataFrame,paramMap: org.apache.spark.ml.param.ParamMap)org.apache.spark.sql.DataFrame <and> (dataset: org.apache.spark.sql.DataFrame,firstParamPair: org.apache.spark.ml.param.ParamPair[_],otherParamPairs: org.apache.spark.ml.param.ParamPair[_]*)org.apache.spark.sql.DataFrame cannot be applied to (org.apache.spark.mllib.linalg.Vector) 

and

  val labeled = pca .transform(trainingDf) .map(row => LabeledPoint(row.getDouble(0), row(4).asInstanceOf[Vector[Int]])) error: type mismatch; found : scala.collection.immutable.Vector[Int] required: org.apache.spark.mllib.linalg.Vector 

(I imported the file org.apache.spark.mllib.linalg.Vectors in the above case)

Any help?

+7
scala pca apache-spark rdd apache-spark-mllib
source share
1 answer

The right approach here is the second you tried - matching each Row in LabeledPoint to get RDD[LabeledPoint] . However, it has two errors:

  • The correct Vector class ( org.apache.spark.mllib.linalg.Vector ) does NOT accept type arguments (such as Vector[Int] ) - so although you had the correct import, the compiler came to the conclusion that you meant scala.collection.immutable.Vector , which is.
  • The DataFrame returned from PCA.fit() has 3 columns, and you tried to extract the column number 4. For example, showing the first 4 rows:

     +-----+--------------------+--------------------+ |label| features| pcaFeatures| +-----+--------------------+--------------------+ | 5.0|(780,[152,153,154...|[880.071111851977...| | 1.0|(780,[158,159,160...|[-41.473039034112...| | 2.0|(780,[155,156,157...|[931.444898405036...| | 1.0|(780,[124,125,126...|[25.5114585648411...| +-----+--------------------+--------------------+ 

    To make this easier - I prefer using column names instead of their indexes.

So, here you have the necessary conversions:

 val labeled = pca.transform(trainingDf).rdd.map(row => LabeledPoint( row.getAs[Double]("label"), row.getAs[org.apache.spark.mllib.linalg.Vector]("pcaFeatures") )) 
+10
source share

All Articles