diff --git a/docs/reference/index.rst b/docs/reference/index.rst index 1db1b0ae..85b07bc7 100644 --- a/docs/reference/index.rst +++ b/docs/reference/index.rst @@ -50,7 +50,6 @@ kombu.transport.SQS kombu.transport.SLMQ kombu.transport.pyro - kombu.transport.amqplib kombu.transport.base kombu.transport.virtual kombu.transport.virtual.exchange diff --git a/docs/reference/kombu.transport.amqplib.rst b/docs/reference/kombu.transport.amqplib.rst deleted file mode 100644 index 06fc48bf..00000000 --- a/docs/reference/kombu.transport.amqplib.rst +++ /dev/null @@ -1,36 +0,0 @@ -.. currentmodule:: kombu.transport.amqplib - -.. automodule:: kombu.transport.amqplib - - .. contents:: - :local: - - Transport - --------- - - .. autoclass:: Transport - :members: - :undoc-members: - - Connection - ---------- - - .. autoclass:: Connection - :members: - :undoc-members: - :inherited-members: - - Channel - ------- - - .. autoclass:: Channel - :members: - :undoc-members: - - Message - ------- - - .. autoclass:: Message - :members: - :undoc-members: - diff --git a/funtests/tests/test_amqplib.py b/funtests/tests/test_amqplib.py deleted file mode 100644 index eb7e91eb..00000000 --- a/funtests/tests/test_amqplib.py +++ /dev/null @@ -1,11 +0,0 @@ -from __future__ import absolute_import, unicode_literals - -from funtests import transport - -from kombu.tests.case import skip - - -@skip.unless_module('amqplib') -class test_amqplib(transport.TransportCase): - transport = 'amqplib' - prefix = 'amqplib' diff --git a/kombu/connection.py b/kombu/connection.py index 052ade55..ab27d0bf 100644 --- a/kombu/connection.py +++ b/kombu/connection.py @@ -70,7 +70,7 @@ class Connection(object): .. admonition:: SSL compatibility - SSL currently only works with the py-amqp, amqplib, and qpid + SSL currently only works with the py-amqp, and qpid transports. For other transports you can use stunnel. :keyword ssl: Use SSL to connect to the server. Default is ``False``. diff --git a/kombu/mixins.py b/kombu/mixins.py index 0bc0bbc1..3f664155 100644 --- a/kombu/mixins.py +++ b/kombu/mixins.py @@ -243,9 +243,6 @@ class ConsumerMixin(object): @cached_property def restart_limit(self): - # the AttributeError that can be catched from amqplib - # poses problems for the too often restarts protection - # in Connection.ensure_connection return TokenBucket(1) @cached_property diff --git a/kombu/tests/transport/test_amqplib.py b/kombu/tests/transport/test_amqplib.py deleted file mode 100644 index 41b6c1ce..00000000 --- a/kombu/tests/transport/test_amqplib.py +++ /dev/null @@ -1,156 +0,0 @@ -from __future__ import absolute_import, unicode_literals - -import sys - -from kombu import Connection - -from kombu.tests.case import Case, Mock, mock, skip - - -class MockConnection(dict): - - def __setattr__(self, key, value): - self[key] = value - -try: - __import__('amqplib') -except ImportError: - amqplib = Channel = None -else: - from kombu.transport import amqplib - - class Channel(amqplib.Channel): - wait_returns = [] - - def _x_open(self, *args, **kwargs): - pass - - def wait(self, *args, **kwargs): - return self.wait_returns - - def _send_method(self, *args, **kwargs): - pass - - -@skip.unless_module('amqplib') -class amqplibCase(Case): - pass - - -class test_Channel(amqplibCase): - - def setup(self): - self.conn = Mock() - self.conn.channels = {} - self.channel = Channel(self.conn, 0) - - def test_init(self): - self.assertFalse(self.channel.no_ack_consumers) - - def test_prepare_message(self): - self.assertTrue(self.channel.prepare_message( - 'foobar', 10, 'application/data', 'utf-8', - properties={}, - )) - - def test_message_to_python(self): - message = Mock() - message.headers = {} - message.properties = {} - self.assertTrue(self.channel.message_to_python(message)) - - def test_close_resolves_connection_cycle(self): - self.assertIsNotNone(self.channel.connection) - self.channel.close() - self.assertIsNone(self.channel.connection) - - def test_basic_consume_registers_ack_status(self): - self.channel.wait_returns = 'my-consumer-tag' - self.channel.basic_consume('foo', no_ack=True) - self.assertIn('my-consumer-tag', self.channel.no_ack_consumers) - - self.channel.wait_returns = 'other-consumer-tag' - self.channel.basic_consume('bar', no_ack=False) - self.assertNotIn('other-consumer-tag', self.channel.no_ack_consumers) - - self.channel.basic_cancel('my-consumer-tag') - self.assertNotIn('my-consumer-tag', self.channel.no_ack_consumers) - - -class test_Transport(amqplibCase): - - def setup(self): - self.connection = Connection('amqplib://') - self.transport = self.connection.transport - - def test_create_channel(self): - connection = Mock() - self.transport.create_channel(connection) - connection.channel.assert_called_with() - - def test_drain_events(self): - connection = Mock() - self.transport.drain_events(connection, timeout=10.0) - connection.drain_events.assert_called_with(timeout=10.0) - - def test_dnspython_localhost_resolve_bug(self): - - class Conn(object): - - def __init__(self, **kwargs): - vars(self).update(kwargs) - - self.transport.Connection = Conn - self.transport.client.hostname = 'localhost' - conn1 = self.transport.establish_connection() - self.assertEqual(conn1.host, '127.0.0.1:5672') - - self.transport.client.hostname = 'example.com' - conn2 = self.transport.establish_connection() - self.assertEqual(conn2.host, 'example.com:5672') - - def test_close_connection(self): - connection = Mock() - connection.client = Mock() - self.transport.close_connection(connection) - - self.assertIsNone(connection.client) - connection.close.assert_called_with() - - def test_verify_connection(self): - connection = Mock() - connection.channels = None - self.assertFalse(self.transport.verify_connection(connection)) - - connection.channels = {1: 1, 2: 2} - self.assertTrue(self.transport.verify_connection(connection)) - - @mock.mask_modules('ssl') - def test_import_no_ssl(self): - pm = sys.modules.pop('kombu.transport.amqplib') - try: - from kombu.transport.amqplib import SSLError - self.assertEqual(SSLError.__module__, 'kombu.transport.amqplib') - finally: - if pm is not None: - sys.modules['kombu.transport.amqplib'] = pm - - -class test_amqplib(amqplibCase): - - def test_default_port(self): - - class Transport(amqplib.Transport): - Connection = MockConnection - - c = Connection(port=None, transport=Transport).connect() - self.assertEqual(c['host'], - '127.0.0.1:%s' % (Transport.default_port,)) - - def test_custom_port(self): - - class Transport(amqplib.Transport): - Connection = MockConnection - - c = Connection(port=1337, transport=Transport).connect() - self.assertEqual(c['host'], '127.0.0.1:1337') diff --git a/kombu/transport/__init__.py b/kombu/transport/__init__.py index 0c87751c..1d076e7e 100644 --- a/kombu/transport/__init__.py +++ b/kombu/transport/__init__.py @@ -35,7 +35,6 @@ TRANSPORT_ALIASES = { 'SLMQ': 'kombu.transport.SLMQ.Transport', 'slmq': 'kombu.transport.SLMQ.Transport', 'filesystem': 'kombu.transport.filesystem:Transport', - 'amqplib': 'kombu.transport.amqplib:Transport', 'qpid': 'kombu.transport.qpid:Transport', 'sentinel': 'kombu.transport.redis:SentinelTransport', 'consul': 'kombu.transport.consul:Transport' diff --git a/kombu/transport/amqplib.py b/kombu/transport/amqplib.py deleted file mode 100644 index ed452655..00000000 --- a/kombu/transport/amqplib.py +++ /dev/null @@ -1,406 +0,0 @@ -""" -kombu.transport.amqplib -======================= - -amqplib transport. - -""" -from __future__ import absolute_import, unicode_literals - -import errno -import socket - -try: - from ssl import SSLError -except ImportError: - class SSLError(Exception): # noqa - pass -from struct import unpack - -from kombu.five import items -from kombu.utils.encoding import str_to_bytes -from kombu.utils.amq_manager import get_manager - -from . import base - - -class NA(object): - pass - -try: - from amqplib import client_0_8 as amqp - from amqplib.client_0_8 import transport - from amqplib.client_0_8.channel import Channel as _Channel - from amqplib.client_0_8.exceptions import AMQPConnectionException - from amqplib.client_0_8.exceptions import AMQPChannelException -except ImportError: # pragma: no cover - - class NAx(object): - pass - amqp = NA - amqp.Connection = NA - transport = _Channel = NA # noqa - # Sphinx crashes if this is NA, must be different class - transport.TCPTransport = transport.SSLTransport = NAx - AMQPConnectionException = AMQPChannelException = NA # noqa - - -DEFAULT_PORT = 5672 -HAS_MSG_PEEK = hasattr(socket, 'MSG_PEEK') - -# amqplib's handshake mistakenly identifies as protocol version 1191, -# this breaks in RabbitMQ tip, which no longer falls back to -# 0-8 for unknown ids. -transport.AMQP_PROTOCOL_HEADER = str_to_bytes('AMQP\x01\x01\x08\x00') - - -# - fixes warnings when socket is not connected. -class TCPTransport(transport.TCPTransport): - - def read_frame(self): - frame_type, channel, size = unpack('>BHI', self._read(7, True)) - payload = self._read(size) - ch = ord(self._read(1)) - if ch == 206: # '\xce' - return frame_type, channel, payload - else: - raise Exception( - 'Framing Error, received 0x%02x while expecting 0xce' % ch) - - def _read(self, n, initial=False): - read_buffer = self._read_buffer - while len(read_buffer) < n: - try: - s = self.sock.recv(n - len(read_buffer)) - except socket.error as exc: - if not initial and exc.errno in (errno.EAGAIN, errno.EINTR): - continue - raise - if not s: - raise IOError('Socket closed') - read_buffer += s - - result = read_buffer[:n] - self._read_buffer = read_buffer[n:] - - return result - - def __del__(self): - try: - self.close() - except Exception: - pass - finally: - self.sock = None - -transport.TCPTransport = TCPTransport - - -class SSLTransport(transport.SSLTransport): - - def __init__(self, host, connect_timeout, ssl): - if isinstance(ssl, dict): - self.sslopts = ssl - self.sslobj = None - - transport._AbstractTransport.__init__(self, host, connect_timeout) - - def read_frame(self): - frame_type, channel, size = unpack('>BHI', self._read(7, True)) - payload = self._read(size) - ch = ord(self._read(1)) - if ch == 206: # '\xce' - return frame_type, channel, payload - else: - raise Exception( - 'Framing Error, received 0x%02x while expecting 0xce' % ch) - - def _read(self, n, initial=False): - result = '' - - while len(result) < n: - try: - s = self.sslobj.read(n - len(result)) - except socket.error as exc: - if not initial and exc.errno in (errno.EAGAIN, errno.EINTR): - continue - raise - if not s: - raise IOError('Socket closed') - result += s - - return result - - def __del__(self): - try: - self.close() - except Exception: - pass - finally: - self.sock = None -transport.SSLTransport = SSLTransport - - -class Connection(amqp.Connection): # pragma: no cover - connected = True - - def _do_close(self, *args, **kwargs): - # amqplib does not ignore socket errors when connection - # is closed on the remote end. - try: - super(Connection, self)._do_close(*args, **kwargs) - except socket.error: - pass - - def _dispatch_basic_return(self, channel, args, msg): - reply_code = args.read_short() - reply_text = args.read_shortstr() - exchange = args.read_shortstr() - routing_key = args.read_shortstr() - - exc = AMQPChannelException(reply_code, reply_text, (50, 60)) - if channel.events['basic_return']: - for callback in channel.events['basic_return']: - callback(exc, exchange, routing_key, msg) - else: - raise exc - - def __init__(self, *args, **kwargs): - super(Connection, self).__init__(*args, **kwargs) - self._method_override = {(60, 50): self._dispatch_basic_return} - - def drain_events(self, timeout=None): - """Wait for an event on a channel.""" - chanmap = self.channels - chanid, method_sig, args, content = self._wait_multiple( - chanmap, None, timeout=timeout) - - channel = chanmap[chanid] - - if (content and - channel.auto_decode and - hasattr(content, 'content_encoding')): - try: - content.body = content.body.decode(content.content_encoding) - except Exception: - pass - - amqp_method = self._method_override.get(method_sig) or \ - channel._METHOD_MAP.get(method_sig, None) - - if amqp_method is None: - raise Exception('Unknown AMQP method (%d, %d)' % method_sig) - - if content is None: - return amqp_method(channel, args) - else: - return amqp_method(channel, args, content) - - def read_timeout(self, timeout=None): - if timeout is None: - return self.method_reader.read_method() - sock = self.transport.sock - prev = sock.gettimeout() - if prev != timeout: - sock.settimeout(timeout) - try: - try: - return self.method_reader.read_method() - except SSLError as exc: - # http://bugs.python.org/issue10272 - if 'timed out' in str(exc): - raise socket.timeout() - # Non-blocking SSL sockets can throw SSLError - if 'The operation did not complete' in str(exc): - raise socket.timeout() - raise - finally: - if prev != timeout: - sock.settimeout(prev) - - def _wait_multiple(self, channels, allowed_methods, timeout=None): - for channel_id, channel in items(channels): - method_queue = channel.method_queue - for queued_method in method_queue: - method_sig = queued_method[0] - if (allowed_methods is None or - method_sig in allowed_methods or - method_sig == (20, 40)): - method_queue.remove(queued_method) - method_sig, args, content = queued_method - return channel_id, method_sig, args, content - - # Nothing queued, need to wait for a method from the peer - read_timeout = self.read_timeout - wait = self.wait - while 1: - channel, method_sig, args, content = read_timeout(timeout) - - if (channel in channels and - allowed_methods is None or - method_sig in allowed_methods or - method_sig == (20, 40)): - return channel, method_sig, args, content - - # Not the channel and/or method we were looking for. Queue - # this method for later - channels[channel].method_queue.append((method_sig, args, content)) - - # - # If we just queued up a method for channel 0 (the Connection - # itself) it's probably a close method in reaction to some - # error, so deal with it right away. - # - if channel == 0: - wait() - - def channel(self, channel_id=None): - try: - return self.channels[channel_id] - except KeyError: - return Channel(self, channel_id) - - -class Message(base.Message): - - def __init__(self, channel, msg, **kwargs): - props = msg.properties - super(Message, self).__init__( - channel, - body=msg.body, - delivery_tag=msg.delivery_tag, - content_type=props.get('content_type'), - content_encoding=props.get('content_encoding'), - delivery_info=msg.delivery_info, - properties=msg.properties, - headers=props.get('application_headers') or {}, - **kwargs) - - -class Channel(_Channel, base.StdChannel): - Message = Message - events = {'basic_return': set()} - - def __init__(self, *args, **kwargs): - self.no_ack_consumers = set() - super(Channel, self).__init__(*args, **kwargs) - - def prepare_message(self, body, priority=None, content_type=None, - content_encoding=None, headers=None, properties=None): - """Encapsulate data into a AMQP message.""" - return amqp.Message(body, priority=priority, - content_type=content_type, - content_encoding=content_encoding, - application_headers=headers, - **properties) - - def message_to_python(self, raw_message): - """Convert encoded message body back to a Python value.""" - return self.Message(self, raw_message) - - def close(self): - try: - super(Channel, self).close() - finally: - self.connection = None - - def basic_consume(self, *args, **kwargs): - consumer_tag = super(Channel, self).basic_consume(*args, **kwargs) - if kwargs['no_ack']: - self.no_ack_consumers.add(consumer_tag) - return consumer_tag - - def basic_cancel(self, consumer_tag, **kwargs): - self.no_ack_consumers.discard(consumer_tag) - return super(Channel, self).basic_cancel(consumer_tag, **kwargs) - - -class Transport(base.Transport): - Connection = Connection - - default_port = DEFAULT_PORT - - # it's very annoying that amqplib sometimes raises AttributeError - # if the connection is lost, but nothing we can do about that here. - connection_errors = ( - base.Transport.connection_errors + ( - AMQPConnectionException, - socket.error, IOError, OSError, AttributeError) - ) - channel_errors = base.Transport.channel_errors + (AMQPChannelException,) - - driver_name = 'amqplib' - driver_type = 'amqp' - - implements = base.Transport.implements.extend( - async=True, - heartbeats=False, - ) - - def __init__(self, client, **kwargs): - self.client = client - self.default_port = kwargs.get('default_port') or self.default_port - - if amqp is NA: - raise ImportError('Missing amqplib library (pip install amqplib)') - - def create_channel(self, connection): - return connection.channel() - - def drain_events(self, connection, **kwargs): - return connection.drain_events(**kwargs) - - def establish_connection(self): - """Establish connection to the AMQP broker.""" - conninfo = self.client - for name, default_value in items(self.default_connection_params): - if not getattr(conninfo, name, None): - setattr(conninfo, name, default_value) - if conninfo.hostname == 'localhost': - conninfo.hostname = '127.0.0.1' - conn = self.Connection(host=conninfo.host, - userid=conninfo.userid, - password=conninfo.password, - login_method=conninfo.login_method, - virtual_host=conninfo.virtual_host, - insist=conninfo.insist, - ssl=conninfo.ssl, - connect_timeout=conninfo.connect_timeout) - conn.client = self.client - return conn - - def close_connection(self, connection): - """Close the AMQP broker connection.""" - connection.client = None - connection.close() - - def is_alive(self, connection): - if HAS_MSG_PEEK: - sock = connection.transport.sock - prev = sock.gettimeout() - sock.settimeout(0.0001) - try: - sock.recv(1, socket.MSG_PEEK) - except socket.timeout: - pass - except socket.error: - return False - finally: - sock.settimeout(prev) - return True - - def verify_connection(self, connection): - return connection.channels is not None and self.is_alive(connection) - - def register_with_event_loop(self, connection, loop): - loop.add_reader(connection.method_reader.source.sock, - self.on_readable, connection, loop) - - @property - def default_connection_params(self): - return {'userid': 'guest', 'password': 'guest', - 'port': self.default_port, - 'hostname': 'localhost', 'login_method': 'AMQPLAIN'} - - def get_manager(self, *args, **kwargs): - return get_manager(self.client, *args, **kwargs)