* Add more logs
* Launch _on_connection_disconnect in Conection only if channel was added properly to the poller
* Prepare test which check the flow of the channel removal from poller
* Change the comment
* ConnectionPool can't be used after .resize(..., reset=True) (resolves#2018)
* Update kombu/resource.py
---------
Co-authored-by: Asif Saif Uddin <auvipy@gmail.com>
When broker send Basic.cancel notification, py-amqp default behavior is
to raise a ConsumerCancelled exception. It is then treated as an error
even if connection and channel are still operationnal and connection
is closed.
Basic.cancel may be sent by broker when a queue is deleted or when
replicated queue leader change. py-amqp channel.basic_consume allow to
define a callback function on this event. It may be useful to register
this callback from kombu when consuming from a queue.
(cherry picked from commit e1fa168ace)
Signed-off-by: julien.cosmao <julien.cosmao@ovhcloud.com>
Co-authored-by: julien.cosmao <julien.cosmao@ovhcloud.com>
* Fix partial closure error when instantiating redis pool for a global_keyprefix sentinel connection
* add tests for redis sentinel connection with global_keyprefix setting
* StrictRedis is deprecated, use Redis class
* add some resource cleanup
* add some resource cleanup
* Use the correct protocol for SQS requests
TL;DR - The use of boto3 in #1759 resulted in relying on blocking
(synchronous) HTTP requests, which caused the performance issue reported
in #1783.
`kombu` previously used to craft AWS requests manually as explained in
detail in #1726, which resulted in an outage when botocore temporarily
changed the default protocol to JSON (before rolling back due to the
impact on celery and airflow.) To fix the issue, I submitted #1759,
which changes `kombu` to use `boto3` instead of manually crafting AWS
requests. This way when boto3 changes the default protocol, kombu won't
be impacted.
While working on #1759, I did extensive debugging to understand the
multi-threading nature of kombu. What I discovered is that there isn't
an actual multi-threading in the true sense of the word, but an event
loop that runs on the same thread and process and orchestrate the
communication with SQS. As such, it didn't appear to me that there is
anything to worry about my change, and the testing I did didn't discover
any issue. However, it turns out that while kombu's event loop doesn't
have actual multi-threading, its [reliance on
pycurl](https://github.com/celery/kombu/blob/main/kombu/asynchronous/http/curl.py#L48)
(and thus libcurl) meant that the requests to AWS were being done
asynchronously. On the other hand, boto3 requests are always done
synchronously, i.e. they are blocking requests.
The above meant that my change introduced blocking on the event loop of
kombu. This is fine in most of the cases, since the requests to SQS are
pretty fast. However, in the case of using long-polling, a call to SQS's
ReceiveMessage can last up to 20 seconds (depending on the user
configuration).
To solve this problem, I rolled back my earlier changes and, instead, to
address the issue reported in #1726, I now changed the
`AsyncSQSConnection` class such that it crafts either a `query` or a
`json` request depending on the protocol used by the SQS client. Thus,
when botocore changes the default protocol of SQS to JSON, kombu won't
be impacted, since it crafts its own request and, after my change, it
uses a hard-coded protocol based on the crafted requests.
This solution shouldn't be the final solution, and it is more of a
workaround that does the job for now. The final solution should be to
completely rely on boto3 for any communication with AWS, and ensuring
that all requests are async in nature (non-blocking.) This, however, is
a fundamental change that requires a lot of testing, in particular
performance testing.
* [pre-commit.ci] auto fixes from pre-commit.com hooks
for more information, see https://pre-commit.ci
* Update kombu/asynchronous/aws/sqs/connection.py
* [pre-commit.ci] auto fixes from pre-commit.com hooks
for more information, see https://pre-commit.ci
---------
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: Asif Saif Uddin <auvipy@gmail.com>
* added Python 3.12 in the CI
* try to make kafka work on py3.12
* skip kafka for the time being as it seems not woring with py3.12 yet
* using assert_called_once()
* Create a lock on cached_property if not present
This fixes#1804 (fixing breakage caused by use of undocumented
implementation details of functools.cached_property) by ensuring a lock
is always present on cached_property attributes, which is required to
safely support setting and deleting cached values in addition to
computing them on demand.
* Add a unit test for cached_property locking
* azure servicebus: use DefaultAzureCredential in documentation
* azure servicebus: only use connection string when using sas key
* azure servicebus: add two small tests for paring of connection string
* azure servicebus: fix lint issues
* [fix#1726] Use boto3 for SQS async requests
* [pre-commit.ci] auto fixes from pre-commit.com hooks
for more information, see https://pre-commit.ci
---------
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
* Fix#1618: avoid re-fetching queue URL when we already have it
`_get_from_sqs` was unnecessarily calling `get_queue_url` every
time even though the only place which calls `_get_from_sqs`
(that is `_get_async`) actually already knows the queue URL.
This change avoids hundreds of `GetQueueUrl` AWS API calls per hour
when using this SQS backend with celery.
Also `connection` is set by the one-and-only caller (and `queue` is
actually the queue name string now anyway so couldn't ever have
`.connection`) so remove the None default and unused fallback code.
* Clarify that `_new_queue` returns the queue URL
It seems that prior to 129a9e4ed0 it returned a queue
object but this is no longer the case so update comments
variable names accordingly to make it clearer.
Also remove the incorrect fallback which cannot
be correct any more given the return value has to
be the queue URL which must be a string.
* Unit test coverage for SQS async codepath
This key code path (which as far as I can see is
the main route when using celery with SQS) was
missing test coverage.
This test adds coverage for:
`_get_bulk_async` -> `_get_async` -> `_get_from_sqs`
* Main
* [pre-commit.ci] auto fixes from pre-commit.com hooks
for more information, see https://pre-commit.ci
* Fix
* [pre-commit.ci] auto fixes from pre-commit.com hooks
for more information, see https://pre-commit.ci
* Fix
* Trigger Build
* Fix
* [pre-commit.ci] auto fixes from pre-commit.com hooks
for more information, see https://pre-commit.ci
* Fix
* Fix
* noqas
* [pre-commit.ci] auto fixes from pre-commit.com hooks
for more information, see https://pre-commit.ci
* Fix
* [pre-commit.ci] auto fixes from pre-commit.com hooks
for more information, see https://pre-commit.ci
* remove unused noqa
* [pre-commit.ci] auto fixes from pre-commit.com hooks
for more information, see https://pre-commit.ci
* re-add import
* [pre-commit.ci] auto fixes from pre-commit.com hooks
for more information, see https://pre-commit.ci
* Fix
* [pre-commit.ci] auto fixes from pre-commit.com hooks
for more information, see https://pre-commit.ci
* Fixes
* [pre-commit.ci] auto fixes from pre-commit.com hooks
for more information, see https://pre-commit.ci
* use pytest-freezer (#1683)
* Main
* Trigger Build
* Fixes
* remove
* [pre-commit.ci] auto fixes from pre-commit.com hooks
for more information, see https://pre-commit.ci
* lint
* Lint
---------
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: Asif Saif Uddin <auvipy@gmail.com>
* Added unit test: test_ensure_retry_errors_is_not_looping_infinitely()
* Added unit test: test_ensure_retry_errors_is_limited_by_max_retries()
* Added retry_errors arg to Connection.ensure() to allow applying retry policy for specific errors additionally
* Parse credential as a dict when using Azurite emulator
This more flexible credential allows the use of Azurite for integration testing in local docker-compose configurations.
* [pre-commit.ci] auto fixes from pre-commit.com hooks
for more information, see https://pre-commit.ci
* Fix some lint errors
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
* Add separate transport option for retry loop timeout
This only applies when using `Connect.default_channel`.
Before this change, the retry loop timeout was set equal to TCP connect timeout (`connect_timeout`), meaning when first connection attempt timeouted, no retry would be attempted.
Now if a new transport option `connect_total_timeout` is provided, this overrides `connect_timeout` for the retry loop (but not for TCP connect).
* Add tests
* Fix isort
* Rename to connect_retry_timeout
* Reformat
* connect_retry_timeout -> connect_retries_timeout
* Fix flake8
* Solve Kombu filesystem transport not thread safe
fix: #398
Currently only write lock used in msg/exchange file written. Cause
reading in other thread got some incomplete result.
1. Add timeout for the lock acquire.
2. Add Share locks when reading message from filesystem.
3. Add a unit test for the `lock` and `unlock`
4. Add a unit test to test the lock during message processing.
* Replace deprecated function.
* hub: tick delay fix
todo and timer callbacks can perform actions that
require a tick callback to be executed right away
without polling.
the current order can cause issues
when using single worker with no prefetch (acks late).
related issue in celery:
https://github.com/celery/celery/issues/7718
* add unit test for hub delay fix
* [pre-commit.ci] auto fixes from pre-commit.com hooks
for more information, see https://pre-commit.ci
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
* Avoid losing type of UUID when serializing/deserializing
Serializing UUIDs as strs and deserializing them as strs can lead to
somewhat obscure bugs such as the one that led to the creation of this
PR in django-cacheback
https://github.com/codeinthehole/django-cacheback/pull/100 which some
would call unexpected behaviour.
After all, an UUID is altogether a different type from strs and if
bytes got their own serializer/deserializer for UUID the same logic
should apply.
* [pre-commit.ci] auto fixes from pre-commit.com hooks
for more information, see https://pre-commit.ci
* Update documentation for JSON serialization
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
* Add DelaySeconds to kwargs
* Add test
* add default value for DelaySeconds
* Fix tests and add check for properties
* Fix flake8 style issue
Co-authored-by: Edmund Lam <2623895+edmundlam@users.noreply.github.com>
* Datetime serialization and deserialization fixed
* Unit test fixed
* Unit test fixed
* Fixed pylint
* Added Undocumented Autodoc Modules
* Update kombu/utils/json.py
Co-authored-by: Omer Katz <omer.katz@omerkatz.com>
* [pre-commit.ci] auto fixes from pre-commit.com hooks
for more information, see https://pre-commit.ci
* Clean and freeze now
* Clean and freeze now
* Clean and freeze now
* Clean and freeze now
Co-authored-by: Asif Saif Uddin <auvipy@gmail.com>
Co-authored-by: Omer Katz <omer.katz@omerkatz.com>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
* Annotate `abstract.py`
* [pre-commit.ci] auto fixes from pre-commit.com hooks
for more information, see https://pre-commit.ci
* apply pre-commit
* Use quotes
* Add typing_extensions as requirement
* Add quotes
* Add quotes
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
* updated azurestoragequeues transport for azure-storage-queues v12 + added basic tests
* fixed flake8 issues
* pinned azure-storage-queue lib to >= v12.0.0
* [pre-commit.ci] auto fixes from pre-commit.com hooks
for more information, see https://pre-commit.ci
* azure-storage-queue>=12.2.0
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: Asif Saif Uddin <auvipy@gmail.com>