Python threading.timer - repeat a function every n seconds

I am having difficulty with python timer and I would really appreciate some advice or help: D

I don’t know much how streams work, but I just want to disable the function every 0.5 seconds and be able to start and stop and reset the timer.

However, I keep getting RuntimeError: threads can only be started once when I execute threading.timer.start() twice. Is there any work for this? I tried applying threading.timer.cancel() before each run.

Pseudocode:

 t=threading.timer(0.5,function) while True: t.cancel() t.start() 
+69
Sep 15
source share
12 answers

The best way is to start the timer thread once. Inside the timer thread, you would specify the following

 class MyThread(Thread): def __init__(self, event): Thread.__init__(self) self.stopped = event def run(self): while not self.stopped.wait(0.5): print("my thread") # call a function 

In the code that started the timer, you can set stop the event to stop the timer.

 stopFlag = Event() thread = MyThread(stopFlag) thread.start() # this will stop the timer stopFlag.set() 
+89
Sep 15
source share

Using timer threads -

 from threading import Timer,Thread,Event class perpetualTimer(): def __init__(self,t,hFunction): self.t=t self.hFunction = hFunction self.thread = Timer(self.t,self.handle_function) def handle_function(self): self.hFunction() self.thread = Timer(self.t,self.handle_function) self.thread.start() def start(self): self.thread.start() def cancel(self): self.thread.cancel() def printer(): print 'ipsem lorem' t = perpetualTimer(5,printer) t.start() 

it can be stopped at t.cancel()

+26
Jun 30 '14 at 10:29
source share

From the equivalent of setInterval in python :

 import threading def setInterval(interval): def decorator(function): def wrapper(*args, **kwargs): stopped = threading.Event() def loop(): # executed in another thread while not stopped.wait(interval): # until stopped function(*args, **kwargs) t = threading.Thread(target=loop) t.daemon = True # stop if the program exits t.start() return stopped return wrapper return decorator 

Using:

 @setInterval(.5) def function(): "..." stop = function() # start timer, the first call is in .5 seconds stop.set() # stop the loop stop = function() # start new timer # ... stop.set() 

Or here is the same functionality, but as a separate function instead of a decorator :

 cancel_future_calls = call_repeatedly(60, print, "Hello, World") # ... cancel_future_calls() 

Here's how to do it without using threads .

+23
May 03 '13 at 22:40
source share

Hans little improvement Then answer , we can just create a subclass of the Timer function. The following becomes all of our "snooze timer" code, and it can be used as a replacement for threading.Timer with all the same arguments:

 from threading import Timer class RepeatTimer(Timer): def run(self): while not self.finished.wait(self.interval): self.function(*self.args, **self.kwargs) 

Usage example:

 def dummyfn(msg="foo"): print(msg) timer = RepeatTimer(1, dummyfn) timer.start() time.sleep(5) timer.cancel() 

produces the following output:

 foo foo foo foo 

as well as

 timer = RepeatTimer(1, dummyfn, args=("bar",)) timer.start() time.sleep(5) timer.cancel() 

produces

 bar bar bar bar 
+8
Feb 12 '18 at 6:57
source share

In the interest of providing the correct answer using a timer as requested by the OP, I will improve the response of swapnil jariwala :

 from threading import Timer import time class InfiniteTimer(): """A Timer class that does not stop, unless you want it to.""" def __init__(self, seconds, target): self._should_continue = False self.is_running = False self.seconds = seconds self.target = target self.thread = None def _handle_target(self): self.is_running = True self.target() self.is_running = False self._start_timer() def _start_timer(self): if self._should_continue: # Code could have been running when cancel was called. self.thread = Timer(self.seconds, self._handle_target) self.thread.start() def start(self): if not self._should_continue and not self.is_running: self._should_continue = True self._start_timer() else: print("Timer already started or running, please wait if you're restarting.") def cancel(self): if self.thread is not None: self._should_continue = False # Just in case thread is running and cancel fails. self.thread.cancel() else: print("Timer never started or failed to initialize.") def tick(): print('ipsem lorem') # Example Usage t = InfiniteTimer(0.5, tick) t.start() 

