diff --git a/kombu/entity.py b/kombu/entity.py index 72ddb7d2..50e89dc2 100644 --- a/kombu/entity.py +++ b/kombu/entity.py @@ -563,10 +563,10 @@ class Queue(MaybeChannelBound): no_ack = self.no_ack if no_ack is None else no_ack message = self.channel.basic_get(queue=self.name, no_ack=no_ack) if message is not None: - message.accept = prepare_accept_encoding(accept) m2p = getattr(self.channel, 'message_to_python', None) if m2p: message = m2p(message) + message.accept = prepare_accept_encoding(accept) return message def purge(self, nowait=False): diff --git a/kombu/messaging.py b/kombu/messaging.py index b7de2396..2bac95ea 100644 --- a/kombu/messaging.py +++ b/kombu/messaging.py @@ -13,7 +13,7 @@ from .compression import compress from .connection import maybe_channel, is_connection from .entity import Exchange, Queue, DELIVERY_MODES from .five import int_types, text_t, values -from .serialization import encode, registry +from .serialization import encode, registry, prepare_accept_encoding from .utils import ChannelPromise, maybe_list __all__ = ['Exchange', 'Queue', 'Producer', 'Consumer'] @@ -351,13 +351,7 @@ class Consumer(object): self.auto_declare = auto_declare if on_decode_error is not None: self.on_decode_error = on_decode_error - self.accept = accept - - if self.accept is not None: - self.accept = set( - n if '/' in n else registry.name_to_type[n] - for n in self.accept - ) + self.accept = prepare_accept_encoding(accept) if self.channel: self.revive(self.channel) @@ -579,13 +573,13 @@ class Consumer(object): def _receive_callback(self, message): accept = self.accept - if accept is not None: - message.accept = accept on_m, channel, decoded = self.on_message, self.channel, None try: m2p = getattr(channel, 'message_to_python', None) if m2p: message = m2p(message) + if accept is not None: + message.accept = accept decoded = None if on_m else message.decode() except Exception as exc: if not self.on_decode_error: diff --git a/kombu/tests/test_compat.py b/kombu/tests/test_compat.py index 790e76ce..473b25e4 100644 --- a/kombu/tests/test_compat.py +++ b/kombu/tests/test_compat.py @@ -219,7 +219,8 @@ class test_Consumer(TestCase): callback_called[0] = True c.backend.to_deliver.append('42') - self.assertEqual(c.fetch().payload, '42') + payload = c.fetch().payload + self.assertEqual(payload, '42') c.backend.to_deliver.append('46') c.register_callback(receive) self.assertEqual(c.fetch(enable_callbacks=True).payload, '46') diff --git a/kombu/tests/test_connection.py b/kombu/tests/test_connection.py index f4bf4bfd..5ed034cf 100644 --- a/kombu/tests/test_connection.py +++ b/kombu/tests/test_connection.py @@ -226,8 +226,8 @@ class test_Connection(TestCase): self.assertEqual(c.hostname, 'mysql://some_host') finally: mod.URI_PASSTHROUGH = prev - c = Connection('amqp+sqlite://some_host') - self.assertTrue(c.as_uri().startswith('amqp+')) + c = Connection('pyamqp+sqlite://some_host') + self.assertTrue(c.as_uri().startswith('pyamqp+')) def test_default_ensure_callback(self): with patch('kombu.connection.logger') as logger: diff --git a/kombu/tests/transport/test_sqlalchemy.py b/kombu/tests/transport/test_sqlalchemy.py index 099b4de8..d3b93662 100644 --- a/kombu/tests/transport/test_sqlalchemy.py +++ b/kombu/tests/transport/test_sqlalchemy.py @@ -44,6 +44,7 @@ class test_sqlalchemy(TestCase): assert channel._get('celery') == 'DATA' def test_custom_table_names(self): + raise SkipTest('causes global side effect') conn = Connection('sqlalchemy+sqlite:///:memory:', transport_options={ 'queue_tablename': 'my_custom_queue', 'message_tablename': 'my_custom_message' diff --git a/kombu/tests/utilities/test_encoding.py b/kombu/tests/utilities/test_encoding.py index 2a37fc7c..56348d23 100644 --- a/kombu/tests/utilities/test_encoding.py +++ b/kombu/tests/utilities/test_encoding.py @@ -27,16 +27,16 @@ def clean_encoding(): class test_default_encoding(TestCase): - @patch('sys.getfilesystemencoding') - def test_default(self, getfilesystemencoding): - getfilesystemencoding.return_value = 'ascii' + @patch('sys.getdefaultencoding') + def test_default(self, getdefaultencoding): + getdefaultencoding.return_value = 'ascii' with clean_encoding() as encoding: enc = encoding.default_encoding() if sys.platform.startswith('java'): self.assertEqual(enc, 'utf-8') else: self.assertEqual(enc, 'ascii') - getfilesystemencoding.assert_called_with() + getdefaultencoding.assert_called_with() class test_encoding_utils(TestCase):