Merge branch '2.5'

Conflicts:
	pavement.py
This commit is contained in:
Ask Solem 2013-02-07 15:38:15 +00:00
commit 67d2fa4744
42 changed files with 291 additions and 206 deletions

View File

@ -5,6 +5,7 @@
Adam Nelson <adam@varud.com>
Adam Wentz
Alex Koshelev <daevaorn@gmail.com>
Alexandre Bourget <alexandre.bourget@savoirfairelinux.com>
Andrew Watts
Andrey Antukh <niwi@niwi.be>

View File

@ -5,9 +5,7 @@ import os
testing = False
DONT_TOUCH = (
'./index.txt',
)
DONT_TOUCH = ('./index.txt', )
def target_name(fn):

View File

@ -95,8 +95,10 @@ def fixliterals(fname):
replace_type in ("class", "func", "meth"):
default = default[:-2]
replace_value = raw_input(
colorize("Text <target> [", fg="yellow") + default + \
colorize("]: ", fg="yellow")).strip()
colorize("Text <target> [", fg="yellow") +
default +
colorize("]: ", fg="yellow"),
).strip()
if not replace_value:
replace_value = default
new.append(":%s:`%s`" % (replace_type, replace_value))

View File

@ -62,8 +62,8 @@ html_use_modindex = True
html_use_index = True
latex_documents = [
('index', 'Kombu.tex', ur'Kombu Documentation',
ur'Ask Solem', 'manual'),
('index', 'Kombu.tex', u'Kombu Documentation',
u'Ask Solem', 'manual'),
]
html_theme = "celery"

View File

@ -14,10 +14,11 @@ def send_as_task(connection, fun, args=(), kwargs={}, priority='mid'):
with producers[connection].acquire(block=True) as producer:
maybe_declare(task_exchange, producer.channel)
producer.publish(payload, serializer='pickle',
compression='bzip2',
exchange=task_exchange,
routing_key=routing_key)
producer.publish(payload,
serializer='pickle',
compression='bzip2',
exchange=task_exchange,
routing_key=routing_key)
if __name__ == '__main__':
from kombu import Connection

View File

@ -147,7 +147,7 @@ def bump(*files, **kwargs):
v.write(next)
print(cmd("git", "commit", "-m", "Bumps version to %s" % (to_str(next), ),
*[f.filename for f in files]))
*[f.filename for f in files]))
print(cmd("git", "tag", "v%s" % (to_str(next), )))

View File

@ -37,7 +37,7 @@ class FlakePP(object):
re_with = compile(RE_WITH)
re_noqa = compile(RE_NOQA)
map = {"abs": True, "print": False,
"with": False, "with-used": False}
"with": False, "with-used": False}
def __init__(self, verbose=False):
self.verbose = verbose

View File

@ -8,7 +8,7 @@ except ImportError:
from ez_setup import use_setuptools
use_setuptools()
from setuptools import setup # noqa
from setuptools.command.install import install # noqa
from setuptools.command.install import install # noqa
class no_install(install):

View File

@ -40,7 +40,7 @@ def consumeN(conn, consumer, n=1, timeout=30):
except socket.timeout:
seconds += 1
msg = "Received %s/%s messages. %s seconds passed." % (
len(messages), n, seconds)
len(messages), n, seconds)
if seconds >= timeout:
raise socket.timeout(msg)
if seconds > 1:
@ -113,7 +113,7 @@ class TransportCase(unittest.TestCase):
self.after_connect(self.connection)
except self.connection.connection_errors:
self.skip_test_reason = "%s transport can't connect" % (
self.transport, )
self.transport, )
else:
self.connected = True
@ -161,14 +161,15 @@ class TransportCase(unittest.TestCase):
return _digest(data).hexdigest()
@skip_if_quick
def test_produce__consume_large_messages(self, bytes=1048576, n=10,
def test_produce__consume_large_messages(
self, bytes=1048576, n=10,
charset=string.punctuation + string.letters + string.digits):
if not self.verify_alive():
return
bytes = min(filter(None, [bytes, self.message_size_limit]))
messages = ["".join(random.choice(charset)
for j in xrange(bytes)) + "--%s" % n
for i in xrange(n)]
for j in xrange(bytes)) + "--%s" % n
for i in xrange(n)]
digests = []
chan1 = self.connection.channel()
consumer = chan1.Consumer(self.queue)
@ -180,7 +181,7 @@ class TransportCase(unittest.TestCase):
digests.append(self._digest(message))
received = [(msg["i"], msg["text"])
for msg in consumeN(self.connection, consumer, n)]
for msg in consumeN(self.connection, consumer, n)]
self.assertEqual(len(received), n)
ordering = [i for i, _ in received]
if ordering != range(n) and not self.suppress_disorder_warning:
@ -229,8 +230,9 @@ class TransportCase(unittest.TestCase):
chan = self.connection.channel()
self.purge([self.queue.name])
consumer = chan.Consumer(self.queue)
self.assertRaises(socket.timeout, self.connection.drain_events,
timeout=0.3)
self.assertRaises(
socket.timeout, self.connection.drain_events, timeout=0.3,
)
consumer.cancel()
chan.close()

View File

@ -380,7 +380,10 @@ class Consumer(object):
return self
def __exit__(self, *exc_info):
self.cancel()
try:
self.cancel()
except Exception:
pass
def add_queue(self, queue):
queue = queue(self.channel)

