2011-09-07 14:21:47 +00:00
|
|
|
from __future__ import absolute_import
|
2013-01-17 13:16:32 +00:00
|
|
|
from __future__ import with_statement
|
2010-11-10 15:29:16 +00:00
|
|
|
|
2012-01-13 19:04:49 +00:00
|
|
|
import socket
|
|
|
|
|
2012-06-24 15:32:17 +00:00
|
|
|
from kombu import Connection
|
2012-03-20 14:53:00 +00:00
|
|
|
from kombu import pidbox
|
|
|
|
from kombu.utils import uuid
|
2011-09-07 14:21:47 +00:00
|
|
|
|
2012-01-15 16:22:47 +00:00
|
|
|
from .utils import TestCase
|
2012-01-13 19:04:49 +00:00
|
|
|
from .utils import Mock
|
2010-11-10 15:29:16 +00:00
|
|
|
|
|
|
|
|
2012-01-15 16:22:47 +00:00
|
|
|
class test_Mailbox(TestCase):
|
2010-11-10 15:29:16 +00:00
|
|
|
|
|
|
|
def _handler(self, state):
|
2012-06-15 17:32:40 +00:00
|
|
|
return self.stats['var']
|
2010-11-10 15:29:16 +00:00
|
|
|
|
|
|
|
def setUp(self):
|
|
|
|
|
|
|
|
class Mailbox(pidbox.Mailbox):
|
|
|
|
|
|
|
|
def _collect(self, *args, **kwargs):
|
2012-06-15 17:32:40 +00:00
|
|
|
return 'COLLECTED'
|
2010-11-10 15:29:16 +00:00
|
|
|
|
2012-06-15 17:32:40 +00:00
|
|
|
self.mailbox = Mailbox('test_pidbox')
|
2012-06-24 15:32:17 +00:00
|
|
|
self.connection = Connection(transport='memory')
|
2012-06-15 17:32:40 +00:00
|
|
|
self.state = {'var': 1}
|
|
|
|
self.handlers = {'mymethod': self._handler}
|
2010-11-10 15:29:16 +00:00
|
|
|
self.bound = self.mailbox(self.connection)
|
|
|
|
self.default_chan = self.connection.channel()
|
2012-06-15 17:32:40 +00:00
|
|
|
self.node = self.bound.Node('test_pidbox', state=self.state,
|
2013-01-17 13:16:32 +00:00
|
|
|
handlers=self.handlers,
|
|
|
|
channel=self.default_chan)
|
2010-11-10 15:29:16 +00:00
|
|
|
|
|
|
|
def test_reply__collect(self):
|
2012-06-15 17:32:40 +00:00
|
|
|
mailbox = pidbox.Mailbox('test_reply__collect')(self.connection)
|
2010-11-10 15:29:16 +00:00
|
|
|
exchange = mailbox.reply_exchange.name
|
2012-10-24 13:51:06 +00:00
|
|
|
channel = self.connection.channel()
|
|
|
|
mailbox.reply_queue(channel).declare()
|
2010-11-10 15:29:16 +00:00
|
|
|
|
2011-09-07 13:11:20 +00:00
|
|
|
ticket = uuid()
|
2012-10-24 13:51:06 +00:00
|
|
|
mailbox._publish_reply({'foo': 'bar'}, exchange, mailbox.oid, ticket)
|
2010-11-10 15:29:16 +00:00
|
|
|
_callback_called = [False]
|
2010-11-11 12:06:46 +00:00
|
|
|
|
2010-11-10 15:29:16 +00:00
|
|
|
def callback(body):
|
|
|
|
_callback_called[0] = True
|
|
|
|
|
2013-01-17 13:16:32 +00:00
|
|
|
reply = mailbox._collect(ticket, limit=1, callback=callback,
|
|
|
|
channel=channel)
|
2012-06-15 17:32:40 +00:00
|
|
|
self.assertEqual(reply, [{'foo': 'bar'}])
|
2010-11-10 15:29:16 +00:00
|
|
|
self.assertTrue(_callback_called[0])
|
|
|
|
|
2011-09-07 13:11:20 +00:00
|
|
|
ticket = uuid()
|
2012-10-24 13:51:06 +00:00
|
|
|
mailbox._publish_reply({'biz': 'boz'}, exchange, mailbox.oid, ticket)
|
2010-11-10 15:29:16 +00:00
|
|
|
reply = mailbox._collect(ticket, limit=1, channel=channel)
|
2012-06-15 17:32:40 +00:00
|
|
|
self.assertEqual(reply, [{'biz': 'boz'}])
|
2010-11-10 15:29:16 +00:00
|
|
|
|
2012-01-13 19:04:49 +00:00
|
|
|
de = mailbox.connection.drain_events = Mock()
|
|
|
|
de.side_effect = socket.timeout
|
|
|
|
mailbox._collect(ticket, limit=1, channel=channel)
|
|
|
|
|
2010-11-10 15:29:16 +00:00
|
|
|
def test_constructor(self):
|
|
|
|
self.assertIsNone(self.mailbox.connection)
|
|
|
|
self.assertTrue(self.mailbox.exchange.name)
|
|
|
|
self.assertTrue(self.mailbox.reply_exchange.name)
|
|
|
|
|
|
|
|
def test_bound(self):
|
|
|
|
bound = self.mailbox(self.connection)
|
|
|
|
self.assertIs(bound.connection, self.connection)
|
|
|
|
|
|
|
|
def test_Node(self):
|
|
|
|
self.assertTrue(self.node.hostname)
|
|
|
|
self.assertTrue(self.node.state)
|
|
|
|
self.assertIs(self.node.mailbox, self.bound)
|
|
|
|
self.assertTrue(self.handlers)
|
|
|
|
|
|
|
|
# No initial handlers
|
2012-06-15 17:32:40 +00:00
|
|
|
node2 = self.bound.Node('test_pidbox2', state=self.state)
|
2010-11-10 15:29:16 +00:00
|
|
|
self.assertDictEqual(node2.handlers, {})
|
|
|
|
|
|
|
|
def test_Node_consumer(self):
|
|
|
|
consumer1 = self.node.Consumer()
|
|
|
|
self.assertIs(consumer1.channel, self.default_chan)
|
|
|
|
self.assertTrue(consumer1.no_ack)
|
|
|
|
|
|
|
|
chan2 = self.connection.channel()
|
|
|
|
consumer2 = self.node.Consumer(channel=chan2, no_ack=False)
|
|
|
|
self.assertIs(consumer2.channel, chan2)
|
|
|
|
self.assertFalse(consumer2.no_ack)
|
|
|
|
|
|
|
|
def test_handler(self):
|
2012-06-15 17:32:40 +00:00
|
|
|
node = self.bound.Node('test_handler', state=self.state)
|
2010-11-10 15:29:16 +00:00
|
|
|
|
|
|
|
@node.handler
|
|
|
|
def my_handler_name(state):
|
|
|
|
return 42
|
|
|
|
|
2012-06-15 17:32:40 +00:00
|
|
|
self.assertIn('my_handler_name', node.handlers)
|
2010-11-10 15:29:16 +00:00
|
|
|
|
|
|
|
def test_dispatch(self):
|
2012-06-15 17:32:40 +00:00
|
|
|
node = self.bound.Node('test_dispatch', state=self.state)
|
2010-11-10 15:29:16 +00:00
|
|
|
|
|
|
|
@node.handler
|
|
|
|
def my_handler_name(state, x=None, y=None):
|
|
|
|
return x + y
|
|
|
|
|
2012-06-15 17:32:40 +00:00
|
|
|
self.assertEqual(node.dispatch('my_handler_name',
|
|
|
|
arguments={'x': 10, 'y': 10}), 20)
|
2010-11-10 15:29:16 +00:00
|
|
|
|
|
|
|
def test_dispatch_raising_SystemExit(self):
|
2012-06-15 17:32:40 +00:00
|
|
|
node = self.bound.Node('test_dispatch_raising_SystemExit',
|
2010-11-10 15:29:16 +00:00
|
|
|
state=self.state)
|
|
|
|
|
|
|
|
@node.handler
|
|
|
|
def my_handler_name(state):
|
|
|
|
raise SystemExit
|
|
|
|
|
2012-01-13 17:54:48 +00:00
|
|
|
with self.assertRaises(SystemExit):
|
2012-06-15 17:32:40 +00:00
|
|
|
node.dispatch('my_handler_name')
|
2010-11-10 15:29:16 +00:00
|
|
|
|
|
|
|
def test_dispatch_raising(self):
|
2012-06-15 17:32:40 +00:00
|
|
|
node = self.bound.Node('test_dispatch_raising', state=self.state)
|
2010-11-10 15:29:16 +00:00
|
|
|
|
|
|
|
@node.handler
|
|
|
|
def my_handler_name(state):
|
2012-06-15 17:32:40 +00:00
|
|
|
raise KeyError('foo')
|
2010-11-10 15:29:16 +00:00
|
|
|
|
2012-06-15 17:32:40 +00:00
|
|
|
res = node.dispatch('my_handler_name')
|
|
|
|
self.assertIn('error', res)
|
|
|
|
self.assertIn('KeyError', res['error'])
|
2010-11-10 15:29:16 +00:00
|
|
|
|
|
|
|
def test_dispatch_replies(self):
|
|
|
|
_replied = [False]
|
2010-11-11 12:06:46 +00:00
|
|
|
|
2010-11-10 15:29:16 +00:00
|
|
|
def reply(data, **options):
|
|
|
|
_replied[0] = True
|
|
|
|
|
2012-06-15 17:32:40 +00:00
|
|
|
node = self.bound.Node('test_dispatch', state=self.state)
|
2010-11-10 15:29:16 +00:00
|
|
|
node.reply = reply
|
|
|
|
|
|
|
|
@node.handler
|
|
|
|
def my_handler_name(state, x=None, y=None):
|
|
|
|
return x + y
|
|
|
|
|
2012-06-15 17:32:40 +00:00
|
|
|
node.dispatch('my_handler_name',
|
|
|
|
arguments={'x': 10, 'y': 10},
|
|
|
|
reply_to={'exchange': 'foo', 'routing_key': 'bar'})
|
2010-11-10 15:29:16 +00:00
|
|
|
self.assertTrue(_replied[0])
|
|
|
|
|
|
|
|
def test_reply(self):
|
|
|
|
_replied = [(None, None, None)]
|
2010-11-11 12:06:46 +00:00
|
|
|
|
2012-10-24 13:51:06 +00:00
|
|
|
def publish_reply(data, exchange, routing_key, ticket, **kwargs):
|
|
|
|
_replied[0] = (data, exchange, routing_key, ticket)
|
2010-11-10 15:29:16 +00:00
|
|
|
|
|
|
|
mailbox = self.mailbox(self.connection)
|
|
|
|
mailbox._publish_reply = publish_reply
|
2012-06-15 17:32:40 +00:00
|
|
|
node = mailbox.Node('test_reply')
|
2010-11-10 15:29:16 +00:00
|
|
|
|
|
|
|
@node.handler
|
|
|
|
def my_handler_name(state):
|
|
|
|
return 42
|
|
|
|
|
2012-06-15 17:32:40 +00:00
|
|
|
node.dispatch('my_handler_name',
|
|
|
|
reply_to={'exchange': 'exchange',
|
2012-10-24 13:51:06 +00:00
|
|
|
'routing_key': 'rkey'},
|
|
|
|
ticket='TICKET')
|
|
|
|
data, exchange, routing_key, ticket = _replied[0]
|
2012-06-15 17:32:40 +00:00
|
|
|
self.assertEqual(data, {'test_reply': 42})
|
|
|
|
self.assertEqual(exchange, 'exchange')
|
|
|
|
self.assertEqual(routing_key, 'rkey')
|
2012-10-24 13:51:06 +00:00
|
|
|
self.assertEqual(ticket, 'TICKET')
|
2010-11-10 15:29:16 +00:00
|
|
|
|
|
|
|
def test_handle_message(self):
|
2012-06-15 17:32:40 +00:00
|
|
|
node = self.bound.Node('test_dispatch_from_message')
|
2010-11-10 15:29:16 +00:00
|
|
|
|
|
|
|
@node.handler
|
|
|
|
def my_handler_name(state, x=None, y=None):
|
|
|
|
return x * y
|
|
|
|
|
2012-06-15 17:32:40 +00:00
|
|
|
body = {'method': 'my_handler_name',
|
|
|
|
'arguments': {'x': 64, 'y': 64}}
|
2010-11-10 15:29:16 +00:00
|
|
|
|
|
|
|
self.assertEqual(node.handle_message(body, None), 64 * 64)
|
|
|
|
|
|
|
|
# message not for me should not be processed.
|
2012-06-15 17:32:40 +00:00
|
|
|
body['destination'] = ['some_other_node']
|
2010-11-10 15:29:16 +00:00
|
|
|
self.assertIsNone(node.handle_message(body, None))
|
|
|
|
|
|
|
|
def test_listen(self):
|
|
|
|
consumer = self.node.listen()
|
|
|
|
self.assertEqual(consumer.callbacks[0],
|
|
|
|
self.node.handle_message)
|
|
|
|
self.assertEqual(consumer.channel, self.default_chan)
|
|
|
|
|
|
|
|
def test_cast(self):
|
2012-06-15 17:32:40 +00:00
|
|
|
self.bound.cast(['somenode'], 'mymethod')
|
2010-11-10 15:29:16 +00:00
|
|
|
consumer = self.node.Consumer()
|
|
|
|
self.assertIsCast(self.get_next(consumer))
|
|
|
|
|
|
|
|
def test_abcast(self):
|
2012-06-15 17:32:40 +00:00
|
|
|
self.bound.abcast('mymethod')
|
2010-11-10 15:29:16 +00:00
|
|
|
consumer = self.node.Consumer()
|
|
|
|
self.assertIsCast(self.get_next(consumer))
|
|
|
|
|
|
|
|
def test_call_destination_must_be_sequence(self):
|
2012-01-13 17:54:48 +00:00
|
|
|
with self.assertRaises(ValueError):
|
2012-06-15 17:32:40 +00:00
|
|
|
self.bound.call('some_node', 'mymethod')
|
2010-11-10 15:29:16 +00:00
|
|
|
|
|
|
|
def test_call(self):
|
2013-01-17 13:16:32 +00:00
|
|
|
self.assertEqual(self.bound.call(['some_node'], 'mymethod'),
|
|
|
|
'COLLECTED')
|
2010-11-10 15:29:16 +00:00
|
|
|
consumer = self.node.Consumer()
|
|
|
|
self.assertIsCall(self.get_next(consumer))
|
|
|
|
|
|
|
|
def test_multi_call(self):
|
2012-06-15 17:32:40 +00:00
|
|
|
self.assertEqual(self.bound.multi_call('mymethod'), 'COLLECTED')
|
2010-11-10 15:29:16 +00:00
|
|
|
consumer = self.node.Consumer()
|
|
|
|
self.assertIsCall(self.get_next(consumer))
|
|
|
|
|
|
|
|
def get_next(self, consumer):
|
|
|
|
m = consumer.queues[0].get()
|
|
|
|
if m:
|
|
|
|
return m.payload
|
|
|
|
|
|
|
|
def assertIsCast(self, message):
|
2012-06-15 17:32:40 +00:00
|
|
|
self.assertTrue(message['method'])
|
2010-11-10 15:29:16 +00:00
|
|
|
|
|
|
|
def assertIsCall(self, message):
|
2012-06-15 17:32:40 +00:00
|
|
|
self.assertTrue(message['method'])
|
|
|
|
self.assertTrue(message['reply_to'])
|