I know that there are a thousand questions related to how best to split your DataFrames or RDD using key salting, etc., but I think this situation is different enough to justify my own question.
I am creating a collaboration mechanism for filtering in PySpark, which means that we need to compare the unique positions of each user (string). Thus, for a DataFrame size M (rows) x N (columns) this means that the data set becomes M x (K choose 2) , where K << N is the number of non-zero (i.e. estimated) elements for the user.
My algorithm works reasonably well and efficiently for datasets, where users rate approximately the same number of elements. However, in situations where a subset of users rated a lot of subjects (an order of magnitude more than other users in one section), my data is greatly distorted, and the last few sections begin to take up large amounts of time. For a simple example, consider the following DataFrame :
cols = ['id', 'Toy Story', 'UP', 'Die Hard', 'MIB', 'The Shining'] ratings = [ (1, 4.5, 3.5, None, 1.0, None),
My situation is a big variation of this DataFrame (~ 1,000,000 users and ~ 10 thousand elements), where some users rate a much larger share of films than others. First, I split the DataFrame as follows:
def _make_ratings(row): import numpy as np non_null_mask = ~np.isnan(row) idcs = np.where(non_null_mask)[0]
Then I can check the number of pairs of mutual evaluations needed for each section, as follows:
n_choose_2 = (lambda itrbl: (len(itrbl) * (len(itrbl) - 1)) / 2.) sorted(ratings.map(n_choose_2).glom().map(sum).collect(), reverse=True)
Initially, this was the distribution of pairs of mutual evaluations per section that I received:

As you can see, this is simply not scalable. So my first attempt to fix this was to more intelligently split my data frame at the source. I came up with the following function, which will randomly break data lines:
def shuffle_partition(X, n_partitions, col_name='shuffle'): from pyspark.sql.functions import rand X2 = X.withColumn(col_name, rand()) return X2.repartition(n_partitions, col_name).drop(col_name)
It worked, sort of. And after applying it, here is a new distribution:

It definitely weighs a lot better, but I still don't like it. There should be a way to distribute these “power controls” more evenly across the sections, but I just can't figure it out. I was thinking about dividing by the column “number of ratings for each user”, but this would ultimately bring together all the users with a high rating, rather than breaking them down.
Am I missing something obvious?
Update
I implemented the igrinis solution in the following function (I'm sure there is a more elegant way to write this, but I'm not very familiar with the DataFrame API, so I went back to RDD for this greeting), but the distribution was about the same as the original. therefore not sure if I did something wrong or not ...:
def partition_by_rating_density(X, id_col_name, n_partitions, partition_col_name='partition'): """Segment partitions by rating density. Partitions will be more evenly distributed based on the number of ratings for each user. Parameters ---------- X : PySpark DataFrame The ratings matrix id_col_name : str The ID column name n_partitions : int The number of partitions in the new DataFrame. partition_col_name : str The name of the partitioning column Returns ------- with_partition_key : PySpark DataFrame The partitioned DataFrame """ ididx = X.columns.index(id_col_name) def count_non_null(row): sm = sum(1 if v is not None else 0 for i, v in enumerate(row) if i != ididx) return row[ididx], sm