View File

@ -1,4 +1,4 @@
from __future__ import absolute_import, print_function
from __future__ import absolute_import
import anyjson
import os
@ -33,8 +33,7 @@ def find_distribution_modules(name=__name__, file=__file__):
def import_all_modules(name=__name__, file=__file__, skip=[]):
for module in find_distribution_modules(name, file):
if module not in skip:
print('preimporting {0} for coverage...'.format(module),
file=sys.stderr)
print('preimporting %r for coverage...' % (module, ))
try:
__import__(module)
except (ImportError, VersionMismatch, AttributeError):

View File

@ -27,7 +27,7 @@ class Channel(base.StdChannel):
def __init__(self, connection):
self.connection = connection
self.called = []
self.deliveries = count(1)
self.deliveries = count(1).next
self.to_deliver = []
self.events = {'basic_return': set()}
self.channel_id = self._ids()
@ -105,7 +105,7 @@ class Channel(base.StdChannel):
def message_to_python(self, message, *args, **kwargs):
self._called('message_to_python')
return Message(self, body=anyjson.dumps(message),
delivery_tag=next(self.deliveries),
delivery_tag=self.deliveries(),
throw_decode_error=self.throw_decode_error,
content_type='application/json',
content_encoding='utf-8')

View File

@ -1,4 +1,5 @@
from __future__ import absolute_import
from __future__ import with_statement
import socket
@ -156,13 +157,13 @@ class test_replies(TestCase):
body, message = Mock(), Mock()
itermessages.return_value = [(body, message)]
it = collect_replies(conn, channel, queue, no_ack=False)
m = next(it)
m = it.next()
self.assertIs(m, body)
itermessages.assert_called_with(conn, channel, queue, no_ack=False)
message.ack.assert_called_with()
with self.assertRaises(StopIteration):
next(it)
it.next()
channel.after_reply_message_received.assert_called_with(queue.name)
@ -172,7 +173,7 @@ class test_replies(TestCase):
body, message = Mock(), Mock()
itermessages.return_value = [(body, message)]
it = collect_replies(conn, channel, queue)
m = next(it)
m = it.next()
self.assertIs(m, body)
itermessages.assert_called_with(conn, channel, queue, no_ack=True)
self.assertFalse(message.ack.called)
@ -183,7 +184,7 @@ class test_replies(TestCase):
itermessages.return_value = []
it = collect_replies(conn, channel, queue)
with self.assertRaises(StopIteration):
next(it)
it.next()
self.assertFalse(channel.after_reply_message_received.called)
@ -317,11 +318,11 @@ class test_itermessages(TestCase):
it = common.itermessages(conn, channel, 'q', limit=1,
Consumer=MockConsumer)
ret = next(it)
ret = it.next()
self.assertTupleEqual(ret, ('body', 'message'))
with self.assertRaises(StopIteration):
next(it)
it.next()
def test_when_raises_socket_timeout(self):
conn = self.MockConnection()
@ -332,7 +333,7 @@ class test_itermessages(TestCase):
Consumer=MockConsumer)
with self.assertRaises(StopIteration):
next(it)
it.next()
@patch('kombu.common.deque')
def test_when_raises_IndexError(self, deque):
@ -344,7 +345,7 @@ class test_itermessages(TestCase):
Consumer=MockConsumer)
with self.assertRaises(StopIteration):
next(it)
it.next()
class test_entry_to_queue(TestCase):

View File

@ -1,4 +1,5 @@
from __future__ import absolute_import
from __future__ import with_statement
from mock import patch
@ -30,7 +31,7 @@ class test_misc(TestCase):
conn = MyConnection()
consumer = Consumer()
it = compat._iterconsume(conn, consumer)
self.assertEqual(next(it), 1)
self.assertEqual(it.next(), 1)
self.assertTrue(consumer.active)
it2 = compat._iterconsume(conn, consumer, limit=10)
@ -242,7 +243,7 @@ class test_Consumer(TestCase):
c = C(self.connection,
queue=n, exchange=n, routing_key='rkey')
self.assertEqual(c.wait(10), list(range(10)))
self.assertEqual(c.wait(10), range(10))
c.close()
def test_iterqueue(self, n='test_iterqueue'):
@ -257,7 +258,7 @@ class test_Consumer(TestCase):
c = C(self.connection,
queue=n, exchange=n, routing_key='rkey')
self.assertEqual(list(c.iterqueue(limit=10)), list(range(10)))
self.assertEqual(list(c.iterqueue(limit=10)), range(10))
c.close()

View File

@ -1,4 +1,5 @@
from __future__ import absolute_import
from __future__ import with_statement
import errno
import pickle
@ -10,7 +11,6 @@ from nose import SkipTest
from kombu import Connection, Consumer, Producer, parse_url
from kombu.connection import Resource
from kombu.five import items
from .mocks import Transport
from .utils import TestCase
@ -43,7 +43,7 @@ class test_connection_utils(TestCase):
def test_parse_generated_as_uri(self):
conn = Connection(self.url)
info = conn.info()
for k, v in items(self.expected):
for k, v in self.expected.items():
self.assertEqual(info[k], v)
# by default almost the same- no password
self.assertEqual(conn.as_uri(), self.nopass)
@ -60,7 +60,7 @@ class test_connection_utils(TestCase):
def assert_info(self, conn, **fields):
info = conn.info()
for field, expected in items(fields):
for field, expected in fields.iteritems():
self.assertEqual(info[field], expected)
def test_rabbitmq_example_urls(self):
@ -505,7 +505,7 @@ class ResourceCase(TestCase):
return
P = self.create_resource(10, 0)
self.assertState(P, 10, 0)
chans = [P.acquire() for _ in range(10)]
chans = [P.acquire() for _ in xrange(10)]
self.assertState(P, 0, 10)
with self.assertRaises(P.LimitExceeded):
P.acquire()

