Creating a temporary asynchronous timer timer callback for a related method using python-asyncio

I am trying to create some kind of timer callback for the bound async method using the asyncio event asyncio . Now the problem is that the associated asynchronous method should not contain a strong reference to the instance, otherwise the latter will never be deleted. The timer callback should only work up to the parent instance. I found a solution, but I do not think it is beautiful:

 import asyncio import functools import weakref class ClassWithTimer: def __init__(self): asyncio.ensure_future( functools.partial( ClassWithTimer.update, weakref.ref(self) )() ) def __del__(self): print("deleted ClassWithTimer!") async def update(self): while True: await asyncio.sleep(1) if self() is None: break print("IN update of object " + repr(self())) async def run(): foo = ClassWithTimer() await asyncio.sleep(5) del foo loop = asyncio.get_event_loop() loop.run_until_complete(run()) 

Is there a better, more pythonic way to do this? The timer callback really needs to be asynchronous. Without asyncio , weakref.WeakMethod probably be the way to go. But asyncio.ensure_future requires an accompanying object, so in this case it will not work.

+7
python timer weak-references python-asyncio
source share
4 answers

Based on answers from Huazuo Gao and germn, I implemented ensure_weakly_binding_future , which is basically the same as ensure_future but does not contain a reference to an instance of the associated method. It does not change the general binding (for example, a decorator-based solution) and correctly undoes the future when the parent instance is deleted:

 import asyncio import weakref def ensure_weakly_binding_future(method): class Canceller: def __call__(self, proxy): self.future.cancel() canceller = Canceller() proxy_object = weakref.proxy(method.__self__, canceller) weakly_bound_method = method.__func__.__get__(proxy_object) future = asyncio.ensure_future(weakly_bound_method()) canceller.future = future class ClassWithTimer: def __init__(self): ensure_weakly_binding_future(self.update) def __del__(self): print("deleted ClassWithTimer!", flush=True) async def update(self): while True: await asyncio.sleep(1) print("IN update of object " + repr(self), flush=True) async def run(): foo = ClassWithTimer() await asyncio.sleep(5.5) del foo await asyncio.sleep(2.5) loop = asyncio.get_event_loop() loop.run_until_complete(run()) 
+1
source share

Sorry, I'm not sure if I understood your question correctly. Is this the solution you are looking for?

 import asyncio class ClassWithTimer: async def __aenter__(self): self.task = asyncio.ensure_future(self.update()) async def __aexit__(self, *args): try: self.task.cancel() # Just cancel updating when we don't need it. await self.task except asyncio.CancelledError: # Ignore CancelledError rised by cancelled task. pass del self # I think you don't need this in real life: instance should be normally deleted by GC. def __del__(self): print("deleted ClassWithTimer!") async def update(self): while True: await asyncio.sleep(1) print("IN update of object " + repr(self)) async def run(): async with ClassWithTimer(): # Use context manager to handle when we need updating. await asyncio.sleep(5) loop = asyncio.get_event_loop() loop.run_until_complete(run()) 

Output:

 IN update of object <__main__.ClassWithTimer object at 0x000000468D0FB208> IN update of object <__main__.ClassWithTimer object at 0x000000468D0FB208> IN update of object <__main__.ClassWithTimer object at 0x000000468D0FB208> IN update of object <__main__.ClassWithTimer object at 0x000000468D0FB208> deleted ClassWithTimer! [Finished in 5.2s] 

Another way without a context manager:

 import asyncio class ClassWithTimer: def __init__(self): self.task = asyncio.ensure_future(self.update()) async def release(self): try: self.task.cancel() await self.task except asyncio.CancelledError: pass del self def __del__(self): print("deleted ClassWithTimer!") async def update(self): while True: await asyncio.sleep(1) print("IN update of object " + repr(self)) async def run(): foo = ClassWithTimer() await asyncio.sleep(5) await foo.release() loop = asyncio.get_event_loop() loop.run_until_complete(run()) 
+2
source share

I ran into almost the same problem a few months ago, and I wrote a decorator to get around it:

 def weakmethod(f): @property def get(self): return f.__get__(weakref.proxy(self)) # self = weakref.proxy(self) # if hasattr(f, '__get__'): # raise RuntimeWarning( # 'weakref may not work unless you implement ' # 'the property protocol carefully by youself!' # ) # return f.__get__(self) # if asyncio.iscoroutinefunction(f): # #Make the returned method a coroutine function, optional # async def g(*arg, **kwarg): # return await f(self, *arg, **kwarg) # else: # def g(*arg, **kwarg): # return f(self, *arg, **kwarg) # return g # #Still some situations not taken into account? return get 

Your code could be rewritten in a very natural way:

 class ClassWithTimer: def __init__(self): asyncio.ensure_future(self.update()) def __del__(self): print("deleted ClassWithTimer!") @weakmethod async def update(self): while True: await asyncio.sleep(1) print("IN update of object ", self) 

Important:

  • weakref.proxy does not stop you from getting a strong link. In addition, it is not guaranteed that it will behave exactly the same as the original object.

  • My implementation did not cover all the possibilities.

+2
source share

`

 import asyncio import warnings class Timeout: def __init__(self, timeout, *, raise_error=False, loop=None): self._timeout = timeout if loop is None: loop = asyncio.get_event_loop() self._loop = loop self._raise_error = raise_error self._task = None self._cancelled = False self._cancel_handler = None async def __aenter__(self): self._task = asyncio.Task.current_task(loop=loop) self._cancel_handler = self._loop.call_later( self._cancel, self._timeout) async def __aexit__(self, exc_type, exc_val, exc_tb): if self._cancelled: if self._raise_error: raise asyncio.TimeoutError else: # suppress return True else: self._cancel_handler.cancel() # raise all other errors def __del__(self): if self._task: # just for preventing improper usage warnings.warn("Use async with") def _cancel(self): self._cancelled = self._task.cancel() async def long_running_task(): while True: asyncio.sleep(5) async def run(): async with Timeout(1): await long_running_task() loop = asyncio.get_event_loop() loop.run_until_complete(run()) 
0
source share

All Articles