OutOfBoundsException with ALS - Flink MLlib

I use a movie review system using the available MovieLens datasets: http://grouplens.org/datasets/movielens/

To calculate this system of recommendations, I use the ML Flink library in scala and, in particular, the ALS ( org.apache.flink.ml.recommendation.ALS) algorithm .

First, compare the ratings of the movie in DataSet[(Int, Int, Double)], and then create trainingSetand a testSet(see code below).

My problem is that I do not use the function ALS.fitwith the whole data set (all ratings), but if I just delete only one rating, the correspondence function does not work anymore, and I don’t understand why.

Do you have any ideas? :)

Used code:

Rating.scala

case class Rating(userId: Int, movieId: Int, rating: Double)

Preprocessing.scala

object PreProcessing {

def getRatings(env : ExecutionEnvironment, ratingsPath : String): DataSet[Rating] = {
      env.readCsvFile[(Int, Int, Double)](
      ratingsPath, ignoreFirstLine = true,
      includedFields = Array(0,1,2)).map{r => new Rating(r._1, r._2, r._3)}
}

Processing.scala

object Processing {
  private val ratingsPath: String = "Path_to_ratings.csv"

  def main(args: Array[String]) {

    val env = ExecutionEnvironment.getExecutionEnvironment

    val ratings: DataSet[Rating] = PreProcessing.getRatings(env, ratingsPath)

    val trainingSet : DataSet[(Int, Int, Double)] =
    ratings
     .map(r => (r.userId, r.movieId, r.rating))
     .sortPartition(0, Order.ASCENDING)
     .first(ratings.count().toInt)

    val als = ALS()
     .setIterations(10)
     .setNumFactors(10)
     .setBlocks(150)
     .setTemporaryPath("/tmp/tmpALS")

    val parameters = ParameterMap()
     .add(ALS.Lambda, 0.01) // After some tests, this value seems to fit the problem
     .add(ALS.Seed, 42L)

    als.fit(trainingSet, parameters)
  }
}

"But if I just delete only one rating"

val trainingSet : DataSet[(Int, Int, Double)] =
  ratings
    .map(r => (r.userId, r.movieId, r.rating))
    .sortPartition(0, Order.ASCENDING)
    .first((ratings.count()-1).toInt)

:

06/19/2015 15:00:24 CoGroup (CoGroup at org.apache.flink.ml.recommendation.ALS $.updateFactors(ALS.scala: 570)) (4/4) FAILED

java.lang.ArrayIndexOutOfBoundsException: 5

at org.apache.flink.ml.recommendation.ALS $BlockRating.apply(ALS.scala: 358)

at org.apache.flink.ml.recommendation.ALS $$ anon $111.coGroup(ALS.scala: 635)

org.apache.flink.runtime.operators.CoGroupDriver.run(CoGroupDriver.java:152)

...

+4
1

first setTemporaryPath Flink ALS. , , ALS.

- - . . , / , / , . ALS.

Flink , . , . , , . , . ALS, DataSet (, /) , .

, , temporaryPath. , . , , ALS , , , ALS, .

. , , first. first n - . , Flink first . , , Flink , . , first. first. , ALS .

, first ALS. FlinkMLTools persist, DataSet, DataSet, DataSet. .

val firstTrainingSet : DataSet[(Int, Int, Double)] =
  ratings
    .map(r => (r.userId, r.movieId, r.rating))
    .first((ratings.count()-1).toInt)

val trainingSet = FlinkMLTools.persist(firstTrainingSet, "/tmp/tmpALS/training")

val als = ALS()
  .setIterations(10)
  .setNumFactors(10)
  .setBlocks(150)
  .setTemporaryPath("/tmp/tmpALS/")

val parameters = ParameterMap()
  .add(ALS.Lambda, 0.01) // After some tests, this value seems to fit the problem
  .add(ALS.Seed, 42L)

als.fit(trainingSet, parameters)

temporaryPath. ( ) . , , first.

Flink . first, , , - .

+7

All Articles