mirror of https://github.com/celery/kombu.git
99% overall coverage
This commit is contained in:
parent
d348cb4c12
commit
399f979814
|
@ -0,0 +1,8 @@
|
|||
[run]
|
||||
branch = True
|
||||
|
||||
[report]
|
||||
exclude_lines =
|
||||
pragma: no cover
|
||||
|
||||
for infinity
|
|
@ -23,7 +23,7 @@ __all__ = ["Publisher", "Consumer"]
|
|||
|
||||
def _iterconsume(connection, consumer, no_ack=False, limit=None):
|
||||
consumer.consume(no_ack=no_ack)
|
||||
for iteration in count(0):
|
||||
for iteration in count(0): # for infinity
|
||||
if limit and iteration >= limit:
|
||||
raise StopIteration
|
||||
yield connection.drain_events()
|
||||
|
@ -163,7 +163,7 @@ class Consumer(messaging.Consumer):
|
|||
return list(it)
|
||||
|
||||
def iterqueue(self, limit=None, infinite=False):
|
||||
for items_since_start in count():
|
||||
for items_since_start in count(): # for infinity
|
||||
item = self.fetch()
|
||||
if (not infinite and item is None) or \
|
||||
(limit and items_since_start >= limit):
|
||||
|
|
|
@ -23,7 +23,7 @@ from Queue import Empty
|
|||
from urlparse import urlparse
|
||||
try:
|
||||
from urlparse import parse_qsl
|
||||
except ImportError:
|
||||
except ImportError: # pragma: no cover
|
||||
from cgi import parse_qsl # noqa
|
||||
|
||||
from . import exceptions
|
||||
|
@ -289,7 +289,7 @@ class BrokerConnection(object):
|
|||
@wraps(fun)
|
||||
def _ensured(*args, **kwargs):
|
||||
got_connection = 0
|
||||
for retries in count(0):
|
||||
for retries in count(0): # for infinity
|
||||
try:
|
||||
return fun(*args, **kwargs)
|
||||
except self.connection_errors + self.channel_errors, exc:
|
||||
|
|
|
@ -1,11 +1,14 @@
|
|||
from __future__ import absolute_import
|
||||
from __future__ import with_statement
|
||||
|
||||
from mock import patch
|
||||
|
||||
from .. import BrokerConnection, Exchange
|
||||
from .. import compat
|
||||
|
||||
from .mocks import Transport, Channel
|
||||
from .utils import unittest
|
||||
from .utils import Mock
|
||||
|
||||
|
||||
class test_misc(unittest.TestCase):
|
||||
|
@ -104,6 +107,10 @@ class test_Publisher(unittest.TestCase):
|
|||
exchange=explicit)
|
||||
self.assertEqual(pub3.exchange, explicit)
|
||||
|
||||
compat.Publisher(self.connection,
|
||||
exchange="test_Publisher_constructor3",
|
||||
channel=self.connection.default_channel)
|
||||
|
||||
def test_send(self):
|
||||
pub = compat.Publisher(self.connection,
|
||||
exchange="test_Publisher_send",
|
||||
|
@ -127,6 +134,12 @@ class test_Consumer(unittest.TestCase):
|
|||
def setUp(self):
|
||||
self.connection = BrokerConnection(transport=Transport)
|
||||
|
||||
@patch("kombu.compat._iterconsume")
|
||||
def test_iterconsume_calls__iterconsume(self, it, n="test_iterconsume"):
|
||||
c = compat.Consumer(self.connection, queue=n, exchange=n)
|
||||
c.iterconsume(limit=10, no_ack=True)
|
||||
it.assert_called_with(c.connection, c, True, 10)
|
||||
|
||||
def test_constructor(self, n="test_Consumer_constructor"):
|
||||
c = compat.Consumer(self.connection, queue=n, exchange=n,
|
||||
routing_key="rkey")
|
||||
|
@ -158,6 +171,20 @@ class test_Consumer(unittest.TestCase):
|
|||
self.assertIn("close", c.backend)
|
||||
self.assertTrue(c._closed)
|
||||
|
||||
def test_revive(self, n="test_revive"):
|
||||
c = compat.Consumer(self.connection, queue=n, exchange=n)
|
||||
|
||||
with self.connection.channel() as c2:
|
||||
c.revive(c2)
|
||||
self.assertIs(c.backend, c2)
|
||||
|
||||
def test__iter__(self, n="test__iter__"):
|
||||
c = compat.Consumer(self.connection, queue=n, exchange=n)
|
||||
c.iterqueue = Mock()
|
||||
|
||||
c.__iter__()
|
||||
c.iterqueue.assert_called_with(infinite=True)
|
||||
|
||||
def test_iter(self, n="test_iterqueue"):
|
||||
c = compat.Consumer(self.connection, queue=n, exchange=n,
|
||||
routing_key="rkey")
|
||||
|
@ -241,6 +268,21 @@ class test_ConsumerSet(unittest.TestCase):
|
|||
def setUp(self):
|
||||
self.connection = BrokerConnection(transport=Transport)
|
||||
|
||||
@patch("kombu.compat._iterconsume")
|
||||
def test_iterconsume(self, _iterconsume, n="test_iterconsume"):
|
||||
c = compat.Consumer(self.connection, queue=n, exchange=n)
|
||||
cs = compat.ConsumerSet(self.connection, consumers=[c])
|
||||
cs.iterconsume(limit=10, no_ack=True)
|
||||
_iterconsume.assert_called_with(c.connection, cs, True, 10)
|
||||
|
||||
def test_revive(self, n="test_revive"):
|
||||
c = compat.Consumer(self.connection, queue=n, exchange=n)
|
||||
cs = compat.ConsumerSet(self.connection, consumers=[c])
|
||||
|
||||
with self.connection.channel() as c2:
|
||||
cs.revive(c2)
|
||||
self.assertIs(cs.backend, c2)
|
||||
|
||||
def test_constructor(self, prefix="0daf8h21"):
|
||||
dcon = {"%s.xyx" % prefix: {"exchange": "%s.xyx" % prefix,
|
||||
"routing_key": "xyx"},
|
||||
|
|
|
@ -42,6 +42,10 @@ class test_connection_utils(unittest.TestCase):
|
|||
self.assertEqual(conn.as_uri(), self.nopass)
|
||||
self.assertEqual(conn.as_uri(include_password=True), self.url)
|
||||
|
||||
def test_as_uri_when_mongodb(self):
|
||||
x = BrokerConnection("mongodb://localhost")
|
||||
self.assertTrue(x.as_uri())
|
||||
|
||||
def test_bogus_scheme(self):
|
||||
with self.assertRaises(KeyError):
|
||||
BrokerConnection("bogus://localhost:7421").transport
|
||||
|
@ -153,6 +157,22 @@ class test_Connection(unittest.TestCase):
|
|||
with self.assertRaises(_ConnectionError):
|
||||
ensured()
|
||||
|
||||
def test_autoretry(self):
|
||||
myfun = Mock()
|
||||
myfun.__name__ = "test_autoretry"
|
||||
|
||||
self.conn.transport.connection_errors = (KeyError, )
|
||||
|
||||
def on_call(*args, **kwargs):
|
||||
myfun.side_effect = None
|
||||
raise KeyError("foo")
|
||||
|
||||
myfun.side_effect = on_call
|
||||
insured = self.conn.autoretry(myfun)
|
||||
insured()
|
||||
|
||||
self.assertTrue(myfun.called)
|
||||
|
||||
def test_SimpleQueue(self):
|
||||
conn = self.conn
|
||||
q = conn.SimpleQueue("foo")
|
||||
|
@ -221,6 +241,12 @@ class test_Connection_with_transport_options(unittest.TestCase):
|
|||
self.assertEqual(conn.transport_options, self.transport_options)
|
||||
|
||||
|
||||
class xResource(Resource):
|
||||
|
||||
def setup(self):
|
||||
pass
|
||||
|
||||
|
||||
class ResourceCase(unittest.TestCase):
|
||||
abstract = True
|
||||
|
||||
|
@ -250,6 +276,75 @@ class ResourceCase(unittest.TestCase):
|
|||
[chan.release() for chan in chans]
|
||||
self.assertState(P, 10, 0)
|
||||
|
||||
def test_acquire_no_limit(self):
|
||||
if self.abstract:
|
||||
return
|
||||
P = self.create_resource(None, 0)
|
||||
P.acquire().release()
|
||||
|
||||
def test_replace_when_limit(self):
|
||||
if self.abstract:
|
||||
return
|
||||
P = self.create_resource(10, 0)
|
||||
r = P.acquire()
|
||||
P._dirty = Mock()
|
||||
P.close_resource = Mock()
|
||||
|
||||
P.replace(r)
|
||||
P._dirty.discard.assert_called_with(r)
|
||||
P.close_resource.assert_called_with(r)
|
||||
|
||||
def test_replace_no_limit(self):
|
||||
if self.abstract:
|
||||
return
|
||||
P = self.create_resource(None, 0)
|
||||
r = P.acquire()
|
||||
P._dirty = Mock()
|
||||
P.close_resource = Mock()
|
||||
|
||||
P.replace(r)
|
||||
self.assertFalse(P._dirty.discard.called)
|
||||
P.close_resource.assert_called_with(r)
|
||||
|
||||
def test_interface_prepare(self):
|
||||
if not self.abstract:
|
||||
return
|
||||
x = xResource()
|
||||
self.assertEqual(x.prepare(10), 10)
|
||||
|
||||
def test_force_close_all_handles_AttributeError(self):
|
||||
if self.abstract:
|
||||
return
|
||||
P = self.create_resource(10, 10)
|
||||
cr = P.close_resource = Mock()
|
||||
cr.side_effect = AttributeError("x")
|
||||
|
||||
P.acquire()
|
||||
self.assertTrue(P._dirty)
|
||||
|
||||
P.force_close_all()
|
||||
|
||||
def test_force_close_all_no_mutex(self):
|
||||
if self.abstract:
|
||||
return
|
||||
P = self.create_resource(10, 10)
|
||||
P.close_resource = Mock()
|
||||
|
||||
m = P._resource = Mock()
|
||||
m.mutex = None
|
||||
m.queue.pop.side_effect = IndexError
|
||||
|
||||
P.force_close_all()
|
||||
|
||||
def test_add_when_empty(self):
|
||||
if self.abstract:
|
||||
return
|
||||
P = self.create_resource(None, None)
|
||||
P._resource.queue[:] = []
|
||||
self.assertFalse(P._resource.queue)
|
||||
P._add_when_empty()
|
||||
self.assertTrue(P._resource.queue)
|
||||
|
||||
|
||||
class test_ConnectionPool(ResourceCase):
|
||||
abstract = False
|
||||
|
|
|
@ -3,13 +3,16 @@ from __future__ import with_statement
|
|||
|
||||
import warnings
|
||||
|
||||
from mock import patch
|
||||
|
||||
from ..connection import BrokerConnection
|
||||
from ..exceptions import StdChannelError
|
||||
from ..transport import virtual
|
||||
from ..utils import uuid
|
||||
|
||||
from .compat import catch_warnings
|
||||
from .utils import unittest
|
||||
from .utils import redirect_stdouts
|
||||
from .utils import Mock, redirect_stdouts
|
||||
|
||||
|
||||
def client():
|
||||
|
@ -109,6 +112,11 @@ class test_Message(unittest.TestCase):
|
|||
"the quick brown fox...".encode("utf-8"))
|
||||
self.assertTrue(message.delivery_tag, tag)
|
||||
|
||||
def test_create_no_body(self):
|
||||
virtual.Message(Mock(), {
|
||||
"body": None,
|
||||
"properties": {"delivery_tag": 1}})
|
||||
|
||||
def test_serializable(self):
|
||||
c = client().channel()
|
||||
data = c.prepare_message("the quick brown fox...")
|
||||
|
@ -314,6 +322,39 @@ class test_Channel(unittest.TestCase):
|
|||
self.channel.basic_recover(requeue=True)
|
||||
self.assertTrue(self.channel._qos.was_restored)
|
||||
|
||||
def test_restore_unacked_raises_BaseException(self):
|
||||
q = self.channel.qos
|
||||
q._flush = Mock()
|
||||
q._delivered = {1: 1}
|
||||
|
||||
q.channel._restore = Mock()
|
||||
q.channel._restore.side_effect = SystemExit
|
||||
|
||||
errors = q.restore_unacked()
|
||||
self.assertIsInstance(errors[0][0], SystemExit)
|
||||
self.assertEqual(errors[0][1], 1)
|
||||
self.assertFalse(q._delivered)
|
||||
|
||||
@patch("kombu.transport.virtual.emergency_dump_state")
|
||||
@patch("kombu.transport.virtual.say")
|
||||
def test_restore_unacked_once_when_unrestored(self, say,
|
||||
emergency_dump_state):
|
||||
q = self.channel.qos
|
||||
q._flush = Mock()
|
||||
|
||||
class State(dict):
|
||||
restored = False
|
||||
|
||||
q._delivered = State({1: 1})
|
||||
ru = q.restore_unacked = Mock()
|
||||
exc = KeyError()
|
||||
ru.return_value = [(exc, 1)]
|
||||
|
||||
self.channel.do_restore = True
|
||||
q.restore_unacked_once()
|
||||
self.assertTrue(say.called)
|
||||
self.assertTrue(emergency_dump_state.called)
|
||||
|
||||
def test_basic_recover(self):
|
||||
with self.assertRaises(NotImplementedError):
|
||||
self.channel.basic_recover(requeue=False)
|
||||
|
@ -355,6 +396,64 @@ class test_Channel(unittest.TestCase):
|
|||
with self.assertRaises(NotImplementedError):
|
||||
self.channel.flow(False)
|
||||
|
||||
def test_close_when_no_connection(self):
|
||||
self.channel.connection = None
|
||||
self.channel.close()
|
||||
self.assertTrue(self.channel.closed)
|
||||
|
||||
def test_drain_events_has_get_many(self):
|
||||
c = self.channel
|
||||
c._get_many = Mock()
|
||||
c._poll = Mock()
|
||||
c._consumers = [1]
|
||||
c._qos = Mock()
|
||||
c._qos.can_consume.return_value = True
|
||||
|
||||
c.drain_events(timeout=10.0)
|
||||
c._get_many.assert_called_with(c._active_queues, timeout=10.0)
|
||||
|
||||
def test_get_exchanges(self):
|
||||
self.channel.exchange_declare(exchange="foo")
|
||||
self.assertTrue(self.channel.get_exchanges())
|
||||
|
||||
def test_basic_cancel_not_in_active_queues(self):
|
||||
c = self.channel
|
||||
c._consumers.add("x")
|
||||
c._tag_to_queue["x"] = "foo"
|
||||
c._active_queues = Mock()
|
||||
c._active_queues.remove.side_effect = ValueError()
|
||||
c.auto_delete_queues["foo"] = 3
|
||||
|
||||
c.basic_cancel("x")
|
||||
self.assertEqual(c.auto_delete_queues["foo"], 2)
|
||||
c._active_queues.remove.assert_called_with("foo")
|
||||
|
||||
def test_basic_cancel_unknown_ctag(self):
|
||||
self.assertIsNone(self.channel.basic_cancel("unknown-tag"))
|
||||
|
||||
def test_list_bindings(self):
|
||||
c = self.channel
|
||||
c.exchange_declare(exchange="foo")
|
||||
c.queue_declare(queue="q")
|
||||
c.queue_bind(queue="q", exchange="foo", routing_key="rk")
|
||||
|
||||
self.assertIn(("q", "foo", "rk"), list(c.list_bindings()))
|
||||
|
||||
def test_after_reply_message_received(self):
|
||||
c = self.channel
|
||||
c.queue_delete = Mock()
|
||||
c.after_reply_message_received("foo")
|
||||
c.queue_delete.assert_called_with("foo")
|
||||
|
||||
def test_queue_delete_unknown_queue(self):
|
||||
self.assertIsNone(self.channel.queue_delete("xiwjqjwel"))
|
||||
|
||||
def test_queue_declare_passive(self):
|
||||
has_queue = self.channel._has_queue = Mock()
|
||||
has_queue.return_value = False
|
||||
with self.assertRaises(StdChannelError):
|
||||
self.channel.queue_declare(queue="21wisdjwqe", passive=True)
|
||||
|
||||
|
||||
class test_Transport(unittest.TestCase):
|
||||
|
||||
|
|
|
@ -0,0 +1,50 @@
|
|||
from __future__ import absolute_import
|
||||
|
||||
import pickle
|
||||
|
||||
from ..utils.functional import promise, maybe_promise
|
||||
from .utils import unittest
|
||||
|
||||
|
||||
def double(x):
|
||||
return x * 2
|
||||
|
||||
|
||||
class test_promise(unittest.TestCase):
|
||||
|
||||
def test__str__(self):
|
||||
self.assertEqual(str(promise(lambda: "the quick brown fox")),
|
||||
"the quick brown fox")
|
||||
|
||||
def test__repr__(self):
|
||||
self.assertEqual(repr(promise(lambda: "fi fa fo")),
|
||||
"'fi fa fo'")
|
||||
|
||||
def test_evaluate(self):
|
||||
self.assertEqual(promise(lambda: 2 + 2)(), 4)
|
||||
self.assertEqual(promise(lambda x: x * 4, 2), 8)
|
||||
self.assertEqual(promise(lambda x: x * 8, 2)(), 16)
|
||||
|
||||
def test_cmp(self):
|
||||
self.assertEqual(promise(lambda: 10), promise(lambda: 10))
|
||||
self.assertNotEqual(promise(lambda: 10), promise(lambda: 20))
|
||||
|
||||
def test__reduce__(self):
|
||||
x = promise(double, 4)
|
||||
y = pickle.loads(pickle.dumps(x))
|
||||
self.assertEqual(x(), y())
|
||||
|
||||
def test__deepcopy__(self):
|
||||
from copy import deepcopy
|
||||
x = promise(double, 4)
|
||||
y = deepcopy(x)
|
||||
self.assertEqual(x._fun, y._fun)
|
||||
self.assertEqual(x._args, y._args)
|
||||
self.assertEqual(x(), y())
|
||||
|
||||
|
||||
class test_maybe_promise(unittest.TestCase):
|
||||
|
||||
def test_evaluates(self):
|
||||
self.assertEqual(maybe_promise(promise(lambda: 10)), 10)
|
||||
self.assertEqual(maybe_promise(20), 20)
|
|
@ -37,6 +37,12 @@ class StdChannel(object):
|
|||
after transient reply message received."""
|
||||
pass
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, *exc_info):
|
||||
self.close()
|
||||
|
||||
|
||||
class Message(object):
|
||||
"""Base class for received messages."""
|
||||
|
|
|
@ -160,7 +160,7 @@ class QoS(object):
|
|||
|
||||
try:
|
||||
self.channel._restore(message)
|
||||
except (KeyboardInterrupt, SystemExit, Exception), exc:
|
||||
except BaseException, exc:
|
||||
errors.append((exc, message))
|
||||
delivered.clear()
|
||||
return errors
|
||||
|
@ -175,7 +175,7 @@ class QoS(object):
|
|||
self._flush()
|
||||
state = self._delivered
|
||||
|
||||
if not self.channel.do_restore or getattr(state, "restored"):
|
||||
if not self.channel.do_restore or getattr(state, "restored", None):
|
||||
assert not state
|
||||
return
|
||||
|
||||
|
|
|
@ -35,7 +35,7 @@ class FairCycle(object):
|
|||
raise self.predicate()
|
||||
|
||||
def get(self, **kwargs):
|
||||
for tried in count(0):
|
||||
for tried in count(0): # for infinity
|
||||
resource = self._next()
|
||||
|
||||
try:
|
||||
|
|
|
@ -153,7 +153,7 @@ def retry_over_time(fun, catch, args=[], kwargs={}, errback=None,
|
|||
interval_max + interval_start,
|
||||
interval_step, repeatlast=True)
|
||||
|
||||
for retries, interval in enumerate(interval_range):
|
||||
for retries, interval in enumerate(interval_range): # for infinity
|
||||
try:
|
||||
return fun(*args, **kwargs)
|
||||
except catch, exc:
|
||||
|
|
|
@ -1,2 +1,2 @@
|
|||
anyjson>=0.3.1
|
||||
kamqp
|
||||
amqplib>=1.0
|
||||
|
|
|
@ -8,7 +8,6 @@ cover3-package = kombu
|
|||
cover3-exclude = kombu
|
||||
kombu.utils.compat
|
||||
kombu.utils.eventio
|
||||
kombu.utils.functional
|
||||
kombu.utils.finalize
|
||||
kombu.transport.pika
|
||||
kombu.transport.couchdb
|
||||
|
@ -27,4 +26,4 @@ upload-dir = docs/.build/html
|
|||
|
||||
[bdist_rpm]
|
||||
requires = anyjson >= 0.3.1
|
||||
kamqp
|
||||
amqplib >= 1.0.0
|
||||
|
|
Loading…
Reference in New Issue