It's pretty clear that the bottleneck in your program is the search for candidates . Given the Spark architecture, this severely limits your ability to parallelize and incurs significant overhead when starting Spark for each user.
Assuming a typical scenario: 7 million users and a billion products more time, you will predict the entire spectrum of products minus several already purchased by the user. At least in my opinion, an important question is why even bother with filtering. Even if you recommend a product that was previously purchased, is it really harmful?
If you have very strict requirements, I will simply ignore the problem and use MatrixFactorizationModel.recommendProductsForUsers , which pretty much does all the work, excluding data export, for you. After that, you can do bulk export, and you're good to go.
Now let's say that you have a clear policy without duplicates. Working on the assumption that a typical user has purchased only a relatively small number of products, you can start by getting a set of products for each user:
val userProdSet = buy_values .map{case (user, product, _) => (user, product)} .aggregateByKey(Set.empty[Int])((s, e) => s + e, (s1, s2) => s1 ++ s2)
Then you can simply display userProdSet to get predictions:
// Number of predictions for each user val nPred = 30; userProdSet.map{case (user, prodSet) => { val recommended = model // Find recommendations for user .recommendProducts(_, nPred + prodSet.size)) // Filter to remove already purchased .filter(rating => !prodSet.contains(rating.product)) // Sort and limit .sortBy(_.rating) .reverse .take(nPred) (user, recommended) }}
You can further improve with changeable aggregation kits and by translating the model, but this is a common idea.
If the number of users in user_ids less than the number of users in the whole set ( buy_values ), you can simply filter userProdSet to save only a subset of users.