chore(ci): fix lint job (#1718)

* chore(ci): align python version between matrix and tox config

* chore(pydocstyle): upgrade pydocstyle to 6.3.0

* chore(pydocstyle): exclude rules

* fix(mypy): fix `[truthy-function]` error

* fix(pydocstyle): fix D205 rule

* fix(pydocstyle): fix D212 rule

* fix(pydocstyle): fix D407 rule

* fix(pydocstyle): fix D412 rule

* fix(pydocstyle): fix D406 rule

* fix(pydocstyle): fix D411 rule

---------

Co-authored-by: Stevie Gayet <stegayet@users.noreply.github.com>
This commit is contained in:
Stevie Gayet 2023-05-15 04:11:34 +02:00 committed by GitHub
parent 8426032ae2
commit c03335e167
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
34 changed files with 227 additions and 65 deletions

View File

@ -53,7 +53,7 @@ class Object:
setattr(self, name, None)
def as_dict(self, recurse: bool = False) -> dict[str, Any]:
def f(obj: Any, type: Callable[[Any], Any]) -> Any:
def f(obj: Any, type: Callable[[Any], Any] | None = None) -> Any:
if recurse and isinstance(obj, Object):
return obj.as_dict(recurse=True)
return type(obj) if type and obj is not None else obj

View File

@ -44,10 +44,12 @@ class Request:
"""A HTTP Request.
Arguments:
---------
url (str): The URL to request.
method (str): The HTTP method to use (defaults to ``GET``).
Keyword Arguments:
-----------------
headers (Dict, ~kombu.asynchronous.http.Headers): Optional headers for
this request
body (str): Optional body for this request.
@ -138,7 +140,8 @@ class Request:
class Response:
"""HTTP Response.
Arguments:
Arguments
---------
request (~kombu.asynchronous.http.Request): See :attr:`request`.
code (int): See :attr:`code`.
headers (~kombu.asynchronous.http.Headers): See :attr:`headers`.
@ -146,7 +149,8 @@ class Response:
effective_url (str): See :attr:`effective_url`.
status (str): See :attr:`status`.
Attributes:
Attributes
----------
request (~kombu.asynchronous.http.Request): object used to
get this response.
code (int): HTTP response code (e.g. 200, 404, or 500).
@ -182,7 +186,8 @@ class Response:
def raise_for_error(self):
"""Raise if the request resulted in an HTTP error code.
Raises:
Raises
------
:class:`~kombu.exceptions.HttpError`
"""
if self.error:
@ -193,6 +198,7 @@ class Response:
"""The full contents of the response body.
Note:
----
Accessing this property will evaluate the buffer
and subsequent accesses will be cached.
"""

View File

@ -57,6 +57,7 @@ class Hub:
"""Event loop object.
Arguments:
---------
timer (kombu.asynchronous.Timer): Specify custom timer instance.
"""

View File

@ -26,6 +26,7 @@ class LaxBoundedSemaphore:
range even if released more times than it was acquired.
Example:
-------
>>> x = LaxBoundedSemaphore(2)
>>> x.acquire(print, 'HELLO 1')
@ -61,6 +62,7 @@ class LaxBoundedSemaphore:
until the semaphore is released.
Arguments:
---------
callback (Callable): The callback to apply.
*partial_args (Any): partial arguments to callback.
"""
@ -77,6 +79,7 @@ class LaxBoundedSemaphore:
"""Release semaphore.
Note:
----
If there are any waiters this will apply the first waiter
that is waiting for the resource (FIFO order).
"""

View File

@ -149,6 +149,7 @@ class Timer:
"""Enter function into the scheduler.
Arguments:
---------
entry (~kombu.asynchronous.timer.Entry): Item to enter.
eta (datetime.datetime): Scheduled time.
priority (int): Unused.

View File

@ -18,6 +18,7 @@ class timetuple(tuple):
Can be used as part of a heap to keep events ordered.
Arguments:
---------
clock (Optional[int]): Event clock value.
timestamp (float): Event UNIX timestamp value.
id (str): Event host id (e.g. ``hostname:pid``).
@ -85,7 +86,8 @@ class LamportClock:
process receives a message, it resynchronizes its logical clock with
the sender.
See Also:
See Also
--------
* `Lamport timestamps`_
* `Lamports distributed mutex`_

View File

@ -66,6 +66,7 @@ class Broadcast(Queue):
and both the queue and exchange is configured with auto deletion.
Arguments:
---------
name (str): This is used as the name of the exchange.
queue (str): By default a unique id is used for the queue
name for every consumer. You can specify a custom
@ -203,7 +204,8 @@ def eventloop(conn, limit=None, timeout=None, ignore_timeouts=False):
``eventloop`` is a generator.
Examples:
Examples
--------
>>> from kombu.common import eventloop
>>> def run(conn):
@ -219,7 +221,8 @@ def eventloop(conn, limit=None, timeout=None, ignore_timeouts=False):
for _ in eventloop(connection, limit=1, timeout=1):
pass
See Also:
See Also
--------
:func:`itermessages`, which is an event loop bound to one or more
consumers, that yields any messages received.
"""
@ -236,6 +239,7 @@ def send_reply(exchange, req, msg,
"""Send reply for request.
Arguments:
---------
exchange (kombu.Exchange, str): Reply exchange
req (~kombu.Message): Original request, a message with
a ``reply_to`` property.
@ -309,6 +313,7 @@ def ignore_errors(conn, fun=None, *args, **kwargs):
Note:
----
Connection and channel errors should be properly handled,
and not ignored. Using this function is only acceptable in a cleanup
phase, like when a connection is lost or at shutdown.
@ -348,12 +353,14 @@ class QoS:
"""Thread safe increment/decrement of a channels prefetch_count.
Arguments:
---------
callback (Callable): Function used to set new prefetch count,
e.g. ``consumer.qos`` or ``channel.basic_qos``. Will be called
with a single ``prefetch_count`` keyword argument.
initial_value (int): Initial prefetch count value..
Example:
-------
>>> from kombu import Consumer, Connection
>>> connection = Connection('amqp://')
>>> consumer = Consumer(connection)
@ -396,6 +403,7 @@ class QoS:
"""Increment the value, but do not update the channels QoS.
Note:
----
The MainThread will be responsible for calling :meth:`update`
when necessary.
"""
@ -408,6 +416,7 @@ class QoS:
"""Decrement the value, but do not update the channels QoS.
Note:
----
The MainThread will be responsible for calling :meth:`update`
when necessary.
"""

View File

@ -18,6 +18,7 @@ def register(encoder, decoder, content_type, aliases=None):
"""Register new compression method.
Arguments:
---------
encoder (Callable): Function used to compress text.
decoder (Callable): Function used to decompress previously
compressed text.
@ -52,6 +53,7 @@ def compress(body, content_type):
"""Compress text.
Arguments:
---------
body (AnyStr): The text to compress.
content_type (str): mime-type of compression method to use.
"""
@ -63,6 +65,7 @@ def decompress(body, content_type):
"""Decompress compressed text.
Arguments:
---------
body (AnyStr): Previously compressed text to uncompress.
content_type (str): mime-type of compression method used.
"""

View File

@ -64,6 +64,7 @@ class Connection:
"""A connection to the broker.
Example:
-------
>>> Connection('amqp://guest:guest@localhost:5672//')
>>> Connection('amqp://foo;amqp://bar',
... failover_strategy='round-robin')
@ -80,13 +81,16 @@ class Connection:
... })
Note:
----
SSL currently only works with the py-amqp, and qpid
transports. For other transports you can use stunnel.
Arguments:
---------
URL (str, Sequence): Broker URL, or a list of URLs.
Keyword Arguments:
-----------------
ssl (bool/dict): Use SSL to connect to the server.
Default is ``False``.
May not be supported by the specified transport.
@ -102,6 +106,7 @@ class Connection:
around once per second.
Note:
----
The connection is established lazily when needed. If you need the
connection to be established, then force it by calling
:meth:`connect`::
@ -233,9 +238,11 @@ class Connection:
"""Switch connection parameters to use a new URL or hostname.
Note:
----
Does not reconnect!
Arguments:
---------
conn_str (str): either a hostname or URL.
"""
self.close()
@ -311,6 +318,7 @@ class Connection:
this is a noop operation.
Arguments:
---------
rate (int): Rate is how often the tick is called
compared to the actual heartbeat value. E.g. if
the heartbeat is set to 3 seconds, and the tick
@ -323,9 +331,11 @@ class Connection:
"""Wait for a single event from the server.
Arguments:
---------
timeout (float): Timeout in seconds before we give up.
Raises:
Raises
------
socket.timeout: if the timeout is exceeded.
"""
return self.transport.drain_events(self.connection, **kwargs)
@ -408,6 +418,7 @@ class Connection:
specified.
Arguments:
---------
errback (Callable): Optional callback called each time the
connection can't be established. Arguments provided are
the exception raised and the interval that will be
@ -491,6 +502,7 @@ class Connection:
the function.
Arguments:
---------
obj: The object to ensure an action on.
fun (Callable): Method to apply.
@ -515,7 +527,8 @@ class Connection:
regardless of the connection state. Must provide max_retries
if this is specified.
Examples:
Examples
--------
>>> from kombu import Connection, Producer
>>> conn = Connection('amqp://')
>>> producer = Producer(conn)
@ -601,10 +614,12 @@ class Connection:
If a ``channel`` is not provided, then one will be automatically
acquired (remember to close it afterwards).
See Also:
See Also
--------
:meth:`ensure` for the full list of supported keyword arguments.
Example:
-------
>>> channel = connection.channel()
>>> try:
... ret, channel = connection.autoretry(
@ -730,14 +745,17 @@ class Connection:
def Pool(self, limit=None, **kwargs):
"""Pool of connections.
See Also:
See Also
--------
:class:`ConnectionPool`.
Arguments:
---------
limit (int): Maximum number of active connections.
Default is no limit.
Example:
-------
>>> connection = Connection('amqp://')
>>> pool = connection.Pool(2)
>>> c1 = pool.acquire()
@ -756,14 +774,17 @@ class Connection:
def ChannelPool(self, limit=None, **kwargs):
"""Pool of channels.
See Also:
See Also
--------
:class:`ChannelPool`.
Arguments:
---------
limit (int): Maximum number of active channels.
Default is no limit.
Example:
-------
>>> connection = Connection('amqp://')
>>> pool = connection.ChannelPool(2)
>>> c1 = pool.acquire()
@ -802,6 +823,7 @@ class Connection:
also it will be used as the default routing key.
Arguments:
---------
name (str, kombu.Queue): Name of the queue/or a queue.
no_ack (bool): Disable acknowledgments. Default is false.
queue_opts (Dict): Additional keyword arguments passed to the
@ -828,7 +850,8 @@ class Connection:
Create new :class:`~kombu.simple.SimpleQueue` using a channel
from this connection.
See Also:
See Also
--------
Same as :meth:`SimpleQueue`, but configured with buffering
semantics. The resulting queue and exchange will not be durable,
also auto delete is enabled. Messages will be transient (not
@ -901,6 +924,7 @@ class Connection:
"""The underlying connection object.
Warning:
-------
This instance is transport specific, so do not
depend on the interface of this object.
"""
@ -925,6 +949,7 @@ class Connection:
Created upon access and closed when the connection is closed.
Note:
----
Can be used for automatic channel handling when you only need one
channel, and also it is the channel implicitly used if
a connection is passed instead of a channel, to functions that

View File

@ -42,6 +42,7 @@ class Exchange(MaybeChannelBound):
"""An Exchange declaration.
Arguments:
---------
name (str): See :attr:`name`.
type (str): See :attr:`type`.
channel (kombu.Connection, ChannelT): See :attr:`channel`.
@ -51,7 +52,8 @@ class Exchange(MaybeChannelBound):
arguments (Dict): See :attr:`arguments`.
no_declare (bool): See :attr:`no_declare`
Attributes:
Attributes
----------
name (str): Name of the exchange.
Default is no name (the default exchange).
@ -190,6 +192,7 @@ class Exchange(MaybeChannelBound):
"""Bind the exchange to another exchange.
Arguments:
---------
nowait (bool): If set the server will not respond, and the call
will not block waiting for a response.
Default is :const:`False`.
@ -221,6 +224,7 @@ class Exchange(MaybeChannelBound):
"""Create message instance to be sent with :meth:`publish`.
Arguments:
---------
body (Any): Message body.
delivery_mode (bool): Set custom delivery mode.
@ -259,6 +263,7 @@ class Exchange(MaybeChannelBound):
"""Publish message.
Arguments:
---------
message (Union[kombu.Message, str, bytes]):
Message to publish.
routing_key (str): Message routing key.
@ -280,6 +285,7 @@ class Exchange(MaybeChannelBound):
"""Delete the exchange declaration on server.
Arguments:
---------
if_unused (bool): Delete only if the exchange has no bindings.
Default is :const:`False`.
nowait (bool): If set the server will not respond, and a
@ -322,6 +328,7 @@ class binding(Object):
"""Represents a queue or exchange binding.
Arguments:
---------
exchange (Exchange): Exchange to bind to.
routing_key (str): Routing key used as binding key.
arguments (Dict): Arguments for bind operation.
@ -376,6 +383,7 @@ class Queue(MaybeChannelBound):
"""A Queue declaration.
Arguments:
---------
name (str): See :attr:`name`.
exchange (Exchange, str): See :attr:`exchange`.
routing_key (str): See :attr:`routing_key`.
@ -394,7 +402,8 @@ class Queue(MaybeChannelBound):
max_length_bytes (int): See :attr:`max_length_bytes`.
max_priority (int): See :attr:`max_priority`.
Attributes:
Attributes
----------
name (str): Name of the queue.
Default is no name (default queue destination).
@ -628,6 +637,7 @@ class Queue(MaybeChannelBound):
"""Declare queue on the server.
Arguments:
---------
nowait (bool): Do not wait for a reply.
passive (bool): If set, the server will not create the queue.
The client can use this to check whether a queue exists
@ -684,11 +694,13 @@ class Queue(MaybeChannelBound):
specific types of applications where synchronous functionality
is more important than performance.
Returns:
Returns
-------
~kombu.Message: if a message was available,
or :const:`None` otherwise.
Arguments:
---------
no_ack (bool): If enabled the broker will
automatically ack messages.
accept (Set[str]): Custom list of accepted content types.
@ -717,6 +729,7 @@ class Queue(MaybeChannelBound):
until the client cancels them.
Arguments:
---------
consumer_tag (str): Unique identifier for the consumer.
The consumer tag is local to a connection, so two clients
can use the same consumer tags. If this field is empty
@ -747,6 +760,7 @@ class Queue(MaybeChannelBound):
"""Delete the queue.
Arguments:
---------
if_unused (bool): If set, the server will only delete the queue
if it has no consumers. A channel error will be raised
if the queue has consumers.

View File

@ -19,7 +19,7 @@ class Message:
"""Base class for received messages.
Keyword Arguments:
-----------------
channel (ChannelT): If message was received, this should be the
channel that the message was received on.
@ -103,7 +103,8 @@ class Message:
This will remove the message from the queue.
Raises:
Raises
------
MessageStateError: If the message has already been
acknowledged/requeued/rejected.
"""
@ -148,7 +149,8 @@ class Message:
The message will be discarded by the server.
Raises:
Raises
------
MessageStateError: If the message has already been
acknowledged/requeued/rejected.
"""
@ -166,10 +168,12 @@ class Message:
"""Reject this message and put it back on the queue.
Warning:
-------
You must not use this method as a means of selecting messages
to process.
Raises:
Raises
------
MessageStateError: If the message has already been
acknowledged/requeued/rejected.
"""
@ -189,6 +193,7 @@ class Message:
Returning the original python structure sent by the publisher.
Note:
----
The return value is memoized, use `_decode` to force
re-evaluation.
"""

View File

@ -23,6 +23,7 @@ class Producer:
"""Message Producer.
Arguments:
---------
channel (kombu.Connection, ChannelT): Connection or channel.
exchange (kombu.entity.Exchange, str): Optional default exchange.
routing_key (str): Optional default routing key.
@ -93,6 +94,7 @@ class Producer:
"""Declare the exchange.
Note:
----
This happens automatically at instantiation when
the :attr:`auto_declare` flag is enabled.
"""
@ -126,6 +128,7 @@ class Producer:
"""Publish message to the specified exchange.
Arguments:
---------
body (Any): Message body.
routing_key (str): Message routing key.
delivery_mode (enum): See :attr:`delivery_mode`.
@ -293,6 +296,7 @@ class Consumer:
"""Message consumer.
Arguments:
---------
channel (kombu.Connection, ChannelT): see :attr:`channel`.
queues (Sequence[kombu.Queue]): see :attr:`queues`.
no_ack (bool): see :attr:`no_ack`.
@ -426,6 +430,7 @@ class Consumer:
"""Declare queues, exchanges and bindings.
Note:
----
This is done automatically at instantiation
when :attr:`auto_declare` is set.
"""
@ -436,6 +441,7 @@ class Consumer:
"""Register a new callback to be called when a message is received.
Note:
----
The signature of the callback needs to accept two arguments:
`(body, message)`, which is the decoded message body
and the :class:`~kombu.Message` instance.
@ -464,6 +470,7 @@ class Consumer:
"""Add a queue to the list of queues to consume from.
Note:
----
This will not start consuming from the queue,
for that you will have to call :meth:`consume` after.
"""
@ -482,6 +489,7 @@ class Consumer:
use :meth:`cancel_by_queue`).
Arguments:
---------
no_ack (bool): See :attr:`no_ack`.
"""
queues = list(self._queues.values())
@ -497,6 +505,7 @@ class Consumer:
"""End all active queue consumers.
Note:
----
This does not affect already delivered messages, but it does
mean the server will not send any more messages for this consumer.
"""
@ -530,6 +539,7 @@ class Consumer:
"""Purge messages from all queues.
Warning:
-------
This will *delete all ready messages*, there is no undo operation.
"""
return sum(queue.purge() for queue in self._queues.values())
@ -559,6 +569,7 @@ class Consumer:
The prefetch window is Ignored if the :attr:`no_ack` option is set.
Arguments:
---------
prefetch_size (int): Specify the prefetch window in octets.
The server will send a message in advance if it is equal to
or smaller in size than the available prefetch size (and
@ -582,6 +593,7 @@ class Consumer:
on the specified channel.
Arguments:
---------
requeue (bool): By default the messages will be redelivered
to the original recipient. With `requeue` set to true, the
server will attempt to requeue the message, potentially then
@ -595,10 +607,12 @@ class Consumer:
This dispatches to the registered :attr:`callbacks`.
Arguments:
---------
body (Any): The decoded message body.
message (~kombu.Message): The message instance.
Raises:
Raises
------
NotImplementedError: If no consumer callbacks have been
registered.
"""

