diff --git a/Changelog b/Changelog index 07e32997..f518f8dd 100644 --- a/Changelog +++ b/Changelog @@ -32,6 +32,18 @@ Contributed by Rumyana Neykova. +- Consumer now supports a ``on_message`` callback that can be used to process + raw undecoded messages. + + Other callbacks specified using the ``callbacks`` argument, and + the ``receive`` method will be not be called when a on message callback + is present. + +- New utility :func:`kombu.common.ignore_errors` ignores connection and + channel errors. + + Must only be used for cleanup actions at shutdown or on connection loss. + - Support for exchange-to-exchange bindings. The :class:`~kombu.entity.Exchange` entity gained ``bind_to`` diff --git a/docs/userguide/serialization.rst b/docs/userguide/serialization.rst index d5ade3b6..b3648d14 100644 --- a/docs/userguide/serialization.rst +++ b/docs/userguide/serialization.rst @@ -98,6 +98,7 @@ for the raw data:: The `Message` object returned by the `Consumer` class will have a `content_type` and `content_encoding` attribute. +.. _serialization-entrypoints: Creating extensions using Setuptools entry-points ================================================= diff --git a/kombu/common.py b/kombu/common.py index 7dcbf388..1c26bc97 100644 --- a/kombu/common.py +++ b/kombu/common.py @@ -15,6 +15,7 @@ import socket import threading from collections import deque +from contextlib import contextmanager from functools import partial from itertools import count @@ -192,13 +193,43 @@ def _ensure_errback(exc, interval): exc_info=True) -def ignore_errors(conn, fun, *args, **kwargs): +@contextmanager +def _ignore_errors(conn): try: - fun(*args, **kwargs) + yield except (AttributeError, ) + conn.connection_errors + conn.channel_errors: pass +def ignore_errors(conn, fun=None, *args, **kwargs): + """Ignore connection and channel errors. + + The first argument must be a connection object, or any other object + with ``connection_error`` and ``channel_error`` attributes. + + Can be used as a function:: + + >>> ignore_errors(conn, consumer.channel.close) + + or as a context manager:: + + >>> with ignore_errors(conn): + ... consumer.channel.close() + + + .. note:: + + Connection and channel errors should be properly handled, + and not ignored. Using this function is only acceptible in a cleanup + phase, like when a connection is lost or at shutdown. + + """ + if fun: + with _ignore_errors(conn): + return fun(*args, **kwargs) + return _ignore_errors(conn) + + def revive_connection(connection, channel, on_revive=None): if on_revive: on_revive(channel) @@ -244,14 +275,48 @@ def entry_to_queue(queue, **options): class QoS(object): """Thread safe increment/decrement of a channels prefetch_count. - :param consumer: A :class:`kombu.messaging.Consumer` instance. + :param callback: Function used to set new prefetch count, + e.g. ``consumer.qos`` or ``channel.basic_qos``. Will be called + with a single ``prefetch_count`` keyword argument. :param initial_value: Initial prefetch count value. + **Example usage** + + .. code-block:: python + + >>> consumer = Consumer(connection) + >>> qos = QoS(consumer.qos, initial_prefetch_count=2) + >>> qos.update() # set initial + + >>> qos.value + 2 + + >>> def in_some_thread(): + ... qos.increment_eventually() + + >>> def in_some_other_thread(): + ... qos.decrement_eventually() + + >>> while some_loop: + ... if qos.prev != qos.value: + ... qos.update() # prefetch changed so update. + + It can be used with any function supporting a ``prefetch_count`` keyword + argument:: + + >>> channel = connection.channel() + >>> QoS(channel.basic_qos, 10) + + + >>> def set_qos(prefetch_count): + ... some_object.change(prefetch=prefetch_count) + >>> QoS(set_qos, 10) + """ prev = None - def __init__(self, consumer, initial_value): - self.consumer = consumer + def __init__(self, callback, initial_value): + self.callback = callback self._mutex = threading.RLock() self.value = initial_value or 0 @@ -288,7 +353,7 @@ class QoS(object): PREFETCH_COUNT_MAX) new_value = 0 klogger.debug('basic.qos: prefetch_count->%s', new_value) - self.consumer.qos(prefetch_count=new_value) + self.callback(prefetch_count=new_value) self.prev = pcount return pcount diff --git a/kombu/mixins.py b/kombu/mixins.py index 50f1d499..dbd4e34c 100644 --- a/kombu/mixins.py +++ b/kombu/mixins.py @@ -17,6 +17,7 @@ from contextlib import contextmanager from functools import partial from itertools import count +from .common import ignore_errors from .messaging import Consumer from .log import LogMixin from .utils import cached_property, nested @@ -192,14 +193,8 @@ class ConsumerMixin(LogMixin): self.debug('consume exiting') def maybe_conn_error(self, fun): - """Applies function but ignores any connection or channel - errors raised.""" - try: - fun() - except (AttributeError, ) + \ - self.connection_errors + \ - self.channel_errors: - pass + """Use :func:`kombu.common.ignore_errors` instead.""" + return ignore_errors(self, fun) @contextmanager def establish_connection(self):