How to prepare for data preparation in mllib

TL; DR; How to use mllib to prepare my wiki data (text and category) to predict tweets?

I find it difficult to understand how to convert my tokenized wiki data so that it can be trained through NaiveBayes or LogisticRegression . My goal is to use a trained model for comparison with tweets *. I tried using pipelines with LR and HashingTF with IDF for NaiveBayes , but I continue to make erroneous forecasts. Here is what I tried:

* Please note that I would like to use many categories in the wiki data for my shortcuts ... I only saw binary classification (is this one category or another) .... is it possible to do what I want

Pipe w LR

 import org.apache.spark.rdd.RDD import org.apache.spark.SparkContext import org.apache.spark.ml.feature.HashingTF import org.apache.spark.mllib.linalg.Vector import org.apache.spark.ml.feature.RegexTokenizer case class WikiData(category: String, text: String) case class LabeledData(category: String, text: String, label: Double) val wikiData = sc.parallelize(List(WikiData("Spark", "this is about spark"), WikiData("Hadoop","then there is hadoop"))) val categoryMap = wikiData.map(x=>x.category).distinct.zipWithIndex.mapValues(x=>x.toDouble/1000).collectAsMap val labeledData = wikiData.map(x=>LabeledData(x.category, x.text, categoryMap.get(x.category).getOrElse(0.0))).toDF val tokenizer = new RegexTokenizer() .setInputCol("text") .setOutputCol("words") .setPattern("/W+") val hashingTF = new HashingTF() .setNumFeatures(1000) .setInputCol(tokenizer.getOutputCol) .setOutputCol("features") val lr = new LogisticRegression() .setMaxIter(10) .setRegParam(0.01) val pipeline = new Pipeline() .setStages(Array(tokenizer, hashingTF, lr)) val model = pipeline.fit(labeledData) model.transform(labeledData).show 

Naive Bayes

 val hashingTF = new HashingTF() val tf: RDD[Vector] = hashingTF.transform(documentsAsWordSequenceAlready) import org.apache.spark.mllib.feature.IDF tf.cache() val idf = new IDF().fit(tf) val tfidf: RDD[Vector] = idf.transform(tf) tf.cache() val idf = new IDF(minDocFreq = 2).fit(tf) val tfidf: RDD[Vector] = idf.transform(tf) //to create tfidfLabeled (below) I ran a map set the labels...but again it seems to have to be 1.0 or 0.0? NaiveBayes.train(tfidfLabeled) .predict(hashingTF.transform(tweet)) .collect 
+4
apache-spark apache-spark-mllib apache-spark-ml
source share
1 answer

ML LogisticRegression does not yet support polynomial classification, but it supports both MLLib NaiveBayes and LogisticRegressionWithLBFGS . In the first case, it should work by default:

 import org.apache.spark.mllib.classification.NaiveBayes val nbModel = new NaiveBayes() .setModelType("multinomial") // This is default value .run(train) 

but for logistic regression you must specify several classes:

 import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS val model = new LogisticRegressionWithLBFGS() .setNumClasses(n) // Set number of classes .run(trainingData) 

As for the preprocessing steps, this is a pretty broad topic, and it’s hard for you to give you comprehensive advice without access to your data, so everything you find below is just a wild guess:

  • As I understand it, you use wiki data for training and tweets for testing. If true, then, generally speaking, a bad idea. You can expect both sets to use significantly different vocabulary, grammar, and spelling.
  • a simple regex tokenizer may work well on standardized text, but in my experience it won’t work well on unofficial text like tweets.
  • HashingTF may be a good way to get a basic model, but this is an extremely simplified approach, especially if you don't use any filtering steps. If you decide to use it, you should at least increase the number of functions or use the default value (2 ^ 20)

EDIT (Preparing data for Naive Bayes with IDF)

  • using ML pipelines:
 import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.linalg.Vector import org.apache.spark.ml.feature.IDF import org.apache.spark.sql.Row val tokenizer = ??? val hashingTF = new HashingTF() .setNumFeatures(1000) .setInputCol(tokenizer.getOutputCol) .setOutputCol("rawFeatures") val idf = new IDF() .setInputCol(hashingTF.getOutputCol) .setOutputCol("features") val pipeline = new Pipeline().setStages(Array(tokenizer, hashingTF, idf)) val model = pipeline.fit(labeledData) model .transform(labeledData) .select($"label", $"features") .map{case Row(label: Double, features: Vector) => LabeledPoint(label, features)} 
  • using MLlib transformers:
 import org.apache.spark.mllib.feature.HashingTF import org.apache.spark.mllib.linalg.Vector import org.apache.spark.mllib.feature.{IDF, IDFModel} val labeledData = wikiData.map(x => LabeledData(x.category, x.text, categoryMap.get(x.category).getOrElse(0.0))) val p = "\\W+".r val raw = labeledData.map{ case LabeledData(_, text, label) => (label, p.split(text))} val hashingTF: org.apache.spark.mllib.feature.HashingTF = new HashingTF(1000) val tf = raw.map{case (label, text) => (label, hashingTF.transform(text))} val idf: org.apache.spark.mllib.feature.IDFModel = new IDF().fit(tf.map(_._2)) tf.map{ case (label, rawFeatures) => LabeledPoint(label, idf.transform(rawFeatures))} 

Note. Since transformers require access to the JVM, the MLlib version will not work in PySpark. If you prefer Python, you should separate data conversion and zip code .

EDIT (data preparation for ML algorithms):

While the following code snippet looks valid at a glance

 val categoryMap = wikiData .map(x=>x.category) .distinct .zipWithIndex .mapValues(x=>x.toDouble/1000) .collectAsMap val labeledData = wikiData.map(x=>LabeledData( x.category, x.text, categoryMap.get(x.category).getOrElse(0.0))).toDF 

it will not generate valid labels for ML algorithms.

First of all, ML expects the labels to be at (0.0, 1.0, ..., n.0), where n is the number of classes. If your sample pipeline, where one of the classes receives the label 0.001, you get the following error:

ERROR LogisticRegression: classification labels must be between {0 and 0; 1 invalid labels found.

The obvious solution is to avoid division when creating a mapping

 .mapValues(x=>x.toDouble) 

As long as it works for LogisticRegression , the rest of the ML algorithms will still fail. For example, using RandomForestClassifier you get

RandomForestClassifier received input with an invalid label label label without specifying the number of specified classes. See StringIndexer.

What an interesting version of ML RandomForestClassifier , unlike its counterpart MLlib , does not provide a method for setting a number of classes. Turns out he expects special attributes to be set in the DataFrame column. The easiest approach is to use the StringIndexer mentioned in the error message:

 import org.apache.spark.ml.feature.StringIndexer val indexer = new StringIndexer() .setInputCol("category") .setOutputCol("label") val pipeline = new Pipeline() .setStages(Array(indexer, tokenizer, hashingTF, idf, lr)) val model = pipeline.fit(wikiData.toDF) 
+2
source share

All Articles