Uniform decomposition of the PySpark File Frame by the number of nonzero elements per line

I know that there are a thousand questions related to how best to split your DataFrames or RDD using key salting, etc., but I think this situation is different enough to justify my own question.

I am creating a collaboration mechanism for filtering in PySpark, which means that we need to compare the unique positions of each user (string). Thus, for a DataFrame size M (rows) x N (columns) this means that the data set becomes M x (K choose 2) , where K << N is the number of non-zero (i.e. estimated) elements for the user.

My algorithm works reasonably well and efficiently for datasets, where users rate approximately the same number of elements. However, in situations where a subset of users rated a lot of subjects (an order of magnitude more than other users in one section), my data is greatly distorted, and the last few sections begin to take up large amounts of time. For a simple example, consider the following DataFrame :

 cols = ['id', 'Toy Story', 'UP', 'Die Hard', 'MIB', 'The Shining'] ratings = [ (1, 4.5, 3.5, None, 1.0, None), # user 1 (2, 2.0, None, 5.0, 4.0, 3.0), # user 2 (3, 3.5, 5.0, 1.0, None, 1.0), # user 3 (4, None, None, 4.5, 3.5, 4.0), # user 4 (5, None, None, None, None, 4.5) # user 5 ] sc.parallelize(ratings, 2).toDF(cols) 

My situation is a big variation of this DataFrame (~ 1,000,000 users and ~ 10 thousand elements), where some users rate a much larger share of films than others. First, I split the DataFrame as follows:

 def _make_ratings(row): import numpy as np non_null_mask = ~np.isnan(row) idcs = np.where(non_null_mask)[0] # extract the non-null index mask # zip the non-null idcs with the corresponding ratings rtgs = row[non_null_mask] return list(zip(idcs, rtgs)) def as_array(partition): import numpy as np for row in partition: yield _make_ratings(np.asarray(row, dtype=np.float32)) # drop the id column, get the RDD, and make the copy of np.ndarrays ratings = R.drop('id').rdd\ .mapPartitions(as_array)\ .cache() 

Then I can check the number of pairs of mutual evaluations needed for each section, as follows:

 n_choose_2 = (lambda itrbl: (len(itrbl) * (len(itrbl) - 1)) / 2.) sorted(ratings.map(n_choose_2).glom().map(sum).collect(), reverse=True) 

Initially, this was the distribution of pairs of mutual evaluations per section that I received:

First distribution

As you can see, this is simply not scalable. So my first attempt to fix this was to more intelligently split my data frame at the source. I came up with the following function, which will randomly break data lines:

 def shuffle_partition(X, n_partitions, col_name='shuffle'): from pyspark.sql.functions import rand X2 = X.withColumn(col_name, rand()) return X2.repartition(n_partitions, col_name).drop(col_name) 

It worked, sort of. And after applying it, here is a new distribution:

Second level

It definitely weighs a lot better, but I still don't like it. There should be a way to distribute these “power controls” more evenly across the sections, but I just can't figure it out. I was thinking about dividing by the column “number of ratings for each user”, but this would ultimately bring together all the users with a high rating, rather than breaking them down.

Am I missing something obvious?

Update

I implemented the igrinis solution in the following function (I'm sure there is a more elegant way to write this, but I'm not very familiar with the DataFrame API, so I went back to RDD for this greeting), but the distribution was about the same as the original. therefore not sure if I did something wrong or not ...:

 def partition_by_rating_density(X, id_col_name, n_partitions, partition_col_name='partition'): """Segment partitions by rating density. Partitions will be more evenly distributed based on the number of ratings for each user. Parameters ---------- X : PySpark DataFrame The ratings matrix id_col_name : str The ID column name n_partitions : int The number of partitions in the new DataFrame. partition_col_name : str The name of the partitioning column Returns ------- with_partition_key : PySpark DataFrame The partitioned DataFrame """ ididx = X.columns.index(id_col_name) def count_non_null(row): sm = sum(1 if v is not None else 0 for i, v in enumerate(row) if i != ididx) return row[ididx], sm # add the count as the last element and id as the first counted = X.rdd.map(count_non_null)\ .sortBy(lambda r: r[-1], ascending=False) # get the count array out, zip it with the index, and then flatMap # it out to get the sorted index indexed = counted.zipWithIndex()\ .map(lambda ti: (ti[0][0], ti[1] % n_partitions))\ .toDF([id_col_name, partition_col_name]) # join back with indexed, which now has the partition column counted_indexed = X.join(indexed, on=id_col_name, how='inner') # the columns to drop return counted_indexed.repartition(n_partitions, partition_col_name)\ .drop(partition_col_name) 
+7
performance python machine-learning pyspark spark-dataframe
source share
1 answer

What you can do is get a sorted list of users by the number of ratings, and then index them in a column divided by the number of sections. Get the remainder of the division as a column, and then partition with partitionBy() in that column. Thus, your sections will have almost equal representations of all ratings of user ratings.

For 3 sections, this will help you:

 [1000, 800, 700, 600, 200, 30, 10, 5] - number of ratings [ 0, 1, 2, 3, 4, 5, 6, 7] - position in sorted index [ 0, 1, 2, 0, 1, 2, 0, 1] - group to partition by 
+5
source share

All Articles