I am trying to write something as idiomatic as possible in order to collect results from futures stored in a dict.
Suppose I have the following code:
import asyncio async def sleep(seconds): print(f'sleeping for {seconds} seconds') await asyncio.sleep(seconds) print(f'finished sleeping {seconds} seconds') async def run(): tasks = { '4': sleep(4), '3': sleep(3), '2': sleep(2), '1': sleep(1), } print(await gather_from_dict(tasks)) if __name__ == '__main__': asyncio.get_event_loop().run_until_complete(run())
Expected Result:
sleeping for 2 seconds sleeping for 1 seconds sleeping for 4 seconds sleeping for 3 seconds finished sleeping 1 seconds finished sleeping 2 seconds finished sleeping 3 seconds finished sleeping 4 seconds {'4': None, '3': None, '2': None, '1': None}
So far, the cleanest solution I have found is:
async def gather_from_dict(tasks:Dict[Hashable, Awaitable], loop=None, return_exceptions=False) -> Dict: results = await asyncio.gather( *tasks.values(), loop=loop, return_exceptions=return_exceptions ) return dict(zip(tasks.keys(), results))
Any ideas on how to make this easier? Thanks!!!
jordixou
source share