mirror of https://github.com/celery/kombu.git
Updates Changelog
This commit is contained in:
parent
1186cf82b4
commit
d4b0c455d2
12
Changelog
12
Changelog
|
@ -32,6 +32,18 @@
|
||||||
|
|
||||||
Contributed by Rumyana Neykova.
|
Contributed by Rumyana Neykova.
|
||||||
|
|
||||||
|
- Consumer now supports a ``on_message`` callback that can be used to process
|
||||||
|
raw undecoded messages.
|
||||||
|
|
||||||
|
Other callbacks specified using the ``callbacks`` argument, and
|
||||||
|
the ``receive`` method will be not be called when a on message callback
|
||||||
|
is present.
|
||||||
|
|
||||||
|
- New utility :func:`kombu.common.ignore_errors` ignores connection and
|
||||||
|
channel errors.
|
||||||
|
|
||||||
|
Must only be used for cleanup actions at shutdown or on connection loss.
|
||||||
|
|
||||||
- Support for exchange-to-exchange bindings.
|
- Support for exchange-to-exchange bindings.
|
||||||
|
|
||||||
The :class:`~kombu.entity.Exchange` entity gained ``bind_to``
|
The :class:`~kombu.entity.Exchange` entity gained ``bind_to``
|
||||||
|
|
|
@ -98,6 +98,7 @@ for the raw data::
|
||||||
The `Message` object returned by the `Consumer` class will have a
|
The `Message` object returned by the `Consumer` class will have a
|
||||||
`content_type` and `content_encoding` attribute.
|
`content_type` and `content_encoding` attribute.
|
||||||
|
|
||||||
|
.. _serialization-entrypoints:
|
||||||
|
|
||||||
Creating extensions using Setuptools entry-points
|
Creating extensions using Setuptools entry-points
|
||||||
=================================================
|
=================================================
|
||||||
|
|
|
@ -15,6 +15,7 @@ import socket
|
||||||
import threading
|
import threading
|
||||||
|
|
||||||
from collections import deque
|
from collections import deque
|
||||||
|
from contextlib import contextmanager
|
||||||
from functools import partial
|
from functools import partial
|
||||||
from itertools import count
|
from itertools import count
|
||||||
|
|
||||||
|
@ -192,13 +193,43 @@ def _ensure_errback(exc, interval):
|
||||||
exc_info=True)
|
exc_info=True)
|
||||||
|
|
||||||
|
|
||||||
def ignore_errors(conn, fun, *args, **kwargs):
|
@contextmanager
|
||||||
|
def _ignore_errors(conn):
|
||||||
try:
|
try:
|
||||||
fun(*args, **kwargs)
|
yield
|
||||||
except (AttributeError, ) + conn.connection_errors + conn.channel_errors:
|
except (AttributeError, ) + conn.connection_errors + conn.channel_errors:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
def ignore_errors(conn, fun=None, *args, **kwargs):
|
||||||
|
"""Ignore connection and channel errors.
|
||||||
|
|
||||||
|
The first argument must be a connection object, or any other object
|
||||||
|
with ``connection_error`` and ``channel_error`` attributes.
|
||||||
|
|
||||||
|
Can be used as a function::
|
||||||
|
|
||||||
|
>>> ignore_errors(conn, consumer.channel.close)
|
||||||
|
|
||||||
|
or as a context manager::
|
||||||
|
|
||||||
|
>>> with ignore_errors(conn):
|
||||||
|
... consumer.channel.close()
|
||||||
|
|
||||||
|
|
||||||
|
.. note::
|
||||||
|
|
||||||
|
Connection and channel errors should be properly handled,
|
||||||
|
and not ignored. Using this function is only acceptible in a cleanup
|
||||||
|
phase, like when a connection is lost or at shutdown.
|
||||||
|
|
||||||
|
"""
|
||||||
|
if fun:
|
||||||
|
with _ignore_errors(conn):
|
||||||
|
return fun(*args, **kwargs)
|
||||||
|
return _ignore_errors(conn)
|
||||||
|
|
||||||
|
|
||||||
def revive_connection(connection, channel, on_revive=None):
|
def revive_connection(connection, channel, on_revive=None):
|
||||||
if on_revive:
|
if on_revive:
|
||||||
on_revive(channel)
|
on_revive(channel)
|
||||||
|
@ -244,14 +275,48 @@ def entry_to_queue(queue, **options):
|
||||||
class QoS(object):
|
class QoS(object):
|
||||||
"""Thread safe increment/decrement of a channels prefetch_count.
|
"""Thread safe increment/decrement of a channels prefetch_count.
|
||||||
|
|
||||||
:param consumer: A :class:`kombu.messaging.Consumer` instance.
|
:param callback: Function used to set new prefetch count,
|
||||||
|
e.g. ``consumer.qos`` or ``channel.basic_qos``. Will be called
|
||||||
|
with a single ``prefetch_count`` keyword argument.
|
||||||
:param initial_value: Initial prefetch count value.
|
:param initial_value: Initial prefetch count value.
|
||||||
|
|
||||||
|
**Example usage**
|
||||||
|
|
||||||
|
.. code-block:: python
|
||||||
|
|
||||||
|
>>> consumer = Consumer(connection)
|
||||||
|
>>> qos = QoS(consumer.qos, initial_prefetch_count=2)
|
||||||
|
>>> qos.update() # set initial
|
||||||
|
|
||||||
|
>>> qos.value
|
||||||
|
2
|
||||||
|
|
||||||
|
>>> def in_some_thread():
|
||||||
|
... qos.increment_eventually()
|
||||||
|
|
||||||
|
>>> def in_some_other_thread():
|
||||||
|
... qos.decrement_eventually()
|
||||||
|
|
||||||
|
>>> while some_loop:
|
||||||
|
... if qos.prev != qos.value:
|
||||||
|
... qos.update() # prefetch changed so update.
|
||||||
|
|
||||||
|
It can be used with any function supporting a ``prefetch_count`` keyword
|
||||||
|
argument::
|
||||||
|
|
||||||
|
>>> channel = connection.channel()
|
||||||
|
>>> QoS(channel.basic_qos, 10)
|
||||||
|
|
||||||
|
|
||||||
|
>>> def set_qos(prefetch_count):
|
||||||
|
... some_object.change(prefetch=prefetch_count)
|
||||||
|
>>> QoS(set_qos, 10)
|
||||||
|
|
||||||
"""
|
"""
|
||||||
prev = None
|
prev = None
|
||||||
|
|
||||||
def __init__(self, consumer, initial_value):
|
def __init__(self, callback, initial_value):
|
||||||
self.consumer = consumer
|
self.callback = callback
|
||||||
self._mutex = threading.RLock()
|
self._mutex = threading.RLock()
|
||||||
self.value = initial_value or 0
|
self.value = initial_value or 0
|
||||||
|
|
||||||
|
@ -288,7 +353,7 @@ class QoS(object):
|
||||||
PREFETCH_COUNT_MAX)
|
PREFETCH_COUNT_MAX)
|
||||||
new_value = 0
|
new_value = 0
|
||||||
klogger.debug('basic.qos: prefetch_count->%s', new_value)
|
klogger.debug('basic.qos: prefetch_count->%s', new_value)
|
||||||
self.consumer.qos(prefetch_count=new_value)
|
self.callback(prefetch_count=new_value)
|
||||||
self.prev = pcount
|
self.prev = pcount
|
||||||
return pcount
|
return pcount
|
||||||
|
|
||||||
|
|
|
@ -17,6 +17,7 @@ from contextlib import contextmanager
|
||||||
from functools import partial
|
from functools import partial
|
||||||
from itertools import count
|
from itertools import count
|
||||||
|
|
||||||
|
from .common import ignore_errors
|
||||||
from .messaging import Consumer
|
from .messaging import Consumer
|
||||||
from .log import LogMixin
|
from .log import LogMixin
|
||||||
from .utils import cached_property, nested
|
from .utils import cached_property, nested
|
||||||
|
@ -192,14 +193,8 @@ class ConsumerMixin(LogMixin):
|
||||||
self.debug('consume exiting')
|
self.debug('consume exiting')
|
||||||
|
|
||||||
def maybe_conn_error(self, fun):
|
def maybe_conn_error(self, fun):
|
||||||
"""Applies function but ignores any connection or channel
|
"""Use :func:`kombu.common.ignore_errors` instead."""
|
||||||
errors raised."""
|
return ignore_errors(self, fun)
|
||||||
try:
|
|
||||||
fun()
|
|
||||||
except (AttributeError, ) + \
|
|
||||||
self.connection_errors + \
|
|
||||||
self.channel_errors:
|
|
||||||
pass
|
|
||||||
|
|
||||||
@contextmanager
|
@contextmanager
|
||||||
def establish_connection(self):
|
def establish_connection(self):
|
||||||
|
|
Loading…
Reference in New Issue