OneHotEncoder in Spark Dataframe in Pipeline

I tried to run an example in Spark and Scala with an adult dataset .

Using Scala 2.11.8 and Spark 1.6.1.

The problem (for now) is the number of categorical functions in this dataset that must be encoded into numbers before the Spark ML algorithm can do its job.

So far I have this:

import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.feature.OneHotEncoder
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}

object Adult {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("Adult example").setMaster("local[*]")
    val sparkContext = new SparkContext(conf)
    val sqlContext = new SQLContext(sparkContext)

    val data = sqlContext.read
      .format("com.databricks.spark.csv")
      .option("header", "true") // Use first line of all files as header
      .option("inferSchema", "true") // Automatically infer data types
      .load("src/main/resources/adult.data")

    val categoricals = data.dtypes filter (_._2 == "StringType")
    val encoders = categoricals map (cat => new OneHotEncoder().setInputCol(cat._1).setOutputCol(cat._1 + "_encoded"))
    val features = data.dtypes filterNot (_._1 == "label") map (tuple => if(tuple._2 == "StringType") tuple._1 + "_encoded" else tuple._1)

    val lr = new LogisticRegression()
      .setMaxIter(10)
      .setRegParam(0.01)
    val pipeline = new Pipeline()
      .setStages(encoders ++ Array(lr))

    val model = pipeline.fit(training)
  }
}

However, this does not work. The call pipeline.fitstill contains the original string functions and, therefore, throws an exception. How can I remove these "StringType"in the pipeline? Or maybe I'm doing it completely wrong, so if anyone has a different suggestion, I am happy with all the comments :).

, , , Python Pandas, Scala Spark.

+5
1

, , . , . API:

one-hot encoder (...) , , .

import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.feature.{StringIndexer, OneHotEncoder}

val df = Seq((1L, "foo"), (2L, "bar")).toDF("id", "x")

val categoricals = df.dtypes.filter (_._2 == "StringType") map (_._1)

val indexers = categoricals.map (
  c => new StringIndexer().setInputCol(c).setOutputCol(s"${c}_idx")
)

val encoders = categoricals.map (
  c => new OneHotEncoder().setInputCol(s"${c}_idx").setOutputCol(s"${c}_enc")
)

val pipeline = new Pipeline().setStages(indexers ++ encoders)

val transformed = pipeline.fit(df).transform(df)
transformed.show

// +---+---+-----+-------------+
// | id|  x|x_idx|        x_enc|
// +---+---+-----+-------------+
// |  1|foo|  1.0|    (1,[],[])|
// |  2|bar|  0.0|(1,[0],[1.0])|
// +---+---+-----+-------------+

, . OneHotEncoder NominalAttribute, BinaryAttribute .

+6

All Articles