View File

@ -1,4 +1,5 @@
from __future__ import absolute_import
from __future__ import with_statement
import pickle
@ -243,7 +244,8 @@ class test_Queue(TestCase):
arguments=None,
type='direct',
durable=True,
), chan.exchange_declare.call_args_list,
),
chan.exchange_declare.call_args_list,
)
def test_can_cache_declaration(self):
@ -265,9 +267,8 @@ class test_Queue(TestCase):
)
def test_binds_at_instantiation(self):
self.assertTrue(
Queue('foo', self.exchange, channel=get_conn().channel()).is_bound,
)
self.assertTrue(Queue('foo', self.exchange,
channel=get_conn().channel()).is_bound)
def test_also_binds_exchange(self):
chan = get_conn().channel()

View File

@ -1,4 +1,5 @@
from __future__ import absolute_import, unicode_literals
from __future__ import absolute_import
from __future__ import with_statement
import anyjson
import pickle
@ -67,7 +68,7 @@ class test_Producer(TestCase):
'p.declare() declares exchange')
def test_prepare(self):
message = {'the quick brown fox': 'jumps over the lazy dog'}
message = {u'the quick brown fox': u'jumps over the lazy dog'}
channel = self.connection.channel()
p = Producer(channel, self.exchange, serializer='json')
m, ctype, cencoding = p._prepare(message, headers={})
@ -76,7 +77,7 @@ class test_Producer(TestCase):
self.assertEqual(cencoding, 'utf-8')
def test_prepare_compression(self):
message = {'the quick brown fox': 'jumps over the lazy dog'}
message = {u'the quick brown fox': u'jumps over the lazy dog'}
channel = self.connection.channel()
p = Producer(channel, self.exchange, serializer='json')
headers = {}
@ -106,16 +107,15 @@ class test_Producer(TestCase):
self.assertEqual(cencoding, 'alien')
def test_prepare_is_already_unicode(self):
message = 'the quick brown fox'
message = u'the quick brown fox'
channel = self.connection.channel()
p = Producer(channel, self.exchange, serializer='json')
m, ctype, cencoding = p._prepare(message, content_type='text/plain')
self.assertEqual(m, message.encode('utf-8'))
self.assertEqual(ctype, 'text/plain')
self.assertEqual(cencoding, 'utf-8')
m, ctype, cencoding = p._prepare(
message, content_type='text/plain', content_encoding='utf-8',
)
m, ctype, cencoding = p._prepare(message, content_type='text/plain',
content_encoding='utf-8')
self.assertEqual(m, message.encode('utf-8'))
self.assertEqual(ctype, 'text/plain')
self.assertEqual(cencoding, 'utf-8')
@ -177,7 +177,7 @@ class test_Producer(TestCase):
def test_publish(self):
channel = self.connection.channel()
p = Producer(channel, self.exchange, serializer='json')
message = {'the quick brown fox': 'jumps over the lazy dog'}
message = {u'the quick brown fox': u'jumps over the lazy dog'}
ret = p.publish(message, routing_key='process')
self.assertIn('prepare_message', channel)
self.assertIn('basic_publish', channel)
@ -409,11 +409,11 @@ class test_Consumer(TestCase):
message.payload # trigger cache
consumer.register_callback(callback)
consumer._receive_callback({'foo': 'bar'})
consumer._receive_callback({u'foo': u'bar'})
self.assertIn('basic_ack', channel)
self.assertIn('message_to_python', channel)
self.assertEqual(received[0], {'foo': 'bar'})
self.assertEqual(received[0], {u'foo': u'bar'})
def test_basic_ack_twice(self):
channel = self.connection.channel()

View File

@ -1,4 +1,5 @@
from __future__ import absolute_import
from __future__ import with_statement
import socket
@ -28,9 +29,11 @@ class test_Mailbox(TestCase):
self.handlers = {'mymethod': self._handler}
self.bound = self.mailbox(self.connection)
self.default_chan = self.connection.channel()
self.node = self.bound.Node('test_pidbox', state=self.state,
handlers=self.handlers,
channel=self.default_chan)
self.node = self.bound.Node(
'test_pidbox',
state=self.state, handlers=self.handlers,
channel=self.default_chan,
)
def test_reply__collect(self):
mailbox = pidbox.Mailbox('test_reply__collect')(self.connection)

View File

@ -1,4 +1,4 @@
from __future__ import absolute_import
from __future__ import with_statement
from kombu import Connection, Producer
from kombu import pools

View File

