* Replaced `mktemp` with `mkstemp`
Signed-off-by: fazledyn-or <ataf@openrefactory.com>
* [pre-commit.ci] auto fixes from pre-commit.com hooks
for more information, see https://pre-commit.ci
---------
Signed-off-by: fazledyn-or <ataf@openrefactory.com>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
* Use the correct protocol for SQS requests
TL;DR - The use of boto3 in #1759 resulted in relying on blocking
(synchronous) HTTP requests, which caused the performance issue reported
in #1783.
`kombu` previously used to craft AWS requests manually as explained in
detail in #1726, which resulted in an outage when botocore temporarily
changed the default protocol to JSON (before rolling back due to the
impact on celery and airflow.) To fix the issue, I submitted #1759,
which changes `kombu` to use `boto3` instead of manually crafting AWS
requests. This way when boto3 changes the default protocol, kombu won't
be impacted.
While working on #1759, I did extensive debugging to understand the
multi-threading nature of kombu. What I discovered is that there isn't
an actual multi-threading in the true sense of the word, but an event
loop that runs on the same thread and process and orchestrate the
communication with SQS. As such, it didn't appear to me that there is
anything to worry about my change, and the testing I did didn't discover
any issue. However, it turns out that while kombu's event loop doesn't
have actual multi-threading, its [reliance on
pycurl](https://github.com/celery/kombu/blob/main/kombu/asynchronous/http/curl.py#L48)
(and thus libcurl) meant that the requests to AWS were being done
asynchronously. On the other hand, boto3 requests are always done
synchronously, i.e. they are blocking requests.
The above meant that my change introduced blocking on the event loop of
kombu. This is fine in most of the cases, since the requests to SQS are
pretty fast. However, in the case of using long-polling, a call to SQS's
ReceiveMessage can last up to 20 seconds (depending on the user
configuration).
To solve this problem, I rolled back my earlier changes and, instead, to
address the issue reported in #1726, I now changed the
`AsyncSQSConnection` class such that it crafts either a `query` or a
`json` request depending on the protocol used by the SQS client. Thus,
when botocore changes the default protocol of SQS to JSON, kombu won't
be impacted, since it crafts its own request and, after my change, it
uses a hard-coded protocol based on the crafted requests.
This solution shouldn't be the final solution, and it is more of a
workaround that does the job for now. The final solution should be to
completely rely on boto3 for any communication with AWS, and ensuring
that all requests are async in nature (non-blocking.) This, however, is
a fundamental change that requires a lot of testing, in particular
performance testing.
* [pre-commit.ci] auto fixes from pre-commit.com hooks
for more information, see https://pre-commit.ci
* Update kombu/asynchronous/aws/sqs/connection.py
* [pre-commit.ci] auto fixes from pre-commit.com hooks
for more information, see https://pre-commit.ci
---------
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: Asif Saif Uddin <auvipy@gmail.com>
* added Python 3.12 in the CI
* try to make kafka work on py3.12
* skip kafka for the time being as it seems not woring with py3.12 yet
* using assert_called_once()
* Create a lock on cached_property if not present
This fixes#1804 (fixing breakage caused by use of undocumented
implementation details of functools.cached_property) by ensuring a lock
is always present on cached_property attributes, which is required to
safely support setting and deleting cached values in addition to
computing them on demand.
* Add a unit test for cached_property locking
* azure servicebus: use DefaultAzureCredential in documentation
* azure servicebus: only use connection string when using sas key
* azure servicebus: add two small tests for paring of connection string
* azure servicebus: fix lint issues
* [fix#1726] Use boto3 for SQS async requests
* [pre-commit.ci] auto fixes from pre-commit.com hooks
for more information, see https://pre-commit.ci
---------
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Consider the following piece of code, very similar to what can be found
in kombu/utils/objects.py:
---8<------------------------------------------------------------------
$ cat /tmp/x.py
try:
from functools import _NOT_FOUND
from functools import cached_property as _cached_property
except ImportError:
from cached_property import threaded_cached_property as _cached_property
_NOT_FOUND = object()
print("OK!")
---8<------------------------------------------------------------------
This works well in Python3.11:
---8<------------------------------------------------------------------
$ podman run -it --rm -v /tmp:/tmp python:3.11.4 python /tmp/x.py
OK!
---8<------------------------------------------------------------------
But fails in Python3.12:
---8<------------------------------------------------------------------
$ podman run -it --rm -v /tmp:/tmp python:3.12.0b2 python /tmp/x.py
Traceback (most recent call last):
File "/tmp/x.py", line 2, in <module>
from functools import _NOT_FOUND
ImportError: cannot import name '_NOT_FOUND' from 'functools' (/usr/local/lib/python3.12/functools.py)
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/tmp/x.py", line 6, in <module>
from cached_property import threaded_cached_property as _cached_property
ModuleNotFoundError: No module named 'cached_property'
---8<------------------------------------------------------------------
This is because Python3.12 removed functools._NOT_FOUND (see commit
056dfc71dce15f81887f0bd6da09d6099d71f979), which prevents
cached_property from being imported from functools in our code. If the
cached_property library is not installed, then the imports fail.
We should be using two different try/except blocks, but since
functools._NOT_FOUND was defined as "object()" in the standard library
anyway, let's just not bother importing it.