Hang a Python script using SQLAlchemy and multiprocessing

Consider the following Python script that uses SQLAlchemy and the Python multiprocessing module. This is with Python 2.6.6-8 + b1 (default) and SQLAlchemy 0.6.3-3 (default) during Debian compression. This is a simplified version of some actual code.

import multiprocessing from sqlalchemy import * from sqlalchemy.orm import * dbuser = ... password = ... dbname = ... dbstring = "postgresql://%s:% s@localhost :5432/%s"%(dbuser, password, dbname) db = create_engine(dbstring) m = MetaData(db) def make_foo(i): t1 = Table('foo%s'%i, m, Column('a', Integer, primary_key=True)) conn = db.connect() for i in range(10): conn.execute("DROP TABLE IF EXISTS foo%s"%i) conn.close() db.dispose() for i in range(10): make_foo(i) m.create_all() def do(kwargs): i, dbstring = kwargs['i'], kwargs['dbstring'] db = create_engine(dbstring) Session = scoped_session(sessionmaker()) Session.configure(bind=db) Session.execute("COMMIT; BEGIN; TRUNCATE foo%s; COMMIT;") Session.commit() db.dispose() pool = multiprocessing.Pool(processes=5) # start 4 worker processes results = [] arglist = [] for i in range(10): arglist.append({'i':i, 'dbstring':dbstring}) r = pool.map_async(do, arglist, callback=results.append) # evaluate "f(10)" asynchronously r.get() r.wait() pool.close() pool.join() 

This script hangs with the following error message.

 Exception in thread Thread-2: Traceback (most recent call last): File "/usr/lib/python2.6/threading.py", line 532, in __bootstrap_inner self.run() File "/usr/lib/python2.6/threading.py", line 484, in run self.__target(*self.__args, **self.__kwargs) File "/usr/lib/python2.6/multiprocessing/pool.py", line 259, in _handle_results task = get() TypeError: ('__init__() takes at least 4 arguments (2 given)', <class 'sqlalchemy.exc.ProgrammingError'>, ('(ProgrammingError) syntax error at or near "%"\nLINE 1: COMMIT; BEGIN; TRUNCATE foo%s; COMMIT;\n ^\n',)) 

Of course, the syntax error here is TRUNCATE foo%s; . My question is: why is the process hanging, and can I convince it to exit with an error instead, without doing a serious operation with my code? This behavior is very similar to the behavior of my actual code.

Note that the hang does not occur if the statement is replaced with something like print foobarbaz . In addition, freezing occurs if we replace

 Session.execute("COMMIT; BEGIN; TRUNCATE foo%s; COMMIT;") Session.commit() db.dispose() 

just Session.execute("TRUNCATE foo%s;")

I am using the old version because it is closer to what my actual code is doing.

In addition, removing multiprocessing from the image and cycling through the tables sequentially causes the hanger to disappear, and it just quits with an error.