Import time is optional without using an example.

+4
Jan 03 '17 at 19:05
source share

I changed some code in swapnil-jariwala code to make some console hours.

 from threading import Timer, Thread, Event from datetime import datetime class PT(): def __init__(self, t, hFunction): self.t = t self.hFunction = hFunction self.thread = Timer(self.t, self.handle_function) def handle_function(self): self.hFunction() self.thread = Timer(self.t, self.handle_function) self.thread.start() def start(self): self.thread.start() def printer(): tempo = datetime.today() h,m,s = tempo.hour, tempo.minute, tempo.second print(f"{h}:{m}:{s}") t = PT(1, printer) t.start() 

EXIT

 >>> 11:39:11 11:39:12 11:39:13 11:39:14 11:39:15 11:39:16 ... 

Tkinter graphical timer

This code puts the clock timer in a small window with tkinter

 from threading import Timer, Thread, Event from datetime import datetime import tkinter as tk app = tk.Tk() lab = tk.Label(app, text="Timer will start in a sec") lab.pack() class perpetualTimer(): def __init__(self, t, hFunction): self.t = t self.hFunction = hFunction self.thread = Timer(self.t, self.handle_function) def handle_function(self): self.hFunction() self.thread = Timer(self.t, self.handle_function) self.thread.start() def start(self): self.thread.start() def cancel(self): self.thread.cancel() def printer(): tempo = datetime.today() clock = "{}:{}:{}".format(tempo.hour, tempo.minute, tempo.second) try: lab['text'] = clock except RuntimeError: exit() t = perpetualTimer(1, printer) t.start() app.mainloop() 

An example of a game with cards (sort of)

 from threading import Timer, Thread, Event from datetime import datetime class perpetualTimer(): def __init__(self, t, hFunction): self.t = t self.hFunction = hFunction self.thread = Timer(self.t, self.handle_function) def handle_function(self): self.hFunction() self.thread = Timer(self.t, self.handle_function) self.thread.start() def start(self): self.thread.start() def cancel(self): self.thread.cancel() x = datetime.today() start = x.second def printer(): global questions, counter, start x = datetime.today() tempo = x.second if tempo - 3 > start: show_ans() #print("\n{}:{}:{}".format(tempo.hour, tempo.minute, tempo.second), end="") print() print("-" + questions[counter]) counter += 1 if counter == len(answers): counter = 0 def show_ans(): global answers, c2 print("It is {}".format(answers[c2])) c2 += 1 if c2 == len(answers): c2 = 0 questions = ["What is the capital of Italy?", "What is the capital of France?", "What is the capital of England?", "What is the capital of Spain?"] answers = "Rome", "Paris", "London", "Madrid" counter = 0 c2 = 0 print("Get ready to answer") t = perpetualTimer(3, printer) t.start() 

exit:

 Get ready to answer >>> -What is the capital of Italy? It is Rome -What is the capital of France? It is Paris -What is the capital of England? ... 
+3
Aug 09 '17 at 9:40 on
source share

I had to do this for the project. What I ended up with is launching a separate thread for the function

 t = threading.Thread(target =heartbeat, args=(worker,)) t.start() 

**** heartbeat is my function, working is one of my arguments ****

inside my heartbeat function:

 def heartbeat(worker): while True: time.sleep(5) #all of my code 

Therefore, when I start the thread, the function will wait 5 seconds, run all my code and do it endlessly. If you want to kill a process, just kill the thread.

+2
Mar 25 '17 at 18:53 on
source share

I implemented a class that works like a timer.

I leave a link here if someone needs it: https://github.com/ivanhalencp/python/tree/master/xTimer

