Stream Error: Unable to start a new topic

Here's MWE much more code that I use. It performs Monte Carlo integration according to KDE ( kernel density estimate ) for all values โ€‹โ€‹below a certain threshold (the integration method was proposed in this question: Integration of a 2D kernel density estimate ) iteratively for several points in the list and returns a list of these results.

 import numpy as np from scipy import stats from multiprocessing import Pool import threading # Define KDE integration function. def kde_integration(m_list): # Put some of the values from the m_list into two new lists. m1, m2 = [], [] for item in m_list: # x data. m1.append(item[0]) # y data. m2.append(item[1]) # Define limits. xmin, xmax = min(m1), max(m1) ymin, ymax = min(m2), max(m2) # Perform a kernel density estimate on the data: x, y = np.mgrid[xmin:xmax:100j, ymin:ymax:100j] values = np.vstack([m1, m2]) kernel = stats.gaussian_kde(values) # This list will be returned at the end of this function. out_list = [] # Iterate through all points in the list and calculate for each the integral # of the KDE for the domain of points located below the value of that point # in the KDE. for point in m_list: # Compute the point below which to integrate. iso = kernel((point[0], point[1])) # Sample KDE distribution sample = kernel.resample(size=1000) #Choose number of cores and split input array. cores = 4 torun = np.array_split(sample, cores, axis=1) # Print number of active threads. print threading.active_count() #Calculate pool = Pool(processes=cores) results = pool.map(kernel, torun) #Reintegrate and calculate results insample_mp = np.concatenate(results) < iso # Integrate for all values below iso. integral = insample_mp.sum() / float(insample_mp.shape[0]) # Append integral value for this point to list that will return. out_list.append(integral) return out_list # Generate some random two-dimensional data: def measure(n): "Measurement model, return two coupled measurements." m1 = np.random.normal(size=n) m2 = np.random.normal(scale=0.5, size=n) return m1+m2, m1-m2 # Create list to pass to KDE integral function. m_list = [] for i in range(100): m1, m2 = measure(5) m_list.append(m1.tolist()) m_list.append(m2.tolist()) # Call KDE integration function. print 'Integral result: ', kde_integration(m_list) 

Multiprocessing was suggested in the code in this question. Accelerate the kernel evaluation sample to speed up the code (up to ~ 3.4x).

The code works fine until I try to pass a list of more than 62-63 elements to the KDE function (i.e.: I set the value to more than 63 in the for i in range(100) ). If I do this, I get the following error:

 Traceback (most recent call last): File "~/gauss_kde_temp.py", line 78, in <module> print 'Integral result: ', kde_integration(m_list) File "~/gauss_kde_temp.py", line 48, in kde_integration pool = Pool(processes=cores) File "/usr/lib/python2.7/multiprocessing/__init__.py", line 232, in Pool return Pool(processes, initializer, initargs, maxtasksperchild) File "/usr/lib/python2.7/multiprocessing/pool.py", line 144, in __init__ self._worker_handler.start() File "/usr/lib/python2.7/threading.py", line 494, in start _start_new_thread(self.__bootstrap, ()) thread.error: can't start new thread 

usually (9 out of 10 times) around the active thread 374 . I got out of my league in terms of python coding here, and I have no idea how I can fix this problem. Any help would be greatly appreciated.


Add

I tried adding a while to prevent using too many code streams. I did replace the string print threading.active_count() with this bit of code:

  # Print number of active threads. exit_loop = True while exit_loop: if threading.active_count() < 300: exit_loop = False else: # Pause for 10 seconds. time.sleep(10.) print 'waiting: ', threading.active_count() 

When 302 active threads are reached, the code stops (i.e., gets stuck inside the loop). I waited more than 10 minutes, and the code never left the loop, and the number of active threads never dropped from 302 . Shouldn't the number of active threads decrease after some time?

+3
python multithreading numpy montecarlo kernel density
source share

No one has answered this question yet.

See similar questions:

8
Speed โ€‹โ€‹up core grading sample
5
Integrate 2D core density estimation

or similar:

2369
Add new keys to the dictionary?
1989
"implements Runnable" vs "extends Thread" in Java
1487
What is the difference between process and thread?
1266
How to update GUI from another thread?
1192
How to use threads in Python?
761
What is a daemon thread in Java?
715
Multiprocessing vs Threading Python
696
Is there a way to kill a thread?
674
What is a thread safe or insecure thread in PHP?
8
Speed โ€‹โ€‹up core grading sample

All Articles