How to interrupt / stop / end a multithreaded python program

I have a python program that implements such threads:

class Mythread(threading.Thread): def __init__(self, name, q): threading.Thread.__init__(self) self.name = name self.q = q def run(self): print "Starting %s..." % (self.name) while True: ## Get data from queue data = self.q.get() ## do_some_processing with data ### process_data(data) ## Mark Queue item as done self.q.task_done() print "Exiting %s..." % (self.name) def call_threaded_program(): ##Setup the threads. Define threads,queue,locks threads = [] q = Queue.Queue() thread_count = n #some number data_list = [] #some data list containing data ##Create Threads for thread_id in range(1, thread_count+1): thread_name = "Thread-" + str(thread_id) thread = Mythread(thread_name,q) thread.daemon = True thread.start() ##Fill data in Queue for data_item in data_list: q.put(data_item) try: ##Wait for queue to be exhausted and then exit main program q.join() except (KeyboardInterrupt, SystemExit) as e: print "Interrupt Issued. Exiting Program with error state: %s"%(str(e)) exit(1) 

Call_threaded_program () is called from another program.

I have code that works under normal circumstances. However, if an error / exception occurs in one of the threads, then the program gets stuck (since the queuing is endlessly blocked). The only way to exit this program is to close the terminal itself.

What is the best way to end this program when the thread is up? Is there a way to do this differently? I know this question has been asked many times, but I still cannot find a convincing answer. I would really appreciate any help.

EDIT: I tried to remove the connection in the queue and used the global exit flag as suggested in Is there a way to kill Thread in Python? However, now the behavior is so strange that I can not understand what is happening.

  import threading import Queue import time exit_flag = False class Mythread (threading.Thread): def __init__(self,name,q): threading.Thread.__init__(self) self.name = name self.q = q def run(self): try: # Start Thread print "Starting %s...."%(self.name) # Do Some Processing while not exit_flag: data = self.q.get() print "%s processing %s"%(self.name,str(data)) self.q.task_done() # Exit thread print "Exiting %s..."%(self.name) except Exception as e: print "Exiting %s due to Error: %s"%(self.name,str(e)) def main(): global exit_flag ##Setup the threads. Define threads,queue,locks threads = [] q = Queue.Queue() thread_count = 20 data_list = range(1,50) ##Create Threads for thread_id in range(1,thread_count+1): thread_name = "Thread-" + str(thread_id) thread = Mythread(thread_name,q) thread.daemon = True threads.append(thread) thread.start() ##Fill data in Queue for data_item in data_list: q.put(data_item) try: ##Wait for queue to be exhausted and then exit main program while not q.empty(): pass # Stop the threads exit_flag = True # Wait for threads to finish print "Waiting for threads to finish..." while threading.activeCount() > 1: print "Active Threads:",threading.activeCount() time.sleep(1) pass print "Finished Successfully" except (KeyboardInterrupt, SystemExit) as e: print "Interrupt Issued. Exiting Program with error state: %s"%(str(e)) if __name__ == '__main__': main() 

The program output is as follows:

  #Threads get started correctly #The output also is getting processed but then towards the end, All i see are Active Threads: 16 Active Threads: 16 Active Threads: 16... 

Then the program simply freezes or continues to print active threads. However, since the exit flag is set to True, the thread start method is not executed. Therefore, I do not know how these threads are supported or what happens.

EDIT : I found a problem. In the above code, the method of receiving the stream was blocked and, therefore, could not complete the work. Using the get method with a timeout instead did the trick. I have code only for the launch method, which I changed below

  def run(self): try: #Start Thread printing "Starting %s..."%(self.name) #Do Some processing while not exit_flag: try: data = self.q.get(True,self.timeout) print "%s processing %s"%(self.name,str(data)) self.q.task_done() except: print "Queue Empty or Timeout Occurred. Try Again for %s"%(self.name) # Exit thread print "Exiting %s..."%(self.name) except Exception as e: print "Exiting %s due to Error: %s"%(self.name,str(e)) 
+3
source share
2 answers

If you want to force all threads to exit when you exit the process, you can set the thread's daemon flag to True before creating the thread.

http://docs.python.org/2/library/threading.html#threading.Thread.daemon

+2
source

I did this once in C. Basically, I had a main process that started from others and tracked them, i.e. saved PID and waited for return code. If you have an error in the process, the code will indicate so, and then you can stop any other process. Hope this helps

Edit:

Sorry, I might have forgotten in your answer that you used threads. But I think it still applies. You can either wrap it or change the thread to get the return value, or you can use the multi-threaded pool library.

how to get return value from stream in python?

Python stream exit code

0
source

All Articles