I am also puzzled by the error form, especially the TypeError: ('__init__() takes at least 4 arguments (2 given)' bit TypeError: ('__init__() takes at least 4 arguments (2 given)' . Where does this error come from? It seems to be somewhere in the multiprocessing code.

PostgreSQL logs do not help. I see a lot of lines, for example

 2012-01-09 14:16:34.174 IST [7810] 4f0aa96a.1e82/1 12/583 0 ERROR: syntax error at or near "%" at character 28 2012-01-09 14:16:34.175 IST [7810] 4f0aa96a.1e82/2 12/583 0 STATEMENT: COMMIT; BEGIN; TRUNCATE foo%s; COMMIT; 

but nothing else seems relevant.

UPDATE 1: Thanks to lbolla and its insightful analysis, I was able to report a Python bug about this. See sbt in this report as well as here . See Also Python Error Report Fix Exception Etching . So, following the sbt explanation, we can reproduce the original error with

 import sqlalchemy.exc e = sqlalchemy.exc.ProgrammingError("", {}, None) type(e)(*e.args) 

which gives

 Traceback (most recent call last): File "<stdin>", line 9, in <module> TypeError: __init__() takes at least 4 arguments (2 given) 

UPDATE 2: This has been fixed, at least for SQLAlchemy, by Mike Bayer, see the Exception Error Report for the StatementError unc pickable fix. , At the suggestion of Mike, I also reported a similar error for psycopg2, although I did not have (and do not) an actual example of a breakdown. Despite this, they apparently corrected it, although they did not give any details about the correction. See Exceptions psycopg cannot be pickled . For good measure, I also reported a Python error. ConfigParser exceptions are not defined, which corresponds to the issue SO lbolla mentioned . It seems they want to check it out.

In any case, it seems that this will remain a problem in the foreseeable future, since Python developers, by and large, are not aware of this problem and therefore do not protect it. Surprisingly, it seems that people using multiprocessing are not enough to be a well-known problem, or maybe they just put up with it. I hope Python developers get around to fixing this, at least for Python 3, because it is annoying.

I accepted lbolla's answer, since without explaining how the problem is with exception handling, I would most likely not understand this. I also want to thank sbt for explaining that the Python problem is unable to expose exceptions. I am very grateful to both of them and ask you to vote for their answers. Thanks.

UPDATE 3: I posted the following question: Catching the irreplaceable exceptions and re-raising .

+7
source share
4 answers

I believe TypeError comes from multiprocessing get .

I removed all the DB code from your script. Take a look at this:

 import multiprocessing import sqlalchemy.exc def do(kwargs): i = kwargs['i'] print i raise sqlalchemy.exc.ProgrammingError("", {}, None) return i pool = multiprocessing.Pool(processes=5) # start 4 worker processes results = [] arglist = [] for i in range(10): arglist.append({'i':i}) r = pool.map_async(do, arglist, callback=results.append) # evaluate "f(10)" asynchronously # Use get or wait? # r.get() r.wait() pool.close() pool.join() print results 

Using r.wait returns the expected result, but with the help of r.get a TypeError occurs. As described in python docs , use r.wait after map_async .

Change I have to change my previous answer. Now I believe that TypeError comes from SQLAlchemy. I modified my script to reproduce the error.

Change 2 . It seems that the problem is that multiprocessing.pool does not work well if any worker throws an exception whose constructor requires a parameter (see also here ).

I modified my script to highlight this.

 import multiprocessing class BadExc(Exception): def __init__(self, a): '''Non-optional param in the constructor.''' self.a = a class GoodExc(Exception): def __init__(self, a=None): '''Optional param in the constructor.''' self.a = a def do(kwargs): i = kwargs['i'] print i raise BadExc('a') # raise GoodExc('a') return i pool = multiprocessing.Pool(processes=5) results = [] arglist = [] for i in range(10): arglist.append({'i':i}) r = pool.map_async(do, arglist, callback=results.append) try: # set a timeout in order to be able to catch Cc r.get(1e100) except KeyboardInterrupt: pass print results 

In your case, given that your code throws a SQLAlchemy exception, the only solution I can think of is to catch all the exceptions from the do function and raise a normal Exception again. Something like that:

 import multiprocessing class BadExc(Exception): def __init__(self, a): '''Non-optional param in the constructor.''' self.a = a def do(kwargs): try: i = kwargs['i'] print i raise BadExc('a') return i except Exception as e: raise Exception(repr(e)) pool = multiprocessing.Pool(processes=5) results = [] arglist = [] for i in range(10): arglist.append({'i':i}) r = pool.map_async(do, arglist, callback=results.append) try: # set a timeout in order to be able to catch Cc r.get(1e100) except KeyboardInterrupt: pass print results 

Edit 3 : so that it looks like a bug with Python , but the correct exceptions are in SQLAlchemy Workaround for this: therefore, I also raised the issue using SQLAlchemy .

As a workaround, the problem is, I think, the solution at the end of Change 2 will be (wrapping callbacks in try-except and re-raise).

+11
source

Error TypeError: ('__init__() takes at least 4 arguments (2 given) not related to the sql you are trying to execute, it is related to how you use the SqlAlchemy API.

The problem is that you are trying to call execute in the session class, and not in an instance of that session.

Try the following:

 session = Session() session.execute("COMMIT; BEGIN; TRUNCATE foo%s; COMMIT;") session.commit() 

From the docs :

The sessionmaker () function is supposed to be called inside the global scope of the application and the return class is available to the rest of the application as a separate class used to create sessions.

So, Session = sessionmaker() returns a new session class, and session = Session() returns an instance of this class, which you can then call execute .

+2
source

I do not know the reason for the initial exception. However, multiprocessing problems with "bad" exceptions really come down to how etching works. I think the sqlachemy exception class is not working.

If the exception class has a __init__() method that does not raise BaseException.__init__() (directly or indirectly), then self.args will probably not be set correctly. BaseException.__reduce__() (which is used by the pickle protocol) assumes that a copy of the e exception can be recreated simply by doing

 type(e)(*e.args) 

for example

 >>> e = ValueError("bad value") >>> e ValueError('bad value',) >>> type(e)(*e.args) ValueError('bad value',) 

If this invariant is not satisfied, then etching / scattering will fail. Therefore instances

 class BadExc(Exception): def __init__(self, a): '''Non-optional param in the constructor.''' self.a = a 

can be pickled, but the result cannot be decomposed:

 >>> from cPickle import loads, dumps >>> class BadExc(Exception): ... def __init__(self, a): ... '''Non-optional param in the constructor.''' ... self.a = a ... >>> loads(dumps(BadExc(1))) Traceback (most recent call last): File "<stdin>", line 1, in <module> TypeError: ('__init__() takes exactly 2 arguments (1 given)', <class '__main__.BadExc'>, ()) 

But instances

 class GoodExc1(Exception): def __init__(self, a): '''Non-optional param in the constructor.''' Exception.__init__(self, a) self.a = a 

or

 class GoodExc2(Exception): def __init__(self, a): '''Non-optional param in the constructor.''' self.args = (a,) self.a = a 

can be successfully pickled / scattered.

So, you should ask sqlalchemy developers to fix their exception classes. In the meantime, you can probably use copy_reg.pickle() to override BaseException.__reduce__() for difficult classes.

+1
source

(This is an answer to a Faheem Mitha question in a comment on how to use copy_reg to work with broken exception classes.)

The __init__() methods of the SQLAlchemy exception classes seem to call the __init__() base class methods, but with different arguments. This mulches the etching.

To set up sqlalchemy exception class sorting, you can use copy_reg to register your own shrinking functions for these classes.

The decrease function takes an argument obj and returns a pair (callable_obj, args) , so you can create a copy of obj executing callable_obj(*args) . for example

 class StatementError(SQLAlchemyError): def __init__(self, message, statement, params, orig): SQLAlchemyError.__init__(self, message) self.statement = statement self.params = params self.orig = orig ... 

can be "fixed" by doing

 import copy_reg, sqlalchemy.exc def reduce_StatementError(e): message = e.args[0] args = (message, e.statement, e.params, e.orig) return (type(e), args) copy_reg.pickle(sqlalchemy.exc.StatementError, reduce_StatementError) 

There are several other classes in sqlalchemy.exc that need to be fixed in the same way. But I hope you get this idea.


Secondly, instead of fixing each class separately, you can probably just behead the patch __reduce__() the base exception class method:

 import sqlalchemy.exc def rebuild_exc(cls, args, dic): e = Exception.__new__(cls) e.args = args e.__dict__.update(dic) return e def __reduce__(e): return (rebuild_exc, (type(e), e.args, e.__dict__)) sqlalchemy.exc.SQLAlchemyError.__reduce__ = __reduce__ 
+1
source

All Articles