Catch Keyboard Interrupt to Stop Multiprocess Python from Queuing

From several posts found on stackoverflow, I created this code.

Scenario

I want to have multiprocessing.queue on which several workers are "listening"

In the event of a keyboard interruption, the main process should no longer place new elements in the queue and with the help of sentinel objects, the worker should be elegantly stopped.

Problem

My problem is with the current version where I use

signal.signal(signal.SIGINT, signal.SIG_IGN) 

To ignore Ctrl + C, it is also ignored by the main process.

Any ideas? Do I need to use a multiprocessor pool of workers? Some examples indicate that I may have to do this. Can I then use the queue?

 from multiprocessing import Pool, Process,Queue import time import signal # http://docs.python.org/3.1/library/multiprocessing.html#multiprocessing.Queue # http://docs.python.org/3.1/library/multiprocessing.html#multiprocessing.Process class Worker(Process): def __init__(self, queue,ident): super(Worker, self).__init__() # Ignore Signals signal.signal(signal.SIGINT, signal.SIG_IGN) self.queue= queue self.idstr= str(ident) print "Ident" + self.idstr def run(self): print 'Worker started' # do some initialization here print 'Computing things!' for data in iter( self.queue.get, None ): print "#" + self.idstr + " : " + str(data) time.sleep(5) print "#" + self.idstr + "Queue Size: " + str(self.queue.qsize()) print "Worker Done" #### Main #### request_queue = Queue(10) for i in range(4): Worker( request_queue,i ).start() try: for data in range(1000000): request_queue.put( data ) #print "Queue Size: " + str(request_queue.qsize()) # Sentinel objects to allow clean shutdown: 1 per worker. for i in range(4): request_queue.put( None ) except KeyboardInterrupt: print "Caught KeyboardInterrupt, terminating workers" while request_queue.empty()==False: request_queue.get() request_queue.put( None ) 
+4
python multithreading exception
source share
2 answers

I think I found a solution. However, I do not like the fact that I get SIGINT 1 time from the main and 4 times from the Worker, but maybe I need to live with it.

  • I specified a signal handler for the interrupt signal.
  • After receiving the first Sig INT, I ignore more of the SIG Int signal.
  • I set the stop flag to TRUE
  • I exit the queue insert loop
  • I call a stop function that clears the queue and inserts still frames

     from multiprocessing import Pool, Process,Queue import time import signal # http://docs.python.org/3.1/library/multiprocessing.html#multiprocessing.Queue # http://docs.python.org/3.1/library/multiprocessing.html#multiprocessing.Process # Stop Flag for loop stop = False # Define SIGINT def signal_handler(sig, frame): print 'You pressed Ctrl+C!' global stop stop = True # Ignore more Ctrl+C signal.signal(signal.SIGINT, signal.SIG_IGN) signal.signal(signal.SIGINT, signal_handler) def stopSentinel(request_queue): print "CTRL Stop Queue and insert None" # Empty Existing Queue while request_queue.empty()==False: request_queue.get() # Put One None for each Worker for i in range(4): request_queue.put( None ) class Worker(Process): def __init__(self, queue,ident): super(Worker, self).__init__() self.queue= queue self.idstr= str(ident) print "Ident" + self.idstr def run(self): print 'Worker started' # do some initialization here print 'Computing things!' for data in iter( self.queue.get, None ): print "#" + self.idstr + " : " + str(data) time.sleep(5) print "#" + self.idstr + "Queue Size: " + str(self.queue.qsize()) print "Worker Done" #### Main ##### request_queue = Queue(10) for i in range(4): Worker( request_queue,i ).start() #### Fill Queue with Data #### for data in range(1000000): request_queue.put( data ) #print "Queue Size: " + str(request_queue.qsize()) # Sentinel objects to allow clean shutdown: 1 per worker. # Check for Stop print "Check Breakout" if stop == True: print "Stop Break" break if stop == True: stopSentinel(request_queue) else: print "Normal Stop" for i in range(4): request_queue.put( None ) 
0
source share

Based on your decision (which is good), I added an additional level of protection if the main code does not respond and the user cancels twice:

 global STOP import os, signal def signal_handler(sig, frame): global STOP if STOP: signal.signal(signal.SIGINT, signal.SIG_IGN) os.kill(os.getpid(), signal.SIGTERM) STOP = True signal.signal(signal.SIGINT, signal_handler) 
0
source share

All Articles