How to run multithreaded jobs in apache-spark mode using scala or python?

I ran into a concurrency issue in a lawsuit that prevents me from using it in production, but I know there is a way out of it. I am trying to launch Spark ALS for 7 million users per billion products using order history. First, I take a list of individual users, and then run a cycle for these users to get recommendations that are rather slow and take several days to get recommendations for all users. I tried doing Cartesian users and products to get recommendations for everyone at once, but again, to submit this to elasticsearch, I have to filter and sort the entries for each user, and only then can I submit it to elasticsearch to consume other APIs.

Therefore, please offer me a solution that is quite scalable in this case and will be used in production with real-time recommendations.

Here is my snippet of code in scala that will give you an idea of ​​how I am now getting closer to solving the problem:

// buy_values -> RDD with Rating(<int user_id>, <int product_id>, <double rating>) def recommend_for_user(user: Int): Unit = { println("Recommendations for User ID: " + user); // Product IDs which are not bought by user val candidates = buys_values .filter(x => x("customer_id").toString.toInt != user) .map(x => x("product_id").toString.toInt) .distinct().map((user, _)) // find 30 products with top rating val recommendations = bestModel.get .predict(candidates) .takeOrdered(30)(Ordering[Double].reverse.on(x => x.rating)) var i = 1 var ESMap = Map[String, String]() recommendations.foreach { r => ESMap += r.product.toString -> bitem_ids.value(r.product) } // push to elasticsearch with user as id client.execute { index into "recommendation" / "items" id user fields ESMap }.await // remove candidate RDD from memory candidates.unpersist() } // iterate on each user to get recommendations for the user [slow process] user_ids.foreach(recommend_for_user) 
+4
source share
2 answers

It's pretty clear that the bottleneck in your program is the search for candidates . Given the Spark architecture, this severely limits your ability to parallelize and incurs significant overhead when starting Spark for each user.

Assuming a typical scenario: 7 million users and a billion products more time, you will predict the entire spectrum of products minus several already purchased by the user. At least in my opinion, an important question is why even bother with filtering. Even if you recommend a product that was previously purchased, is it really harmful?

If you have very strict requirements, I will simply ignore the problem and use MatrixFactorizationModel.recommendProductsForUsers , which pretty much does all the work, excluding data export, for you. After that, you can do bulk export, and you're good to go.

Now let's say that you have a clear policy without duplicates. Working on the assumption that a typical user has purchased only a relatively small number of products, you can start by getting a set of products for each user:

 val userProdSet = buy_values .map{case (user, product, _) => (user, product)} .aggregateByKey(Set.empty[Int])((s, e) => s + e, (s1, s2) => s1 ++ s2) 

Then you can simply display userProdSet to get predictions:

 // Number of predictions for each user val nPred = 30; userProdSet.map{case (user, prodSet) => { val recommended = model // Find recommendations for user .recommendProducts(_, nPred + prodSet.size)) // Filter to remove already purchased .filter(rating => !prodSet.contains(rating.product)) // Sort and limit .sortBy(_.rating) .reverse .take(nPred) (user, recommended) }} 

You can further improve with changeable aggregation kits and by translating the model, but this is a common idea.

If the number of users in user_ids less than the number of users in the whole set ( buy_values ), you can simply filter userProdSet to save only a subset of users.

+2
source

1.4 recommends AllAll to generate all recommendations so that it can be serviced through kv stores.

0
source

All Articles