ML LogisticRegression does not yet support polynomial classification, but it supports both MLLib NaiveBayes and LogisticRegressionWithLBFGS . In the first case, it should work by default:
import org.apache.spark.mllib.classification.NaiveBayes val nbModel = new NaiveBayes() .setModelType("multinomial") // This is default value .run(train)
but for logistic regression you must specify several classes:
import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS val model = new LogisticRegressionWithLBFGS() .setNumClasses(n) // Set number of classes .run(trainingData)
As for the preprocessing steps, this is a pretty broad topic, and itβs hard for you to give you comprehensive advice without access to your data, so everything you find below is just a wild guess:
- As I understand it, you use wiki data for training and tweets for testing. If true, then, generally speaking, a bad idea. You can expect both sets to use significantly different vocabulary, grammar, and spelling.
- a simple regex tokenizer may work well on standardized text, but in my experience it wonβt work well on unofficial text like tweets.
HashingTF may be a good way to get a basic model, but this is an extremely simplified approach, especially if you don't use any filtering steps. If you decide to use it, you should at least increase the number of functions or use the default value (2 ^ 20)
EDIT (Preparing data for Naive Bayes with IDF)
import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.linalg.Vector import org.apache.spark.ml.feature.IDF import org.apache.spark.sql.Row val tokenizer = ??? val hashingTF = new HashingTF() .setNumFeatures(1000) .setInputCol(tokenizer.getOutputCol) .setOutputCol("rawFeatures") val idf = new IDF() .setInputCol(hashingTF.getOutputCol) .setOutputCol("features") val pipeline = new Pipeline().setStages(Array(tokenizer, hashingTF, idf)) val model = pipeline.fit(labeledData) model .transform(labeledData) .select($"label", $"features") .map{case Row(label: Double, features: Vector) => LabeledPoint(label, features)}
- using MLlib transformers:
import org.apache.spark.mllib.feature.HashingTF import org.apache.spark.mllib.linalg.Vector import org.apache.spark.mllib.feature.{IDF, IDFModel} val labeledData = wikiData.map(x => LabeledData(x.category, x.text, categoryMap.get(x.category).getOrElse(0.0))) val p = "\\W+".r val raw = labeledData.map{ case LabeledData(_, text, label) => (label, p.split(text))} val hashingTF: org.apache.spark.mllib.feature.HashingTF = new HashingTF(1000) val tf = raw.map{case (label, text) => (label, hashingTF.transform(text))} val idf: org.apache.spark.mllib.feature.IDFModel = new IDF().fit(tf.map(_._2)) tf.map{ case (label, rawFeatures) => LabeledPoint(label, idf.transform(rawFeatures))}
Note. Since transformers require access to the JVM, the MLlib version will not work in PySpark. If you prefer Python, you should separate data conversion and zip code .
EDIT (data preparation for ML algorithms):
While the following code snippet looks valid at a glance
val categoryMap = wikiData .map(x=>x.category) .distinct .zipWithIndex .mapValues(x=>x.toDouble/1000) .collectAsMap val labeledData = wikiData.map(x=>LabeledData( x.category, x.text, categoryMap.get(x.category).getOrElse(0.0))).toDF
it will not generate valid labels for ML algorithms.
First of all, ML expects the labels to be at (0.0, 1.0, ..., n.0), where n is the number of classes. If your sample pipeline, where one of the classes receives the label 0.001, you get the following error:
ERROR LogisticRegression: classification labels must be between {0 and 0; 1 invalid labels found.
The obvious solution is to avoid division when creating a mapping
.mapValues(x=>x.toDouble)
As long as it works for LogisticRegression , the rest of the ML algorithms will still fail. For example, using RandomForestClassifier you get
RandomForestClassifier received input with an invalid label label label without specifying the number of specified classes. See StringIndexer.
What an interesting version of ML RandomForestClassifier , unlike its counterpart MLlib , does not provide a method for setting a number of classes. Turns out he expects special attributes to be set in the DataFrame column. The easiest approach is to use the StringIndexer mentioned in the error message:
import org.apache.spark.ml.feature.StringIndexer val indexer = new StringIndexer() .setInputCol("category") .setOutputCol("label") val pipeline = new Pipeline() .setStages(Array(indexer, tokenizer, hashingTF, idf, lr)) val model = pipeline.fit(wikiData.toDF)