+1
Jun 28 '17 at 17:15
source share
 from threading import Timer def TaskManager(): #do stuff t = Timer( 1, TaskManager ) t.start() TaskManager() 

Here is a small example, it will help you better understand how it works. The taskManager () function at the end creates a deferred function call for itself.

Try changing the dalay variable and you can see the difference

 from threading import Timer, _sleep # ------------------------------------------ DATA = [] dalay = 0.25 # sec counter = 0 allow_run = True FIFO = True def taskManager(): global counter, DATA, delay, allow_run counter += 1 if len(DATA) > 0: if FIFO: print("["+str(counter)+"] new data: ["+str(DATA.pop(0))+"]") else: print("["+str(counter)+"] new data: ["+str(DATA.pop())+"]") else: print("["+str(counter)+"] no data") if allow_run: #delayed method/function call to it self t = Timer( dalay, taskManager ) t.start() else: print(" END task-manager: disabled") # ------------------------------------------ def main(): DATA.append("data from main(): 0") _sleep(2) DATA.append("data from main(): 1") _sleep(2) # ------------------------------------------ print(" START task-manager:") taskManager() _sleep(2) DATA.append("first data") _sleep(2) DATA.append("second data") print(" START main():") main() print(" END main():") _sleep(2) DATA.append("last data") allow_run = False 
0
Jun 20 '16 at 18:52
source share

I like the right2clicky correct answer, especially in that it does not require breaking the stream and creating a new one every time the timer fires. In addition, it is easy to override the creation of a class with a timer callback that is called periodically. This is my normal use case:

 class MyClass(RepeatTimer): def __init__(self, period): super().__init__(period, self.on_timer) def on_timer(self): print("Tick") if __name__ == "__main__": mc = MyClass(1) mc.start() time.sleep(5) mc.cancel() 
0
Mar 16 '19 at 19:41
source share

This is an alternative implementation using a function instead of a class. Inspired by @ Andrew Wilkins above.

Because waiting is more accurate than sleep (takes into account the execution time of the function):

 import threading PING_ON = threading.Event() def ping(): while not PING_ON.wait(1): print("my thread %s" % str(threading.current_thread().ident)) t = threading.Thread(target=ping) t.start() sleep(5) PING_ON.set() 
0
Jul 02 '19 at 14:51
source share

I have a problem when I try to cancel a thread with another thread:

The code:

 from threading import Timer, Thread, Event import time import RPi.GPIO as GPIO import time class perpetualTimer(): def __init__(self,t,hFunction): self.t=t self.hFunction = hFunction self.thread = Timer(self.t,self.handle_function) def handle_function(self): self.hFunction() self.thread = Timer(self.t,self.handle_function) self.thread.start() def start(self): self.thread.start() def cancel(self): self.thread.cancel() def apagar(): print ('ipsem lorem') def encender(): s.calcel() t.calcel() t = perpetualTimer(0.5, apagar) s = perpetualTimer(5, encender) t.start() s.start() 

Mistake:

 pi@proyecto:~/proyecto $ python hilo_infinito_fallo.py ipsem lorem ipsem lorem ipsem lorem ipsem lorem ipsem lorem ipsem lorem ipsem lorem ipsem lorem ipsem lorem Exception in thread Thread-2: Traceback (most recent call last): File "/usr/lib/python2.7/threading.py", line 801, in __bootstrap_inner self.run() File "/usr/lib/python2.7/threading.py", line 1073, in run self.function(*self.args, **self.kwargs) File "hilo_infinito_fallo.py", line 15, in handle_function self.hFunction() File "hilo_infinito_fallo.py", line 29, in encender s.calcel() AttributeError: perpetualTimer instance has no attribute 'calcel' ipsem lorem ipsem lorem ipsem lorem ipsem lorem 

How can I cancel a topic at a specific time?

0
Jul 09 '19 at 8:16
source share



All Articles