code:
views = sdf \ .where(sdf['PRODUCT_ID'].isin(PRODUCTS)) \ .rdd \ .groupBy(lambda x: x['SESSION_ID']) \ .toLocalIterator() for sess_id, rows in views:
PRODUCTS is set . It is large, about 10,000 items.
Code Failure:
--> 9 for sess_id, rows in views: /usr/local/spark/python/pyspark/rdd.py in _load_from_socket(port, serializer) --> 142 for item in serializer.load_stream(rf): /usr/local/spark/python/pyspark/serializers.py in load_stream(self, stream) --> 139 yield self._read_with_length(stream) /usr/local/spark/python/pyspark/serializers.py in _read_with_length(self, stream) --> 156 length = read_int(stream) /usr/local/spark/python/pyspark/serializers.py in read_int(stream) --> 543 length = stream.read(4) /opt/conda/lib/python3.5/socket.py in readinto(self, b) 574 try: --> 575 return self._sock.recv_into(b) 576 except timeout: 577 self._timeout_occurred = True timeout: timed out
But when I do PRODUCTS set less, everything is fine. I tried to change some timeout values ββin the Spark configuration. It did not help. How to avoid such failures?
UPDATE
PRODUCTS = sdf.sort(['TIMESTAMP']).select('PRODUCT_ID').limit(10000).drop_duplicates() views = sdf \ .join(PRODUCTS, 'PRODUCT_ID', 'inner') \ .rdd \ .groupBy(lambda x: x['SESSION_ID']) \ .toLocalIterator() for sess_id, rows in views: # do ...
PRODUCTS is now a data frame. And I use join . Received the same error.
UPDATE 2
Try this solution:
views = sdf \ .join(PRODUCTS, 'PRODUCT_ID', 'inner') \ .rdd \ .groupBy(lambda x: x['SESSION_ID']) views.cache() for sess_id, rows in views.toLocalIterator(): pass
After a while, a very long error occurred:
Py4JJavaError: An error occurred while calling o289.javaToPython. : org.apache.spark.SparkException: Exception thrown in awaitResult: at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:194) ....
This error appeared only once! Now I get the same timeout exceptions!