I am trying to set up a system that elegantly drops database operations in a separate thread to avoid blocking during Twisted callbacks.
So far, here is my approach:
from contextlib import contextmanager
from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session, sessionmaker
from twisted.internet.threads import deferToThread
_engine = create_engine(initialization_string)
Session = scoped_session(sessionmaker(bind=_engine))
@contextmanager
def transaction_context():
session = Session()
try:
yield session
session.commit()
except:
raise
finally:
session.remove()
def threaded(fn):
@wraps(fn)
def wrapper(*args, **kwargs):
return deferToThread(fn, *args, **kwargs)
return wrapper
This should allow me to wrap the function with a decorator threaded, and then use the context manager transaction_contextin the specified function body. The following is an example:
from __future__ import print_function
from my_lib.orm import User, transaction_context, threaded
from twisted.internet import reactor
@threaded
def get_n_users(n):
with transaction_context() as session:
return session.query(User).limit(n).all()
if __name__ == '__main__':
get_n_users(n).addBoth(len)
reactor.run()
However, when I run the above script, I get a failure containing the following trace:
Unhandled error in Deferred:
Unhandled Error
Traceback (most recent call last):
File "/usr/lib/python2.7/threading.py", line 781, in __bootstrap
self.__bootstrap_inner()
File "/usr/lib/python2.7/threading.py", line 808, in __bootstrap_inner
self.run()
File "/usr/lib/python2.7/threading.py", line 761, in run
self.__target(*self.__args, **self.__kwargs)
File "/usr/local/lib/python2.7/dist-packages/twisted/python/threadpool.py", line 191, in _worker
result = context.call(ctx, function, *args, **kwargs)
File "/usr/local/lib/python2.7/dist-packages/twisted/python/context.py", line 118, in callWithContext
return self.currentContext().callWithContext(ctx, func, *args, **kw)
File "/usr/local/lib/python2.7/dist-packages/twisted/python/context.py", line 81, in callWithContext
return func(*args,**kw)
File "testaccess.py", line 9, in get_n_users
return session.query(User).limit(n).all()
File "/usr/lib/python2.7/contextlib.py", line 24, in __exit__
self.gen.next()
File "/home/louis/Documents/Python/knacki/knacki/db.py", line 36, in transaction_context
session.remove()
exceptions.AttributeError: 'Session' object has no attribute 'remove'
I did not expect this. What am I missing? I did not create an instance scoped_sessioncorrectly?
: - Twisted. , .