diff --git a/kombu/transport/sqlalchemy/__init__.py b/kombu/transport/sqlalchemy/__init__.py index bcf9ed5b..fe469fae 100644 --- a/kombu/transport/sqlalchemy/__init__.py +++ b/kombu/transport/sqlalchemy/__init__.py @@ -4,6 +4,7 @@ from __future__ import absolute_import, unicode_literals +import threading from json import loads, dumps from sqlalchemy import create_engine @@ -21,6 +22,8 @@ from .models import (ModelBase, Queue as QueueBase, Message as MessageBase, VERSION = (1, 1, 0) __version__ = '.'.join(map(str, VERSION)) +_MUTEX = threading.Lock() + class Channel(virtual.Channel): """The channel class.""" @@ -127,9 +130,10 @@ class Channel(virtual.Channel): return self._query_all(queue).count() def _declarative_cls(self, name, base, ns): - if name in class_registry: - return class_registry[name] - return type(str(name), (base, ModelBase), ns) + with _MUTEX: + if name in class_registry: + return class_registry[name] + return type(str(name), (base, ModelBase), ns) @cached_property def queue_cls(self):