Pyspark socket timeout exception after application launch for some time

I use pyspark to estimate the parameters for the logistic regression model. I use a spark to calculate probability and gradients, and then use the scipy minim function for optimization (L-BFGS-B).

I use the yarn-client mode to run my application. My application can start working without any problems. However, after a while, it reports the following error:

Traceback (most recent call last): File "/home/panc/research/MixedLogistic/software/mixedlogistic/mixedlogistic_spark/simulation/20160716-1626/spark_1m_data.py", line 115, in <module> res = trainEM2(distData, params0, verbose=True, em_tol=1e-5, opt_method='L-BFGS-B') File "/home/panc/research/MixedLogistic/software/mixedlogistic/mixedlogistic_spark/Train2.py", line 166, in trainEM options={'disp': False}) File "/apps/rhel6/Anaconda-2.0.1/lib/python2.7/site-packages/scipy/optimize/_minimize.py", line 447, in minimize callback=callback, **options) File "/apps/rhel6/Anaconda-2.0.1/lib/python2.7/site-packages/scipy/optimize/lbfgsb.py", line 330, in _minimize_lbfgsb f, g = func_and_grad(x) File "/apps/rhel6/Anaconda-2.0.1/lib/python2.7/site-packages/scipy/optimize/lbfgsb.py", line 278, in func_and_grad f = fun(x, *args) File "/apps/rhel6/Anaconda-2.0.1/lib/python2.7/site-packages/scipy/optimize/optimize.py", line 289, in function_wrapper return function(*(wrapper_args + args)) File "/home/panc/research/MixedLogistic/software/mixedlogistic/mixedlogistic_spark/Train2.py", line 146, in fun_observedQj return dataAndWeightsj_old.map(lambda _: calObservedQj(_[0], _[1], vparamsj, params0)).sum() File "/apps/hathi/spark-1.6.2/python/pyspark/rdd.py", line 995, in sum return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add) File "/apps/hathi/spark-1.6.2/python/pyspark/rdd.py", line 869, in fold vals = self.mapPartitions(func).collect() File "/apps/hathi/spark-1.6.2/python/pyspark/rdd.py", line 772, in collect return list(_load_from_socket(port, self._jrdd_deserializer)) File "/apps/hathi/spark-1.6.2/python/pyspark/rdd.py", line 142, in _load_from_socket for item in serializer.load_stream(rf): File "/apps/hathi/spark-1.6.2/python/pyspark/serializers.py", line 139, in load_stream 16/07/16 20:59:10 ERROR python.PythonRDD: Error while sending iterator java.net.SocketTimeoutException: Accept timed out at java.net.PlainSocketImpl.socketAccept(Native Method) at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:409) at java.net.ServerSocket.implAccept(ServerSocket.java:545) at java.net.ServerSocket.accept(ServerSocket.java:513) at org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:645) yield self._read_with_length(stream) File "/apps/hathi/spark-1.6.2/python/pyspark/serializers.py", line 156, in _read_with_length length = read_int(stream) File "/apps/hathi/spark-1.6.2/python/pyspark/serializers.py", line 543, in read_int length = stream.read(4) File "/apps/rhel6/Anaconda-2.0.1/lib/python2.7/socket.py", line 384, in read data = self._sock.recv(left) socket.timeout: timed out 

I also found a python broken pipe error when I set the spark log level to "ALL".

I am using Spark 1.6.2 and Java 1.8.0_91. Any idea what is going on?

- Update -

I found this to be related to the optimization program that I used in my program.

What I did was evaluate the statistical model using the maximum likelihood method using the EM algorithm (as an iterative algorithm). During each iteration, I need to update the parameters, resolving the minimization problem. Spark is responsible for calculating my likelihood and gradient, which are then passed to the Scipy minimization routine, where I use the L-BFGS-B method. There seems to be something in this routine that breaks my Spark work. But I do not know which part of this procedure is responsible for this problem.

Another observation is that, using the same sample and the same program, I changed the number of sections. When the number of partitions is small, my program can end without any problems. However, when the number of partitions becomes large, the program crashes.

+5
source share
3 answers

I had a similar problem. I had an iteration, and sometimes the execution lasted so long. Increasing spark.executor.heartbeatInterval seemed to solve the problem. I increased it to 3600 to ensure that I will not use timeouts again, and everything has been working fine ever since.

From: http://spark.apache.org/docs/latest/configuration.html :

spark.executor.heartbeatInterval 10s The interval between each artist, starting with a bit. Heartbeats lets the driver know that the performer is still alive and updates it with metrics to complete the tasks.

+3
source

Check out artist logs for more details. I saw similar errors when executors die or are killed by the cluster manager (usually to use more memory than for the container).

0
source

I had a similar problem and fixed it for me:

 import pyspark as ps conf = ps.SparkConf().setMaster("yarn-client").setAppName("sparK-mer") conf.set("spark.executor.heartbeatInterval","3600s") sc = ps.SparkContext('local[4]', '', conf=conf) # uses 4 cores on your local machine 

Additional examples of setting other parameters: https://gist.github.com/robenalt/5b06415f52009c5035910d91f5b919ad

0
source

All Articles