Creating a separate database connection for each celery worker

I run into more complex mysql problems all the time, and workers perform tasks right after they are created.

We use django 1.3, celery 3.1.17, djorm-ext-pool 0.5

We start the celery process with concurrency 3. So far, I have observed that when the workers process begins, they all get the same mysql connection. We register the db connection identifier as shown below.

from django.db import connection connection.cursor() logger.info("Task %s processing with db connection %s", str(task_id), str(connection.connection.thread_id())) 

When all workers receive jobs, the first is successful, and the other two give strange Mysql errors. These are either errors with the "Mysql server gone" or with the condition that Django throws a "DoNotExist" error. obviously, the objects that Django requests are there.

After this error, each worker begins to receive his own connection to the database, after which we do not find any problems.

What is the default behavior of celery? It is designed to share the same database connection. If so, how is communication between processes handled? I would ideally prefer different database connections for each worker.

I tried the code mentioned below in the link that didn't work. Celery Producers Database Connection Pool

We also fixed the celery code suggested below. https://github.com/celery/celery/issues/2453

For those who have omitted the question, kindly tell me the reason for the fall.

+7
python mysql sqlalchemy celery django-celery
source share
1 answer

Celery starts with the command below

 celery -A myproject worker --loglevel=debug --concurrency=3 -Q testqueue 

myproject.py as part of the main process, made some queries against the mysql database before deploying workflows.

As part of the query flow in the main django process, ORM creates a sqlalchemy connection pool if it does not already exist. Then workflows are created.

Celery as part of django patches closes existing connections.

  def close_database(self, **kwargs): if self._close_old_connections: return self._close_old_connections() # Django 1.6 if not self.db_reuse_max: return self._close_database() if self._db_recycles >= self.db_reuse_max * 2: self._db_recycles = 0 self._close_database() self._db_recycles += 1 

In fact, what can happen is that the sqlalchemy pool object with one unused db connection is copied into 3 workflows when it is forked. Thus, 3 different pools have 3 connection objects that point to the same file description file.

Workers when performing tasks when requesting a connection to db, all workers receive the same unused connection from the sqlalchemy pool, since it is not currently used. The fact that all connections point to the same file descriptor made the MySQL connection disappear.

The new connections created after this are all new and do not point to the same socket file descriptor.

Decision:

In the main process, add

 from django.db import connection connection.cursor() 

before performing any import. those. before another djorm-ext-pool module is added.

This way all db requests will use the connection created by django outside the pool. When the django celery fixation closes the connection, the connection actually closes, and does not return to the alchemy pool, leaving the alchemy pool without any connections in it while referring to all workers when split. After workers request a db connection, sqlalchemy returns one of the newly created connections.

+2
source share

All Articles