Merge branch 'master' of github.com:celery/kombu

This commit is contained in:
Ask Solem 2016-08-18 16:18:40 -07:00
commit d7ffc4d75b
3 changed files with 1456 additions and 1611 deletions

File diff suppressed because it is too large Load Diff

View File

@ -25,15 +25,9 @@ or to install the requirements manually:
to underlying dependencies not being compatible. This version is
tested and works with with Python 2.7.
.. admonition:: Potential Deadlock
This transport should be used with caution due to a known
potential deadlock. See `Issue 2199`_ for more details.
.. _`Qpid`: http://qpid.apache.org/
.. _`qpid-python`: http://pypi.python.org/pypi/qpid-python/
.. _`qpid-tools`: http://pypi.python.org/pypi/qpid-tools/
.. _`Issue 2199`: https://github.com/celery/celery/issues/2199
Authentication
==============
@ -83,33 +77,27 @@ The :attr:`~kombu.Connection.transport_options` argument to the
options override and replace any other default or specified values. If using
Celery, this can be accomplished by setting the
*BROKER_TRANSPORT_OPTIONS* Celery option.
>>>>>>> ba4fa60... [qpid] Fixes rst syntax errors in docstrings
"""
from __future__ import absolute_import, unicode_literals
from collections import OrderedDict
import os
import select
import socket
import ssl
import sys
import threading
import time
import uuid
from collections import OrderedDict
from itertools import count
from gettext import gettext as _
import amqp.protocol
from kombu.five import Empty, items
from kombu.log import get_logger
from kombu.transport.virtual import Base64, Message
from kombu.transport import base
try:
import fcntl
except ImportError:
fcntl = None
fcntl = None # noqa
try:
import qpidtoollibs
@ -117,24 +105,29 @@ except ImportError: # pragma: no cover
qpidtoollibs = None # noqa
try:
from qpid.messaging.exceptions import ConnectionError
from qpid.messaging.exceptions import ConnectionError, NotFound
from qpid.messaging.exceptions import Empty as QpidEmpty
from qpid.messaging.exceptions import SessionClosed
except ImportError: # pragma: no cover
ConnectionError = None
NotFound = None
QpidEmpty = None
# ## The Following Import Applies Monkey Patches at Import Time ##
import kombu.transport.qpid_patches # noqa
################################################################
SessionClosed = None
try:
import qpid
except ImportError: # pragma: no cover
qpid = None
from kombu.five import Empty, items
from kombu.log import get_logger
from kombu.transport.virtual import Base64, Message
from kombu.transport import base
logger = get_logger(__name__)
DEFAULT_PORT = 5672
OBJECT_ALREADY_EXISTS_STRING = 'object already exists'
@ -144,72 +137,21 @@ __version__ = '.'.join(map(str, VERSION))
PY3 = sys.version_info[0] == 3
E_AUTH = """
Unable to authenticate to qpid using the following mechanisms: %s
"""
def dependency_is_none(dependency):
"""Return True if the dependency is None, otherwise False. This is done
using a function so that tests can mock this behavior easily.
E_UNREACHABLE = """
Unable to connect to qpid with SASL mechanism %s
"""
:param dependency: The module to check if it is None
:return: True if dependency is None otherwise False.
"""
return dependency is None
class AuthenticationFailure(Exception):
pass
class QpidMessagingExceptionHandler(object):
"""An exception handling decorator that silences some exceptions.
An exception handling class designed to silence specific exceptions
that qpid.messaging raises as part of normal operation. qpid.messaging
exceptions require string parsing, and are not machine consumable.
This is designed to be used as a decorator, and accepts a whitelist
string as an argument.
Usage:
@QpidMessagingExceptionHandler('whitelist string goes here')
"""
def __init__(self, allowed_exception_string):
"""Instantiate a QpidMessagingExceptionHandler object.
:param allowed_exception_string: a string that, if present in the
exception message, will be silenced.
:type allowed_exception_string: str
"""
self.allowed_exception_string = allowed_exception_string
def __call__(self, original_func):
"""The decorator method.
Method that wraps the actual function with exception silencing
functionality. Any exception that contains the string
:attr:`allowed_exception_string` in the message will be silenced.
:param original_func: function that is automatically passed in
when this object is used as a decorator.
:type original_func: function
:return: A function that decorates (wraps) the original function.
:rtype: function
"""
def decorator(*args, **kwargs):
"""A runtime-built function that will be returned which contains
a reference to the original function, and wraps a call to it in
a try/except block that can silence errors."""
try:
return original_func(*args, **kwargs)
except Exception as exc:
if self.allowed_exception_string not in str(exc):
raise
return decorator
class QoS(object):
"""A helper object for message prefetch and ACKing purposes.
@ -230,7 +172,7 @@ class QoS(object):
ACKed asynchronously through a call to :meth:`ack`. Messages that are
received, but not ACKed will not be delivered by the broker to another
consumer until an ACK is received, or the session is closed. Messages
are referred to using delivery_tag integers, which are unique per
are referred to using delivery_tag, which are unique per
:class:`Channel`. Delivery tags are managed outside of this object and
are passed in with a message to :meth:`append`. Un-ACKed messages can
be looked up from QoS using :meth:`get` and can be rejected and
@ -282,13 +224,13 @@ class QoS(object):
def append(self, message, delivery_tag):
"""Append message to the list of un-ACKed messages.
Add a message, referenced by the integer delivery_tag, for ACKing,
Add a message, referenced by the delivery_tag, for ACKing,
rejecting, or getting later. Messages are saved into an
:class:`collections.OrderedDict` by delivery_tag.
:param message: A received message that has not yet been ACKed.
:type message: qpid.messaging.Message
:param delivery_tag: An integer number to refer to this message by
:param delivery_tag: A UUID to refer to this message by
upon receipt.
:type delivery_tag: uuid.UUID
@ -301,7 +243,7 @@ class QoS(object):
:param delivery_tag: The delivery tag associated with the message
to be returned.
:type delivery_tag: int
:type delivery_tag: uuid.UUID
:return: An un-ACKed message that is looked up by delivery_tag.
:rtype: qpid.messaging.Message
@ -336,7 +278,7 @@ class QoS(object):
:param delivery_tag: The delivery tag associated with the message
to be rejected.
:type delivery_tag: int
:type delivery_tag: uuid.UUID
:keyword requeue: If True, the broker will be notified to requeue
the message. If False, the broker will be told to drop the
message entirely. In both cases, the message will be removed
@ -389,10 +331,9 @@ class Channel(base.StdChannel):
Messages sent using this channel are assigned a delivery_tag. The
delivery_tag is generated for a message as they are prepared for
sending by :meth:`basic_publish`. The delivery_tag is unique per
Channel instance using :meth:`~itertools.count`. The delivery_tag has
no meaningful context in other objects, and is only maintained in the
memory of this object, and the underlying :class:`QoS` object that
provides support.
channel instance. The delivery_tag has no meaningful context in other
objects, and is only maintained in the memory of this object, and the
underlying :class:`QoS` object that provides support.
Each channel object instantiates exactly one :class:`QoS` object for
prefetch limiting, and asynchronous ACKing. The :class:`QoS` object is
@ -423,8 +364,8 @@ class Channel(base.StdChannel):
Each call to :meth:`basic_consume` creates a consumer, which is given a
consumer tag that is identified by the caller of :meth:`basic_consume`.
Already started consumers can be canceled using by their consumer_tag
using :meth:`basic_cancel`. Cancelation of a consumer causes the
Already started consumers can be cancelled using by their consumer_tag
using :meth:`basic_cancel`. Cancellation of a consumer causes the
:class:`~qpid.messaging.endpoints.Receiver` object to be closed.
Asynchronous message ACKing is supported through :meth:`basic_ack`,
@ -447,9 +388,6 @@ class Channel(base.StdChannel):
#: Binary <-> ASCII codecs.
codecs = {'base64': Base64()}
#: counter used to generate delivery tags for this channel.
_delivery_tags = count(1)
def __init__(self, connection, transport):
self.connection = connection
self.transport = transport
@ -529,13 +467,11 @@ class Channel(base.StdChannel):
"""
if not exchange:
address = '%s; {assert: always, node: {type: queue}}' % (
routing_key,
)
routing_key,)
msg_subject = None
else:
address = '%s/%s; {assert: always, node: {type: topic}}' % (
exchange, routing_key,
)
exchange, routing_key)
msg_subject = str(routing_key)
sender = self.transport.session.sender(address)
qpid_message = qpid.messaging.Message(content=message,
@ -549,11 +485,14 @@ class Channel(base.StdChannel):
"""Purge all undelivered messages from a queue specified by name.
An internal method to purge all undelivered messages from a queue
specified by name. The queue message depth is first checked,
and then the broker is asked to purge that number of messages. The
integer number of messages requested to be purged is returned. The
actual number of messages purged may be different than the
requested number of messages to purge (see below).
specified by name. If the queue does not exist a
:class:`qpid.messaging.exceptions.NotFound` exception is raised.
The queue message depth is first checked, and then the broker is
asked to purge that number of messages. The integer number of
messages requested to be purged is returned. The actual number of
messages purged may be different than the requested number of
messages to purge (see below).
Sometimes delivered messages are asked to be purged, but are not.
This case fails silently, which is the correct behavior when a
@ -577,6 +516,9 @@ class Channel(base.StdChannel):
"""
queue_to_purge = self._broker.getQueue(queue)
if queue_to_purge is None:
error_text = "NOT_FOUND - no queue '{0}'".format(queue)
raise NotFound(code=404, text=error_text)
message_count = queue_to_purge.values['msgDepth']
if message_count > 0:
queue_to_purge.purge(message_count)
@ -661,7 +603,7 @@ class Channel(base.StdChannel):
'exclusive' flag always implies 'auto-delete'. Default is False.
If auto_delete is True, the queue is deleted when all consumers
have finished using it. The last consumer can be canceled either
have finished using it. The last consumer can be cancelled either
explicitly or because its channel is closed. If there was no
consumer ever on the queue, it won't be deleted. Default is True.
@ -720,7 +662,7 @@ class Channel(base.StdChannel):
self._broker.addQueue(queue, options=options)
except Exception as exc:
if OBJECT_ALREADY_EXISTS_STRING not in str(exc):
raise
raise exc
queue_to_check = self._broker.getQueue(queue)
message_count = queue_to_check.values['msgDepth']
consumer_count = queue_to_check.values['consumerCount']
@ -755,7 +697,6 @@ class Channel(base.StdChannel):
return
self._delete(queue)
@QpidMessagingExceptionHandler(OBJECT_ALREADY_EXISTS_STRING)
def exchange_declare(self, exchange='', type='direct', durable=False,
**kwargs):
"""Create a new exchange.
@ -772,18 +713,23 @@ class Channel(base.StdChannel):
functionality.
:keyword type: The exchange type. Valid values include 'direct',
'topic', and 'fanout'.
'topic', and 'fanout'.
:type type: str
:keyword exchange: The name of the exchange to be created. If no
exchange is specified, then a blank string will be used as the name.
exchange is specified, then a blank string will be used as the
name.
:type exchange: str
:keyword durable: True if the exchange should be durable, or False
otherwise.
otherwise.
:type durable: bool
"""
options = {'durable': durable}
self._broker.addExchange(type, exchange, options)
try:
self._broker.addExchange(type, exchange, options)
except Exception as exc:
if OBJECT_ALREADY_EXISTS_STRING not in str(exc):
raise exc
def exchange_delete(self, exchange_name, **kwargs):
"""Delete an exchange specified by name
@ -839,12 +785,12 @@ class Channel(base.StdChannel):
def queue_purge(self, queue, **kwargs):
"""Remove all undelivered messages from queue.
Purge all undelivered messages from a queue specified by name. The
queue message depth is first checked, and then the broker is asked
to purge that number of messages. The integer number of messages
requested to be purged is returned. The actual number of messages
purged may be different than the requested number of messages to
purge.
Purge all undelivered messages from a queue specified by name. If the
queue does not exist an exception is raised. The queue message
depth is first checked, and then the broker is asked to purge that
number of messages. The integer number of messages requested to be
purged is returned. The actual number of messages purged may be
different than the requested number of messages to purge.
Sometimes delivered messages are asked to be purged, but are not.
This case fails silently, which is the correct behavior when a
@ -939,7 +885,7 @@ class Channel(base.StdChannel):
:param delivery_tag: The delivery tag associated with the message
to be rejected.
:type delivery_tag: int
:type delivery_tag: uuid.UUID
:keyword requeue: If False, the rejected message will be dropped by
the broker and not delivered to any other consumers. If True,
then the rejected message will be requeued for delivery to
@ -980,10 +926,9 @@ class Channel(base.StdChannel):
handled by the caller of :meth:`~Transport.drain_events`. Messages
can be ACKed after being received through a call to :meth:`basic_ack`.
If no_ack is True, the messages are immediately ACKed to avoid a
memory leak in qpid.messaging when messages go un-ACKed. The no_ack
flag indicates that the receiver of the message does not intent to
call :meth:`basic_ack`.
If no_ack is True, The no_ack flag indicates that the receiver of
the message will not call :meth:`basic_ack` later. Since the
message will not be ACKed later, it is ACKed immediately.
:meth:`basic_consume` transforms the message object type prior to
calling the callback. Initially the message comes in as a
@ -1020,9 +965,7 @@ class Channel(base.StdChannel):
delivery_tag = message.delivery_tag
self.qos.append(qpid_message, delivery_tag)
if no_ack:
# Celery will not ack this message later, so we should to
# avoid a memory leak in qpid.messaging due to un-ACKed
# messages.
# Celery will not ack this message later, so we should ack now
self.basic_ack(delivery_tag)
return callback(message)
@ -1041,7 +984,7 @@ class Channel(base.StdChannel):
This method also cleans up all lingering references of the consumer.
:param consumer_tag: The tag which refers to the consumer to be
canceled. Originally specified when the consumer was created
cancelled. Originally specified when the consumer was created
as a parameter to :meth:`basic_consume`.
:type consumer_tag: an immutable object
@ -1053,7 +996,7 @@ class Channel(base.StdChannel):
self.connection._callbacks.pop(queue, None)
def close(self):
"""Close Channel and all associated messages.
"""Cancel all associated messages and close the Channel.
This cancels all consumers by calling :meth:`basic_cancel` for each
known consumer_tag. It also closes the self._broker sessions. Closing
@ -1136,13 +1079,11 @@ class Channel(base.StdChannel):
info = properties.setdefault('delivery_info', {})
info['priority'] = priority or 0
return {
'body': body,
'content-encoding': content_encoding,
'content-type': content_type,
'headers': headers or {},
'properties': properties or {},
}
return {'body': body,
'content-encoding': content_encoding,
'content-type': content_type,
'headers': headers or {},
'properties': properties or {}}
def basic_publish(self, message, exchange, routing_key, **kwargs):
"""Publish message onto an exchange using a routing key.
@ -1155,7 +1096,7 @@ class Channel(base.StdChannel):
- wraps the body as a buffer object, so that
:class:`qpid.messaging.endpoints.Sender` uses a content type
that can support arbitrarily large messages.
- assigns a delivery_tag generated through self._delivery_tags
- sets delivery_tag to a random uuid.UUID
- sets the exchange and routing_key info as delivery_info
Internally uses :meth:`_put` to send the message synchronously. This
@ -1182,7 +1123,7 @@ class Channel(base.StdChannel):
props = message['properties']
props.update(
body_encoding=body_encoding,
delivery_tag=next(self._delivery_tags),
delivery_tag=uuid.uuid4(),
)
props['delivery_info'].update(
exchange=exchange,
@ -1262,7 +1203,7 @@ class Channel(base.StdChannel):
qpid_exchange = self._broker.getExchange(exchange)
if qpid_exchange:
qpid_exchange_attributes = qpid_exchange.getAttributes()
return qpid_exchange_attributes["type"]
return qpid_exchange_attributes['type']
else:
return default
@ -1334,7 +1275,6 @@ class Connection(object):
# A class reference to the :class:`Channel` object
Channel = Channel
SASL_ANONYMOUS_MECH = 'ANONYMOUS'
def __init__(self, **connection_options):
self.connection_options = connection_options
@ -1343,55 +1283,43 @@ class Connection(object):
self._qpid_conn = None
establish = qpid.messaging.Connection.establish
# There is a behavior difference in qpid.messaging's sasl_mechanism
# selection method and cyrus-sasl's. The former will put PLAIN before
# ANONYMOUS if a username and password is given, but the latter will
# simply take whichever mech is listed first. Thus, if we had
# "ANONYMOUS PLAIN" as the default, the user would never be able to
# select PLAIN if cyrus-sasl was installed.
# There are several inconsistent behaviors in the sasl libraries
# used on different systems. Although qpid.messaging allows
# multiple space separated sasl mechanisms, this implementation
# only advertises one type to the server. These are either
# ANONYMOUS, PLAIN, or an overridden value specified by the user.
# The following code will put ANONYMOUS last in the mech list, and then
# try sasl mechs one by one. This should still result in secure
# behavior since it will select the first suitable mech. Unsuitable
# mechs will be rejected by the server.
sasl_mech = connection_options['sasl_mechanisms']
sasl_mechanisms = [
x for x in connection_options['sasl_mechanisms'].split()
if x != self.SASL_ANONYMOUS_MECH
]
if self.SASL_ANONYMOUS_MECH in \
connection_options['sasl_mechanisms'].split():
sasl_mechanisms.append(self.SASL_ANONYMOUS_MECH)
for sasl_mech in sasl_mechanisms:
try:
logger.debug(
'Attempting to connect to qpid with SASL mechanism %s',
sasl_mech,
)
modified_conn_opts = self.connection_options
modified_conn_opts['sasl_mechanisms'] = sasl_mech
self._qpid_conn = establish(**modified_conn_opts)
# connection was successful if we got this far
logger.info(
'Connected to qpid with SASL mechanism %s', sasl_mech)
break
except ConnectionError as exc:
if self._is_unreachable_error(exc):
logger.debug(E_UNREACHABLE, sasl_mech)
else:
raise
if not self.get_qpid_connection():
logger.error(E_AUTH, sasl_mechanisms, exc_info=1)
raise AuthenticationFailure(sys.exc_info()[1])
def _is_unreachable_error(self, exc):
return (
getattr(exc, 'code', None) == 320 or
'Authentication failed' in exc.text or
'sasl negotiation failed: no mechanism agreed' in exc.text
)
try:
msg = _('Attempting to connect to qpid with '
'SASL mechanism %s') % sasl_mech
logger.debug(msg)
self._qpid_conn = establish(**self.connection_options)
# connection was successful if we got this far
msg = _('Connected to qpid with SASL '
'mechanism %s') % sasl_mech
logger.info(msg)
except ConnectionError as conn_exc:
# if we get one of these errors, do not raise an exception.
# Raising will cause the connection to be retried. Instead,
# just continue on to the next mech.
coded_as_auth_failure = getattr(conn_exc, 'code', None) == 320
contains_auth_fail_text = \
'Authentication failed' in conn_exc.text
contains_mech_fail_text = \
'sasl negotiation failed: no mechanism agreed' \
in conn_exc.text
contains_mech_unavail_text = 'no mechanism available' \
in conn_exc.text
if coded_as_auth_failure or \
contains_auth_fail_text or contains_mech_fail_text or \
contains_mech_unavail_text:
msg = _('Unable to connect to qpid with SASL '
'mechanism %s') % sasl_mech
logger.error(msg)
raise AuthenticationFailure(sys.exc_info()[1])
raise
def get_qpid_connection(self):
"""Return the existing connection (singleton).
@ -1409,7 +1337,7 @@ class Connection(object):
receivers used by the Connection.
"""
return self._qpid_conn
self._qpid_conn.close()
def close_channel(self, channel):
"""Close a Channel.
@ -1429,88 +1357,6 @@ class Connection(object):
channel.connection = None
class ReceiversMonitor(threading.Thread):
"""A monitoring thread that reads and handles messages from all receivers.
A single instance of ReceiversMonitor is expected to be created by
:class:`Transport`.
In :meth:`monitor_receivers`, the thread monitors all receivers
associated with the session created by the Transport using the blocking
call to session.next_receiver(). When any receiver has messages
available, a symbol '0' is written to the self._w_fd file descriptor. The
:meth:`monitor_receivers` is designed not to exit, and loops over
session.next_receiver() forever.
The entry point of the thread is :meth:`run` which calls
:meth:`monitor_receivers` and catches and logs all exceptions raised.
After an exception is logged, the method sleeps for 10 seconds, and
re-enters :meth:`monitor_receivers`
The thread is designed to be daemonized, and will be forcefully killed
when all non-daemon threads have already exited.
"""
def __init__(self, session, w):
"""Instantiate a ReceiversMonitor object
:param session: The session which needs all of its receivers
monitored.
:type session: :class:`qpid.messaging.endpoints.Session`
:param w: The file descriptor to write the '0' into when
next_receiver unblocks.
:type w: int
"""
super(ReceiversMonitor, self).__init__()
self._session = session
self._w_fd = w
def run(self):
"""Thread entry point for ReceiversMonitor
Calls :meth:`monitor_receivers` with a log-and-reenter behavior. This
guards against unexpected exceptions which could cause this thread to
exit unexpectedly.
If a recoverable error occurs, then the exception needs to be
propagated to the Main Thread where an exception handler can properly
handle it. An Exception is checked if it is recoverable, and if so,
it is stored as saved_exception on the self._session object. The
character 'e' is then written to the self.w_fd file descriptor
causing Main Thread to raise the saved exception. Once the Exception
info is saved and the file descriptor is written, this Thread
gracefully exits.
Typically recoverable errors are connection errors, and can be
recovered through a call to Transport.establish_connection which will
spawn a new ReceiversMonitor Thread.
"""
while 1:
try:
self.monitor_receivers()
except Transport.connection_errors as exc:
self._session.saved_exception = exc
os.write(self._w_fd, 'e')
return
except Exception as exc:
logger.error(exc, exc_info=1)
time.sleep(10)
def monitor_receivers(self):
"""Monitor all receivers, and write to _w_fd when a message is ready.
The call to next_receiver() blocks until a message is ready. Once a
message is ready, write a '0' to _w_fd.
"""
while 1:
self._session.next_receiver()
os.write(self._w_fd, '0')
class Transport(base.Transport):
"""Kombu native transport for a Qpid broker.
@ -1533,20 +1379,23 @@ class Transport(base.Transport):
The Transport can create :class:`Channel` objects to communicate with the
broker with using the :meth:`create_channel` method.
The Transport identifies recoverable errors, allowing for error recovery
when certain exceptions occur. These exception types are stored in the
Transport class attribute connection_errors. This adds support for Kombu
to retry an operation if a ConnectionError occurs. ConnectionErrors occur
when the Transport cannot communicate with the Qpid broker.
The Transport identifies recoverable connection errors and recoverable
channel errors according to the Kombu 3.0 interface. These exception are
listed as tuples and store in the Transport class attribute
`recoverable_connection_errors` and `recoverable_channel_errors`
respectively. Any exception raised that is not a member of one of these
tuples is considered non-recoverable. This allows Kombu support for
automatic retry of certain operations to function correctly.
For backwards compatibility to the pre Kombu 3.0 exception interface, the
recoverable errors are also listed as `connection_errors` and
`channel_errors`.
"""
# Reference to the class that should be used as the Connection object
Connection = Connection
# The default port
default_port = DEFAULT_PORT
# This Transport does not specify a polling interval.
polling_interval = None
@ -1557,33 +1406,75 @@ class Transport(base.Transport):
driver_type = 'qpid'
driver_name = 'qpid'
connection_errors = (
# Exceptions that can be recovered from, but where the connection must be
# closed and re-established first.
recoverable_connection_errors = (
ConnectionError,
select.error
select.error,
)
# Exceptions that can be automatically recovered from without
# re-establishing the connection.
recoverable_channel_errors = (
NotFound,
)
# Support the pre 3.0 Kombu exception labeling interface which treats
# connection_errors and channel_errors both as recoverable via a
# reconnect.
connection_errors = recoverable_connection_errors
channel_errors = recoverable_channel_errors
def __init__(self, *args, **kwargs):
self.verify_runtime_environment()
super(Transport, self).__init__(*args, **kwargs)
self.r, self._w = os.pipe()
if fcntl is not None:
fcntl.fcntl(self.r, fcntl.F_SETFL, os.O_NONBLOCK)
self.use_async_interface = False
def verify_runtime_environment(self):
"""Verify that the runtime environment is acceptable.
This method is called as part of __init__ and raises a RuntimeError
in Python 3 or PyPi environments. This module is not compatible with
Python 3 or PyPi. The RuntimeError identifies this to the user up
front along with suggesting Python 2.7 be used instead.
in Python3 or PyPi environments. This module is not compatible with
Python3 or PyPi. The RuntimeError identifies this to the user up
front along with suggesting Python 2.6+ be used instead.
This method also checks that the dependencies qpidtoollibs and
qpid.messaging are installed. If either one is not installed a
RuntimeError is raised.
:raises: RuntimeError if the runtime environment is not acceptable.
"""
if getattr(sys, 'pypy_version_info', None):
raise RuntimeError('The Qpid transport for Kombu does not '
'support PyPy. Try using Python 2.7')
raise RuntimeError(
'The Qpid transport for Kombu does not '
'support PyPy. Try using Python 2.6+',
)
if PY3:
raise RuntimeError('The Qpid transport for Kombu does not '
'support Python 3. Try using Python 2.7')
raise RuntimeError(
'The Qpid transport for Kombu does not '
'support Python 3. Try using Python 2.6+',
)
if dependency_is_none(qpidtoollibs):
raise RuntimeError(
'The Python package "qpidtoollibs" is missing. Install it '
'with your package manager. You can also try `pip install '
'qpid-tools`.')
if dependency_is_none(qpid):
raise RuntimeError(
'The Python package "qpid.messaging" is missing. Install it '
'with your package manager. You can also try `pip install '
'qpid-python`.')
def _qpid_message_ready_handler(self, session):
if self.use_async_interface:
os.write(self._w, '0')
def _qpid_async_exception_notify_handler(self, obj_with_exception, exc):
if self.use_async_interface:
os.write(self._w, 'e')
def on_readable(self, connection, loop):
"""Handle any messages associated with this Transport.
@ -1591,17 +1482,13 @@ class Transport(base.Transport):
This method clears a single message from the externally monitored
file descriptor by issuing a read call to the self.r file descriptor
which removes a single '0' character that was placed into the pipe
by :class:`ReceiversMonitor`. Once a '0' is read, all available
events are drained through a call to :meth:`drain_events`.
by the Qpid session message callback handler. Once a '0' is read,
all available events are drained through a call to
:meth:`drain_events`.
The behavior of self.r is adjusted in __init__ to be non-blocking,
ensuring that an accidental call to this method when no more messages
will arrive will not cause indefinite blocking.
If the self.r file descriptor returns the character 'e', a
recoverable error occurred in the background thread, and this thread
should raise the saved exception. The exception is stored as
saved_exception on the session object.
The file descriptor self.r is modified to be non-blocking, ensuring
that an accidental call to this method when no more messages will
not cause indefinite blocking.
Nothing is expected to be returned from :meth:`drain_events` because
:meth:`drain_events` handles messages by calling callbacks that are
@ -1638,9 +1525,7 @@ class Transport(base.Transport):
:type loop: kombu.async.Hub
"""
symbol = os.read(self.r, 1)
if symbol == 'e':
raise self.session.saved_exception
os.read(self.r, 1)
try:
self.drain_events(connection)
except socket.timeout:
@ -1652,7 +1537,7 @@ class Transport(base.Transport):
Register the callback self.on_readable to be called when an
external epoll loop sees that the file descriptor registered is
ready for reading. The file descriptor is created by this Transport,
and is updated by the ReceiversMonitor thread.
and is written to when a message is available.
Because supports_ev == True, Celery expects to call this method to
give the Transport an opportunity to register a read file descriptor
@ -1672,24 +1557,29 @@ class Transport(base.Transport):
:type loop: kombu.async.hub.Hub
"""
self.r, self._w = os.pipe()
if fcntl is not None:
fcntl.fcntl(self.r, fcntl.F_SETFL, os.O_NONBLOCK)
self.use_async_interface = True
loop.add_reader(self.r, self.on_readable, connection, loop)
def establish_connection(self):
"""Establish a Connection object.
Determines the correct options to use when creating any connections
needed by this Transport, and create a :class:`Connection` object
which saves those values for connections generated as they are
needed. The options are a mixture of what is passed in through the
creator of the Transport, and the defaults provided by
Determines the correct options to use when creating any
connections needed by this Transport, and create a
:class:`Connection` object which saves those values for
connections generated as they are needed. The options are a
mixture of what is passed in through the creator of the
Transport, and the defaults provided by
:meth:`default_connection_params`. Options cover broker network
settings, timeout behaviors, authentication, and identity
verification settings.
This method also creates and stores a
:class:`~qpid.messaging.endpoints.Session` using the
:class:`~qpid.messaging.endpoints.Connection` created by this method.
The Session is stored on self.
:class:`~qpid.messaging.endpoints.Connection` created by this
method. The Session is stored on self.
:return: The created :class:`Connection` object is returned.
:rtype: :class:`Connection`
@ -1699,8 +1589,6 @@ class Transport(base.Transport):
for name, default_value in items(self.default_connection_params):
if not getattr(conninfo, name, None):
setattr(conninfo, name, default_value)
if conninfo.hostname == 'localhost':
conninfo.hostname = '127.0.0.1'
if conninfo.ssl:
conninfo.qpid_transport = 'ssl'
conninfo.transport_options['ssl_keyfile'] = conninfo.ssl[
@ -1715,19 +1603,51 @@ class Transport(base.Transport):
conninfo.transport_options['ssl_skip_hostname_check'] = True
else:
conninfo.qpid_transport = 'tcp'
opts = dict({'host': conninfo.hostname, 'port': conninfo.port,
'username': conninfo.userid,
'password': conninfo.password,
'transport': conninfo.qpid_transport,
'timeout': conninfo.connect_timeout,
'sasl_mechanisms': conninfo.sasl_mechanisms},
**conninfo.transport_options or {})
credentials = {}
if conninfo.login_method is None:
if conninfo.userid is not None and conninfo.password is not None:
sasl_mech = 'PLAIN'
credentials['username'] = conninfo.userid
credentials['password'] = conninfo.password
elif conninfo.userid is None and conninfo.password is not None:
raise Exception(
'Password configured but no username. SASL PLAIN '
'requires a username when using a password.')
elif conninfo.userid is not None and conninfo.password is None:
raise Exception(
'Username configured but no password. SASL PLAIN '
'requires a password when using a username.')
else:
sasl_mech = 'ANONYMOUS'
else:
sasl_mech = conninfo.login_method
if conninfo.userid is not None:
credentials['username'] = conninfo.userid
opts = {
'host': conninfo.hostname,
'port': conninfo.port,
'sasl_mechanisms': sasl_mech,
'timeout': conninfo.connect_timeout,
'transport': conninfo.qpid_transport
}
opts.update(credentials)
opts.update(conninfo.transport_options)
conn = self.Connection(**opts)
conn.client = self.client
self.session = conn.get_qpid_connection().session()
monitor_thread = ReceiversMonitor(self.session, self._w)
monitor_thread.daemon = True
monitor_thread.start()
self.session.set_message_received_notify_handler(
self._qpid_message_ready_handler
)
conn.get_qpid_connection().set_async_exception_notify_handler(
self._qpid_async_exception_notify_handler
)
self.session.set_async_exception_notify_handler(
self._qpid_async_exception_notify_handler
)
return conn
def close_connection(self, connection):
@ -1737,8 +1657,7 @@ class Transport(base.Transport):
:type connection: :class:`kombu.transport.qpid.Connection`
"""
for channel in connection.channels:
channel.close()
connection.close()
def drain_events(self, connection, timeout=0, **kwargs):
"""Handle and call callbacks for all ready Transport messages.
@ -1801,18 +1720,21 @@ class Transport(base.Transport):
These connection parameters will be used whenever the creator of
Transport does not specify a required parameter.
NOTE: password is set to '' by default instead of None so the a
connection is attempted[1]. An empty password is considered valid for
qpidd if "auth=no" is set on the server.
[1] https://issues.apache.org/jira/browse/QPID-6109
:return: A dict containing the default parameters.
:rtype: dict
"""
return {
'userid': 'guest', 'password': '',
'port': self.default_port, 'virtual_host': '',
'hostname': 'localhost', 'sasl_mechanisms': 'PLAIN ANONYMOUS',
'hostname': 'localhost',
'port': 5672,
}
def __del__(self):
"""Ensure file descriptors opened in __init__() are closed."""
if self.use_async_interface:
for fd in (self.r, self._w):
try:
os.close(fd)
except OSError:
# ignored
pass

View File

@ -1,167 +0,0 @@
# This module applies two patches to qpid.messaging that are required for
# correct operation. Each patch fixes a bug. See links to the bugs below:
# https://issues.apache.org/jira/browse/QPID-5637
# https://issues.apache.org/jira/browse/QPID-5557
# ## Begin Monkey Patch 1 ###
# https://issues.apache.org/jira/browse/QPID-5637
#############################################################################
# _ _ ___ _____ _____
# | \ | |/ _ \_ _| ____|
# | \| | | | || | | _|
# | |\ | |_| || | | |___
# |_| \_|\___/ |_| |_____|
#
# If you have code that also uses qpid.messaging and imports kombu,
# or causes this file to be imported, then you need to make sure that this
# import occurs first.
#
# Failure to do this will cause the following exception:
# AttributeError: 'Selector' object has no attribute '_current_pid'
#
# Fix this by importing this module prior to using qpid.messaging in other
# code that also uses this module.
#############################################################################
# this import is needed for Python 2.6. Without it, qpid.py will "mask" the
# system's qpid lib
from __future__ import absolute_import, unicode_literals
import os
# Imports for Monkey Patch 1
try:
from qpid.selector import Selector
except ImportError: # pragma: no cover
Selector = None # noqa
import atexit
# Prepare for Monkey Patch 1
def default_monkey(): # pragma: no cover
Selector.lock.acquire()
try:
if Selector.DEFAULT is None:
sel = Selector()
atexit.register(sel.stop)
sel.start()
Selector.DEFAULT = sel
Selector._current_pid = os.getpid()
elif Selector._current_pid != os.getpid():
sel = Selector()
atexit.register(sel.stop)
sel.start()
Selector.DEFAULT = sel
Selector._current_pid = os.getpid()
return Selector.DEFAULT
finally:
Selector.lock.release()
# Apply Monkey Patch 1
try:
import qpid.selector
qpid.selector.Selector.default = staticmethod(default_monkey)
except ImportError: # pragma: no cover
pass
# ## End Monkey Patch 1 ###
# ## Begin Monkey Patch 2 ###
# https://issues.apache.org/jira/browse/QPID-5557
# Imports for Monkey Patch 2
try:
from qpid.ops import ExchangeQuery, QueueQuery
except ImportError: # pragma: no cover
ExchangeQuery = None
QueueQuery = None
try:
from qpid.messaging.exceptions import (
NotFound, AssertionFailed, ConnectionError,
)
except ImportError: # pragma: no cover
NotFound = None
AssertionFailed = None
ConnectionError = None
# Prepare for Monkey Patch 2
def resolve_declare_monkey(self, sst, lnk, dir, action): # pragma: no cover
declare = lnk.options.get('create') in ('always', dir)
assrt = lnk.options.get('assert') in ('always', dir)
requested_type = lnk.options.get('node', {}).get('type')
def do_resolved(type, subtype):
err = None
if type is None:
if declare:
err = self.declare(sst, lnk, action)
else:
err = NotFound(text='no such queue: %s' % lnk.name)
else:
if assrt:
expected = lnk.options.get('node', {}).get('type')
if expected and type != expected:
err = AssertionFailed(
text='expected %s, got %s' % (expected, type))
if err is None:
action(type, subtype)
if err:
tgt = lnk.target
tgt.error = err
del self._attachments[tgt]
tgt.closed = True
return
self.resolve(sst, lnk.name, do_resolved, node_type=requested_type,
force=declare)
def resolve_monkey(self, sst, name, action, force=False,
node_type=None): # pragma: no cover
if not force and not node_type:
try:
type, subtype = self.address_cache[name]
action(type, subtype)
return
except KeyError:
pass
args = []
def do_result(r):
args.append(r)
def do_action(r):
do_result(r)
er, qr = args
if node_type == 'topic' and not er.not_found:
type, subtype = 'topic', er.type
elif node_type == 'queue' and qr.queue:
type, subtype = 'queue', None
elif er.not_found and not qr.queue:
type, subtype = None, None
elif qr.queue:
type, subtype = 'queue', None
else:
type, subtype = 'topic', er.type
if type is not None:
self.address_cache[name] = (type, subtype)
action(type, subtype)
sst.write_query(ExchangeQuery(name), do_result)
sst.write_query(QueueQuery(name), do_action)
# Apply monkey patch 2
try:
import qpid.messaging.driver
qpid.messaging.driver.Engine.resolve_declare = resolve_declare_monkey
qpid.messaging.driver.Engine.resolve = resolve_monkey
except ImportError: # pragma: no cover
pass
# ## End Monkey Patch 2 ###