diff --git a/AUTHORS b/AUTHORS index cc72758a..25214355 100644 --- a/AUTHORS +++ b/AUTHORS @@ -5,6 +5,7 @@ Adam Nelson Adam Wentz +Alex Koshelev Alexandre Bourget Andrew Watts Andrey Antukh diff --git a/docs/_ext/applyxrefs.py b/docs/_ext/applyxrefs.py index 027490b8..93222a33 100644 --- a/docs/_ext/applyxrefs.py +++ b/docs/_ext/applyxrefs.py @@ -5,9 +5,7 @@ import os testing = False -DONT_TOUCH = ( - './index.txt', - ) +DONT_TOUCH = ('./index.txt', ) def target_name(fn): diff --git a/docs/_ext/literals_to_xrefs.py b/docs/_ext/literals_to_xrefs.py index e9dc7ca0..f1497565 100644 --- a/docs/_ext/literals_to_xrefs.py +++ b/docs/_ext/literals_to_xrefs.py @@ -95,8 +95,10 @@ def fixliterals(fname): replace_type in ("class", "func", "meth"): default = default[:-2] replace_value = raw_input( - colorize("Text [", fg="yellow") + default + \ - colorize("]: ", fg="yellow")).strip() + colorize("Text [", fg="yellow") + + default + + colorize("]: ", fg="yellow"), + ).strip() if not replace_value: replace_value = default new.append(":%s:`%s`" % (replace_type, replace_value)) diff --git a/docs/conf.py b/docs/conf.py index 27534746..bf39ded2 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -62,8 +62,8 @@ html_use_modindex = True html_use_index = True latex_documents = [ - ('index', 'Kombu.tex', ur'Kombu Documentation', - ur'Ask Solem', 'manual'), + ('index', 'Kombu.tex', u'Kombu Documentation', + u'Ask Solem', 'manual'), ] html_theme = "celery" diff --git a/examples/simple_task_queue/client.py b/examples/simple_task_queue/client.py index 8ed33f36..8cad069e 100644 --- a/examples/simple_task_queue/client.py +++ b/examples/simple_task_queue/client.py @@ -14,10 +14,11 @@ def send_as_task(connection, fun, args=(), kwargs={}, priority='mid'): with producers[connection].acquire(block=True) as producer: maybe_declare(task_exchange, producer.channel) - producer.publish(payload, serializer='pickle', - compression='bzip2', - exchange=task_exchange, - routing_key=routing_key) + producer.publish(payload, + serializer='pickle', + compression='bzip2', + exchange=task_exchange, + routing_key=routing_key) if __name__ == '__main__': from kombu import Connection diff --git a/extra/release/bump_version.py b/extra/release/bump_version.py index abda578e..24d0ebcb 100755 --- a/extra/release/bump_version.py +++ b/extra/release/bump_version.py @@ -147,7 +147,7 @@ def bump(*files, **kwargs): v.write(next) print(cmd("git", "commit", "-m", "Bumps version to %s" % (to_str(next), ), - *[f.filename for f in files])) + *[f.filename for f in files])) print(cmd("git", "tag", "v%s" % (to_str(next), ))) diff --git a/extra/release/flakeplus.py b/extra/release/flakeplus.py index 6fe1f1fc..4e1efd47 100755 --- a/extra/release/flakeplus.py +++ b/extra/release/flakeplus.py @@ -37,7 +37,7 @@ class FlakePP(object): re_with = compile(RE_WITH) re_noqa = compile(RE_NOQA) map = {"abs": True, "print": False, - "with": False, "with-used": False} + "with": False, "with-used": False} def __init__(self, verbose=False): self.verbose = verbose diff --git a/funtests/setup.py b/funtests/setup.py index fdb7a98c..a7ad8b4e 100644 --- a/funtests/setup.py +++ b/funtests/setup.py @@ -8,7 +8,7 @@ except ImportError: from ez_setup import use_setuptools use_setuptools() from setuptools import setup # noqa - from setuptools.command.install import install # noqa + from setuptools.command.install import install # noqa class no_install(install): diff --git a/funtests/transport.py b/funtests/transport.py index 5924a72a..f6ae1e17 100644 --- a/funtests/transport.py +++ b/funtests/transport.py @@ -40,7 +40,7 @@ def consumeN(conn, consumer, n=1, timeout=30): except socket.timeout: seconds += 1 msg = "Received %s/%s messages. %s seconds passed." % ( - len(messages), n, seconds) + len(messages), n, seconds) if seconds >= timeout: raise socket.timeout(msg) if seconds > 1: @@ -113,7 +113,7 @@ class TransportCase(unittest.TestCase): self.after_connect(self.connection) except self.connection.connection_errors: self.skip_test_reason = "%s transport can't connect" % ( - self.transport, ) + self.transport, ) else: self.connected = True @@ -161,14 +161,15 @@ class TransportCase(unittest.TestCase): return _digest(data).hexdigest() @skip_if_quick - def test_produce__consume_large_messages(self, bytes=1048576, n=10, + def test_produce__consume_large_messages( + self, bytes=1048576, n=10, charset=string.punctuation + string.letters + string.digits): if not self.verify_alive(): return bytes = min(filter(None, [bytes, self.message_size_limit])) messages = ["".join(random.choice(charset) - for j in xrange(bytes)) + "--%s" % n - for i in xrange(n)] + for j in xrange(bytes)) + "--%s" % n + for i in xrange(n)] digests = [] chan1 = self.connection.channel() consumer = chan1.Consumer(self.queue) @@ -180,7 +181,7 @@ class TransportCase(unittest.TestCase): digests.append(self._digest(message)) received = [(msg["i"], msg["text"]) - for msg in consumeN(self.connection, consumer, n)] + for msg in consumeN(self.connection, consumer, n)] self.assertEqual(len(received), n) ordering = [i for i, _ in received] if ordering != range(n) and not self.suppress_disorder_warning: @@ -229,8 +230,9 @@ class TransportCase(unittest.TestCase): chan = self.connection.channel() self.purge([self.queue.name]) consumer = chan.Consumer(self.queue) - self.assertRaises(socket.timeout, self.connection.drain_events, - timeout=0.3) + self.assertRaises( + socket.timeout, self.connection.drain_events, timeout=0.3, + ) consumer.cancel() chan.close() diff --git a/kombu/messaging.py b/kombu/messaging.py index b3fba4bc..e39625a3 100644 --- a/kombu/messaging.py +++ b/kombu/messaging.py @@ -380,7 +380,10 @@ class Consumer(object): return self def __exit__(self, *exc_info): - self.cancel() + try: + self.cancel() + except Exception: + pass def add_queue(self, queue): queue = queue(self.channel) diff --git a/kombu/tests/__init__.py b/kombu/tests/__init__.py index f6db3faa..6a13a750 100644 --- a/kombu/tests/__init__.py +++ b/kombu/tests/__init__.py @@ -1,4 +1,4 @@ -from __future__ import absolute_import, print_function +from __future__ import absolute_import import anyjson import os @@ -33,8 +33,7 @@ def find_distribution_modules(name=__name__, file=__file__): def import_all_modules(name=__name__, file=__file__, skip=[]): for module in find_distribution_modules(name, file): if module not in skip: - print('preimporting {0} for coverage...'.format(module), - file=sys.stderr) + print('preimporting %r for coverage...' % (module, )) try: __import__(module) except (ImportError, VersionMismatch, AttributeError): diff --git a/kombu/tests/mocks.py b/kombu/tests/mocks.py index 5366226a..4d38da56 100644 --- a/kombu/tests/mocks.py +++ b/kombu/tests/mocks.py @@ -27,7 +27,7 @@ class Channel(base.StdChannel): def __init__(self, connection): self.connection = connection self.called = [] - self.deliveries = count(1) + self.deliveries = count(1).next self.to_deliver = [] self.events = {'basic_return': set()} self.channel_id = self._ids() @@ -105,7 +105,7 @@ class Channel(base.StdChannel): def message_to_python(self, message, *args, **kwargs): self._called('message_to_python') return Message(self, body=anyjson.dumps(message), - delivery_tag=next(self.deliveries), + delivery_tag=self.deliveries(), throw_decode_error=self.throw_decode_error, content_type='application/json', content_encoding='utf-8') diff --git a/kombu/tests/test_common.py b/kombu/tests/test_common.py index 6f1311d3..978687e3 100644 --- a/kombu/tests/test_common.py +++ b/kombu/tests/test_common.py @@ -1,4 +1,5 @@ from __future__ import absolute_import +from __future__ import with_statement import socket @@ -156,13 +157,13 @@ class test_replies(TestCase): body, message = Mock(), Mock() itermessages.return_value = [(body, message)] it = collect_replies(conn, channel, queue, no_ack=False) - m = next(it) + m = it.next() self.assertIs(m, body) itermessages.assert_called_with(conn, channel, queue, no_ack=False) message.ack.assert_called_with() with self.assertRaises(StopIteration): - next(it) + it.next() channel.after_reply_message_received.assert_called_with(queue.name) @@ -172,7 +173,7 @@ class test_replies(TestCase): body, message = Mock(), Mock() itermessages.return_value = [(body, message)] it = collect_replies(conn, channel, queue) - m = next(it) + m = it.next() self.assertIs(m, body) itermessages.assert_called_with(conn, channel, queue, no_ack=True) self.assertFalse(message.ack.called) @@ -183,7 +184,7 @@ class test_replies(TestCase): itermessages.return_value = [] it = collect_replies(conn, channel, queue) with self.assertRaises(StopIteration): - next(it) + it.next() self.assertFalse(channel.after_reply_message_received.called) @@ -317,11 +318,11 @@ class test_itermessages(TestCase): it = common.itermessages(conn, channel, 'q', limit=1, Consumer=MockConsumer) - ret = next(it) + ret = it.next() self.assertTupleEqual(ret, ('body', 'message')) with self.assertRaises(StopIteration): - next(it) + it.next() def test_when_raises_socket_timeout(self): conn = self.MockConnection() @@ -332,7 +333,7 @@ class test_itermessages(TestCase): Consumer=MockConsumer) with self.assertRaises(StopIteration): - next(it) + it.next() @patch('kombu.common.deque') def test_when_raises_IndexError(self, deque): @@ -344,7 +345,7 @@ class test_itermessages(TestCase): Consumer=MockConsumer) with self.assertRaises(StopIteration): - next(it) + it.next() class test_entry_to_queue(TestCase): diff --git a/kombu/tests/test_compat.py b/kombu/tests/test_compat.py index 790e76ce..7e80e21f 100644 --- a/kombu/tests/test_compat.py +++ b/kombu/tests/test_compat.py @@ -1,4 +1,5 @@ from __future__ import absolute_import +from __future__ import with_statement from mock import patch @@ -30,7 +31,7 @@ class test_misc(TestCase): conn = MyConnection() consumer = Consumer() it = compat._iterconsume(conn, consumer) - self.assertEqual(next(it), 1) + self.assertEqual(it.next(), 1) self.assertTrue(consumer.active) it2 = compat._iterconsume(conn, consumer, limit=10) @@ -242,7 +243,7 @@ class test_Consumer(TestCase): c = C(self.connection, queue=n, exchange=n, routing_key='rkey') - self.assertEqual(c.wait(10), list(range(10))) + self.assertEqual(c.wait(10), range(10)) c.close() def test_iterqueue(self, n='test_iterqueue'): @@ -257,7 +258,7 @@ class test_Consumer(TestCase): c = C(self.connection, queue=n, exchange=n, routing_key='rkey') - self.assertEqual(list(c.iterqueue(limit=10)), list(range(10))) + self.assertEqual(list(c.iterqueue(limit=10)), range(10)) c.close() diff --git a/kombu/tests/test_connection.py b/kombu/tests/test_connection.py index d0aac2ed..f3c4120a 100644 --- a/kombu/tests/test_connection.py +++ b/kombu/tests/test_connection.py @@ -1,4 +1,5 @@ from __future__ import absolute_import +from __future__ import with_statement import errno import pickle @@ -10,7 +11,6 @@ from nose import SkipTest from kombu import Connection, Consumer, Producer, parse_url from kombu.connection import Resource -from kombu.five import items from .mocks import Transport from .utils import TestCase @@ -43,7 +43,7 @@ class test_connection_utils(TestCase): def test_parse_generated_as_uri(self): conn = Connection(self.url) info = conn.info() - for k, v in items(self.expected): + for k, v in self.expected.items(): self.assertEqual(info[k], v) # by default almost the same- no password self.assertEqual(conn.as_uri(), self.nopass) @@ -60,7 +60,7 @@ class test_connection_utils(TestCase): def assert_info(self, conn, **fields): info = conn.info() - for field, expected in items(fields): + for field, expected in fields.iteritems(): self.assertEqual(info[field], expected) def test_rabbitmq_example_urls(self): @@ -505,7 +505,7 @@ class ResourceCase(TestCase): return P = self.create_resource(10, 0) self.assertState(P, 10, 0) - chans = [P.acquire() for _ in range(10)] + chans = [P.acquire() for _ in xrange(10)] self.assertState(P, 0, 10) with self.assertRaises(P.LimitExceeded): P.acquire() diff --git a/kombu/tests/test_entities.py b/kombu/tests/test_entities.py index ff1a160f..8e89f84e 100644 --- a/kombu/tests/test_entities.py +++ b/kombu/tests/test_entities.py @@ -1,4 +1,5 @@ from __future__ import absolute_import +from __future__ import with_statement import pickle @@ -243,7 +244,8 @@ class test_Queue(TestCase): arguments=None, type='direct', durable=True, - ), chan.exchange_declare.call_args_list, + ), + chan.exchange_declare.call_args_list, ) def test_can_cache_declaration(self): @@ -265,9 +267,8 @@ class test_Queue(TestCase): ) def test_binds_at_instantiation(self): - self.assertTrue( - Queue('foo', self.exchange, channel=get_conn().channel()).is_bound, - ) + self.assertTrue(Queue('foo', self.exchange, + channel=get_conn().channel()).is_bound) def test_also_binds_exchange(self): chan = get_conn().channel() diff --git a/kombu/tests/test_messaging.py b/kombu/tests/test_messaging.py index 256352b3..0fb8a657 100644 --- a/kombu/tests/test_messaging.py +++ b/kombu/tests/test_messaging.py @@ -1,4 +1,5 @@ -from __future__ import absolute_import, unicode_literals +from __future__ import absolute_import +from __future__ import with_statement import anyjson import pickle @@ -67,7 +68,7 @@ class test_Producer(TestCase): 'p.declare() declares exchange') def test_prepare(self): - message = {'the quick brown fox': 'jumps over the lazy dog'} + message = {u'the quick brown fox': u'jumps over the lazy dog'} channel = self.connection.channel() p = Producer(channel, self.exchange, serializer='json') m, ctype, cencoding = p._prepare(message, headers={}) @@ -76,7 +77,7 @@ class test_Producer(TestCase): self.assertEqual(cencoding, 'utf-8') def test_prepare_compression(self): - message = {'the quick brown fox': 'jumps over the lazy dog'} + message = {u'the quick brown fox': u'jumps over the lazy dog'} channel = self.connection.channel() p = Producer(channel, self.exchange, serializer='json') headers = {} @@ -106,16 +107,15 @@ class test_Producer(TestCase): self.assertEqual(cencoding, 'alien') def test_prepare_is_already_unicode(self): - message = 'the quick brown fox' + message = u'the quick brown fox' channel = self.connection.channel() p = Producer(channel, self.exchange, serializer='json') m, ctype, cencoding = p._prepare(message, content_type='text/plain') self.assertEqual(m, message.encode('utf-8')) self.assertEqual(ctype, 'text/plain') self.assertEqual(cencoding, 'utf-8') - m, ctype, cencoding = p._prepare( - message, content_type='text/plain', content_encoding='utf-8', - ) + m, ctype, cencoding = p._prepare(message, content_type='text/plain', + content_encoding='utf-8') self.assertEqual(m, message.encode('utf-8')) self.assertEqual(ctype, 'text/plain') self.assertEqual(cencoding, 'utf-8') @@ -177,7 +177,7 @@ class test_Producer(TestCase): def test_publish(self): channel = self.connection.channel() p = Producer(channel, self.exchange, serializer='json') - message = {'the quick brown fox': 'jumps over the lazy dog'} + message = {u'the quick brown fox': u'jumps over the lazy dog'} ret = p.publish(message, routing_key='process') self.assertIn('prepare_message', channel) self.assertIn('basic_publish', channel) @@ -409,11 +409,11 @@ class test_Consumer(TestCase): message.payload # trigger cache consumer.register_callback(callback) - consumer._receive_callback({'foo': 'bar'}) + consumer._receive_callback({u'foo': u'bar'}) self.assertIn('basic_ack', channel) self.assertIn('message_to_python', channel) - self.assertEqual(received[0], {'foo': 'bar'}) + self.assertEqual(received[0], {u'foo': u'bar'}) def test_basic_ack_twice(self): channel = self.connection.channel() diff --git a/kombu/tests/test_pidbox.py b/kombu/tests/test_pidbox.py index 1eaec780..d250eedb 100644 --- a/kombu/tests/test_pidbox.py +++ b/kombu/tests/test_pidbox.py @@ -1,4 +1,5 @@ from __future__ import absolute_import +from __future__ import with_statement import socket @@ -28,9 +29,11 @@ class test_Mailbox(TestCase): self.handlers = {'mymethod': self._handler} self.bound = self.mailbox(self.connection) self.default_chan = self.connection.channel() - self.node = self.bound.Node('test_pidbox', state=self.state, - handlers=self.handlers, - channel=self.default_chan) + self.node = self.bound.Node( + 'test_pidbox', + state=self.state, handlers=self.handlers, + channel=self.default_chan, + ) def test_reply__collect(self): mailbox = pidbox.Mailbox('test_reply__collect')(self.connection) diff --git a/kombu/tests/test_pools.py b/kombu/tests/test_pools.py index a9386602..1ec34395 100644 --- a/kombu/tests/test_pools.py +++ b/kombu/tests/test_pools.py @@ -1,4 +1,4 @@ -from __future__ import absolute_import +from __future__ import with_statement from kombu import Connection, Producer from kombu import pools diff --git a/kombu/tests/test_serialization.py b/kombu/tests/test_serialization.py index 3d920cf8..de7ed343 100644 --- a/kombu/tests/test_serialization.py +++ b/kombu/tests/test_serialization.py @@ -1,29 +1,22 @@ #!/usr/bin/python # -*- coding: utf-8 -*- -from __future__ import absolute_import, unicode_literals +from __future__ import absolute_import +from __future__ import with_statement -from kombu.serialization import ( - registry, - register, - SerializerNotInstalled, - raw_encode, - register_yaml, - register_msgpack, - decode, - bytes_t, - pickle, - pickle_protocol, - unregister, - register_pickle, -) +import sys + +from kombu.serialization import (registry, register, SerializerNotInstalled, + raw_encode, register_yaml, register_msgpack, + decode, bytes_t, pickle, pickle_protocol, + unregister, register_pickle) from .utils import TestCase from .utils import mask_modules, skip_if_not_module # For content_encoding tests -unicode_string = 'abcdé\u8463' +unicode_string = u'abcdé\u8463' unicode_string_as_utf8 = unicode_string.encode('utf-8') -latin_string = 'abcdé' +latin_string = u'abcdé' latin_string_as_latin1 = latin_string.encode('latin-1') latin_string_as_utf8 = latin_string.encode('utf-8') @@ -33,39 +26,48 @@ py_data = { 'string': 'The quick brown fox jumps over the lazy dog', 'int': 10, 'float': 3.14159265, - 'unicode': 'Thé quick brown fox jumps over thé lazy dog', + 'unicode': u'Thé quick brown fox jumps over thé lazy dog', 'list': ['george', 'jerry', 'elaine', 'cosmo'], } # JSON serialization tests -json_data = ('{"int": 10, "float": 3.1415926500000002, ' - '"list": ["george", "jerry", "elaine", "cosmo"], ' - '"string": "The quick brown fox jumps over the lazy ' - 'dog", "unicode": "Th\\u00e9 quick brown fox jumps over ' - 'th\\u00e9 lazy dog"}') +json_data = """\ +{"int": 10, "float": 3.1415926500000002, \ +"list": ["george", "jerry", "elaine", "cosmo"], \ +"string": "The quick brown fox jumps over the lazy \ +dog", "unicode": "Th\\u00e9 quick brown fox jumps over \ +th\\u00e9 lazy dog"}\ +""" # Pickle serialization tests pickle_data = pickle.dumps(py_data, protocol=pickle_protocol) # YAML serialization tests -yaml_data = ('float: 3.1415926500000002\nint: 10\n' - 'list: [george, jerry, elaine, cosmo]\n' - 'string: The quick brown fox jumps over the lazy dog\n' - 'unicode: "Th\\xE9 quick brown fox ' - 'jumps over th\\xE9 lazy dog"\n') +yaml_data = """\ +float: 3.1415926500000002 +int: 10 +list: [george, jerry, elaine, cosmo] +string: The quick brown fox jumps over the lazy dog +unicode: "Th\\xE9 quick brown fox jumps over th\\xE9 lazy dog" +""" -msgpack_py_data = { - 'string': 'The quick brown fox jumps over the lazy dog', - 'int': 10, - 'float': 3.14159265, - 'unicode': 'Thé quick brown fox jumps over thé lazy dog', - 'list': ['george', 'jerry', 'elaine', 'cosmo'], -} +msgpack_py_data = dict(py_data) # msgpack only supports tuples msgpack_py_data['list'] = tuple(msgpack_py_data['list']) # Unicode chars are lost in transmit :( msgpack_py_data['unicode'] = 'Th quick brown fox jumps over th lazy dog' +msgpack_data = """\ +\x85\xa3int\n\xa5float\xcb@\t!\xfbS\xc8\xd4\xf1\xa4list\ +\x94\xa6george\xa5jerry\xa6elaine\xa5cosmo\xa6string\xda\ +\x00+The quick brown fox jumps over the lazy dog\xa7unicode\ +\xda\x00)Th quick brown fox jumps over th lazy dog\ +""" + + +def say(m): + sys.stderr.write('%s\n' % (m, )) + registry.register('testS', lambda s: s, lambda s: 'decoded', 'application/testS', 'utf-8') @@ -91,11 +93,13 @@ class test_Serialization(TestCase): registry.disable('testS') with self.assertRaises(SerializerNotInstalled): - registry.decode('xxd', 'application/testS', 'utf-8', - force=False) + registry.decode( + 'xxd', 'application/testS', 'utf-8', force=False, + ) - ret = registry.decode('xxd', 'application/testS', 'utf-8', - force=True) + ret = registry.decode( + 'xxd', 'application/testS', 'utf-8', force=True, + ) self.assertEqual(ret, 'decoded') finally: disabled.clear() @@ -106,17 +110,15 @@ class test_Serialization(TestCase): def test_content_type_decoding(self): self.assertEqual( unicode_string, - registry.decode( - unicode_string_as_utf8, - content_type='plain/text', - content_encoding='utf-8'), + registry.decode(unicode_string_as_utf8, + content_type='plain/text', + content_encoding='utf-8'), ) self.assertEqual( latin_string, - registry.decode( - latin_string_as_latin1, - content_type='application/data', - content_encoding='latin-1'), + registry.decode(latin_string_as_latin1, + content_type='application/data', + content_encoding='latin-1'), ) def test_content_type_binary(self): @@ -158,10 +160,9 @@ class test_Serialization(TestCase): def test_json_decode(self): self.assertEqual( py_data, - registry.decode( - json_data, - content_type='application/json', - content_encoding='utf-8'), + registry.decode(json_data, + content_type='application/json', + content_encoding='utf-8'), ) def test_json_encode(self): @@ -174,18 +175,35 @@ class test_Serialization(TestCase): registry.decode( json_data, content_type='application/json', - content_encoding='utf-8'), + content_encoding='utf-8', + ), ) @skip_if_not_module('msgpack') def test_msgpack_decode(self): register_msgpack() - decoded = registry.decode( - registry.encode(msgpack_py_data, serializer='msgpack')[-1], - content_type='application/x-msgpack', - content_encoding='binary', + self.assertEqual( + msgpack_py_data, + registry.decode(msgpack_data, + content_type='application/x-msgpack', + content_encoding='binary'), + ) + + @skip_if_not_module('msgpack') + def test_msgpack_encode(self): + register_msgpack() + self.assertEqual( + registry.decode( + registry.encode(msgpack_py_data, serializer='msgpack')[-1], + content_type='application/x-msgpack', + content_encoding='binary', + ), + registry.decode( + msgpack_data, + content_type='application/x-msgpack', + content_encoding='binary', + ), ) - self.assertEqual(msgpack_py_data, decoded) @skip_if_not_module('yaml') def test_yaml_decode(self): @@ -209,16 +227,16 @@ class test_Serialization(TestCase): registry.decode( yaml_data, content_type='application/x-yaml', - content_encoding='utf-8'), + content_encoding='utf-8', + ), ) def test_pickle_decode(self): self.assertEqual( py_data, - registry.decode( - pickle_data, - content_type='application/x-python-serialize', - content_encoding='binary'), + registry.decode(pickle_data, + content_type='application/x-python-serialize', + content_encoding='binary'), ) def test_pickle_encode(self): @@ -248,9 +266,10 @@ class test_Serialization(TestCase): registry.encode('foo', serializer='nonexisting') def test_raw_encode(self): - self.assertTupleEqual(raw_encode('foo'.encode('utf-8')), - ('application/data', 'binary', - 'foo'.encode('utf-8'))) + self.assertTupleEqual( + raw_encode('foo'.encode('utf-8')), + ('application/data', 'binary', 'foo'.encode('utf-8')), + ) @mask_modules('yaml') def test_register_yaml__no_yaml(self): diff --git a/kombu/tests/test_simple.py b/kombu/tests/test_simple.py index 381d8a20..26b09080 100644 --- a/kombu/tests/test_simple.py +++ b/kombu/tests/test_simple.py @@ -1,7 +1,9 @@ from __future__ import absolute_import +from __future__ import with_statement + +from Queue import Empty from kombu import Connection, Exchange, Queue -from kombu.five import Empty from .utils import TestCase from .utils import Mock diff --git a/kombu/tests/test_utils.py b/kombu/tests/test_utils.py index 338c58e7..298fc2bc 100644 --- a/kombu/tests/test_utils.py +++ b/kombu/tests/test_utils.py @@ -1,14 +1,19 @@ -from __future__ import absolute_import, unicode_literals +from __future__ import absolute_import +from __future__ import with_statement import pickle import sys from functools import wraps -from io import BytesIO, StringIO from mock import Mock, patch +if sys.version_info >= (3, 0): + from io import StringIO, BytesIO +else: + from StringIO import StringIO, StringIO as BytesIO # noqa + from kombu import utils -from kombu.five import string_t +from kombu.utils.compat import next from .utils import ( TestCase, @@ -57,7 +62,7 @@ class test_utils(TestCase): [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0]) def test_reprkwargs(self): - self.assertTrue(utils.reprkwargs({'foo': 'bar', 1: 2, 'k': 'v'})) + self.assertTrue(utils.reprkwargs({'foo': 'bar', 1: 2, u'k': 'v'})) def test_reprcall(self): self.assertTrue( @@ -88,7 +93,7 @@ class test_UUID(TestCase): self.assertIsNone(ctypes) tid = uuid() self.assertTrue(tid) - self.assertIsInstance(tid, string_t) + self.assertIsInstance(tid, basestring) try: with_ctypes_masked() @@ -103,8 +108,8 @@ class test_Misc(TestCase): def f(**kwargs): return kwargs - kw = {'foo': 'foo', - 'bar': 'bar'} + kw = {u'foo': 'foo', + u'bar': 'bar'} self.assertTrue(f(**utils.kwdict(kw))) @@ -142,8 +147,7 @@ class test_emergency_dump_state(TestCase): {'foo': 'bar'}, open_file=lambda n, m: fh, dump=raise_something ) - # replace at the end removes u' from repr on Py2 - self.assertIn("'foo': 'bar'", fh.getvalue().replace("u'", "'")) + self.assertIn("'foo': 'bar'", fh.getvalue()) self.assertTrue(stderr.getvalue()) self.assertFalse(stdout.getvalue()) @@ -196,8 +200,8 @@ class test_retry_over_time(TestCase): utils.count.return_value = range(10) cb = Mock() x = utils.retry_over_time(self.myfun, self.Predicate, - errback=self.errback, interval_max=14, - callback=cb) + errback=self.errback, callback=cb, + interval_max=14) self.assertEqual(x, 42) self.assertEqual(self.index, 9) cb.assert_called_with() @@ -287,8 +291,10 @@ class test_symbol_by_name(TestCase): def test_returns_default(self): default = object() - self.assertIs(utils.symbol_by_name('xyz.ryx.qedoa.weq:foz', - default=default), default) + self.assertIs( + utils.symbol_by_name('xyz.ryx.qedoa.weq:foz', default=default), + default, + ) def test_no_default(self): with self.assertRaises(ImportError): @@ -302,17 +308,19 @@ class test_symbol_by_name(TestCase): def test_package(self): from kombu.entity import Exchange - self.assertIs(utils.symbol_by_name('.entity:Exchange', - package='kombu'), Exchange) + self.assertIs( + utils.symbol_by_name('.entity:Exchange', package='kombu'), + Exchange, + ) self.assertTrue(utils.symbol_by_name(':Consumer', package='kombu')) class test_ChannelPromise(TestCase): def test_repr(self): - self.assertIn( - "'foo'", + self.assertEqual( repr(utils.ChannelPromise(lambda: 'foo')), + "", ) diff --git a/kombu/tests/transport/test_amqplib.py b/kombu/tests/transport/test_amqplib.py index 626050fc..185cc349 100644 --- a/kombu/tests/transport/test_amqplib.py +++ b/kombu/tests/transport/test_amqplib.py @@ -57,11 +57,10 @@ class test_Channel(amqplibCase): self.assertFalse(self.channel.no_ack_consumers) def test_prepare_message(self): - x = self.channel.prepare_message( + self.assertTrue(self.channel.prepare_message( 'foobar', 10, 'application/data', 'utf-8', properties={}, - ) - self.assertTrue(x) + )) def test_message_to_python(self): message = Mock() diff --git a/kombu/tests/transport/test_base.py b/kombu/tests/transport/test_base.py index 2ba7156a..c34871d7 100644 --- a/kombu/tests/transport/test_base.py +++ b/kombu/tests/transport/test_base.py @@ -1,4 +1,5 @@ from __future__ import absolute_import +from __future__ import with_statement from kombu import Connection, Consumer, Producer, Queue from kombu.transport.base import Message, StdChannel, Transport diff --git a/kombu/tests/transport/test_filesystem.py b/kombu/tests/transport/test_filesystem.py index 7bdad8f5..b1d7c0cd 100644 --- a/kombu/tests/transport/test_filesystem.py +++ b/kombu/tests/transport/test_filesystem.py @@ -1,4 +1,5 @@ from __future__ import absolute_import +from __future__ import with_statement import tempfile diff --git a/kombu/tests/transport/test_memory.py b/kombu/tests/transport/test_memory.py index 970681d7..d138e494 100644 --- a/kombu/tests/transport/test_memory.py +++ b/kombu/tests/transport/test_memory.py @@ -1,4 +1,5 @@ from __future__ import absolute_import +from __future__ import with_statement import socket diff --git a/kombu/tests/transport/test_pyamqp.py b/kombu/tests/transport/test_pyamqp.py index 21764c83..bd4e86bf 100644 --- a/kombu/tests/transport/test_pyamqp.py +++ b/kombu/tests/transport/test_pyamqp.py @@ -1,4 +1,5 @@ from __future__ import absolute_import +from __future__ import with_statement import sys @@ -49,11 +50,10 @@ class test_Channel(TestCase): self.assertFalse(self.channel.no_ack_consumers) def test_prepare_message(self): - x = self.channel.prepare_message( + self.assertTrue(self.channel.prepare_message( 'foobar', 10, 'application/data', 'utf-8', properties={}, - ) - self.assertTrue(x) + )) def test_message_to_python(self): message = Mock() diff --git a/kombu/tests/transport/test_redis.py b/kombu/tests/transport/test_redis.py index 8cb8fab5..90e3a318 100644 --- a/kombu/tests/transport/test_redis.py +++ b/kombu/tests/transport/test_redis.py @@ -1,4 +1,5 @@ from __future__ import absolute_import +from __future__ import with_statement import socket import types @@ -6,10 +7,10 @@ import types from anyjson import dumps from collections import defaultdict from itertools import count +from Queue import Empty, Queue as _Queue from kombu import Connection, Exchange, Queue, Consumer, Producer from kombu.exceptions import InconsistencyError, VersionMismatch -from kombu.five import Empty, Queue as _Queue from kombu.utils import eventio # patch poll from kombu.tests.utils import TestCase @@ -128,10 +129,10 @@ class Client(object): class _socket(object): blocking = True - filenos = count(30) + next_fileno = count(30).next def __init__(self, *args): - self._fileno = next(self.filenos) + self._fileno = self.next_fileno() self.data = [] def fileno(self): @@ -264,24 +265,28 @@ class test_Channel(TestCase): self.assertDictEqual( self.channel._handle_message( self.channel.subclient, - ['pmessage', 'pattern', 'channel', 'data'] + ['pmessage', 'pattern', 'channel', 'data'], ), - {'type': 'pmessage', - 'pattern': 'pattern', - 'channel': 'channel', - 'data': 'data'}, + { + 'type': 'pmessage', + 'pattern': 'pattern', + 'channel': 'channel', + 'data': 'data', + }, ) def test_handle_message(self): self.assertDictEqual( self.channel._handle_message( self.channel.subclient, - ['type', 'channel', 'data'] + ['type', 'channel', 'data'], ), - {'type': 'type', - 'pattern': None, - 'channel': 'channel', - 'data': 'data'}, + { + 'type': 'type', + 'pattern': None, + 'channel': 'channel', + 'data': 'data', + }, ) def test_brpop_start_but_no_queues(self): @@ -290,9 +295,8 @@ class test_Channel(TestCase): def test_receive(self): s = self.channel.subclient = Mock() self.channel._fanout_to_queue['a'] = 'b' - s.parse_response.return_value = [ - 'message', 'a', dumps({'hello': 'world'}), - ] + s.parse_response.return_value = ['message', 'a', + dumps({'hello': 'world'})] payload, queue = self.channel._receive() self.assertDictEqual(payload, {'hello': 'world'}) self.assertEqual(queue, 'b') diff --git a/kombu/tests/transport/test_sqlalchemy.py b/kombu/tests/transport/test_sqlalchemy.py index f3d7d239..27cc4323 100644 --- a/kombu/tests/transport/test_sqlalchemy.py +++ b/kombu/tests/transport/test_sqlalchemy.py @@ -1,4 +1,5 @@ from __future__ import absolute_import +from __future__ import with_statement from mock import patch from nose import SkipTest diff --git a/kombu/tests/transport/test_transport.py b/kombu/tests/transport/test_transport.py index 608b9767..11cdcbe3 100644 --- a/kombu/tests/transport/test_transport.py +++ b/kombu/tests/transport/test_transport.py @@ -1,4 +1,5 @@ from __future__ import absolute_import +from __future__ import with_statement from mock import patch diff --git a/kombu/tests/transport/virtual/test_base.py b/kombu/tests/transport/virtual/test_base.py index 571c6e17..28714cc4 100644 --- a/kombu/tests/transport/virtual/test_base.py +++ b/kombu/tests/transport/virtual/test_base.py @@ -1,4 +1,5 @@ from __future__ import absolute_import +from __future__ import with_statement import warnings @@ -8,7 +9,9 @@ from kombu import Connection from kombu.exceptions import StdChannelError from kombu.transport import virtual from kombu.utils import uuid +from kombu.compression import compress +from kombu.tests.compat import catch_warnings from kombu.tests.utils import TestCase from kombu.tests.utils import Mock, redirect_stdouts @@ -64,7 +67,7 @@ class test_QoS(TestCase): self.q.append(i + 1, uuid()) self.assertFalse(self.q.can_consume()) - tag1 = next(iter(self.q._delivered)) + tag1 = iter(self.q._delivered).next() self.q.ack(tag1) self.assertTrue(self.q.can_consume()) @@ -120,13 +123,15 @@ class test_Message(TestCase): def test_serializable(self): c = client().channel() - data = c.prepare_message('the quick brown fox...') + body, content_type = compress('the quick brown fox...', 'gzip') + data = c.prepare_message(body, headers={'compression': content_type}) tag = data['properties']['delivery_tag'] = uuid() message = c.message_to_python(data) dict_ = message.serializable() self.assertEqual(dict_['body'], 'the quick brown fox...'.encode('utf-8')) self.assertEqual(dict_['properties']['delivery_tag'], tag) + self.assertFalse('compression' in dict_['headers']) class test_AbstractChannel(TestCase): @@ -304,8 +309,8 @@ class test_Channel(TestCase): consumer_tag = uuid() - c.basic_consume(n + '2', False, consumer_tag=consumer_tag, - callback=lambda *a: None) + c.basic_consume(n + '2', False, + consumer_tag=consumer_tag, callback=lambda *a: None) self.assertIn(n + '2', c._active_queues) r2, _ = c.drain_events() r2 = c.message_to_python(r2) @@ -406,7 +411,7 @@ class test_Channel(TestCase): def test_lookup__undeliverable(self, n='test_lookup__undeliverable'): warnings.resetwarnings() - with warnings.catch_warnings(record=True) as log: + with catch_warnings(record=True) as log: self.assertListEqual( self.channel._lookup(n, n, 'ae.undeliver'), ['ae.undeliver'], diff --git a/kombu/tests/transport/virtual/test_exchange.py b/kombu/tests/transport/virtual/test_exchange.py index d59aaa06..90127ba0 100644 --- a/kombu/tests/transport/virtual/test_exchange.py +++ b/kombu/tests/transport/virtual/test_exchange.py @@ -1,4 +1,5 @@ from __future__ import absolute_import +from __future__ import with_statement from kombu import Connection from kombu.transport.virtual import exchange @@ -68,8 +69,10 @@ class test_Fanout(ExchangeCase): class test_Topic(ExchangeCase): type = exchange.TopicExchange - table = [('stock.#', None, 'rFoo'), - ('stock.us.*', None, 'rBar')] + table = [ + ('stock.#', None, 'rFoo'), + ('stock.us.*', None, 'rBar'), + ] def setUp(self): super(test_Topic, self).setUp() @@ -125,18 +128,24 @@ class test_ExchangeType(ExchangeCase): ) def test_equivalent(self): - e1 = dict(type='direct', - durable=True, - auto_delete=True, - arguments={}) + e1 = dict( + type='direct', + durable=True, + auto_delete=True, + arguments={}, + ) self.assertTrue( - self.e.equivalent(e1, 'eFoo', 'direct', True, True, {})) + self.e.equivalent(e1, 'eFoo', 'direct', True, True, {}), + ) self.assertFalse( - self.e.equivalent(e1, 'eFoo', 'topic', True, True, {})) + self.e.equivalent(e1, 'eFoo', 'topic', True, True, {}), + ) self.assertFalse( - self.e.equivalent(e1, 'eFoo', 'direct', False, True, {})) + self.e.equivalent(e1, 'eFoo', 'direct', False, True, {}), + ) self.assertFalse( - self.e.equivalent(e1, 'eFoo', 'direct', True, False, {})) + self.e.equivalent(e1, 'eFoo', 'direct', True, False, {}), + ) self.assertFalse( self.e.equivalent(e1, 'eFoo', 'direct', True, True, {'expires': 3000}), diff --git a/kombu/tests/transport/virtual/test_scheduling.py b/kombu/tests/transport/virtual/test_scheduling.py index 1a6c32bc..6f2d24f1 100644 --- a/kombu/tests/transport/virtual/test_scheduling.py +++ b/kombu/tests/transport/virtual/test_scheduling.py @@ -1,4 +1,5 @@ from __future__ import absolute_import +from __future__ import with_statement from kombu.transport.virtual.scheduling import FairCycle diff --git a/kombu/tests/utilities/test_amq_manager.py b/kombu/tests/utilities/test_amq_manager.py index e030810e..ccf4ec08 100644 --- a/kombu/tests/utilities/test_amq_manager.py +++ b/kombu/tests/utilities/test_amq_manager.py @@ -1,4 +1,5 @@ from __future__ import absolute_import +from __future__ import with_statement from mock import patch diff --git a/kombu/tests/utilities/test_debug.py b/kombu/tests/utilities/test_debug.py index d364400f..dbe8a2af 100644 --- a/kombu/tests/utilities/test_debug.py +++ b/kombu/tests/utilities/test_debug.py @@ -1,4 +1,5 @@ from __future__ import absolute_import +from __future__ import with_statement import logging diff --git a/kombu/tests/utilities/test_encoding.py b/kombu/tests/utilities/test_encoding.py index 5d42e026..b2942b67 100644 --- a/kombu/tests/utilities/test_encoding.py +++ b/kombu/tests/utilities/test_encoding.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- -from __future__ import absolute_import, unicode_literals +from __future__ import absolute_import +from __future__ import with_statement import sys @@ -45,16 +46,16 @@ class test_encoding_utils(TestCase): def test_str_to_bytes(self): with clean_encoding() as e: - self.assertIsInstance(e.str_to_bytes('foobar'), str) + self.assertIsInstance(e.str_to_bytes(u'foobar'), str) self.assertIsInstance(e.str_to_bytes('foobar'), str) def test_from_utf8(self): with clean_encoding() as e: - self.assertIsInstance(e.from_utf8('foobar'), str) + self.assertIsInstance(e.from_utf8(u'foobar'), str) def test_default_encode(self): with clean_encoding() as e: - self.assertTrue(e.default_encode(b'foo')) + self.assertTrue(e.default_encode('foo')) class test_safe_str(TestCase): @@ -63,10 +64,10 @@ class test_safe_str(TestCase): self.assertEqual(safe_str('foo'), 'foo') def test_when_unicode(self): - self.assertIsInstance(safe_str('foo'), str) + self.assertIsInstance(safe_str(u'foo'), str) def test_when_containing_high_chars(self): - s = 'The quiæk fåx jømps øver the lazy dåg' + s = u'The quiæk fåx jømps øver the lazy dåg' res = safe_str(s) self.assertIsInstance(res, str) diff --git a/kombu/tests/utils.py b/kombu/tests/utils.py index 82830dad..a8f92a37 100644 --- a/kombu/tests/utils.py +++ b/kombu/tests/utils.py @@ -1,10 +1,12 @@ from __future__ import absolute_import +import __builtin__ import os import sys import types from functools import wraps +from StringIO import StringIO import mock @@ -16,8 +18,6 @@ try: except AttributeError: import unittest2 as unittest # noqa -from kombu.five import StringIO, builtins, string_t, module_name_t - class TestCase(unittest.TestCase): @@ -76,8 +76,8 @@ def module_exists(*modules): @wraps(fun) def __inner(*args, **kwargs): for module in modules: - if isinstance(module, string_t): - module = types.ModuleType(module_name_t(module)) + if isinstance(module, basestring): + module = types.ModuleType(module) sys.modules[module.__name__] = module try: return fun(*args, **kwargs) @@ -95,19 +95,19 @@ def mask_modules(*modnames): @wraps(fun) def __inner(*args, **kwargs): - realimport = builtins.__import__ + realimport = __builtin__.__import__ def myimp(name, *args, **kwargs): if name in modnames: - raise ImportError('No module named {0}'.format(name)) + raise ImportError('No module named %s' % name) else: return realimport(name, *args, **kwargs) - builtins.__import__ = myimp + __builtin__.__import__ = myimp try: return fun(*args, **kwargs) finally: - builtins.__import__ = realimport + __builtin__.__import__ = realimport return __inner return _inner @@ -120,7 +120,7 @@ def skip_if_environ(env_var_name): @wraps(fun) def _skips_if_environ(*args, **kwargs): if os.environ.get(env_var_name): - raise SkipTest('SKIP {0}: {1} set'.format( + raise SkipTest('SKIP %s: %s set\n' % ( fun.__name__, env_var_name)) return fun(*args, **kwargs) @@ -135,7 +135,7 @@ def skip_if_module(module): def _skip_if_module(*args, **kwargs): try: __import__(module) - raise SkipTest('SKIP {0}: {1} available'.format( + raise SkipTest('SKIP %s: %s available\n' % ( fun.__name__, module)) except ImportError: pass @@ -151,7 +151,7 @@ def skip_if_not_module(module): try: __import__(module) except ImportError: - raise SkipTest('SKIP {0}: {1} available'.format( + raise SkipTest('SKIP %s: %s available\n' % ( fun.__name__, module)) return fun(*args, **kwargs) return _skip_if_not_module diff --git a/kombu/transport/SQS.py b/kombu/transport/SQS.py index 12380702..9432ba12 100644 --- a/kombu/transport/SQS.py +++ b/kombu/transport/SQS.py @@ -240,6 +240,13 @@ class Channel(virtual.Channel): return payload raise Empty() + def _restore(self, message, + unwanted_delivery_info=('sqs_message', 'sqs_queue')): + for unwanted_key in unwanted_delivery_info: + # Remove objects that aren't JSON serializable (Issue #1108). + message.delivery_info.pop(unwanted_key, None) + return super(Channel, self)._restore(message) + def basic_ack(self, delivery_tag): delivery_info = self.qos.get(delivery_tag).delivery_info try: diff --git a/kombu/transport/virtual/__init__.py b/kombu/transport/virtual/__init__.py index f3dbf58d..b9012a2d 100644 --- a/kombu/transport/virtual/__init__.py +++ b/kombu/transport/virtual/__init__.py @@ -131,9 +131,9 @@ class QoS(object): def append(self, message, delivery_tag): """Append message to transactional state.""" - self._delivered[delivery_tag] = message if self._dirty: self._flush() + self._delivered[delivery_tag] = message def get(self, delivery_tag): return self._delivered[delivery_tag] @@ -238,7 +238,7 @@ class Message(base.Message): 'properties': props, 'content-type': self.content_type, 'content-encoding': self.content_encoding, - 'headers': self.headers} + 'headers': headers} class AbstractChannel(object): @@ -510,10 +510,13 @@ class Channel(AbstractChannel, base.StdChannel): pass self.connection._callbacks.pop(queue, None) - def basic_get(self, queue, **kwargs): + def basic_get(self, queue, no_ack=False, **kwargs): """Get message by direct access (synchronous).""" try: - return self._get(queue) + message = self.Message(self, self._get(queue)) + if not no_ack: + self.qos.append(message, message.delivery_tag) + return message except Empty: pass diff --git a/kombu/utils/amq_manager.py b/kombu/utils/amq_manager.py index 30419ae6..08ee3939 100644 --- a/kombu/utils/amq_manager.py +++ b/kombu/utils/amq_manager.py @@ -8,7 +8,7 @@ def get_manager(client, hostname=None, port=None, userid=None, def get(name, val, default): return (val if val is not None - else opt.get('manager_%s' % name) + else opt('manager_%s' % name) or getattr(client, name, None) or default) host = get('hostname', hostname, 'localhost') diff --git a/pavement.py b/pavement.py index f15cbe68..57f8ce97 100644 --- a/pavement.py +++ b/pavement.py @@ -7,7 +7,7 @@ from paver.setuputils import setup # noqa PYCOMPILE_CACHES = ['*.pyc', '*$py.class'] options( - sphinx=Bunch(builddir='.build'), + sphinx=Bunch(builddir='.build'), ) @@ -91,9 +91,15 @@ def readme(options): ]) def bump(options): s = "-- '%s'" % (options.custom, ) \ +<<<<<<< HEAD if getattr(options, 'custom', None) else '' sh('extra/release/bump_version.py \ kombu/__init__.py README.rst %s' % (s, )) +======= + if getattr(options, "custom", None) else "" + sh("extra/release/bump_version.py \ + kombu/__init__.py README.rst %s" % (s, )) +>>>>>>> 2.5 @task @@ -129,6 +135,7 @@ def flake8(options): }{exit $FOUND_FLAKE; '""" % (complexity, migrations_path), ignore_error=noerror) + @task @cmdopts([ ('noerror', 'E', 'Ignore errors'), diff --git a/setup.py b/setup.py index cf4c9b61..c53190f3 100644 --- a/setup.py +++ b/setup.py @@ -86,8 +86,9 @@ for dirpath, dirnames, filenames in os.walk(src_dir): if filename.endswith('.py'): packages.append('.'.join(fullsplit(dirpath))) else: - data_files.append([dirpath, [os.path.join(dirpath, f) for f in - filenames]]) + data_files.append( + [dirpath, [os.path.join(dirpath, f) for f in filenames]], + ) if os.path.exists('README.rst'): long_description = codecs.open('README.rst', 'r', 'utf-8').read()