mirror of https://github.com/celery/kombu.git
Added serialization support
This commit is contained in:
parent
627ee117f0
commit
e85702c95b
|
@ -135,7 +135,8 @@ class Node(object):
|
|||
|
||||
def reply(self, data, exchange, routing_key, ticket, **kwargs):
|
||||
self.mailbox._publish_reply(data, exchange, routing_key, ticket,
|
||||
channel=self.channel)
|
||||
channel=self.channel,
|
||||
serializer=self.mailbox.serializer)
|
||||
|
||||
|
||||
class Mailbox(object):
|
||||
|
@ -161,8 +162,12 @@ class Mailbox(object):
|
|||
#: Only accepts json messages by default.
|
||||
accept = ['json']
|
||||
|
||||
#: Message serializer
|
||||
serializer = None
|
||||
|
||||
def __init__(self, namespace,
|
||||
type='direct', connection=None, clock=None, accept=None):
|
||||
type='direct', connection=None, clock=None,
|
||||
accept=None, serializer=None):
|
||||
self.namespace = namespace
|
||||
self.connection = connection
|
||||
self.type = type
|
||||
|
@ -172,6 +177,7 @@ class Mailbox(object):
|
|||
self._tls = local()
|
||||
self.unclaimed = defaultdict(deque)
|
||||
self.accept = self.accept if accept is None else accept
|
||||
self.serializer = self.serializer if serializer is None else serializer
|
||||
|
||||
def __call__(self, connection):
|
||||
bound = copy(self)
|
||||
|
@ -242,7 +248,8 @@ class Mailbox(object):
|
|||
pass # queue probably deleted and no one is expecting a reply.
|
||||
|
||||
def _publish(self, type, arguments, destination=None,
|
||||
reply_ticket=None, channel=None, timeout=None):
|
||||
reply_ticket=None, channel=None, timeout=None,
|
||||
serializer=None):
|
||||
message = {'method': type,
|
||||
'arguments': arguments,
|
||||
'destination': destination}
|
||||
|
@ -253,16 +260,18 @@ class Mailbox(object):
|
|||
message.update(ticket=reply_ticket,
|
||||
reply_to={'exchange': self.reply_exchange.name,
|
||||
'routing_key': self.oid})
|
||||
serializer = serializer or self.serializer
|
||||
producer = Producer(chan, auto_declare=False)
|
||||
producer.publish(
|
||||
message, exchange=exchange.name, declare=[exchange],
|
||||
headers={'clock': self.clock.forward(),
|
||||
'expires': time() + timeout if timeout else 0},
|
||||
serializer=serializer,
|
||||
)
|
||||
|
||||
def _broadcast(self, command, arguments=None, destination=None,
|
||||
reply=False, timeout=1, limit=None,
|
||||
callback=None, channel=None):
|
||||
callback=None, channel=None, serializer=None):
|
||||
if destination is not None and \
|
||||
not isinstance(destination, (list, tuple)):
|
||||
raise ValueError(
|
||||
|
@ -277,10 +286,12 @@ class Mailbox(object):
|
|||
if limit is None and destination:
|
||||
limit = destination and len(destination) or None
|
||||
|
||||
serializer = serializer or self.serializer
|
||||
self._publish(command, arguments, destination=destination,
|
||||
reply_ticket=reply_ticket,
|
||||
channel=chan,
|
||||
timeout=timeout)
|
||||
timeout=timeout,
|
||||
serializer=serializer)
|
||||
|
||||
if reply_ticket:
|
||||
return self._collect(reply_ticket, limit=limit,
|
||||
|
|
Loading…
Reference in New Issue