Using Spark ML OneHotEncoder on multiple columns

I managed to create a pipeline that will allow me to index several columns of a row at once, but I am looping because, unlike indexing, the encoder is not an evaluator, so I never call suitable one according to OneHotEncoder example in documents .

import org.apache.spark.ml.feature.{StringIndexer, VectorAssembler, OneHotEncoder} import org.apache.spark.ml.Pipeline val data = sqlContext.read.parquet("s3n://map2-test/forecaster/intermediate_data") val df = data.select("win","bid_price","domain","size", "form_factor").na.drop() //indexing columns val stringColumns = Array("domain","size", "form_factor") val index_transformers: Array[org.apache.spark.ml.PipelineStage] = stringColumns.map( cname => new StringIndexer() .setInputCol(cname) .setOutputCol(s"${cname}_index") ) // Add the rest of your pipeline like VectorAssembler and algorithm val index_pipeline = new Pipeline().setStages(index_transformers) val index_model = index_pipeline.fit(df) val df_indexed = index_model.transform(df) //encoding columns val indexColumns = df_indexed.columns.filter(x => x contains "index") val one_hot_encoders: Array[org.apache.spark.ml.PipelineStage] = indexColumns.map( cname => new OneHotEncoder() .setInputCol(cname) .setOutputCol(s"${cname}_vec") ) val one_hot_pipeline = new Pipeline().setStages(one_hot_encoders) val df_encoded = one_hot_pipeline.transform(df_indexed) 

The OneHotEncoder object does not have a suitable method, so it is placed in the same pipeline as the indexes, it does not work - it throws an error when I call "fit" on the pipeline. I also cannot invoke the conversion on the pipeline, which I did with the array of pipeline stages, one_hot_encoders .

I did not find a good solution to use OneHotEncoder without individually creating and invoking the conversion when transforming myself for all the columns that I want to code

+7
source share
1 answer

Spark> = 3.0 :

In Spark 3.0, OneHotEncoderEstimator been renamed OneHotEncoder :

 import org.apache.spark.ml.feature.{OneHotEncoder, OneHotEncoderModel} val encoder = new OneHotEncoder() .setInputCols(indexColumns) .setOutputCols(indexColumns map (name => s"${name}_vec")) 

Spark> = 2.3

Spark 2.3 introduced new classes OneHotEncoderEstimator , OneHotEncoderModel , which require fitting, even if they are used outside of Pipeline , and work with several columns at the same time.

 import org.apache.spark.ml.feature.{OneHotEncoderEstimator, OneHotEncoderModel} val encoder = new OneHotEncoderEstimator() .setInputCols(indexColumns) .setOutputCols(indexColumns map (name => s"${name}_vec")) encoder.fit(df_indexed).transform(df_indexed) 

Spark <2.3

Even if the converters you use do not require fitting, you should use the fit method to create a PipelineModel that you can use to convert the data.

 one_hot_pipeline.fit(df_indexed).transform(df_indexed) 

In addition, you can combine indexing and coding into one Pipeline :

 val pipeline = new Pipeline() .setStages(index_transformers ++ one_hot_encoders) val model = pipeline.fit(df) model.transform(df) 

Edit :

The error you see means that one of your columns contains an empty String . It is accepted by the indexer, but cannot be used for coding. Depending on your requirements, you can refuse them or use a dummy. Unfortunately, you cannot use NULLs until SPARK-11569 is resolved.

+5
source

All Articles