diff --git a/kombu/compression.py b/kombu/compression.py index e6f04702..e7905d3d 100644 --- a/kombu/compression.py +++ b/kombu/compression.py @@ -8,6 +8,8 @@ Compression utilities. :license: BSD, see LICENSE for more details. """ +import bz2 +import zlib _aliases = {} _encoders = {} @@ -52,7 +54,7 @@ def compress(body, content_type): """ encoder, content_type = get_encoder(content_type) - return encoder(body), content_type + return encoder(body.encode("utf-8")), content_type def decompress(body, content_type): @@ -62,12 +64,12 @@ def decompress(body, content_type): :param content_type: mime-type of compression method used. """ - return get_decoder(content_type)(body) + return get_decoder(content_type)(body).decode("utf-8") -register(lambda x: x.encode("zlib"), - lambda x: x.decode("zlib"), +register(zlib.compress, + zlib.decompress, "application/x-gzip", aliases=["gzip", "zlib"]) -register(lambda x: x.encode("bz2"), - lambda x: x.decode("bz2"), +register(bz2.compress, + bz2.decompress, "application/x-bz2", aliases=["bzip2", "bzip"]) diff --git a/kombu/serialization.py b/kombu/serialization.py index bb9a78a0..cecb252a 100644 --- a/kombu/serialization.py +++ b/kombu/serialization.py @@ -8,8 +8,14 @@ Serialization utilities. :license: BSD, see LICENSE for more details. """ - import codecs +import sys + + +bytes_type = str +if sys.version_info >= (3, 0): + bytes_type = bytes + class SerializerNotInstalled(StandardError): @@ -62,7 +68,7 @@ class SerializerRegistry(object): # If a raw string was sent, assume binary encoding # (it's likely either ASCII or a raw binary file, but 'binary' # charset will encompass both, even if not ideal. - if not serializer and isinstance(data, str): + if not serializer and isinstance(data, bytes_type): # In Python 3+, this would be "bytes"; allow binary data to be # sent as a message without getting encoder errors return "application/data", "binary", data diff --git a/kombu/simple.py b/kombu/simple.py index 01ac867e..dbbaa2de 100644 --- a/kombu/simple.py +++ b/kombu/simple.py @@ -43,7 +43,7 @@ class SimpleBase(object): if self.buffer: return self.buffer.pop() try: - self.channel.connection.drain_events( + self.channel.connection.client.drain_events( timeout=timeout and remaining) except socket.timeout: raise Empty() diff --git a/kombu/tests/mocks.py b/kombu/tests/mocks.py index d34ca473..dfcff423 100644 --- a/kombu/tests/mocks.py +++ b/kombu/tests/mocks.py @@ -1,6 +1,6 @@ from itertools import count -import simplejson +import anyjson from kombu.transport import base @@ -93,7 +93,7 @@ class Channel(object): def message_to_python(self, message, *args, **kwargs): self._called("message_to_python") - return Message(self, body=simplejson.dumps(message), + return Message(self, body=anyjson.serialize(message), delivery_tag=self.deliveries(), throw_decode_error=self.throw_decode_error, content_type="application/json", content_encoding="utf-8") diff --git a/kombu/tests/test_compat.py b/kombu/tests/test_compat.py index 86a1925f..d0cb5a67 100644 --- a/kombu/tests/test_compat.py +++ b/kombu/tests/test_compat.py @@ -1,4 +1,4 @@ -import unittest2 as unittest +from kombu.tests.utils import unittest from kombu import BrokerConnection, Exchange from kombu import compat @@ -161,7 +161,6 @@ class test_Consumer(unittest.TestCase): def test_iter(self, n="test_iterqueue"): c = compat.Consumer(self.connection, queue=n, exchange=n, routing_key="rkey") - self.assertTrue(hasattr(c.__iter__(), "next")) c.close() def test_process_next(self, n="test_process_next"): @@ -173,7 +172,6 @@ class test_Consumer(unittest.TestCase): def test_iterconsume(self, n="test_iterconsume"): c = compat.Consumer(self.connection, queue=n, exchange=n, routing_key="rkey") - self.assertTrue(hasattr(c.iterconsume(), "next")) c.close() def test_discard_all(self, n="test_discard_all"): @@ -256,8 +254,6 @@ class test_ConsumerSet(unittest.TestCase): self.assertEqual(len(c.queues), 3) self.assertEqual(len(c2.queues), 2) - self.assertTrue(hasattr(c.iterconsume(), "next")) - c.add_consumer(compat.Consumer(self.connection, queue=prefix + "xaxxxa", exchange=prefix + "xaxxxa")) diff --git a/kombu/tests/test_compression.py b/kombu/tests/test_compression.py index 2f509c38..762c8c41 100644 --- a/kombu/tests/test_compression.py +++ b/kombu/tests/test_compression.py @@ -1,4 +1,4 @@ -import unittest2 as unittest +from kombu.tests.utils import unittest from kombu import compression diff --git a/kombu/tests/test_connection.py b/kombu/tests/test_connection.py index a6118579..f8ba0e0e 100644 --- a/kombu/tests/test_connection.py +++ b/kombu/tests/test_connection.py @@ -1,5 +1,5 @@ import pickle -import unittest2 as unittest +from kombu.tests.utils import unittest from kombu.connection import BrokerConnection, Resource diff --git a/kombu/tests/test_entities.py b/kombu/tests/test_entities.py index 9cf9f578..fd536b27 100644 --- a/kombu/tests/test_entities.py +++ b/kombu/tests/test_entities.py @@ -1,4 +1,4 @@ -import unittest2 as unittest +from kombu.tests.utils import unittest from kombu.entity import Exchange, Queue from kombu.exceptions import NotBoundError diff --git a/kombu/tests/test_messaging.py b/kombu/tests/test_messaging.py index b4e327dd..751f8587 100644 --- a/kombu/tests/test_messaging.py +++ b/kombu/tests/test_messaging.py @@ -1,11 +1,12 @@ -import unittest2 as unittest +from kombu.tests.utils import unittest -import simplejson +import anyjson from kombu.connection import BrokerConnection from kombu.exceptions import MessageStateError from kombu.messaging import Consumer, Producer from kombu.entity import Exchange, Queue +from kombu.serialization import bytes_type from kombu.tests.mocks import Transport @@ -43,7 +44,7 @@ class test_Producer(unittest.TestCase): channel = self.connection.channel() p = Producer(channel, self.exchange, serializer="json") m, ctype, cencoding = p._prepare(message, headers={}) - self.assertDictEqual(message, simplejson.loads(m)) + self.assertDictEqual(message, anyjson.deserialize(m)) self.assertEqual(ctype, "application/json") self.assertEqual(cencoding, "utf-8") @@ -57,10 +58,12 @@ class test_Producer(unittest.TestCase): self.assertEqual(ctype, "application/json") self.assertEqual(cencoding, "utf-8") self.assertEqual(headers["compression"], "application/x-gzip") - self.assertEqual(simplejson.loads(m.decode("zlib")), message) + import zlib + self.assertEqual(anyjson.deserialize( + zlib.decompress(m).decode("utf-8")), message) def test_prepare_custom_content_type(self): - message = "the quick brown fox" + message = "the quick brown fox".encode("utf-8") channel = self.connection.channel() p = Producer(channel, self.exchange, serializer="json") m, ctype, cencoding = p._prepare(message, content_type="custom") @@ -78,12 +81,12 @@ class test_Producer(unittest.TestCase): channel = self.connection.channel() p = Producer(channel, self.exchange, serializer="json") m, ctype, cencoding = p._prepare(message, content_type="text/plain") - self.assertEqual(m, message) + self.assertEqual(m, message.encode("utf-8")) self.assertEqual(ctype, "text/plain") self.assertEqual(cencoding, "utf-8") m, ctype, cencoding = p._prepare(message, content_type="text/plain", content_encoding="utf-8") - self.assertEqual(m, message) + self.assertEqual(m, message.encode("utf-8")) self.assertEqual(ctype, "text/plain") self.assertEqual(cencoding, "utf-8") @@ -96,7 +99,7 @@ class test_Producer(unittest.TestCase): self.assertIn("basic_publish", channel) m, exc, rkey = ret - self.assertDictEqual(message, simplejson.loads(m["body"])) + self.assertDictEqual(message, anyjson.deserialize(m["body"])) self.assertDictContainsSubset({"content_type": "application/json", "content_encoding": "utf-8", "priority": 0}, m) @@ -371,7 +374,7 @@ class test_Consumer(unittest.TestCase): self.assertTrue(thrown) m, exc = thrown[0] - self.assertEqual(simplejson.loads(m), {"foo": "bar"}) + self.assertEqual(anyjson.deserialize(m), {"foo": "bar"}) self.assertIsInstance(exc, ValueError) def test_recover(self): diff --git a/kombu/tests/test_pidbox.py b/kombu/tests/test_pidbox.py index e24be772..c77f0186 100644 --- a/kombu/tests/test_pidbox.py +++ b/kombu/tests/test_pidbox.py @@ -1,4 +1,4 @@ -import unittest2 as unittest +from kombu.tests.utils import unittest from kombu import pidbox from kombu.connection import BrokerConnection diff --git a/kombu/tests/test_serialization.py b/kombu/tests/test_serialization.py index 046db7c1..1e515e9e 100644 --- a/kombu/tests/test_serialization.py +++ b/kombu/tests/test_serialization.py @@ -3,13 +3,13 @@ import cPickle as pickle import sys -import unittest2 as unittest +from kombu.tests.utils import unittest from nose import SkipTest from kombu.serialization import registry, register, SerializerNotInstalled, \ raw_encode, register_yaml, register_msgpack, \ - decode + decode, bytes_type from kombu.tests.utils import mask_modules @@ -80,7 +80,7 @@ class test_Serialization(unittest.TestCase): self.assertIsInstance(registry.decode(unicode_string_as_utf8, content_type='application/data', content_encoding='binary'), - str) + bytes_type) self.assertEquals(unicode_string_as_utf8, registry.decode( @@ -204,8 +204,9 @@ class test_Serialization(unittest.TestCase): registry.encode, "foo", serializer="nonexisting") def test_raw_encode(self): - self.assertTupleEqual(raw_encode(str("foo")), - ("application/data", "binary", "foo")) + self.assertTupleEqual(raw_encode("foo".encode("utf-8")), + ("application/data", "binary", + "foo".encode("utf-8"))) @mask_modules("yaml") def test_register_yaml__no_yaml(self): diff --git a/kombu/tests/test_simple.py b/kombu/tests/test_simple.py index ed86d350..eaa9a877 100644 --- a/kombu/tests/test_simple.py +++ b/kombu/tests/test_simple.py @@ -1,4 +1,4 @@ -import unittest2 as unittest +from kombu.tests.utils import unittest from Queue import Empty diff --git a/kombu/tests/test_transport.py b/kombu/tests/test_transport.py index 3e1c91e0..af677f46 100644 --- a/kombu/tests/test_transport.py +++ b/kombu/tests/test_transport.py @@ -1,4 +1,4 @@ -import unittest2 as unittest +from kombu.tests.utils import unittest from kombu import transport diff --git a/kombu/tests/test_transport_base.py b/kombu/tests/test_transport_base.py index 4971a2d5..f1828726 100644 --- a/kombu/tests/test_transport_base.py +++ b/kombu/tests/test_transport_base.py @@ -1,4 +1,4 @@ -import unittest2 as unittest +from kombu.tests.utils import unittest from kombu.transport.base import Transport diff --git a/kombu/tests/test_transport_memory.py b/kombu/tests/test_transport_memory.py index 62a8299f..4ba13a49 100644 --- a/kombu/tests/test_transport_memory.py +++ b/kombu/tests/test_transport_memory.py @@ -1,5 +1,5 @@ import socket -import unittest2 as unittest +from kombu.tests.utils import unittest from kombu.connection import BrokerConnection from kombu.entity import Exchange, Queue diff --git a/kombu/tests/test_transport_pyamqplib.py b/kombu/tests/test_transport_pyamqplib.py index 5ec38562..91a9f985 100644 --- a/kombu/tests/test_transport_pyamqplib.py +++ b/kombu/tests/test_transport_pyamqplib.py @@ -1,4 +1,4 @@ -import unittest2 as unittest +from kombu.tests.utils import unittest from kombu.transport import pyamqplib from kombu.connection import BrokerConnection diff --git a/kombu/tests/test_transport_pyredis.py b/kombu/tests/test_transport_pyredis.py index 2280d221..02cf3a88 100644 --- a/kombu/tests/test_transport_pyredis.py +++ b/kombu/tests/test_transport_pyredis.py @@ -1,6 +1,6 @@ import socket import types -import unittest2 as unittest +from kombu.tests.utils import unittest from Queue import Empty, Queue as _Queue diff --git a/kombu/tests/test_transport_virtual.py b/kombu/tests/test_transport_virtual.py index d7ed718b..86662174 100644 --- a/kombu/tests/test_transport_virtual.py +++ b/kombu/tests/test_transport_virtual.py @@ -1,4 +1,4 @@ -import unittest2 as unittest +from kombu.tests.utils import unittest from kombu.connection import BrokerConnection from kombu.transport import virtual @@ -95,7 +95,8 @@ class test_Message(unittest.TestCase): self.assertIsInstance(message, virtual.Message) self.assertIs(message, c.message_to_python(message)) - self.assertEqual(message.body, "the quick brown fox...") + self.assertEqual(message.body, + "the quick brown fox...".encode("utf-8")) self.assertTrue(message.delivery_tag, tag) def test_serializable(self): @@ -104,7 +105,8 @@ class test_Message(unittest.TestCase): tag = data["properties"]["delivery_tag"] = gen_unique_id() message = c.message_to_python(data) dict_ = message.serializable() - self.assertEqual(dict_["body"], "the quick brown fox...") + self.assertEqual(dict_["body"], + "the quick brown fox...".encode("utf-8")) self.assertEqual(dict_["properties"]["delivery_tag"], tag) @@ -249,7 +251,8 @@ class test_Channel(unittest.TestCase): r1 = c.message_to_python(c.basic_get(n)) self.assertTrue(r1) - self.assertEqual(r1.body, "nthex quick brown fox...") + self.assertEqual(r1.body, + "nthex quick brown fox...".encode("utf-8")) self.assertIsNone(c.basic_get(n)) consumer_tag = gen_unique_id() @@ -259,7 +262,8 @@ class test_Channel(unittest.TestCase): self.assertIn(n + "2", c._active_queues) r2, _ = c.drain_events() r2 = c.message_to_python(r2) - self.assertEqual(r2.body, "nthex quick brown fox...") + self.assertEqual(r2.body, + "nthex quick brown fox...".encode("utf-8")) self.assertEqual(r2.delivery_info["exchange"], n) self.assertEqual(r2.delivery_info["routing_key"], n) self.assertRaises(virtual.Empty, c.drain_events) @@ -268,7 +272,7 @@ class test_Channel(unittest.TestCase): c._restore(r2) r3 = c.message_to_python(c.basic_get(n)) self.assertTrue(r3) - self.assertEqual(r3.body, "nthex quick brown fox...") + self.assertEqual(r3.body, "nthex quick brown fox...".encode("utf-8")) self.assertIsNone(c.basic_get(n)) def test_basic_ack(self): diff --git a/kombu/tests/test_utils.py b/kombu/tests/test_utils.py index d222e18d..26daf6d9 100644 --- a/kombu/tests/test_utils.py +++ b/kombu/tests/test_utils.py @@ -1,8 +1,11 @@ import pickle import sys -import unittest2 as unittest +from kombu.tests.utils import unittest -from StringIO import StringIO +if sys.version_info >= (3, 0): + from io import StringIO, BytesIO +else: + from StringIO import StringIO, StringIO as BytesIO from kombu import utils from kombu.utils.functional import wraps @@ -135,11 +138,17 @@ class MyStringIO(StringIO): pass +class MyBytesIO(BytesIO): + + def close(self): + pass + + class test_emergency_dump_state(unittest.TestCase): @redirect_stdouts def test_dump(self, stdout, stderr): - fh = MyStringIO() + fh = MyBytesIO() utils.emergency_dump_state({"foo": "bar"}, open_file=lambda n, m: fh) self.assertDictEqual(pickle.loads(fh.getvalue()), {"foo": "bar"}) diff --git a/kombu/tests/test_virtual_exchange.py b/kombu/tests/test_virtual_exchange.py index 87bfd04f..3eba9dba 100644 --- a/kombu/tests/test_virtual_exchange.py +++ b/kombu/tests/test_virtual_exchange.py @@ -1,4 +1,4 @@ -import unittest2 as unittest +from kombu.tests.utils import unittest from kombu.transport.virtual import exchange diff --git a/kombu/tests/test_virtual_scheduling.py b/kombu/tests/test_virtual_scheduling.py index bdeee651..d1e36f94 100644 --- a/kombu/tests/test_virtual_scheduling.py +++ b/kombu/tests/test_virtual_scheduling.py @@ -1,4 +1,4 @@ -import unittest2 as unittest +from kombu.tests.utils import unittest from kombu.transport.virtual.scheduling import FairCycle diff --git a/kombu/tests/utils.py b/kombu/tests/utils.py index df906987..cb959f05 100644 --- a/kombu/tests/utils.py +++ b/kombu/tests/utils.py @@ -6,6 +6,12 @@ from StringIO import StringIO from kombu.utils.functional import wraps +try: + import unittest + unittest.skip +except AttributeError: + import unittest2 as unittest + def redirect_stdouts(fun): diff --git a/kombu/transport/base.py b/kombu/transport/base.py index adcca281..b2b9330a 100644 --- a/kombu/transport/base.py +++ b/kombu/transport/base.py @@ -64,7 +64,7 @@ class Message(object): compression = self.headers.get("compression") if compression: self.body = decompress(self.body, compression) - if postencode: + if postencode and isinstance(self.body, unicode): self.body = self.body.encode(postencode) def ack(self): diff --git a/kombu/utils/__init__.py b/kombu/utils/__init__.py index b0a9667a..ce765b9e 100644 --- a/kombu/utils/__init__.py +++ b/kombu/utils/__init__.py @@ -34,15 +34,19 @@ def gen_unique_id(): return str(uuid4()) -def kwdict(kwargs): - """Make sure keyword arguments are not in unicode. +if sys.version_info >= (3, 0): + def kwdict(kwargs): + return kwargs +else: + def kwdict(kwargs): + """Make sure keyword arguments are not in unicode. - This should be fixed in newer Python versions, - see: http://bugs.python.org/issue4978. + This should be fixed in newer Python versions, + see: http://bugs.python.org/issue4978. - """ - return dict((key.encode("utf-8"), value) - for key, value in kwargs.items()) + """ + return dict((key.encode("utf-8"), value) + for key, value in kwargs.items()) def maybe_list(v): diff --git a/kombu/utils/compat.py b/kombu/utils/compat.py index 703ad959..f86336d4 100644 --- a/kombu/utils/compat.py +++ b/kombu/utils/compat.py @@ -38,7 +38,7 @@ class _Link(object): __slots__ = 'prev', 'next', 'key', '__weakref__' -class OrderedDict(dict, MutableMapping): +class CompatOrderedDict(dict, MutableMapping): """Dictionary that remembers insertion order""" # An inherited dict maps keys to values. # The inherited dict provides __getitem__, __len__, __contains__, and get. @@ -232,3 +232,8 @@ class OrderedDict(dict, MutableMapping): def __ne__(self, other): return not (self == other) + +try: + from collections import OrderedDict +except ImportError: + OrderedDict = CompatOrderedDict diff --git a/setup.py b/setup.py index 93baa7cf..36db82a4 100644 --- a/setup.py +++ b/setup.py @@ -5,8 +5,13 @@ import sys import codecs extra = {} +tests_require = {"nose", "nose-cover3"} if sys.version_info >= (3, 0): extra.update(use_2to3=True) +elif sys.version_info <= (2, 6): + tests_require.append("unittest2") +elif sys.version_info <= (2, 5): + tests_require.append("simplejson") if sys.version_info < (2, 4): @@ -87,7 +92,7 @@ setup( 'anyjson', 'amqplib>=0.6', ], - tests_require=["nose", "nose-cover3", "unittest2", "simplejson"], + tests_require=tests_require, classifiers=[ "Development Status :: 4 - Beta", "Framework :: Django",