View File

@ -49,6 +49,7 @@ class ConsumerMixin:
channels can be used for different QoS requirements.
Example:
-------
.. code-block:: python
class Worker(ConsumerMixin):
@ -65,8 +66,8 @@ class ConsumerMixin:
print('Got task: {0!r}'.format(body))
message.ack()
Methods:
Methods
-------
* :meth:`extra_context`
Optional extra context manager that will be entered
@ -257,6 +258,7 @@ class ConsumerProducerMixin(ConsumerMixin):
publishing messages.
Example:
-------
.. code-block:: python
class Worker(ConsumerProducerMixin):

View File

@ -67,12 +67,14 @@ class Resource:
"""Acquire resource.
Arguments:
---------
block (bool): If the limit is exceeded,
then block until there is an available item.
timeout (float): Timeout to wait
if ``block`` is true. Default is :const:`None` (forever).
Raises:
Raises
------
LimitExceeded: if block is false and the limit has been exceeded.
"""
if self._closed:
@ -103,6 +105,7 @@ class Resource:
"""Release resource so it can be used by another thread.
Warnings:
--------
The caller is responsible for discarding the object,
and to never use the resource again. A new resource must
be acquired if so needed.

View File

@ -72,6 +72,7 @@ class SerializerRegistry:
"""Register a new encoder/decoder.
Arguments:
---------
name (str): A convenience name for the serialization method.
encoder (callable): A method that will be passed a python data
@ -114,9 +115,11 @@ class SerializerRegistry:
"""Unregister registered encoder/decoder.
Arguments:
---------
name (str): Registered serialization method name.
Raises:
Raises
------
SerializerNotInstalled: If a serializer by that name
cannot be found.
"""
@ -134,11 +137,13 @@ class SerializerRegistry:
"""Set the default serialization method used by this library.
Arguments:
---------
name (str): The name of the registered serialization method.
For example, `json` (default), `pickle`, `yaml`, `msgpack`,
or any custom methods registered using :meth:`register`.
Raises:
Raises
------
SerializerNotInstalled: If the serialization method
requested is not available.
"""
@ -156,6 +161,7 @@ class SerializerRegistry:
as an AMQP message body.
Arguments:
---------
data (List, Dict, str): The message data to send.
serializer (str): An optional string representing
@ -171,12 +177,14 @@ class SerializerRegistry:
serialization method will be used even if a :class:`str`
or :class:`unicode` object is passed in.
Returns:
Returns
-------
Tuple[str, str, str]: A three-item tuple containing the
content type (e.g., `application/json`), content encoding, (e.g.,
`utf-8`) and a string containing the serialized data.
Raises:
Raises
------
SerializerNotInstalled: If the serialization method
requested is not available.
"""
@ -220,6 +228,7 @@ class SerializerRegistry:
based on `content_type`.
Arguments:
---------
data (bytes, buffer, str): The message data to deserialize.
content_type (str): The content-type of the data.
@ -230,10 +239,12 @@ class SerializerRegistry:
accept (Set): List of content-types to accept.
Raises:
Raises
------
ContentDisallowed: If the content-type is not accepted.
Returns:
Returns
-------
Any: The unserialized data.
"""
content_type = (bytes_to_str(content_type) if content_type
@ -343,7 +354,8 @@ def register_pickle():
def register_msgpack():
"""Register msgpack serializer.
See Also:
See Also
--------
https://msgpack.org/.
"""
pack = unpack = None
@ -391,6 +403,7 @@ def enable_insecure_serializers(choices=NOTSET):
"""Enable serializers that are considered to be unsafe.
Note:
----
Will enable ``pickle``, ``yaml`` and ``msgpack`` by default, but you
can also specify a list of serializers (by name or content type)
to enable.
@ -411,6 +424,7 @@ def disable_insecure_serializers(allowed=NOTSET):
or you can specify a list of deserializers to allow.
Note:
----
Producers will still be able to serialize data
in these formats, but consumers will not accept
incoming data using the untrusted content types.
@ -434,7 +448,8 @@ for ep, args in entrypoints('kombu.serializers'): # pragma: no cover
def prepare_accept_content(content_types, name_to_type=None):
"""Replace aliases of content_types with full names from registry.
Raises:
Raises
------
SerializerNotInstalled: If the serialization method
requested is not available.
"""

View File

@ -305,7 +305,8 @@ class Channel(virtual.Channel):
def drain_events(self, timeout=None, callback=None, **kwargs):
"""Return a single payload message from one of our queues.
Raises:
Raises
------
Queue.Empty: if no messages available.
"""
# If we're not allowed to consume or have no consumers, raise Empty
@ -318,7 +319,8 @@ class Channel(virtual.Channel):
def _reset_cycle(self):
"""Reset the consume cycle.
Returns:
Returns
-------
FairCycle: object that points to our _get_bulk() method
rather than the standard _get() method. This allows for
multiple messages to be returned at once from SQS (
@ -341,11 +343,12 @@ class Channel(virtual.Channel):
return self.entity_name(self.queue_name_prefix + queue_name)
def _new_queue(self, queue, **kwargs):
"""
Ensure a queue with given name exists in SQS.
"""Ensure a queue with given name exists in SQS.
Arguments:
---------
queue (str): the AMQP queue name
Returns:
Returns
str: the SQS queue URL
"""
# Translate to SQS name for consistency with initial
@ -481,10 +484,12 @@ class Channel(virtual.Channel):
the 'ack' settings for that queue.
Arguments:
---------
messages (SQSMessage): A list of SQS Message objects.
queue (str): Name representing the queue they came from.
Returns:
Returns
-------
List: A list of Payload objects
"""
q_url = self._new_queue(queue)
@ -501,15 +506,18 @@ class Channel(virtual.Channel):
prefetch_count).
Note:
----
Ignores QoS limits so caller is responsible for checking
that we are allowed to consume at least one message from the
queue. get_bulk will then ask QoS for an estimate of
the number of extra messages that we can consume.
Arguments:
---------
queue (str): The queue name to pull from.
Returns:
Returns
-------
List[Message]
"""
# drain_events calls `can_consume` first, consuming

View File

@ -52,6 +52,7 @@ def resolve_transport(transport: str | None = None) -> str | None:
"""Get transport by name.
Arguments:
---------
transport (Union[str, type]): This can be either
an actual transport class, or the fully qualified
path to a transport class, or the alias of a transport.

View File

@ -54,7 +54,8 @@ def to_rabbitmq_queue_arguments(arguments, **options):
max_priority (int): Max priority steps for queue.
This will be converted to ``x-max-priority`` int.
Returns:
Returns
-------
Dict: RabbitMQ compatible queue arguments.
"""
prepared = dictfilter(dict(
@ -95,7 +96,8 @@ class StdChannel:
def after_reply_message_received(self, queue):
"""Callback called after RPC reply received.
Notes:
Notes
-----
Reply queue semantics: can be used to delete the queue
after transient reply message received.
"""

View File

@ -97,9 +97,11 @@ class Channel(virtual.Channel):
read-consistency between the nodes.
Arguments:
---------
queue (str): The name of the Queue.
Returns:
Returns
-------
str: The ID of the session.
"""
try:
@ -134,13 +136,16 @@ class Channel(virtual.Channel):
means that they have to wait before the lock is released.
Arguments:
---------
queue (str): The name of the Queue.
raising (Exception): Set custom lock error class.
Raises:
Raises
------
LockError: if the lock cannot be acquired.
Returns:
Returns
-------
bool: success?
"""
self._acquire_lock(queue, raising=raising)
@ -170,6 +175,7 @@ class Channel(virtual.Channel):
It does so by simply removing the lock key in Consul.
Arguments:
---------
queue (str): The name of the queue we want to release
the lock from.
"""
@ -182,6 +188,7 @@ class Channel(virtual.Channel):
Will release all locks it still might hold.
Arguments:
---------
queue (str): The name of the Queue.
"""
logger.debug('Destroying session %s', self.queues[queue]['session_id'])

View File

@ -78,6 +78,7 @@ class Channel(virtual.Channel):
"""Create and return the `queue` with the proper prefix.
Arguments:
---------
queue (str): The name of the queue.
"""
return f'{self.prefix}/{queue}'
@ -93,6 +94,7 @@ class Channel(virtual.Channel):
means that they have to wait before the lock is released.
Arguments:
---------
queue (str): The name of the queue.
"""
lock = etcd.Lock(self.client, queue)
@ -109,6 +111,7 @@ class Channel(virtual.Channel):
"""Create a new `queue` if the `queue` doesn't already exist.
Arguments:
---------
queue (str): The name of the queue.
"""
self.queues[queue] = queue
@ -123,7 +126,8 @@ class Channel(virtual.Channel):
def _has_queue(self, queue, **kwargs):
"""Verify that queue exists.
Returns:
Returns
-------
bool: Should return :const:`True` if the queue exists
or :const:`False` otherwise.
"""
@ -137,6 +141,7 @@ class Channel(virtual.Channel):
"""Delete a `queue`.
Arguments:
---------
queue (str): The name of the queue.
"""
self.queues.pop(queue, None)
@ -148,6 +153,7 @@ class Channel(virtual.Channel):
This simply writes a key to the Etcd store
Arguments:
---------
queue (str): The name of the queue.
payload (dict): Message data which will be dumped to etcd.
"""
@ -166,6 +172,7 @@ class Channel(virtual.Channel):
only one node reads at the same time. This is for read consistency
Arguments:
---------
queue (str): The name of the queue.
timeout (int): Optional seconds to wait for a response.
"""
@ -196,6 +203,7 @@ class Channel(virtual.Channel):
"""Remove all `message`s from a `queue`.
Arguments:
---------
queue (str): The name of the queue.
"""
with self._queue_lock(queue):
@ -207,6 +215,7 @@ class Channel(virtual.Channel):
"""Return the size of the `queue`.
Arguments:
---------
queue (str): The name of the queue.
"""
with self._queue_lock(queue):

View File

@ -448,6 +448,7 @@ class Channel(virtual.Channel):
"""Get expiration header named `argument` of queue definition.
Note:
----
`queue` must be either queue name or options itself.
"""
if isinstance(queue, str):

View File

@ -1362,7 +1362,7 @@ class SentinelChannel(Channel):
* `master_name` - name of the redis group to poll
Example:
-------
.. code-block:: python
>>> import kombu

View File

@ -20,8 +20,9 @@ Connection String
sqlalchemy+SQL_ALCHEMY_CONNECTION_STRING
For details about ``SQL_ALCHEMY_CONNECTION_STRING`` see SQLAlchemy Engine Configuration documentation.
Examples:
Examples
--------
.. code-block::
# PostgreSQL with default driver

View File

@ -159,6 +159,7 @@ class QoS:
Only supports `prefetch_count` at this point.
Arguments:
---------
channel (ChannelT): Connection channel.
prefetch_count (int): Initial prefetch count (defaults to 0).
"""
@ -211,7 +212,8 @@ class QoS:
bulk 'get message' calls are preferred to many individual 'get message'
calls - like SQS.
Returns:
Returns
-------
int: greater than zero.
"""
pcount = self.prefetch_count
@ -273,6 +275,7 @@ class QoS:
"""Restore all unacknowledged messages at shutdown/gc collect.
Note:
----
Can only be called once for each instance, subsequent
calls will be ignored.
"""
@ -306,6 +309,7 @@ class QoS:
To be filled in for visibility_timeout style implementations.
Note:
----
This is implementation optional, and currently only
used by the Redis transport.
"""
@ -355,6 +359,7 @@ class AbstractChannel:
you'd usually want to implement in a virtual channel.
Note:
----
Do not subclass directly, but rather inherit
from :class:`Channel`.
"""
@ -379,6 +384,7 @@ class AbstractChannel:
"""Delete `queue`.
Note:
----
This just purges the queue, if you need to do more you can
override this method.
"""
@ -388,6 +394,7 @@ class AbstractChannel:
"""Create new queue.
Note:
----
Your transport can override this method if it needs
to do something whenever a new queue is declared.
"""
@ -395,7 +402,8 @@ class AbstractChannel:
def _has_queue(self, queue, **kwargs):
"""Verify that queue exists.
Returns:
Returns
-------
bool: Should return :const:`True` if the queue exists
or :const:`False` otherwise.
"""
@ -414,6 +422,7 @@ class Channel(AbstractChannel, base.StdChannel):
"""Virtual channel.
Arguments:
---------
connection (ConnectionT): The transport instance this
channel is part of.
"""
@ -675,6 +684,7 @@ class Channel(AbstractChannel, base.StdChannel):
"""Change QoS settings for this channel.
Note:
----
Only `prefetch_count` is supported.
"""
self.qos.prefetch_count = prefetch_count
@ -697,7 +707,8 @@ class Channel(AbstractChannel, base.StdChannel):
def _lookup(self, exchange, routing_key, default=None):
"""Find all queues matching `routing_key` for the given `exchange`.
Returns:
Returns
-------
list[str]: queue names -- must return `[default]`
if default is set and no queues matched.
"""
@ -765,7 +776,8 @@ class Channel(AbstractChannel, base.StdChannel):
def flow(self, active=True):
"""Enable/disable message flow.
Raises:
Raises
------
NotImplementedError: as flow
is not implemented by the base virtual implementation.
"""
@ -838,6 +850,7 @@ class Channel(AbstractChannel, base.StdChannel):
The value is limited to within a boundary of 0 to 9.
Note:
----
Higher value has more priority.
"""
try:
@ -887,6 +900,7 @@ class Transport(base.Transport):
"""Virtual transport.
Arguments:
---------
client (kombu.Connection): The client this is a transport for.
"""

View File

@ -17,6 +17,7 @@ class ExchangeType:
Implements the specifics for an exchange type.
Arguments:
---------
channel (ChannelT): AMQ Channel.
"""
@ -28,7 +29,8 @@ class ExchangeType:
def lookup(self, table, exchange, routing_key, default):
"""Lookup all queues matching `routing_key` in `exchange`.
Returns:
Returns
-------
str: queue name, or 'default' if no queues matched.
"""
raise NotImplementedError('subclass responsibility')
@ -36,7 +38,8 @@ class ExchangeType:
def prepare_bind(self, queue, exchange, routing_key, arguments):
"""Prepare queue-binding.
Returns:
Returns
-------
Tuple[str, Pattern, str]: of `(routing_key, regex, queue)`
to be stored for bindings to this exchange.
"""
@ -137,7 +140,8 @@ class FanoutExchange(ExchangeType):
attribute is set to true, and the `Channel._queue_bind` and
`Channel.get_table` methods are implemented.
See Also:
See Also
--------
the redis backend for an example implementation of these methods.
"""

View File

@ -45,6 +45,7 @@ class LRUCache(UserDict):
"""LRU Cache implementation using a doubly linked list to track access.
Arguments:
---------
limit (int): The maximum number of keys to keep in the cache.
When a new key is inserted and the limit has been exceeded,
the *Least Recently Used* key will be discarded from the
@ -221,6 +222,7 @@ def is_list(obj, scalars=(Mapping, str), iters=(Iterable,)):
"""Return true if the object is iterable.
Note:
----
Returns false if object is a mapping or string.
"""
return isinstance(obj, iters) and not isinstance(obj, scalars or ())
@ -279,11 +281,13 @@ def retry_over_time(fun, catch, args=None, kwargs=None, errback=None,
is increased for every retry until the max seconds is reached.
Arguments:
---------
fun (Callable): The function to try
catch (Tuple[BaseException]): Exceptions to catch, can be either
tuple or a single exception class.
Keyword Arguments:
-----------------
args (Tuple): Positional arguments passed on to the function.
kwargs (Dict): Keyword arguments passed on to the function.
errback (Callable): Callback for when an exception in ``catch``

View File

@ -28,7 +28,8 @@ def symbol_by_name(name, aliases=None, imp=None, package=None,
If `aliases` is provided, a dict containing short name/long name
mappings, the name is looked up in the aliases first.
Examples:
Examples
--------
>>> symbol_by_name('celery.concurrency.processes.TaskPool')
<class 'celery.concurrency.processes.TaskPool'>

View File

@ -11,13 +11,15 @@ __all__ = ('TokenBucket',)
class TokenBucket:
"""Token Bucket Algorithm.
See Also:
See Also
--------
https://en.wikipedia.org/wiki/Token_Bucket
Most of this code was stolen from an entry in the ASPN Python Cookbook:
https://code.activestate.com/recipes/511490/
Warning:
-------
Thread Safety: This implementation is not thread safe.
Access to a `TokenBucket` instance should occur within the critical
section of any multithreaded code.
@ -51,7 +53,8 @@ class TokenBucket:
def can_consume(self, tokens=1):
"""Check if one or more tokens can be consumed.
Returns:
Returns
-------
bool: true if the number of tokens can be consumed
from the bucket. If they can be consumed, a call will also
consume the requested number of tokens from the bucket.
@ -67,7 +70,8 @@ class TokenBucket:
def expected_time(self, tokens=1):
"""Return estimated time of token availability.
Returns:
Returns
-------
float: the time in seconds.
"""
_tokens = self._get_tokens()

View File

@ -24,6 +24,7 @@ class FairCycle:
an equal chance to be consumed from.
Arguments:
---------
fun (Callable): Callback to call.
resources (Sequence[Any]): List of resources.
predicate (type): Exception predicate.

View File

@ -22,7 +22,8 @@ def escape_regex(p, white=''):
def fmatch_iter(needle: str, haystack: Iterable[str], min_ratio: float = 0.6) -> Iterator[tuple[float, str]]:
"""Fuzzy match: iteratively.
Yields:
Yields
------
Tuple: of ratio and key.
"""
for key in haystack:

View File

@ -8,7 +8,8 @@ from uuid import UUID, uuid4
def uuid(_uuid: Callable[[], UUID] = uuid4) -> str:
"""Generate unique id in UUID4 format.
See Also:
See Also
--------
For now this is provided by :func:`uuid.uuid4`.
"""
return str(_uuid())

View File

@ -4,5 +4,5 @@ flake8==5.0.4
tox>=4.4.8
sphinx2rst>=1.0
bumpversion
pydocstyle==1.1.1
pydocstyle==6.3.0
mypy==1.2.0

View File

@ -48,7 +48,7 @@ files =
[pep257]
ignore = D102,D104,D203,D105,D213
ignore = D102,D107,D104,D203,D105,D213,D401,D413,D417
[bdist_rpm]
requires = amqp >= 5.

View File

@ -45,9 +45,9 @@ basepython =
pypy3.9: pypy3.9
pypy3.8: pypy3.8
3.7: python3.7
3.8,mypy: python3.8
3.9,apicheck,pydocstyle,flake8,linkcheck,cov: python3.9
3.10: python3.10
3.8: python3.8
3.9: python3.9
3.10,apicheck,pydocstyle,flake8,linkcheck,cov,mypy: python3.10
3.11: python3.11
install_command = python -m pip --disable-pip-version-check install {opts} {packages}