Fix: make SQLAlchemy Channel init thread-safe

The init method for the SQLAlchemy transport's Channel implementation
is not currently thread-safe.

This means that if two threads in a process attempt to instantiate a
Channel concurrently, there can be a race condition when registering
the SQLAlchemy model classes, which leaves the SQLAlchemy ORM mapper
in a broken state.

This results in an exception like the following:

```
sqlalchemy.exc.InvalidRequestError: Table 'kombu_message' is already
defined for this MetaData instance.  Specify 'extend_existing=True' to
redefine options and columns on an existing Table object.
```

Any subsequent calls to the SQLAlchemy ORM will then fail with an
exception like this:

```
sqlalchemy.exc.InvalidRequestError: Multiple classes found for path
"Message" in the registry of this declarative base. Please use a fully
module-qualified path.
```

This also applies to any SQLAlchemy calls in the same process, not
just those made by Kombu or Celery, and can only really be recovered
from by killing the process and starting over.

To avoid all of this, introduce a mutex which ensures that only one
thread at a time can register a SQLAlchemy model when instantiating
the Channel class.

Signed-off-by: Antoine Busque <antoinebusque@gmail.com>
This commit is contained in:
Antoine Busque 2020-04-29 17:41:36 -04:00 committed by Asif Saif Uddin
parent 93242848b8
commit 8f1de37a08
1 changed files with 7 additions and 3 deletions

View File

@ -4,6 +4,7 @@
from __future__ import absolute_import, unicode_literals
import threading
from json import loads, dumps
from sqlalchemy import create_engine
@ -21,6 +22,8 @@ from .models import (ModelBase, Queue as QueueBase, Message as MessageBase,
VERSION = (1, 1, 0)
__version__ = '.'.join(map(str, VERSION))
_MUTEX = threading.Lock()
class Channel(virtual.Channel):
"""The channel class."""
@ -127,9 +130,10 @@ class Channel(virtual.Channel):
return self._query_all(queue).count()
def _declarative_cls(self, name, base, ns):
if name in class_registry:
return class_registry[name]
return type(str(name), (base, ModelBase), ns)
with _MUTEX:
if name in class_registry:
return class_registry[name]
return type(str(name), (base, ModelBase), ns)
@cached_property
def queue_cls(self):