Pyspark Column.isin () for a large set

code:

views = sdf \ .where(sdf['PRODUCT_ID'].isin(PRODUCTS)) \ .rdd \ .groupBy(lambda x: x['SESSION_ID']) \ .toLocalIterator() for sess_id, rows in views: # do something 

PRODUCTS is set . It is large, about 10,000 items.

Code Failure:

 --> 9 for sess_id, rows in views: /usr/local/spark/python/pyspark/rdd.py in _load_from_socket(port, serializer) --> 142 for item in serializer.load_stream(rf): /usr/local/spark/python/pyspark/serializers.py in load_stream(self, stream) --> 139 yield self._read_with_length(stream) /usr/local/spark/python/pyspark/serializers.py in _read_with_length(self, stream) --> 156 length = read_int(stream) /usr/local/spark/python/pyspark/serializers.py in read_int(stream) --> 543 length = stream.read(4) /opt/conda/lib/python3.5/socket.py in readinto(self, b) 574 try: --> 575 return self._sock.recv_into(b) 576 except timeout: 577 self._timeout_occurred = True timeout: timed out 

But when I do PRODUCTS set less, everything is fine. I tried to change some timeout values ​​in the Spark configuration. It did not help. How to avoid such failures?

UPDATE

 PRODUCTS = sdf.sort(['TIMESTAMP']).select('PRODUCT_ID').limit(10000).drop_duplicates() views = sdf \ .join(PRODUCTS, 'PRODUCT_ID', 'inner') \ .rdd \ .groupBy(lambda x: x['SESSION_ID']) \ .toLocalIterator() for sess_id, rows in views: # do ... 

PRODUCTS is now a data frame. And I use join . Received the same error.

UPDATE 2

Try this solution:

 views = sdf \ .join(PRODUCTS, 'PRODUCT_ID', 'inner') \ .rdd \ .groupBy(lambda x: x['SESSION_ID']) views.cache() for sess_id, rows in views.toLocalIterator(): pass 

After a while, a very long error occurred:

 Py4JJavaError: An error occurred while calling o289.javaToPython. : org.apache.spark.SparkException: Exception thrown in awaitResult: at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:194) .... 

This error appeared only once! Now I get the same timeout exceptions!

+5
source share
2 answers

I believe this is mainly due to a bug in the implementation of toLocalIterator() in pyspark 2.0.2 . You can read more here: [SPARK-18281] [SQL] [PySpark] Delete the timeout for reading data through the socket for the local iterator .

It seems that the fix will be available in the next update after 2.0.2 and in release 2.1.x If you want to fix it temporarily, you can apply the changes from the above question:

Replace this around line 138 of rdd.py (on the actual spark cluster, it seems you need to update rdd.py inside pyspark.zip :

 try: rf = sock.makefile("rb", 65536) for item in serializer.load_stream(rf): yield item finally: sock.close() 

with this:

 sock.settimeout(None) # << this is they key line that disables timeout after the initial connection return serializer.load_stream(sock.makefile("rb", 65536)) 
+1
source

As @eliasah said in his comment. You should try to join DataFrames to exclude what is not in your PRODUCT table.

 views = sdf \ .join(PRODUCTS) \ .where(sdf['PRODUCT_ID']) \ .rdd \ .groupBy(lambda x: x['SESSION_ID']) \ .toLocalIterator() 
0
source

All Articles