How can SQLAlchemy learn to recover from a disconnect?

According to http://docs.sqlalchemy.org/en/rel_0_9/core/pooling.html#disconnect-handling-pessimistic , SQLAlchemy can be connected if the entry in the connection pool is no longer valid. To do this, I create the following test case:

import subprocess from sqlalchemy import create_engine, event from sqlalchemy import exc from sqlalchemy.pool import Pool @event.listens_for(Pool, "checkout") def ping_connection(dbapi_connection, connection_record, connection_proxy): cursor = dbapi_connection.cursor() try: print "pinging server" cursor.execute("SELECT 1") except: print "raising disconnect error" raise exc.DisconnectionError() cursor.close() engine = create_engine('postgresql:// postgres@localhost /test') connection = engine.connect() subprocess.check_call(['psql', str(engine.url), '-c', "select pg_terminate_backend(pid) from pg_stat_activity " + "where pid <> pg_backend_pid() " + "and datname='%s';" % engine.url.database], stdout=subprocess.PIPE) result = connection.execute("select 'OK'") for row in result: print "Success!", " ".join(row) 

But instead of recovering, I get this exception:

 sqlalchemy.exc.OperationalError: (OperationalError) terminating connection due to administrator command server closed the connection unexpectedly This probably means the server terminated abnormally before or while processing the request. 

Since a pinging server is printed on the terminal, it seems safe to conclude that the event listener is connected. How can SQLAlchemy learn to recover from a disconnect?

+7
python psycopg2 sqlalchemy
source share
1 answer

It looks like the checkout method is only called when you first get a connection from the pool (for example, the string connection = engine.connect() )

If you later lose the connection, you will have to explicitly replace it, so you can just take a new one and repeat your sql:

 try: result = connection.execute("select 'OK'") except sqlalchemy.exc.OperationalError: # may need more exceptions here connection = engine.connect() # grab a new connection result = connection.execute("select 'OK'") # and retry 

This would be a pain around every sql bit, so you could wrap your database queries using something like:

 def db_execute(conn, query): try: result = conn.execute(query) except sqlalchemy.exc.OperationalError: # may need more exceptions here (or trap all) conn = engine.connect() # replace your connection result = conn.execute(query) # and retry return result 

Following:

 result = db_execute(connection, "select 'OK'") 

It should work out.

Another option would also be to listen for the invalidate method and take some action at this time to replace your connection.

+4
source share

All Articles