Here is my approach using Spark's internal dependencies. You will need to import the linear algebra library for the matrix operation later, i.e. Multiply tree predictions by learning speed.
import org.apache.spark.mllib.linalg.{Vectors, Matrices} import org.apache.spark.mllib.linalg.distributed.{RowMatrix}
Suppose you created a model with GBT:
val model = GradientBoostedTrees.train(trainingData, boostingStrategy)
To calculate probability using a model object:
// Get the log odds predictions from each tree val treePredictions = testData.map { point => model.trees.map(_.predict(point.features)) } // Transform the arrays into matrices for multiplication val treePredictionsVector = treePredictions.map(array => Vectors.dense(array)) val treePredictionsMatrix = new RowMatrix(treePredictionsVector) val learningRate = model.treeWeights val learningRateMatrix = Matrices.dense(learningRate.size, 1, learningRate) val weightedTreePredictions = treePredictionsMatrix.multiply(learningRateMatrix) // Calculate probability by ensembling the log odds val classProb = weightedTreePredictions.rows.flatMap(_.toArray).map(x => 1 / (1 + Math.exp(-1 * x))) classProb.collect // You may tweak your decision boundary for different class labels val classLabel = classProb.map(x => if (x > 0.5) 1.0 else 0.0) classLabel.collect
Here is a snippet of code that you can copy and paste directly into the spark shell:
import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.linalg.{Vectors, Matrices} import org.apache.spark.mllib.linalg.distributed.{RowMatrix} import org.apache.spark.mllib.tree.GradientBoostedTrees import org.apache.spark.mllib.tree.configuration.BoostingStrategy import org.apache.spark.mllib.tree.model.GradientBoostedTreesModel // Load and parse the data file. val csvData = sc.textFile("data/mllib/sample_tree_data.csv") val data = csvData.map { line => val parts = line.split(',').map(_.toDouble) LabeledPoint(parts(0), Vectors.dense(parts.tail)) } // Split the data into training and test sets (30% held out for testing) val splits = data.randomSplit(Array(0.7, 0.3)) val (trainingData, testData) = (splits(0), splits(1)) // Train a GBT model. val boostingStrategy = BoostingStrategy.defaultParams("Classification") boostingStrategy.numIterations = 50 boostingStrategy.treeStrategy.numClasses = 2 boostingStrategy.treeStrategy.maxDepth = 6 boostingStrategy.treeStrategy.categoricalFeaturesInfo = Map[Int, Int]() val model = GradientBoostedTrees.train(trainingData, boostingStrategy) // Get class label from raw predict function val predictedLabels = model.predict(testData.map(_.features)) predictedLabels.collect // Get class probability val treePredictions = testData.map { point => model.trees.map(_.predict(point.features)) } val treePredictionsVector = treePredictions.map(array => Vectors.dense(array)) val treePredictionsMatrix = new RowMatrix(treePredictionsVector) val learningRate = model.treeWeights val learningRateMatrix = Matrices.dense(learningRate.size, 1, learningRate) val weightedTreePredictions = treePredictionsMatrix.multiply(learningRateMatrix) val classProb = weightedTreePredictions.rows.flatMap(_.toArray).map(x => 1 / (1 + Math.exp(-1 * x))) val classLabel = classProb.map(x => if (x > 0.5) 1.0 else 0.0) classLabel.collect
source share