@ -1,29 +1,22 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
from __future__ import absolute_import, unicode_literals
from __future__ import absolute_import
from __future__ import with_statement
from kombu.serialization import (
registry,
register,
SerializerNotInstalled,
raw_encode,
register_yaml,
register_msgpack,
decode,
bytes_t,
pickle,
pickle_protocol,
unregister,
register_pickle,
)
import sys
from kombu.serialization import (registry, register, SerializerNotInstalled,
raw_encode, register_yaml, register_msgpack,
decode, bytes_t, pickle, pickle_protocol,
unregister, register_pickle)
from .utils import TestCase
from .utils import mask_modules, skip_if_not_module
# For content_encoding tests
unicode_string = 'abcdé\u8463'
unicode_string = u'abcdé\u8463'
unicode_string_as_utf8 = unicode_string.encode('utf-8')
latin_string = 'abcdé'
latin_string = u'abcdé'
latin_string_as_latin1 = latin_string.encode('latin-1')
latin_string_as_utf8 = latin_string.encode('utf-8')
@ -33,39 +26,48 @@ py_data = {
'string': 'The quick brown fox jumps over the lazy dog',
'int': 10,
'float': 3.14159265,
'unicode': 'Thé quick brown fox jumps over thé lazy dog',
'unicode': u'Thé quick brown fox jumps over thé lazy dog',
'list': ['george', 'jerry', 'elaine', 'cosmo'],
}
# JSON serialization tests
json_data = ('{"int": 10, "float": 3.1415926500000002, '
'"list": ["george", "jerry", "elaine", "cosmo"], '
'"string": "The quick brown fox jumps over the lazy '
'dog", "unicode": "Th\\u00e9 quick brown fox jumps over '
'th\\u00e9 lazy dog"}')
json_data = """\
{"int": 10, "float": 3.1415926500000002, \
"list": ["george", "jerry", "elaine", "cosmo"], \
"string": "The quick brown fox jumps over the lazy \
dog", "unicode": "Th\\u00e9 quick brown fox jumps over \
th\\u00e9 lazy dog"}\
"""
# Pickle serialization tests
pickle_data = pickle.dumps(py_data, protocol=pickle_protocol)
# YAML serialization tests
yaml_data = ('float: 3.1415926500000002\nint: 10\n'
'list: [george, jerry, elaine, cosmo]\n'
'string: The quick brown fox jumps over the lazy dog\n'
'unicode: "Th\\xE9 quick brown fox '
'jumps over th\\xE9 lazy dog"\n')
yaml_data = """\
float: 3.1415926500000002
int: 10
list: [george, jerry, elaine, cosmo]
string: The quick brown fox jumps over the lazy dog
unicode: "Th\\xE9 quick brown fox jumps over th\\xE9 lazy dog"
"""
msgpack_py_data = {
'string': 'The quick brown fox jumps over the lazy dog',
'int': 10,
'float': 3.14159265,
'unicode': 'Thé quick brown fox jumps over thé lazy dog',
'list': ['george', 'jerry', 'elaine', 'cosmo'],
}
msgpack_py_data = dict(py_data)
# msgpack only supports tuples
msgpack_py_data['list'] = tuple(msgpack_py_data['list'])
# Unicode chars are lost in transmit :(
msgpack_py_data['unicode'] = 'Th quick brown fox jumps over th lazy dog'
msgpack_data = """\
\x85\xa3int\n\xa5float\xcb@\t!\xfbS\xc8\xd4\xf1\xa4list\
\x94\xa6george\xa5jerry\xa6elaine\xa5cosmo\xa6string\xda\
\x00+The quick brown fox jumps over the lazy dog\xa7unicode\
\xda\x00)Th quick brown fox jumps over th lazy dog\
"""
def say(m):
sys.stderr.write('%s\n' % (m, ))
registry.register('testS', lambda s: s, lambda s: 'decoded',
'application/testS', 'utf-8')
@ -91,11 +93,13 @@ class test_Serialization(TestCase):
registry.disable('testS')
with self.assertRaises(SerializerNotInstalled):
registry.decode('xxd', 'application/testS', 'utf-8',
force=False)
registry.decode(
'xxd', 'application/testS', 'utf-8', force=False,
)
ret = registry.decode('xxd', 'application/testS', 'utf-8',
force=True)
ret = registry.decode(
'xxd', 'application/testS', 'utf-8', force=True,
)
self.assertEqual(ret, 'decoded')
finally:
disabled.clear()
@ -106,17 +110,15 @@ class test_Serialization(TestCase):
def test_content_type_decoding(self):
self.assertEqual(
unicode_string,
registry.decode(
unicode_string_as_utf8,
content_type='plain/text',
content_encoding='utf-8'),
registry.decode(unicode_string_as_utf8,
content_type='plain/text',
content_encoding='utf-8'),
)
self.assertEqual(
latin_string,
registry.decode(
latin_string_as_latin1,
content_type='application/data',
content_encoding='latin-1'),
registry.decode(latin_string_as_latin1,
content_type='application/data',
content_encoding='latin-1'),
)
def test_content_type_binary(self):
@ -158,10 +160,9 @@ class test_Serialization(TestCase):
def test_json_decode(self):
self.assertEqual(
py_data,
registry.decode(
json_data,
content_type='application/json',
content_encoding='utf-8'),
registry.decode(json_data,
content_type='application/json',
content_encoding='utf-8'),
)
def test_json_encode(self):
@ -174,18 +175,35 @@ class test_Serialization(TestCase):
registry.decode(
json_data,
content_type='application/json',
content_encoding='utf-8'),
content_encoding='utf-8',
),
)
@skip_if_not_module('msgpack')
def test_msgpack_decode(self):
register_msgpack()
decoded = registry.decode(
registry.encode(msgpack_py_data, serializer='msgpack')[-1],
content_type='application/x-msgpack',
content_encoding='binary',
self.assertEqual(
msgpack_py_data,
registry.decode(msgpack_data,
content_type='application/x-msgpack',
content_encoding='binary'),
)
@skip_if_not_module('msgpack')
def test_msgpack_encode(self):
register_msgpack()
self.assertEqual(
registry.decode(
registry.encode(msgpack_py_data, serializer='msgpack')[-1],
content_type='application/x-msgpack',
content_encoding='binary',
),
registry.decode(
msgpack_data,
content_type='application/x-msgpack',
content_encoding='binary',
),
)
self.assertEqual(msgpack_py_data, decoded)
@skip_if_not_module('yaml')
def test_yaml_decode(self):
@ -209,16 +227,16 @@ class test_Serialization(TestCase):
registry.decode(
yaml_data,
content_type='application/x-yaml',
content_encoding='utf-8'),
content_encoding='utf-8',
),
)
def test_pickle_decode(self):
self.assertEqual(
py_data,
registry.decode(
pickle_data,
content_type='application/x-python-serialize',
content_encoding='binary'),
registry.decode(pickle_data,
content_type='application/x-python-serialize',
content_encoding='binary'),
)
def test_pickle_encode(self):
@ -248,9 +266,10 @@ class test_Serialization(TestCase):
registry.encode('foo', serializer='nonexisting')
def test_raw_encode(self):
self.assertTupleEqual(raw_encode('foo'.encode('utf-8')),
('application/data', 'binary',
'foo'.encode('utf-8')))
self.assertTupleEqual(
raw_encode('foo'.encode('utf-8')),
('application/data', 'binary', 'foo'.encode('utf-8')),
)
@mask_modules('yaml')
def test_register_yaml__no_yaml(self):

