I process all the files in a directory using multiple threads to process files in parallel. Everything works fine, except that the threads seem to stay alive, and so the number of threads in the process increases until it reaches 1K or so threads, and then gives thread.error can't start new thread error . I know that this error is caused by a level limit on the number of threads. I can't seem to figure out where the error is that keeps threads alive. Any ideas? Here is the minimal version of my code.
class Worker(Thread): def __init__(self, tasks): Thread.__init__(self) self.tasks = tasks self.daemon = True self.start() def run(self): while True: func, args, kargs = self.tasks.get() try: func(*args, **kargs) except Exception, e: print e self.tasks.task_done() class ThreadPool: def __init__(self, num_threads): self.tasks = Queue(num_threads) for _ in range(num_threads): Worker(self.tasks) def add_task(self, func, *args, **kargs): self.tasks.put((func, args, kargs)) def wait_completion(self): self.tasks.join() def foo(filename) pool = ThreadPool(32) iterable_data = process_file(filename) for data in iterable_data: pool.add_task(some_function, data) pool.wait_completion() files = os.listdir(directory) for file in files: foo(file)
source share