mirror of https://github.com/celery/kombu.git
99% coverage for the Redis transport
This commit is contained in:
parent
d1780fcddf
commit
180aabec1f
|
@ -3,16 +3,18 @@ from __future__ import absolute_import
|
|||
import socket
|
||||
import types
|
||||
|
||||
from anyjson import serialize
|
||||
from itertools import count
|
||||
from Queue import Empty, Queue as _Queue
|
||||
|
||||
from ..connection import BrokerConnection
|
||||
from ..entity import Exchange, Queue
|
||||
from ..exceptions import VersionMismatch
|
||||
from ..messaging import Consumer, Producer
|
||||
from ..utils import eventio # patch poll
|
||||
|
||||
from .utils import unittest
|
||||
from .utils import module_exists
|
||||
from .utils import Mock, module_exists, skip_if_not_module
|
||||
|
||||
|
||||
class _poll(eventio._select):
|
||||
|
@ -188,6 +190,237 @@ class Transport(redis.Transport):
|
|||
return ((KeyError, ), (IndexError, ))
|
||||
|
||||
|
||||
class test_Channel(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.connection = BrokerConnection(transport=Transport)
|
||||
self.channel = self.connection.channel()
|
||||
|
||||
def test_basic_consume_when_fanout_queue(self):
|
||||
self.channel.exchange_declare(exchange="txconfan", type="fanout")
|
||||
self.channel.queue_declare(queue="txconfanq")
|
||||
self.channel.queue_bind(queue="txconfanq", exchange="txconfan")
|
||||
|
||||
self.assertIn("txconfanq", self.channel._fanout_queues)
|
||||
self.channel.basic_consume("txconfanq", False, None, 1)
|
||||
self.assertIn("txconfanq", self.channel.active_fanout_queues)
|
||||
self.assertEqual(self.channel._fanout_to_queue.get("txconfan"),
|
||||
"txconfanq")
|
||||
|
||||
def test_basic_cancel_unknown_delivery_tag(self):
|
||||
self.assertIsNone(self.channel.basic_cancel("txaseqwewq"))
|
||||
|
||||
def test_subscribe_no_queues(self):
|
||||
self.channel.subclient = Mock()
|
||||
self.channel.active_fanout_queues.clear()
|
||||
self.channel._subscribe()
|
||||
|
||||
self.assertFalse(self.channel.subclient.subscribe.called)
|
||||
|
||||
def test_subscribe(self):
|
||||
self.channel.subclient = Mock()
|
||||
self.channel.active_fanout_queues.add("a")
|
||||
self.channel.active_fanout_queues.add("b")
|
||||
self.channel._fanout_queues.update(a="a", b="b")
|
||||
|
||||
self.channel._subscribe()
|
||||
self.channel.subclient.subscribe.assert_called_with(["a", "b"])
|
||||
|
||||
self.channel.subclient.connection._sock = None
|
||||
self.channel._subscribe()
|
||||
self.channel.subclient.connection.connect.assert_called_with()
|
||||
|
||||
def test_handle_unsubscribe_message(self):
|
||||
s = self.channel.subclient
|
||||
s.subscribed = True
|
||||
self.channel._handle_message(s, ["unsubscribe", "a", 0])
|
||||
self.assertFalse(s.subscribed)
|
||||
|
||||
def test_handle_pmessage_message(self):
|
||||
self.assertDictEqual(self.channel._handle_message(
|
||||
self.channel.subclient,
|
||||
["pmessage", "pattern", "channel", "data"]),
|
||||
{"type": "pmessage",
|
||||
"pattern": "pattern",
|
||||
"channel": "channel",
|
||||
"data": "data"})
|
||||
|
||||
def test_handle_message(self):
|
||||
self.assertDictEqual(self.channel._handle_message(
|
||||
self.channel.subclient,
|
||||
["type", "channel", "data"]),
|
||||
{"type": "type",
|
||||
"pattern": None,
|
||||
"channel": "channel",
|
||||
"data": "data"})
|
||||
|
||||
def test_brpop_start_but_no_queues(self):
|
||||
self.channel.active_queues.clear()
|
||||
self.assertIsNone(self.channel._brpop_start())
|
||||
|
||||
def test_receive(self):
|
||||
s = self.channel.subclient = Mock()
|
||||
self.channel._fanout_to_queue["a"] = "b"
|
||||
s.parse_response.return_value = ["message", "a",
|
||||
serialize({"hello": "world"})]
|
||||
payload, queue = self.channel._receive()
|
||||
self.assertDictEqual(payload, {"hello": "world"})
|
||||
self.assertEqual(queue, "b")
|
||||
|
||||
def test_receive_raises(self):
|
||||
self.channel._in_listen = True
|
||||
s = self.channel.subclient = Mock()
|
||||
s.parse_response.side_effect = KeyError("foo")
|
||||
|
||||
with self.assertRaises(redis.Empty):
|
||||
self.channel._receive()
|
||||
self.assertFalse(self.channel._in_listen)
|
||||
|
||||
def test_receive_empty(self):
|
||||
s = self.channel.subclient = Mock()
|
||||
s.parse_response.return_value = None
|
||||
|
||||
with self.assertRaises(redis.Empty):
|
||||
self.channel._receive()
|
||||
|
||||
def test_receive_different_message_Type(self):
|
||||
s = self.channel.subclient = Mock()
|
||||
s.parse_response.return_value = ["pmessage", "/foo/", 0, "data"]
|
||||
|
||||
with self.assertRaises(redis.Empty):
|
||||
self.channel._receive()
|
||||
|
||||
def test_brpop_read_raises(self):
|
||||
c = self.channel.client = Mock()
|
||||
c.parse_response.side_effect = KeyError("foo")
|
||||
|
||||
with self.assertRaises(redis.Empty):
|
||||
self.channel._brpop_read()
|
||||
|
||||
c.connection.disconnect.assert_called_with()
|
||||
|
||||
def test_brpop_read_gives_None(self):
|
||||
c = self.channel.client = Mock()
|
||||
c.parse_response.return_value = None
|
||||
|
||||
with self.assertRaises(redis.Empty):
|
||||
self.channel._brpop_read()
|
||||
|
||||
def test_poll_error(self):
|
||||
c = self.channel.client = Mock()
|
||||
c.parse_response = Mock()
|
||||
self.channel._poll_error("BRPOP")
|
||||
|
||||
c.parse_response.assert_called_with("BRPOP")
|
||||
|
||||
c.parse_response.side_effect = KeyError("foo")
|
||||
self.assertIsNone(self.channel._poll_error("BRPOP"))
|
||||
|
||||
def test_put_fanout(self):
|
||||
self.channel._in_poll = False
|
||||
c = self.channel.client = Mock()
|
||||
|
||||
body = {"hello": "world"}
|
||||
self.channel._put_fanout("exchange", body)
|
||||
c.publish.assert_called_with("exchange", serialize(body))
|
||||
|
||||
def test_delete(self):
|
||||
x = self.channel
|
||||
self.channel._in_poll = False
|
||||
c = x.client = Mock()
|
||||
|
||||
x._delete("queue", "exchange", "routing_key", None)
|
||||
c.delete.assert_called_with("queue")
|
||||
c.srem.assert_called_with(x.keyprefix_queue % ("exchange", ),
|
||||
x.sep.join(["routing_key", "", "queue"]))
|
||||
|
||||
def test_has_queue(self):
|
||||
self.channel._in_poll = False
|
||||
c = self.channel.client = Mock()
|
||||
c.exists.return_value = True
|
||||
self.assertTrue(self.channel._has_queue("foo"))
|
||||
c.exists.assert_called_with("foo")
|
||||
|
||||
c.exists.return_value = False
|
||||
self.assertFalse(self.channel._has_queue("foo"))
|
||||
|
||||
def test_close_when_closed(self):
|
||||
self.channel.closed = True
|
||||
self.channel.close()
|
||||
|
||||
def test_close_client_close_raises(self):
|
||||
c = self.channel.client = Mock()
|
||||
c.connection.disconnect.side_effect = self.channel.ResponseError()
|
||||
|
||||
self.channel.close()
|
||||
c.connection.disconnect.assert_called_with()
|
||||
|
||||
def test_invalid_database_raises_ValueError(self):
|
||||
self.channel.connection.client.virtual_host = "xfeqwewkfk"
|
||||
|
||||
with self.assertRaises(ValueError):
|
||||
self.channel._create_client()
|
||||
|
||||
@skip_if_not_module("redis")
|
||||
def test_get_client(self):
|
||||
import redis as R
|
||||
KombuRedis = redis.Channel._get_client(self.channel)
|
||||
self.assertTrue(KombuRedis)
|
||||
|
||||
Rv = getattr(R, "__version__")
|
||||
try:
|
||||
R.__version__ = "2.4.0"
|
||||
with self.assertRaises(VersionMismatch):
|
||||
redis.Channel._get_client(self.channel)
|
||||
finally:
|
||||
if Rv is not None:
|
||||
R.__version__ = Rv
|
||||
|
||||
@skip_if_not_module("redis")
|
||||
def test_get_response_error(self):
|
||||
from redis.exceptions import ResponseError
|
||||
self.assertIs(redis.Channel._get_response_error(self.channel),
|
||||
ResponseError)
|
||||
|
||||
def test_avail_client_when_not_in_poll(self):
|
||||
self.channel._in_poll = False
|
||||
c = self.channel.client = Mock()
|
||||
|
||||
self.assertIs(self.channel._avail_client, c)
|
||||
|
||||
def test_avail_client_when_in_poll(self):
|
||||
self.channel._in_poll = True
|
||||
cc = self.channel._create_client = Mock()
|
||||
|
||||
self.assertTrue(self.channel._avail_client)
|
||||
cc.assert_called_with()
|
||||
|
||||
@skip_if_not_module("redis")
|
||||
def test_transport_get_errors(self):
|
||||
self.assertTrue(redis.Transport._get_errors(self.connection.transport))
|
||||
|
||||
@skip_if_not_module("redis")
|
||||
def test_transport_get_errors_when_InvalidData_used(self):
|
||||
from redis import exceptions
|
||||
|
||||
class ID(Exception):
|
||||
pass
|
||||
|
||||
DataError = getattr(exceptions, "DataError", None)
|
||||
InvalidData = getattr(exceptions, "InvalidData", None)
|
||||
exceptions.InvalidData = ID
|
||||
exceptions.DataError = None
|
||||
try:
|
||||
errors = redis.Transport._get_errors(self.connection.transport)
|
||||
self.assertTrue(errors)
|
||||
self.assertIn(ID, errors[1])
|
||||
finally:
|
||||
if DataError is not None:
|
||||
exceptions.DataError = DataError
|
||||
if InvalidData is not None:
|
||||
exceptions.InvalidData = InvalidData
|
||||
|
||||
|
||||
class test_Redis(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
|
@ -346,3 +579,165 @@ def _redis_modules():
|
|||
myredis.Redis = Redis
|
||||
|
||||
return myredis, exceptions
|
||||
|
||||
|
||||
class test_MultiChannelPoller(unittest.TestCase):
|
||||
Poller = redis.MultiChannelPoller
|
||||
|
||||
def test_close_unregisters_fds(self):
|
||||
p = self.Poller()
|
||||
poller = p._poller = Mock()
|
||||
p._chan_to_sock.update({1: 1, 2: 2, 3: 3})
|
||||
|
||||
p.close()
|
||||
|
||||
self.assertEqual(poller.unregister.call_count, 3)
|
||||
u_args = poller.unregister.call_args_list
|
||||
|
||||
self.assertListEqual(u_args, [((1, ), {}),
|
||||
((2, ), {}),
|
||||
((3, ), {})])
|
||||
|
||||
def test_close_when_unregister_raises_KeyError(self):
|
||||
p = self.Poller()
|
||||
poller = p._poller = Mock()
|
||||
p._chan_to_sock.update({1: 1})
|
||||
p._poller.unregister.side_effect = KeyError(1)
|
||||
p.close()
|
||||
|
||||
def test_close_resets_state(self):
|
||||
p = self.Poller()
|
||||
p._poller = Mock()
|
||||
p._channels = Mock()
|
||||
p._fd_to_chan = Mock()
|
||||
p._chan_to_sock = Mock()
|
||||
|
||||
p._chan_to_sock.itervalues.return_value = []
|
||||
|
||||
p.close()
|
||||
p._channels.clear.assert_called_with()
|
||||
p._fd_to_chan.clear.assert_called_with()
|
||||
p._chan_to_sock.clear.assert_called_with()
|
||||
self.assertIsNone(p._poller)
|
||||
|
||||
def test_register_when_registered_reregisters(self):
|
||||
p = self.Poller()
|
||||
p._poller = Mock()
|
||||
channel, client, type = Mock(), Mock(), Mock()
|
||||
sock = client.connection._sock = Mock()
|
||||
sock.fileno.return_value = 10
|
||||
|
||||
p._chan_to_sock = {(channel, client, type): 6}
|
||||
p._register(channel, client, type)
|
||||
p._poller.unregister.assert_called_with(6)
|
||||
self.assertTupleEqual(p._fd_to_chan[10], (channel, type))
|
||||
self.assertEqual(p._chan_to_sock[(channel, client, type)], sock)
|
||||
p._poller.register.assert_called_with(sock, p.eventflags)
|
||||
|
||||
# when client not connected yet
|
||||
client.connection._sock = None
|
||||
|
||||
def after_connected():
|
||||
client.connection._sock = Mock()
|
||||
client.connection.connect.side_effect = after_connected
|
||||
|
||||
p._register(channel, client, type)
|
||||
client.connection.connect.assert_called_with()
|
||||
|
||||
def test_register_BRPOP(self):
|
||||
p = self.Poller()
|
||||
channel = Mock()
|
||||
channel.client.connection._sock = None
|
||||
p._register = Mock()
|
||||
|
||||
channel._in_poll = False
|
||||
p._register_BRPOP(channel)
|
||||
self.assertEqual(channel._brpop_start.call_count, 1)
|
||||
self.assertEqual(p._register.call_count, 1)
|
||||
|
||||
channel.client.connection._sock = Mock()
|
||||
p._chan_to_sock[(channel, channel.client, "BRPOP")] = True
|
||||
channel._in_poll = True
|
||||
p._register_BRPOP(channel)
|
||||
self.assertEqual(channel._brpop_start.call_count, 1)
|
||||
self.assertEqual(p._register.call_count, 1)
|
||||
|
||||
def test_register_LISTEN(self):
|
||||
p = self.Poller()
|
||||
channel = Mock()
|
||||
channel.subclient.connection._sock = None
|
||||
channel._in_listen = False
|
||||
p._register = Mock()
|
||||
|
||||
p._register_LISTEN(channel)
|
||||
p._register.assert_called_with(channel, channel.subclient, "LISTEN")
|
||||
self.assertEqual(p._register.call_count, 1)
|
||||
self.assertEqual(channel._subscribe.call_count, 1)
|
||||
|
||||
channel._in_listen = True
|
||||
channel.subclient.connection._sock = Mock()
|
||||
p._register_LISTEN(channel)
|
||||
self.assertEqual(p._register.call_count, 1)
|
||||
self.assertEqual(channel._subscribe.call_count, 1)
|
||||
|
||||
def create_get(self, events=None, queues=None,
|
||||
fanouts=None):
|
||||
_pr = [] if events is None else events
|
||||
_aq = [] if queues is None else queues
|
||||
_af = [] if fanouts is None else fanouts
|
||||
p = self.Poller()
|
||||
p._poller = Mock()
|
||||
p._poller.poll.return_value = _pr
|
||||
|
||||
p._register_BRPOP = Mock()
|
||||
p._register_LISTEN = Mock()
|
||||
|
||||
channel = Mock()
|
||||
p._channels = [channel]
|
||||
channel.active_queues = _aq
|
||||
channel.active_fanout_queues = _af
|
||||
|
||||
return p, channel
|
||||
|
||||
def test_get_no_actions(self):
|
||||
p, channel = self.create_get()
|
||||
|
||||
with self.assertRaises(redis.Empty):
|
||||
p.get()
|
||||
|
||||
def test_get_brpop_qos_allow(self):
|
||||
p, channel = self.create_get(queues=["a_queue"])
|
||||
channel.qos.can_consume.return_value = True
|
||||
|
||||
with self.assertRaises(redis.Empty):
|
||||
p.get()
|
||||
|
||||
p._register_BRPOP.assert_called_with(channel)
|
||||
|
||||
def test_get_brpop_qos_disallow(self):
|
||||
p, channel = self.create_get(queues=["a_queue"])
|
||||
channel.qos.can_consume.return_value = False
|
||||
|
||||
with self.assertRaises(redis.Empty):
|
||||
p.get()
|
||||
|
||||
self.assertFalse(p._register_BRPOP.called)
|
||||
|
||||
def test_get_listen(self):
|
||||
p, channel = self.create_get(fanouts=["f_queue"])
|
||||
|
||||
with self.assertRaises(redis.Empty):
|
||||
p.get()
|
||||
|
||||
p._register_LISTEN.assert_called_with(channel)
|
||||
|
||||
|
||||
def test_get_receives_POLL_ERR(self):
|
||||
p, channel = self.create_get(events=[(1, eventio.POLL_ERR)])
|
||||
p._fd_to_chan[1] = (channel, "BRPOP")
|
||||
|
||||
with self.assertRaises(redis.Empty):
|
||||
p.get()
|
||||
|
||||
channel._poll_error.assert_called_with("BRPOP")
|
||||
|
||||
|
|
|
@ -115,7 +115,7 @@ class MultiChannelPoller(object):
|
|||
chan, type = self._fd_to_chan[fileno]
|
||||
if chan.qos.can_consume():
|
||||
return chan.handlers[type](), self
|
||||
elif event & eventio.POLL_HUP:
|
||||
elif event & eventio.POLL_ERR:
|
||||
chan, type = self._fd_to_chan[fileno]
|
||||
chan._poll_error(type)
|
||||
break
|
||||
|
@ -323,8 +323,7 @@ class Channel(virtual.Channel):
|
|||
import redis
|
||||
|
||||
version = getattr(redis, "__version__", (0, 0, 0))
|
||||
if version:
|
||||
version = tuple(map(int, version.split(".")))
|
||||
version = tuple(map(int, version.split(".")))
|
||||
if version < (2, 4, 4):
|
||||
raise VersionMismatch(
|
||||
"Redis transport requires redis-py versions 2.4.4 or later. "
|
||||
|
@ -332,7 +331,7 @@ class Channel(virtual.Channel):
|
|||
|
||||
# KombuRedis maintains a connection attribute on it's instance and
|
||||
# uses that when executing commands
|
||||
class KombuRedis(redis.Redis):
|
||||
class KombuRedis(redis.Redis): # pragma: no cover
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(KombuRedis, self).__init__(*args, **kwargs)
|
||||
|
|
|
@ -395,7 +395,7 @@ class Channel(AbstractChannel, base.StdChannel):
|
|||
def after_reply_message_received(self, queue):
|
||||
self.queue_delete(queue)
|
||||
|
||||
def queue_bind(self, queue, exchange, routing_key, arguments=None,
|
||||
def queue_bind(self, queue, exchange, routing_key="", arguments=None,
|
||||
**kwargs):
|
||||
"""Bind `queue` to `exchange` with `routing key`."""
|
||||
if queue in self.state.bindings:
|
||||
|
|
Loading…
Reference in New Issue