View File

@ -1,7 +1,9 @@
from __future__ import absolute_import
from __future__ import with_statement
from Queue import Empty
from kombu import Connection, Exchange, Queue
from kombu.five import Empty
from .utils import TestCase
from .utils import Mock

View File

@ -1,14 +1,19 @@
from __future__ import absolute_import, unicode_literals
from __future__ import absolute_import
from __future__ import with_statement
import pickle
import sys
from functools import wraps
from io import BytesIO, StringIO
from mock import Mock, patch
if sys.version_info >= (3, 0):
from io import StringIO, BytesIO
else:
from StringIO import StringIO, StringIO as BytesIO # noqa
from kombu import utils
from kombu.five import string_t
from kombu.utils.compat import next
from .utils import (
TestCase,
@ -57,7 +62,7 @@ class test_utils(TestCase):
[1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0])
def test_reprkwargs(self):
self.assertTrue(utils.reprkwargs({'foo': 'bar', 1: 2, 'k': 'v'}))
self.assertTrue(utils.reprkwargs({'foo': 'bar', 1: 2, u'k': 'v'}))
def test_reprcall(self):
self.assertTrue(
@ -88,7 +93,7 @@ class test_UUID(TestCase):
self.assertIsNone(ctypes)
tid = uuid()
self.assertTrue(tid)
self.assertIsInstance(tid, string_t)
self.assertIsInstance(tid, basestring)
try:
with_ctypes_masked()
@ -103,8 +108,8 @@ class test_Misc(TestCase):
def f(**kwargs):
return kwargs
kw = {'foo': 'foo',
'bar': 'bar'}
kw = {u'foo': 'foo',
u'bar': 'bar'}
self.assertTrue(f(**utils.kwdict(kw)))
@ -142,8 +147,7 @@ class test_emergency_dump_state(TestCase):
{'foo': 'bar'},
open_file=lambda n, m: fh, dump=raise_something
)
# replace at the end removes u' from repr on Py2
self.assertIn("'foo': 'bar'", fh.getvalue().replace("u'", "'"))
self.assertIn("'foo': 'bar'", fh.getvalue())
self.assertTrue(stderr.getvalue())
self.assertFalse(stdout.getvalue())
@ -196,8 +200,8 @@ class test_retry_over_time(TestCase):
utils.count.return_value = range(10)
cb = Mock()
x = utils.retry_over_time(self.myfun, self.Predicate,
errback=self.errback, interval_max=14,
callback=cb)
errback=self.errback, callback=cb,
interval_max=14)
self.assertEqual(x, 42)
self.assertEqual(self.index, 9)
cb.assert_called_with()
@ -287,8 +291,10 @@ class test_symbol_by_name(TestCase):
def test_returns_default(self):
default = object()
self.assertIs(utils.symbol_by_name('xyz.ryx.qedoa.weq:foz',
default=default), default)
self.assertIs(
utils.symbol_by_name('xyz.ryx.qedoa.weq:foz', default=default),
default,
)
def test_no_default(self):
with self.assertRaises(ImportError):
@ -302,17 +308,19 @@ class test_symbol_by_name(TestCase):
def test_package(self):
from kombu.entity import Exchange
self.assertIs(utils.symbol_by_name('.entity:Exchange',
package='kombu'), Exchange)
self.assertIs(
utils.symbol_by_name('.entity:Exchange', package='kombu'),
Exchange,
)
self.assertTrue(utils.symbol_by_name(':Consumer', package='kombu'))
class test_ChannelPromise(TestCase):
def test_repr(self):
self.assertIn(
"'foo'",
self.assertEqual(
repr(utils.ChannelPromise(lambda: 'foo')),
"<promise: 'foo'>",
)

View File

@ -57,11 +57,10 @@ class test_Channel(amqplibCase):
self.assertFalse(self.channel.no_ack_consumers)
def test_prepare_message(self):
x = self.channel.prepare_message(
self.assertTrue(self.channel.prepare_message(
'foobar', 10, 'application/data', 'utf-8',
properties={},
)
self.assertTrue(x)
))
def test_message_to_python(self):
message = Mock()

View File

@ -1,4 +1,5 @@
from __future__ import absolute_import
from __future__ import with_statement
from kombu import Connection, Consumer, Producer, Queue
from kombu.transport.base import Message, StdChannel, Transport

View File

@ -1,4 +1,5 @@
from __future__ import absolute_import
from __future__ import with_statement
import tempfile

View File

@ -1,4 +1,5 @@
from __future__ import absolute_import
from __future__ import with_statement
import socket

View File

@ -1,4 +1,5 @@
from __future__ import absolute_import
from __future__ import with_statement
import sys
@ -49,11 +50,10 @@ class test_Channel(TestCase):
self.assertFalse(self.channel.no_ack_consumers)
def test_prepare_message(self):
x = self.channel.prepare_message(
self.assertTrue(self.channel.prepare_message(
'foobar', 10, 'application/data', 'utf-8',
properties={},
)
self.assertTrue(x)
))
def test_message_to_python(self):
message = Mock()

View File

@ -1,4 +1,5 @@
from __future__ import absolute_import
from __future__ import with_statement
import socket
import types
@ -6,10 +7,10 @@ import types
from anyjson import dumps
from collections import defaultdict
from itertools import count
from Queue import Empty, Queue as _Queue
from kombu import Connection, Exchange, Queue, Consumer, Producer
from kombu.exceptions import InconsistencyError, VersionMismatch
from kombu.five import Empty, Queue as _Queue
from kombu.utils import eventio # patch poll
from kombu.tests.utils import TestCase
@ -128,10 +129,10 @@ class Client(object):
class _socket(object):
blocking = True
filenos = count(30)
next_fileno = count(30).next
def __init__(self, *args):
self._fileno = next(self.filenos)
self._fileno = self.next_fileno()
self.data = []
def fileno(self):
@ -264,24 +265,28 @@ class test_Channel(TestCase):
self.assertDictEqual(
self.channel._handle_message(
self.channel.subclient,
['pmessage', 'pattern', 'channel', 'data']
['pmessage', 'pattern', 'channel', 'data'],
),
{'type': 'pmessage',
'pattern': 'pattern',
'channel': 'channel',
'data': 'data'},
{
'type': 'pmessage',
'pattern': 'pattern',
'channel': 'channel',
'data': 'data',
},
)
def test_handle_message(self):
self.assertDictEqual(
self.channel._handle_message(
self.channel.subclient,
['type', 'channel', 'data']
['type', 'channel', 'data'],
),
{'type': 'type',
'pattern': None,
'channel': 'channel',
'data': 'data'},
{
'type': 'type',
'pattern': None,
'channel': 'channel',
'data': 'data',
},
)
def test_brpop_start_but_no_queues(self):
@ -290,9 +295,8 @@ class test_Channel(TestCase):
def test_receive(self):
s = self.channel.subclient = Mock()
self.channel._fanout_to_queue['a'] = 'b'
s.parse_response.return_value = [
'message', 'a', dumps({'hello': 'world'}),
]
s.parse_response.return_value = ['message', 'a',
dumps({'hello': 'world'})]
payload, queue = self.channel._receive()
self.assertDictEqual(payload, {'hello': 'world'})
self.assertEqual(queue, 'b')

View File

@ -1,4 +1,5 @@
from __future__ import absolute_import
from __future__ import with_statement
from mock import patch
from nose import SkipTest

View File

@ -1,4 +1,5 @@
from __future__ import absolute_import
from __future__ import with_statement
from mock import patch

View File

@ -1,4 +1,5 @@
from __future__ import absolute_import
from __future__ import with_statement
import warnings
@ -8,7 +9,9 @@ from kombu import Connection
from kombu.exceptions import StdChannelError
from kombu.transport import virtual
from kombu.utils import uuid
from kombu.compression import compress
from kombu.tests.compat import catch_warnings
from kombu.tests.utils import TestCase
from kombu.tests.utils import Mock, redirect_stdouts
@ -64,7 +67,7 @@ class test_QoS(TestCase):
self.q.append(i + 1, uuid())
self.assertFalse(self.q.can_consume())
tag1 = next(iter(self.q._delivered))
tag1 = iter(self.q._delivered).next()
self.q.ack(tag1)
self.assertTrue(self.q.can_consume())
@ -120,13 +123,15 @@ class test_Message(TestCase):
def test_serializable(self):
c = client().channel()
data = c.prepare_message('the quick brown fox...')
body, content_type = compress('the quick brown fox...', 'gzip')
data = c.prepare_message(body, headers={'compression': content_type})
tag = data['properties']['delivery_tag'] = uuid()
message = c.message_to_python(data)
dict_ = message.serializable()
self.assertEqual(dict_['body'],
'the quick brown fox...'.encode('utf-8'))
self.assertEqual(dict_['properties']['delivery_tag'], tag)
self.assertFalse('compression' in dict_['headers'])
class test_AbstractChannel(TestCase):
@ -304,8 +309,8 @@ class test_Channel(TestCase):
consumer_tag = uuid()
c.basic_consume(n + '2', False, consumer_tag=consumer_tag,
callback=lambda *a: None)
c.basic_consume(n + '2', False,
consumer_tag=consumer_tag, callback=lambda *a: None)
self.assertIn(n + '2', c._active_queues)
r2, _ = c.drain_events()
r2 = c.message_to_python(r2)
@ -406,7 +411,7 @@ class test_Channel(TestCase):
def test_lookup__undeliverable(self, n='test_lookup__undeliverable'):
warnings.resetwarnings()
with warnings.catch_warnings(record=True) as log:
with catch_warnings(record=True) as log:
self.assertListEqual(
self.channel._lookup(n, n, 'ae.undeliver'),
['ae.undeliver'],

View File

@ -1,4 +1,5 @@
from __future__ import absolute_import
from __future__ import with_statement
from kombu import Connection
from kombu.transport.virtual import exchange
@ -68,8 +69,10 @@ class test_Fanout(ExchangeCase):
class test_Topic(ExchangeCase):
type = exchange.TopicExchange
table = [('stock.#', None, 'rFoo'),
('stock.us.*', None, 'rBar')]
table = [
('stock.#', None, 'rFoo'),
('stock.us.*', None, 'rBar'),
]
def setUp(self):
super(test_Topic, self).setUp()
@ -125,18 +128,24 @@ class test_ExchangeType(ExchangeCase):
)
def test_equivalent(self):
e1 = dict(type='direct',
durable=True,
auto_delete=True,
arguments={})
e1 = dict(
type='direct',
durable=True,
auto_delete=True,
arguments={},
)
self.assertTrue(
self.e.equivalent(e1, 'eFoo', 'direct', True, True, {}))
self.e.equivalent(e1, 'eFoo', 'direct', True, True, {}),
)
self.assertFalse(
self.e.equivalent(e1, 'eFoo', 'topic', True, True, {}))
self.e.equivalent(e1, 'eFoo', 'topic', True, True, {}),
)
self.assertFalse(
self.e.equivalent(e1, 'eFoo', 'direct', False, True, {}))
self.e.equivalent(e1, 'eFoo', 'direct', False, True, {}),
)
self.assertFalse(
self.e.equivalent(e1, 'eFoo', 'direct', True, False, {}))
self.e.equivalent(e1, 'eFoo', 'direct', True, False, {}),
)
self.assertFalse(
self.e.equivalent(e1, 'eFoo', 'direct', True, True,
{'expires': 3000}),

View File

@ -1,4 +1,5 @@
from __future__ import absolute_import
from __future__ import with_statement
from kombu.transport.virtual.scheduling import FairCycle

View File

@ -1,4 +1,5 @@
from __future__ import absolute_import
from __future__ import with_statement
from mock import patch

View File

@ -1,4 +1,5 @@
from __future__ import absolute_import
from __future__ import with_statement
import logging

View File

@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
from __future__ import absolute_import, unicode_literals
from __future__ import absolute_import
from __future__ import with_statement
import sys
@ -45,16 +46,16 @@ class test_encoding_utils(TestCase):
def test_str_to_bytes(self):
with clean_encoding() as e:
self.assertIsInstance(e.str_to_bytes('foobar'), str)
self.assertIsInstance(e.str_to_bytes(u'foobar'), str)
self.assertIsInstance(e.str_to_bytes('foobar'), str)
def test_from_utf8(self):
with clean_encoding() as e:
self.assertIsInstance(e.from_utf8('foobar'), str)
self.assertIsInstance(e.from_utf8(u'foobar'), str)
def test_default_encode(self):
with clean_encoding() as e:
self.assertTrue(e.default_encode(b'foo'))
self.assertTrue(e.default_encode('foo'))
class test_safe_str(TestCase):
@ -63,10 +64,10 @@ class test_safe_str(TestCase):
self.assertEqual(safe_str('foo'), 'foo')
def test_when_unicode(self):
self.assertIsInstance(safe_str('foo'), str)
self.assertIsInstance(safe_str(u'foo'), str)
def test_when_containing_high_chars(self):
s = 'The quiæk fåx jømps øver the lazy dåg'
s = u'The quiæk fåx jømps øver the lazy dåg'
res = safe_str(s)
self.assertIsInstance(res, str)

View File

@ -1,10 +1,12 @@
from __future__ import absolute_import
import __builtin__
import os
import sys
import types
from functools import wraps
from StringIO import StringIO
import mock
@ -16,8 +18,6 @@ try:
except AttributeError:
import unittest2 as unittest # noqa
from kombu.five import StringIO, builtins, string_t, module_name_t
class TestCase(unittest.TestCase):
@ -76,8 +76,8 @@ def module_exists(*modules):
@wraps(fun)
def __inner(*args, **kwargs):
for module in modules:
if isinstance(module, string_t):
module = types.ModuleType(module_name_t(module))
if isinstance(module, basestring):
module = types.ModuleType(module)
sys.modules[module.__name__] = module
try:
return fun(*args, **kwargs)
@ -95,19 +95,19 @@ def mask_modules(*modnames):
@wraps(fun)
def __inner(*args, **kwargs):
realimport = builtins.__import__
realimport = __builtin__.__import__
def myimp(name, *args, **kwargs):
if name in modnames:
raise ImportError('No module named {0}'.format(name))
raise ImportError('No module named %s' % name)
else:
return realimport(name, *args, **kwargs)
builtins.__import__ = myimp
__builtin__.__import__ = myimp
try:
return fun(*args, **kwargs)
finally:
builtins.__import__ = realimport
__builtin__.__import__ = realimport
return __inner
return _inner
@ -120,7 +120,7 @@ def skip_if_environ(env_var_name):
@wraps(fun)
def _skips_if_environ(*args, **kwargs):
if os.environ.get(env_var_name):
raise SkipTest('SKIP {0}: {1} set'.format(
raise SkipTest('SKIP %s: %s set\n' % (
fun.__name__, env_var_name))
return fun(*args, **kwargs)
@ -135,7 +135,7 @@ def skip_if_module(module):
def _skip_if_module(*args, **kwargs):
try:
__import__(module)
raise SkipTest('SKIP {0}: {1} available'.format(
raise SkipTest('SKIP %s: %s available\n' % (
fun.__name__, module))
except ImportError:
pass
@ -151,7 +151,7 @@ def skip_if_not_module(module):
try:
__import__(module)
except ImportError:
raise SkipTest('SKIP {0}: {1} available'.format(
raise SkipTest('SKIP %s: %s available\n' % (
fun.__name__, module))
return fun(*args, **kwargs)
return _skip_if_not_module

View File

@ -240,6 +240,13 @@ class Channel(virtual.Channel):
return payload
raise Empty()
def _restore(self, message,
unwanted_delivery_info=('sqs_message', 'sqs_queue')):
for unwanted_key in unwanted_delivery_info:
# Remove objects that aren't JSON serializable (Issue #1108).
message.delivery_info.pop(unwanted_key, None)
return super(Channel, self)._restore(message)
def basic_ack(self, delivery_tag):
delivery_info = self.qos.get(delivery_tag).delivery_info
try:

View File

@ -131,9 +131,9 @@ class QoS(object):
def append(self, message, delivery_tag):
"""Append message to transactional state."""
self._delivered[delivery_tag] = message
if self._dirty:
self._flush()
self._delivered[delivery_tag] = message
def get(self, delivery_tag):
return self._delivered[delivery_tag]
@ -238,7 +238,7 @@ class Message(base.Message):
'properties': props,
'content-type': self.content_type,
'content-encoding': self.content_encoding,
'headers': self.headers}
'headers': headers}
class AbstractChannel(object):
@ -510,10 +510,13 @@ class Channel(AbstractChannel, base.StdChannel):
pass
self.connection._callbacks.pop(queue, None)
def basic_get(self, queue, **kwargs):
def basic_get(self, queue, no_ack=False, **kwargs):
"""Get message by direct access (synchronous)."""
try:
return self._get(queue)
message = self.Message(self, self._get(queue))
if not no_ack:
self.qos.append(message, message.delivery_tag)
return message
except Empty:
pass

View File

@ -8,7 +8,7 @@ def get_manager(client, hostname=None, port=None, userid=None,
def get(name, val, default):
return (val if val is not None
else opt.get('manager_%s' % name)
else opt('manager_%s' % name)
or getattr(client, name, None) or default)
host = get('hostname', hostname, 'localhost')

View File

@ -7,7 +7,7 @@ from paver.setuputils import setup # noqa
PYCOMPILE_CACHES = ['*.pyc', '*$py.class']
options(
sphinx=Bunch(builddir='.build'),
sphinx=Bunch(builddir='.build'),
)
@ -91,9 +91,15 @@ def readme(options):
])
def bump(options):
s = "-- '%s'" % (options.custom, ) \
<<<<<<< HEAD
if getattr(options, 'custom', None) else ''
sh('extra/release/bump_version.py \
kombu/__init__.py README.rst %s' % (s, ))
=======
if getattr(options, "custom", None) else ""
sh("extra/release/bump_version.py \
kombu/__init__.py README.rst %s" % (s, ))
>>>>>>> 2.5
@task
@ -129,6 +135,7 @@ def flake8(options):
}{exit $FOUND_FLAKE;
'""" % (complexity, migrations_path), ignore_error=noerror)
@task
@cmdopts([
('noerror', 'E', 'Ignore errors'),

View File

@ -86,8 +86,9 @@ for dirpath, dirnames, filenames in os.walk(src_dir):
if filename.endswith('.py'):
packages.append('.'.join(fullsplit(dirpath)))
else:
data_files.append([dirpath, [os.path.join(dirpath, f) for f in
filenames]])
data_files.append(
[dirpath, [os.path.join(dirpath, f) for f in filenames]],
)
if os.path.exists('README.rst'):
long_description = codecs.open('README.rst', 'r', 'utf-8').read()