Normalization MinMax in scala

I have an org.apache.spark.sql.DataFrame with multiple columns. I want to scale 1 column (lat_long_dist) using MinMax Normalization or any technique to scale data between -1 and 1 and save the data type as org.apache.spark.sql.DataFrame

scala> val df = sqlContext.csvFile("tenop.csv") df: org.apache.spark.sql.DataFrame = [gst_id_matched: string, ip_crowding: string, lat_long_dist: double, stream_name_1: string] 

I found the StandardScaler parameter, but for this I need to convert the dataset before I can do the conversion. There is an easy way.

+8
scala normalization apache-spark apache-spark-sql
source share
2 answers

I assume you want something like this

 import org.apache.spark.sql.Row import org.apache.spark.sql.functions.{min, max, lit} val df = sc.parallelize(Seq( (1L, 0.5), (2L, 10.2), (3L, 5.7), (4L, -11.0), (5L, 22.3) )).toDF("k", "v") val (vMin, vMax) = df.agg(min($"v"), max($"v")).first match { case Row(x: Double, y: Double) => (x, y) } val scaledRange = lit(2) // Range of the scaled variable val scaledMin = lit(-1) // Min value of the scaled variable val vNormalized = ($"v" - vMin) / (vMax - vMin) // v normalized to (0, 1) range val vScaled = scaledRange * vNormalized + scaledMin df.withColumn("vScaled", vScaled).show // +---+-----+--------------------+ // | k| v| vScaled| // +---+-----+--------------------+ // | 1| 0.5| -0.3093093093093092| // | 2| 10.2| 0.27327327327327344| // | 3| 5.7|0.003003003003003...| // | 4|-11.0| -1.0| // | 5| 22.3| 1.0| // +---+-----+--------------------+ 
+9
source share

Here's another suggestion when you're already playing with Spark.

Why aren't you using MinMaxScaler in the ml package?

Try this with the same example from scratch323.

 import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.ml.feature.MinMaxScaler import org.apache.spark.sql.functions.udf val df = sc.parallelize(Seq( (1L, 0.5), (2L, 10.2), (3L, 5.7), (4L, -11.0), (5L, 22.3) )).toDF("k", "v") //val df.map(r => Vectors.dense(Array(r.getAs[Double]("v")))) val vectorizeCol = udf( (v:Double) => Vectors.dense(Array(v)) ) val df2 = df.withColumn("vVec", vectorizeCol(df("v")) val scaler = new MinMaxScaler() .setInputCol("vVec") .setOutputCol("vScaled") .setMax(1) .setMin(-1) scaler.fit(df2).transform(df2).show +---+-----+-------+--------------------+ | k| v| vv| vs| +---+-----+-------+--------------------+ | 1| 0.5| [0.5]|[-0.3093093093093...| | 2| 10.2| [10.2]|[0.27327327327327...| | 3| 5.7| [5.7]|[0.00300300300300...| | 4|-11.0|[-11.0]| [-1.0]| | 5| 22.3| [22.3]| [1.0]| +---+-----+-------+--------------------+ 

Scale multiple columns at once.

 val df = sc.parallelize(Seq( (1.0, -1.0, 2.0), (2.0, 0.0, 0.0), (0.0, 1.0, -1.0) )).toDF("a", "b", "c") import org.apache.spark.ml.feature.VectorAssembler val assembler = new VectorAssembler() .setInputCols(Array("a", "b", "c")) .setOutputCol("features") val df2 = assembler.transform(df) // Reusing the scaler instance above with the same min(-1) and max(1) scaler.setInputCol("features").setOutputCol("scaledFeatures").fit(df2).transform(df2).show +---+----+----+--------------+--------------------+ | a| b| c| features| scaledFeatures| +---+----+----+--------------+--------------------+ |1.0|-1.0| 2.0|[1.0,-1.0,2.0]| [0.0,-1.0,1.0]| |2.0| 0.0| 0.0| [2.0,0.0,0.0]|[1.0,0.0,-0.33333...| |0.0| 1.0|-1.0|[0.0,1.0,-1.0]| [-1.0,1.0,-1.0]| +---+----+----+--------------+--------------------+ 
+11
source share

All Articles