I have a problem with LinearRegressionWithSGD in Spark MLlib. I used their example to install here https://spark.apache.org/docs/latest/mllib-linear-methods.html (using the Python interface).
In their example, all functions almost scale with an average value of about 0 and a standard deviation of around 1. Now, if I do not scale one of them 10 times, the regression breaks (gives nans or very large coefficients):
from pyspark.mllib.regression import LabeledPoint, LinearRegressionWithSGD
from numpy import array
def parsePoint(line):
values = [float(x) for x in line.replace(',', ' ').split(' ')]
values[3] *= 10
return LabeledPoint(values[0], values[1:])
data = sc.textFile(spark_home+"data/mllib/ridge-data/lpsa.data")
parsedData = data.map(parsePoint)
model = LinearRegressionWithSGD.train(parsedData)
valuesAndPreds = parsedData.map(lambda p: (p.label, model.predict(p.features)))
MSE = valuesAndPreds.map(lambda (v, p): (v - p)**2).reduce(lambda x, y: x + y) / valuesAndPreds.count()
print("Mean Squared Error = " + str(MSE))
print "Model coefficients:", str(model)
So, I think I need to do a function scaling. If I do pre-scaling, this works (because I will return to scaled functions). However, now I do not know how to get the coefficients in the source space.
from pyspark.mllib.regression import LabeledPoint, LinearRegressionWithSGD
from numpy import array
from pyspark.mllib.feature import StandardScaler
from pyspark.mllib.feature import StandardScalerModel
def parseToDenseVector(line):
values = [float(x) for x in line.replace(',', ' ').split(' ')]
values[3] *= 10
return Vectors.dense(values[0:])
def parseToLabel(values):
return LabeledPoint(values[0], values[1:])
data = sc.textFile(spark_home+"data/mllib/ridge-data/lpsa.data")
parsedData = data.map(parseToDenseVector)
scaler = StandardScaler(True, True)
scaler_model = scaler.fit(parsedData)
parsedData_scaled = scaler_model.transform(parsedData)
parsedData_scaled_transformed = parsedData_scaled.map(parseToLabel)
model = LinearRegressionWithSGD.train(parsedData_scaled_transformed)
valuesAndPreds = parsedData_scaled_transformed.map(lambda p: (p.label, model.predict(p.features)))
MSE = valuesAndPreds.map(lambda (v, p): (v - p)**2).reduce(lambda x, y: x + y) / valuesAndPreds.count()
print("Mean Squared Error = " + str(MSE))
print "Model coefficients:", str(model)
, . , ? scaler_model, ScalerModel . . , , , . .