Spark MLlib: Building Classifiers for Each Data Group

I marked the vectors (LabeledPoint-s) labeled with some group number. For each group I need to create a separate logistic regression classifier:

import org.apache.log4j.{Level, Logger} import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS import org.apache.spark.{SparkContext, SparkConf} import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.linalg.{Vector, Vectors} object Scratch { val train = Seq( (1, LabeledPoint(0, Vectors.sparse(3, Seq((0, 1.0), (2, 3.0))))), (1, LabeledPoint(0, Vectors.sparse(3, Seq((1, 1.5), (2, 4.0))))), (1, LabeledPoint(0, Vectors.sparse(3, Seq((0, 2.0), (1, 1.0), (2, 3.5))))), (8, LabeledPoint(0, Vectors.sparse(3, Seq((0, 3.0), (2, 7.0))))), (8, LabeledPoint(0, Vectors.sparse(3, Seq((0, 1.0), (1, 3.0))))), (8, LabeledPoint(0, Vectors.sparse(3, Seq((0, 1.5), (2, 4.0))))) ) def main(args: Array[String]) { Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF) // set up environment val conf = new SparkConf() .setMaster("local[5]") .setAppName("Scratch") .set("spark.executor.memory", "2g") val sc = new SparkContext(conf) val trainRDD = sc.parallelize(train) val modelByGroup = trainRDD.groupByKey().map({case (group, iter) => (group, new LogisticRegressionWithLBFGS().run(iter))}) } } 

LogisticRegressionWithLBFGS().run(iter) does not compile because run works with the RDD and not with the iterator returned by groupBy . Please advise how to create as many classifiers as there are groups (tags) in the input.

Update - shows that nested RDD iteration does not work:

 import org.apache.log4j.{Level, Logger} import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS import org.apache.spark.rdd.RDD import org.apache.spark.{SparkContext, SparkConf} import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.linalg.{Vector, Vectors} object Scratch { val train = Seq( (1, LabeledPoint(0, Vectors.sparse(3, Seq((0, 1.0), (2, 3.0))))), (1, LabeledPoint(0, Vectors.sparse(3, Seq((1, 1.5), (2, 4.0))))), (1, LabeledPoint(0, Vectors.sparse(3, Seq((0, 2.0), (1, 1.0), (2, 3.5))))), (8, LabeledPoint(0, Vectors.sparse(3, Seq((0, 3.0), (2, 7.0))))), (8, LabeledPoint(0, Vectors.sparse(3, Seq((0, 1.0), (1, 3.0))))), (8, LabeledPoint(0, Vectors.sparse(3, Seq((0, 1.5), (2, 4.0))))) ) def main(args: Array[String]) { Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF) // set up environment val conf = new SparkConf() .setMaster("local[5]") .setAppName("Scratch") .set("spark.executor.memory", "2g") val sc = new SparkContext(conf) val trainRDD = sc.parallelize(train) val keys : RDD[Int] = trainRDD.map({case (key,_) => key}).distinct for (key <- keys) { // key is Int here! // Get train data for the current group (key): val groupTrain = trainRDD.filter({case (x, _) => x == key }).cache() /** * Which results in org.apache.spark.SparkException: * RDD transformations and actions can only be invoked by the driver, * not inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid * because the values transformation and count action cannot be performed inside of the rdd1.map transformation. * For more information, see SPARK-5063. at org.apache.spark.rdd.RDD.sc(RDD.scala:87) */ } } } 

It seems like it's impossible to use transforms inside other transforms, right?

+5
source share
1 answer

If your classifier in each group you do not need mllib. Mllib is intended for use with distributed sets (on your sets you do not have local sets for each worker). You can simply use some local computer training library, such as weka for each group in the map function.

EDIT:

 val keys = wholeRDD.map(_._1).distinct.collect var models = List() for (key <- keys) { val valuesForKey = wholeRDD.filter(_._1 == key) // train model ... models = model::models } 
+3
source

All Articles