How to limit the execution time of a function call in Python

There is a function call in my code related to sockets, this function is from another module, so out of my control the problem is that it blocks from time to time, which is completely unacceptable. How to limit the execution time of a function from my code? I assume that the solution should use a different thread.

+48
python multithreading
Dec 14 '08 at 16:20
source share
9 answers

I'm not sure how cross-platform it can be, but using alarms and alarms can be a good way to take a look at this. With a little work, you could make it completely general and use it in any situation.

http://docs.python.org/library/signal.html

So your code will look something like this.

import signal def signal_handler(signum, frame): raise Exception("Timed out!") signal.signal(signal.SIGALRM, signal_handler) signal.alarm(10) # Ten seconds try: long_function_call() except Exception, msg: print "Timed out!" 
+32
Dec 14 '08 at 17:27
source share

Improving @ rik.the.vik's answer will be to use the with statement to give syntax sugar timeout functions:

 import signal from contextlib import contextmanager class TimeoutException(Exception): pass @contextmanager def time_limit(seconds): def signal_handler(signum, frame): raise TimeoutException("Timed out!") signal.signal(signal.SIGALRM, signal_handler) signal.alarm(seconds) try: yield finally: signal.alarm(0) try: with time_limit(10): long_function_call() except TimeoutException as e: print("Timed out!") 
+64
Mar 02 '09 at 3:14
source share

This uses the Linux / OSX method to limit the runtime of a function. This is in case you do not want to use threads and want your program to wait for the function to finish or expire.

 from multiprocessing import Process from time import sleep def f(time): sleep(time) def run_with_limited_time(func, args, kwargs, time): """Runs a function with time limit :param func: The function to run :param args: The functions args, given as tuple :param kwargs: The functions keywords, given as dict :param time: The time limit in seconds :return: True if the function ended successfully. False if it was terminated. """ p = Process(target=func, args=args, kwargs=kwargs) p.start() p.join(time) if p.is_alive(): p.terminate() return False return True if __name__ == '__main__': print run_with_limited_time(f, (1.5, ), {}, 2.5) # True print run_with_limited_time(f, (3.5, ), {}, 2.5) # False 
+14
Oct 30 '14 at 22:05
source share

Doing this from the signal handler is dangerous: you can be inside the exception handler while raising the exception and leave things in a broken state. For example,

 def function_with_enforced_timeout(): f = open_temporary_file() try: ... finally: here() unlink(f.filename) 

If your exception is thrown here (), the temporary file will never be deleted.

The solution here is that asynchronous exceptions are thrown until the code is inside the exception handling code (the exception or the final block), but Python does not.

Note that this does not interrupt anything when executing native code; it will only interrupt it when the function returns, so this may not help this particular case. (SIGALRM itself can abort a call that blocks - but the socket code usually just retries after EINTR.)

Doing this with threads is a better idea as it is more portable than signals. Since you start a worker thread and block it until it completes, there are no normal concurrency problems. Unfortunately, there is no way to asynchronously throw an exception to another thread in Python (other thread APIs can use this). It will also have the same problem with sending an exception during the exception handler and requires the same fix.

+7
Jul 11 '09 at 20:30
source share

I prefer the context manager approach because it allows you to execute multiple python statements inside a with time_limit . Since there is no SIGALARM on a Windows system, a more portable and possibly simpler method can use Timer

 from contextlib import contextmanager import threading import _thread class TimeoutException(Exception): def __init__(self, msg=''): self.msg = msg @contextmanager def time_limit(seconds, msg=''): timer = threading.Timer(seconds, lambda: _thread.interrupt_main()) timer.start() try: yield except KeyboardInterrupt: raise TimeoutException("Timed out for operation {}".format(msg)) finally: # if the action ends in specified time, timer is canceled timer.cancel() import time # ends after 5 seconds with time_limit(5, 'sleep'): for i in range(10): time.sleep(1) # this will actually end after 10 seconds with time_limit(5, 'sleep'): time.sleep(10) 

The key point here is to use _thread.interrupt_main to interrupt the main thread from the timer thread. One caveat is that the main thread does not always respond to KeyboardInterrupt raised by Timer quickly. For example, time.sleep() calls a system function, so KeyboardInterrupt will be processed after the sleep call.

+6
Jun 06 '16 at 1:41
source share

The only "safe" way to do this in any language is to use a secondary process to execute this timeout, otherwise you need to create your code so that it safely runs the time on its own, for example, by checking the time elapsed in a loop or similar . If changing the method is not an option, the flow will be insufficient.

Why? Because you risk leaving things in poor condition when you do this. If the thread is simply killed by the middle method, held locks, etc. They will simply be held and cannot be released.

So, look at the process, do not look at the thread.

+4
Dec 14 '08 at 17:20
source share

You do not need to use streams. You can use another process to perform the lock, for example, perhaps using the subprocess module. If you want to separate data structures between different parts of your program, then Twisted is a great library to give you control over this, and I'd recommend it if you care about locking and expect this problem to be a lot. The bad news with Twisted is that you need to rewrite your code to avoid blocking, and there is a fair learning curve.

You can use threads to avoid blocking, but I would consider this as a last resort, as it provides you with a whole world of pain. Read a good book on concurrency before even thinking about using threads in production, for example. Jean Bacon "Parallel Systems". I work with a bunch of people who really cool high-performance materials using threads, and we don’t introduce topics into projects if we really don’t need them.

+3
Dec 14 '08 at 17:13
source share

Here is the timeout function, I think I found via google, and this works for me.

From: http://code.activestate.com/recipes/473878/

 def timeout(func, args=(), kwargs={}, timeout_duration=1, default=None): '''This function will spwan a thread and run the given function using the args, kwargs and return the given default value if the timeout_duration is exceeded ''' import threading class InterruptableThread(threading.Thread): def __init__(self): threading.Thread.__init__(self) self.result = default def run(self): try: self.result = func(*args, **kwargs) except: self.result = default it = InterruptableThread() it.start() it.join(timeout_duration) if it.isAlive(): return it.result else: return it.result 
+2
Dec 15 '08 at 4:41
source share

I usually prefer to use the context manager as suggested by @ josh-lee

But if someone is interested in having this implemented as a decorator, here is an alternative.

Here's how it would look:

 import time from timeout import timeout class Test(object): @timeout(2) def test_a(self, foo, bar): print foo time.sleep(1) print bar return 'A Done' @timeout(2) def test_b(self, foo, bar): print foo time.sleep(3) print bar return 'B Done' t = Test() print t.test_a('python', 'rocks') print t.test_b('timing', 'out') 

And this is the timeout.py module:

 import threading class TimeoutError(Exception): pass class InterruptableThread(threading.Thread): def __init__(self, func, *args, **kwargs): threading.Thread.__init__(self) self._func = func self._args = args self._kwargs = kwargs self._result = None def run(self): self._result = self._func(*self._args, **self._kwargs) @property def result(self): return self._result class timeout(object): def __init__(self, sec): self._sec = sec def __call__(self, f): def wrapped_f(*args, **kwargs): it = InterruptableThread(f, *args, **kwargs) it.start() it.join(self._sec) if not it.is_alive(): return it.result raise TimeoutError('execution expired') return wrapped_f 

Exit:

 python rocks A Done timing Traceback (most recent call last): ... timeout.TimeoutError: execution expired out 

Please note that even if TimeoutError selected, the decorated method will continue to work in another thread. If you also want this thread to be “stopped”, see: Is there a way to kill a thread in Python?

0
Jan 27 '16 at 13:38 on
source share



All Articles