- , . , :
class NB_Hardware(object):
__metaclass__ = NonBlockBuilder
delegate = Hardware
nb_funcs = ['blocking_command']
, Python 3 concurrent.futures.ThreadPoolExecutor ( -, asyncio), Python 2 a concurrent.futures.ProcessPoolExecutor. :
from multiprocessing import cpu_count
from concurrent.futures import ProcessPoolExecutor
def runner(self, cb, *args, **kwargs):
return getattr(self, cb)(*args, **kwargs)
class _ExecutorMixin():
""" A Mixin that provides asynchronous functionality.
This mixin provides methods that allow a class to run
blocking methods in a ProcessPoolExecutor.
It also provides methods that attempt to keep the object
picklable despite having a non-picklable ProcessPoolExecutor
as part of its state.
"""
pool_workers = cpu_count()
def run_in_executor(self, callback, *args, **kwargs):
""" Runs a function in an Executor.
Returns a concurrent.Futures.Future
"""
if not hasattr(self, '_executor'):
self._executor = self._get_executor()
return self._executor.submit(runner, self, callback, *args, **kwargs)
def _get_executor(self):
return ProcessPoolExecutor(max_workers=self.pool_workers)
def __getattr__(self, attr):
if (self._obj and hasattr(self._obj, attr) and
not attr.startswith("__")):
return getattr(self._obj, attr)
raise AttributeError(attr)
def __getstate__(self):
self_dict = self.__dict__
self_dict['_executor'] = None
return self_dict
def __setstate__(self, state):
self.__dict__.update(state)
self._executor = self._get_executor()
class NonBlockBuilder(type):
""" Metaclass for adding non-blocking versions of methods to a class.
Expects to find the following class attributes:
nb_funcs - A list containing methods that need non-blocking wrappers
delegate - The class to wrap (add non-blocking methods to)
pool_workers - (optional) how many workers to put in the internal pool.
The metaclass inserts a mixin (_ExecutorMixin) into the inheritence
hierarchy of cls. This mixin provides methods that allow
the non-blocking wrappers to do their work.
"""
def __new__(cls, clsname, bases, dct, **kwargs):
nbfunc_list = dct.get('nb_funcs', [])
existing_nbfuncs = set()
def find_existing_nbfuncs(d):
for attr in d:
if attr.startswith("nb_"):
existing_nbfuncs.add(attr)
find_existing_nbfuncs(dct)
for b in bases:
b_dct = b.__dict__
nbfunc_list.extend(b_dct.get('nb_funcs', []))
find_existing_nbfuncs(b_dct)
if _ExecutorMixin not in bases:
bases += (_ExecutorMixin,)
for func in nbfunc_list:
nb_name = 'nb_{}'.format(func)
if nb_name not in existing_nbfuncs:
dct[nb_name] = cls.nbfunc_maker(func)
return super(NonBlockBuilder, cls).__new__(cls, clsname, bases, dct)
def __init__(cls, name, bases, dct):
""" Properly initialize a non-blocking wrapper.
Sets pool_workers and delegate on the class, and also
adds an __init__ method to it that instantiates the
delegate with the proper context.
"""
super(NonBlockBuilder, cls).__init__(name, bases, dct)
pool_workers = dct.get('pool_workers')
delegate = dct.get('delegate')
old_init = dct.get('__init__')
for b in bases:
if b is object:
continue
b_dct = b.__dict__
if not pool_workers:
pool_workers = b_dct.get('pool_workers')
if not delegate:
delegate = b_dct.get('delegate')
if not old_init:
old_init = b_dct.get('__init__')
cls.delegate = delegate
if pool_workers:
cls.pool_workers = pool_workers
def init_func(self, *args, **kwargs):
if old_init:
old_init(self, *args, **kwargs)
if self.delegate:
self._obj = self.delegate(*args, **kwargs)
cls.__init__ = init_func
@staticmethod
def nbfunc_maker(func):
def nb_func(self, *args, **kwargs):
return self.run_in_executor(func, *args, **kwargs)
return nb_func
:
from nb_helper import NonBlockBuilder
import time
class Hardware:
def __init__(self, other_init_args):
self.other = other_init_args
def blocking_command(self, arg_1, arg_2, arg_3):
print("start blocking")
time.sleep(5)
return "blocking"
def normal_command(self):
return "normal"
class NBHardware(object):
__metaclass__ = NonBlockBuilder
delegate = Hardware
nb_funcs = ['blocking_command']
if __name__ == "__main__":
h = NBHardware("abc")
print "doing blocking call"
print h.blocking_command(1,2,3)
print "done"
print "doing non-block call"
x = h.nb_blocking_command(1,2,3)
print h.normal_command()
print x.result()
:
doing blocking call
start blocking
< 5 second delay >
blocking
done
doing non-block call
start blocking
normal
< 5 second delay >
blocking
- , Hardware picklable. , , __getstate__ dll __setstate__, _ExecutorMixin.
Python 2.x backport concurrent.futures.
, , , __init__ nb_*. , :
class AioBaseLock(object):
__metaclass__ = NonBlockBuilder
pool_workers = 1
coroutines = ['acquire', 'release']
def __init__(self, *args, **kwargs):
self._threaded_acquire = False
def _after_fork(obj):
obj._threaded_acquire = False
register_after_fork(self, _after_fork)
def coro_acquire(self, *args, **kwargs):
def lock_acquired(fut):
if fut.result():
self._threaded_acquire = True
out = self.run_in_executor(self._obj.acquire, *args, **kwargs)
out.add_done_callback(lock_acquired)
return out
class AioLock(AioBaseLock):
delegate = Lock
class AioRLock(AioBaseLock):
delegate = RLock
, :
class NonBlockBuilder(type):
""" Metaclass for adding non-blocking versions of methods to a class.
Expects to find the following class attributes:
nb_funcs - A list containing methods that need non-blocking wrappers
delegate - The class to wrap (add non-blocking methods to)
pool_workers - (optional) how many workers to put in the internal pool.
The metaclass inserts a mixin (_ExecutorMixin) into the inheritence
hierarchy of cls. This mixin provides methods that allow
the non-blocking wrappers to do their work.
"""
def __new__(cls, clsname, bases, dct, **kwargs):
nbfunc_list = dct.get('nb_funcs', [])
if _ExecutorMixin not in bases:
bases += (_ExecutorMixin,)
for func in nbfunc_list:
nb_name = 'nb_{}'.format(func)
dct[nb_name] = cls.nbfunc_maker(func)
return super(NonBlockBuilder, cls).__new__(cls, clsname, bases, dct)
def __init__(cls, name, bases, dct):
""" Properly initialize a non-blocking wrapper.
Sets pool_workers and delegate on the class, and also
adds an __init__ method to it that instantiates the
delegate with the proper context.
"""
super(NonBlockBuilder, cls).__init__(name, bases, dct)
pool_workers = dct.get('pool_workers')
cls.delegate = dct['delegate']
if pool_workers:
cls.pool_workers = pool_workers
def init_func(self, *args, **kwargs):
self._obj = self.delegate(*args, **kwargs)
cls.__init__ = init_func
@staticmethod
def nbfunc_maker(func):
def nb_func(self, *args, **kwargs):
return self.run_in_executor(func, *args, **kwargs)
return nb_func
* , .