diff --git a/.coveragerc b/.coveragerc index 4d450b1c..75cf0c1f 100644 --- a/.coveragerc +++ b/.coveragerc @@ -9,7 +9,7 @@ omit = */python?.?/* */site-packages/* */pypy/* - *kombu/async/http/curl.py + *kombu/async/http/urllib3_client.py *kombu/five.py *kombu/transport/mongodb.py *kombu/transport/filesystem.py diff --git a/.github/workflows/linter.yml b/.github/workflows/linter.yml index 966045d8..0e145bdf 100644 --- a/.github/workflows/linter.yml +++ b/.github/workflows/linter.yml @@ -20,7 +20,7 @@ jobs: python-version: ["3.12"] steps: - name: Install system packages - run: sudo apt-get update && sudo apt-get install libcurl4-openssl-dev libssl-dev + run: sudo apt-get update && sudo apt-get install libssl-dev - name: Check out code from GitHub uses: actions/checkout@v4 - name: Set up Python ${{ matrix.python-version }} diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index aed2e12e..81e82652 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -39,7 +39,7 @@ jobs: steps: - name: Install apt packages if: startsWith(matrix.os, 'blacksmith-4vcpu-ubuntu') - run: sudo apt-get update && sudo apt-get install libcurl4-openssl-dev libssl-dev + run: sudo apt-get update && sudo apt-get install libssl-dev - uses: actions/checkout@v4 - name: Set up Python ${{ matrix.python-version }} uses: useblacksmith/setup-python@v6 @@ -98,7 +98,7 @@ jobs: steps: - name: Install apt packages - run: sudo apt-get update && sudo apt-get install libcurl4-openssl-dev libssl-dev + run: sudo apt-get update && sudo apt-get install libssl-dev - uses: actions/checkout@v4 - name: Set up Python ${{ matrix.python-version }} diff --git a/docs/reference/index.rst b/docs/reference/index.rst index 8a79c2dd..bbf60aa9 100644 --- a/docs/reference/index.rst +++ b/docs/reference/index.rst @@ -71,7 +71,7 @@ Kombu Asynchronous kombu.asynchronous.debug kombu.asynchronous.http kombu.asynchronous.http.base - kombu.asynchronous.http.curl + kombu.asynchronous.http.urllib3_client kombu.asynchronous.aws kombu.asynchronous.aws.connection kombu.asynchronous.aws.sqs diff --git a/docs/reference/kombu.asynchronous.http.urllib3_client.rst b/docs/reference/kombu.asynchronous.http.urllib3_client.rst new file mode 100644 index 00000000..46d4e1af --- /dev/null +++ b/docs/reference/kombu.asynchronous.http.urllib3_client.rst @@ -0,0 +1,11 @@ +============================================================ + Urllib3 HTTP Client Pool - ``kombu.asynchronous.http.urllib3_client`` +============================================================ + +.. contents:: + :local: +.. currentmodule:: kombu.asynchronous.http.urllib3_client + +.. automodule:: kombu.asynchronous.http.urllib3_client + :members: + :undoc-members: diff --git a/kombu/asynchronous/aws/connection.py b/kombu/asynchronous/aws/connection.py index df7d3b24..88d3d309 100644 --- a/kombu/asynchronous/aws/connection.py +++ b/kombu/asynchronous/aws/connection.py @@ -7,7 +7,7 @@ from email.mime.message import MIMEMessage from vine import promise, transform -from kombu.asynchronous.aws.ext import AWSRequest, get_response +from kombu.asynchronous.aws.ext import AWSRequest, get_cert_path, get_response from kombu.asynchronous.http import Headers, Request, get_client @@ -92,7 +92,8 @@ class AsyncHTTPSConnection: headers = Headers(self.headers) return self.Request(self.path, method=self.method, headers=headers, body=self.body, connect_timeout=self.timeout, - request_timeout=self.timeout, validate_cert=False) + request_timeout=self.timeout, + validate_cert=True, ca_certs=get_cert_path(True)) def getresponse(self, callback=None): request = self.getrequest() diff --git a/kombu/asynchronous/aws/ext.py b/kombu/asynchronous/aws/ext.py index 1fa4a57e..d82e48f3 100644 --- a/kombu/asynchronous/aws/ext.py +++ b/kombu/asynchronous/aws/ext.py @@ -6,6 +6,7 @@ try: import boto3 from botocore import exceptions from botocore.awsrequest import AWSRequest + from botocore.httpsession import get_cert_path from botocore.response import get_response except ImportError: boto3 = None @@ -19,8 +20,9 @@ except ImportError: exceptions.BotoCoreError = BotoCoreError AWSRequest = _void() get_response = _void() + get_cert_path = _void() __all__ = ( - 'exceptions', 'AWSRequest', 'get_response' + 'exceptions', 'AWSRequest', 'get_response', 'get_cert_path', ) diff --git a/kombu/asynchronous/http/__init__.py b/kombu/asynchronous/http/__init__.py index 67d8b219..de33bb92 100644 --- a/kombu/asynchronous/http/__init__.py +++ b/kombu/asynchronous/http/__init__.py @@ -1,24 +1,19 @@ from __future__ import annotations -from typing import TYPE_CHECKING - from kombu.asynchronous import get_event_loop -from kombu.asynchronous.http.base import Headers, Request, Response +from kombu.asynchronous.http.base import BaseClient, Headers, Request, Response from kombu.asynchronous.hub import Hub -if TYPE_CHECKING: - from kombu.asynchronous.http.curl import CurlClient - -__all__ = ('Client', 'Headers', 'Response', 'Request') +__all__ = ('Client', 'Headers', 'Response', 'Request', 'get_client') -def Client(hub: Hub | None = None, **kwargs: int) -> CurlClient: +def Client(hub: Hub | None = None, **kwargs: int) -> BaseClient: """Create new HTTP client.""" - from .curl import CurlClient - return CurlClient(hub, **kwargs) + from .urllib3_client import Urllib3Client + return Urllib3Client(hub, **kwargs) -def get_client(hub: Hub | None = None, **kwargs: int) -> CurlClient: +def get_client(hub: Hub | None = None, **kwargs: int) -> BaseClient: """Get or create HTTP client bound to the current event loop.""" hub = hub or get_event_loop() try: diff --git a/kombu/asynchronous/http/base.py b/kombu/asynchronous/http/base.py index 345e07f6..3d9fe213 100644 --- a/kombu/asynchronous/http/base.py +++ b/kombu/asynchronous/http/base.py @@ -16,7 +16,7 @@ from kombu.utils.functional import maybe_list, memoize if TYPE_CHECKING: from types import TracebackType -__all__ = ('Headers', 'Response', 'Request') +__all__ = ('Headers', 'Response', 'Request', 'BaseClient') PYPY = hasattr(sys, 'pypy_version_info') @@ -236,6 +236,12 @@ def header_parser(keyt=normalize_header): class BaseClient: + """Base class for HTTP clients. + + This class provides the basic structure and functionality for HTTP clients. + Subclasses should implement specific HTTP client behavior. + """ + Headers = Headers Request = Request Response = Response diff --git a/kombu/asynchronous/http/curl.py b/kombu/asynchronous/http/curl.py deleted file mode 100644 index 6f879fa9..00000000 --- a/kombu/asynchronous/http/curl.py +++ /dev/null @@ -1,289 +0,0 @@ -"""HTTP Client using pyCurl.""" - -from __future__ import annotations - -from collections import deque -from functools import partial -from io import BytesIO -from time import time - -from kombu.asynchronous.hub import READ, WRITE, Hub, get_event_loop -from kombu.exceptions import HttpError -from kombu.utils.encoding import bytes_to_str - -from .base import BaseClient - -try: - import pycurl -except ImportError: # pragma: no cover - pycurl = Curl = METH_TO_CURL = None -else: - from pycurl import Curl - - METH_TO_CURL = { - 'GET': pycurl.HTTPGET, - 'POST': pycurl.POST, - 'PUT': pycurl.UPLOAD, - 'HEAD': pycurl.NOBODY, - } - -__all__ = ('CurlClient',) - -DEFAULT_USER_AGENT = 'Mozilla/5.0 (compatible; pycurl)' -EXTRA_METHODS = frozenset(['DELETE', 'OPTIONS', 'PATCH']) - - -class CurlClient(BaseClient): - """Curl HTTP Client.""" - - Curl = Curl - - def __init__(self, hub: Hub | None = None, max_clients: int = 10): - if pycurl is None: - raise ImportError('The curl client requires the pycurl library.') - hub = hub or get_event_loop() - super().__init__(hub) - self.max_clients = max_clients - - self._multi = pycurl.CurlMulti() - self._multi.setopt(pycurl.M_TIMERFUNCTION, self._set_timeout) - self._multi.setopt(pycurl.M_SOCKETFUNCTION, self._handle_socket) - self._curls = [self.Curl() for i in range(max_clients)] - self._free_list = self._curls[:] - self._pending = deque() - self._fds = {} - - self._socket_action = self._multi.socket_action - self._timeout_check_tref = self.hub.call_repeatedly( - 1.0, self._timeout_check, - ) - - # pycurl 7.29.0 workaround - dummy_curl_handle = pycurl.Curl() - self._multi.add_handle(dummy_curl_handle) - self._multi.remove_handle(dummy_curl_handle) - - def close(self): - self._timeout_check_tref.cancel() - for _curl in self._curls: - _curl.close() - self._multi.close() - - def add_request(self, request): - self._pending.append(request) - self._process_queue() - self._set_timeout(0) - return request - - # the next two methods are used for linux/epoll workaround: - # we temporarily remove all curl fds from hub, so curl cannot - # close a fd which is still inside epoll - def _pop_from_hub(self): - for fd in self._fds: - self.hub.remove(fd) - - def _push_to_hub(self): - for fd, events in self._fds.items(): - if events & READ: - self.hub.add_reader(fd, self.on_readable, fd) - if events & WRITE: - self.hub.add_writer(fd, self.on_writable, fd) - - def _handle_socket(self, event, fd, multi, data, _pycurl=pycurl): - if event == _pycurl.POLL_REMOVE: - if fd in self._fds: - self._fds.pop(fd, None) - else: - if event == _pycurl.POLL_IN: - self._fds[fd] = READ - elif event == _pycurl.POLL_OUT: - self._fds[fd] = WRITE - elif event == _pycurl.POLL_INOUT: - self._fds[fd] = READ | WRITE - - def _set_timeout(self, msecs): - self.hub.call_later(msecs, self._timeout_check) - - def _timeout_check(self, _pycurl=pycurl): - self._pop_from_hub() - try: - while 1: - try: - ret, _ = self._multi.socket_all() - except pycurl.error as exc: - ret = exc.args[0] - if ret != _pycurl.E_CALL_MULTI_PERFORM: - break - finally: - self._push_to_hub() - self._process_pending_requests() - - def on_readable(self, fd, _pycurl=pycurl): - return self._on_event(fd, _pycurl.CSELECT_IN) - - def on_writable(self, fd, _pycurl=pycurl): - return self._on_event(fd, _pycurl.CSELECT_OUT) - - def _on_event(self, fd, event, _pycurl=pycurl): - self._pop_from_hub() - try: - while 1: - try: - ret, _ = self._socket_action(fd, event) - except pycurl.error as exc: - ret = exc.args[0] - if ret != _pycurl.E_CALL_MULTI_PERFORM: - break - finally: - self._push_to_hub() - self._process_pending_requests() - - def _process_pending_requests(self): - while 1: - q, succeeded, failed = self._multi.info_read() - for curl in succeeded: - self._process(curl) - for curl, errno, reason in failed: - self._process(curl, errno, reason) - if q == 0: - break - self._process_queue() - - def _process_queue(self): - while 1: - started = 0 - while self._free_list and self._pending: - started += 1 - curl = self._free_list.pop() - request = self._pending.popleft() - headers = self.Headers() - buf = BytesIO() - curl.info = { - 'headers': headers, - 'buffer': buf, - 'request': request, - 'curl_start_time': time(), - } - self._setup_request(curl, request, buf, headers) - self._multi.add_handle(curl) - if not started: - break - - def _process(self, curl, errno=None, reason=None, _pycurl=pycurl): - info, curl.info = curl.info, None - self._multi.remove_handle(curl) - self._free_list.append(curl) - buffer = info['buffer'] - if errno: - code = 599 - error = HttpError(code, reason) - error.errno = errno - effective_url = None - buffer.close() - buffer = None - else: - error = None - code = curl.getinfo(_pycurl.HTTP_CODE) - effective_url = curl.getinfo(_pycurl.EFFECTIVE_URL) - buffer.seek(0) - # try: - request = info['request'] - request.on_ready(self.Response( - request=request, code=code, headers=info['headers'], - buffer=buffer, effective_url=effective_url, error=error, - )) - - def _setup_request(self, curl, request, buffer, headers, _pycurl=pycurl): - setopt = curl.setopt - setopt(_pycurl.URL, bytes_to_str(request.url)) - - # see tornado curl client - request.headers.setdefault('Expect', '') - request.headers.setdefault('Pragma', '') - - setopt( - _pycurl.HTTPHEADER, - ['{}: {}'.format(*h) for h in request.headers.items()], - ) - - setopt( - _pycurl.HEADERFUNCTION, - partial(request.on_header or self.on_header, request.headers), - ) - setopt( - _pycurl.WRITEFUNCTION, request.on_stream or buffer.write, - ) - setopt( - _pycurl.FOLLOWLOCATION, request.follow_redirects, - ) - setopt( - _pycurl.USERAGENT, - bytes_to_str(request.user_agent or DEFAULT_USER_AGENT), - ) - if request.network_interface: - setopt(_pycurl.INTERFACE, request.network_interface) - setopt( - _pycurl.ENCODING, 'gzip,deflate' if request.use_gzip else 'none', - ) - if request.proxy_host: - if not request.proxy_port: - raise ValueError('Request with proxy_host but no proxy_port') - setopt(_pycurl.PROXY, request.proxy_host) - setopt(_pycurl.PROXYPORT, request.proxy_port) - if request.proxy_username: - setopt(_pycurl.PROXYUSERPWD, '{}:{}'.format( - request.proxy_username, request.proxy_password or '')) - - setopt(_pycurl.SSL_VERIFYPEER, 1 if request.validate_cert else 0) - setopt(_pycurl.SSL_VERIFYHOST, 2 if request.validate_cert else 0) - if request.ca_certs is not None: - setopt(_pycurl.CAINFO, request.ca_certs) - - setopt(_pycurl.IPRESOLVE, pycurl.IPRESOLVE_WHATEVER) - - for meth in METH_TO_CURL.values(): - setopt(meth, False) - try: - meth = METH_TO_CURL[request.method] - except KeyError: - curl.setopt(_pycurl.CUSTOMREQUEST, request.method) - else: - curl.unsetopt(_pycurl.CUSTOMREQUEST) - setopt(meth, True) - - if request.method in ('POST', 'PUT'): - body = request.body.encode('utf-8') if request.body else b'' - reqbuffer = BytesIO(body) - setopt(_pycurl.READFUNCTION, reqbuffer.read) - if request.method == 'POST': - - def ioctl(cmd): - if cmd == _pycurl.IOCMD_RESTARTREAD: - reqbuffer.seek(0) - setopt(_pycurl.IOCTLFUNCTION, ioctl) - setopt(_pycurl.POSTFIELDSIZE, len(body)) - else: - setopt(_pycurl.INFILESIZE, len(body)) - elif request.method == 'GET': - assert not request.body - - if request.auth_username is not None: - auth_mode = { - 'basic': _pycurl.HTTPAUTH_BASIC, - 'digest': _pycurl.HTTPAUTH_DIGEST - }[request.auth_mode or 'basic'] - setopt(_pycurl.HTTPAUTH, auth_mode) - userpwd = '{}:{}'.format( - request.auth_username, request.auth_password or '', - ) - setopt(_pycurl.USERPWD, userpwd) - else: - curl.unsetopt(_pycurl.USERPWD) - - if request.client_cert is not None: - setopt(_pycurl.SSLCERT, request.client_cert) - if request.client_key is not None: - setopt(_pycurl.SSLKEY, request.client_key) - - if request.on_prepare is not None: - request.on_prepare(curl) diff --git a/kombu/asynchronous/http/urllib3_client.py b/kombu/asynchronous/http/urllib3_client.py new file mode 100644 index 00000000..673bf1c7 --- /dev/null +++ b/kombu/asynchronous/http/urllib3_client.py @@ -0,0 +1,219 @@ +from __future__ import annotations + +from collections import deque +from io import BytesIO + +import urllib3 + +from kombu.asynchronous.hub import Hub, get_event_loop +from kombu.exceptions import HttpError + +from .base import BaseClient, Request + +__all__ = ('Urllib3Client',) + +from ...utils.encoding import bytes_to_str + +DEFAULT_USER_AGENT = 'Mozilla/5.0 (compatible; urllib3)' +EXTRA_METHODS = frozenset(['DELETE', 'OPTIONS', 'PATCH']) + + +def _get_pool_key_parts(request: Request) -> list[str]: + _pool_key_parts = [] + + if request.network_interface: + _pool_key_parts.append(f"interface={request.network_interface}") + + if request.validate_cert: + _pool_key_parts.append("validate_cert=True") + else: + _pool_key_parts.append("validate_cert=False") + + if request.ca_certs: + _pool_key_parts.append(f"ca_certs={request.ca_certs}") + + if request.client_cert: + _pool_key_parts.append(f"client_cert={request.client_cert}") + + if request.client_key: + _pool_key_parts.append(f"client_key={request.client_key}") + + return _pool_key_parts + + +class Urllib3Client(BaseClient): + """Urllib3 HTTP Client.""" + + _pools = {} + + def __init__(self, hub: Hub | None = None, max_clients: int = 10): + hub = hub or get_event_loop() + super().__init__(hub) + self.max_clients = max_clients + self._pending = deque() + self._timeout_check_tref = self.hub.call_repeatedly( + 1.0, self._timeout_check, + ) + + def pools_close(self): + for pool in self._pools.values(): + pool.close() + self._pools.clear() + + def close(self): + self._timeout_check_tref.cancel() + self.pools_close() + + def add_request(self, request): + self._pending.append(request) + self._process_queue() + return request + + def get_pool(self, request: Request): + _pool_key_parts = _get_pool_key_parts(request=request) + + _proxy_url = None + proxy_headers = None + if request.proxy_host: + _proxy_url = urllib3.util.Url( + scheme=None, + host=request.proxy_host, + port=request.proxy_port, + ) + if request.proxy_username: + proxy_headers = urllib3.make_headers( + proxy_basic_auth=( + f"{request.proxy_username}" + f":{request.proxy_password}" + ) + ) + else: + proxy_headers = None + + _proxy_url = _proxy_url.url + + _pool_key_parts.append(f"proxy={_proxy_url}") + if proxy_headers: + _pool_key_parts.append(f"proxy_headers={str(proxy_headers)}") + + _pool_key = "|".join(_pool_key_parts) + if _pool_key in self._pools: + return self._pools[_pool_key] + + # create new pool + if _proxy_url: + _pool = urllib3.ProxyManager( + proxy_url=_proxy_url, + num_pools=self.max_clients, + proxy_headers=proxy_headers + ) + else: + _pool = urllib3.PoolManager(num_pools=self.max_clients) + + # Network Interface + if request.network_interface: + _pool.connection_pool_kw['source_address'] = ( + request.network_interface, + 0 + ) + + # SSL Verification + if request.validate_cert: + _pool.connection_pool_kw['cert_reqs'] = 'CERT_REQUIRED' + else: + _pool.connection_pool_kw['cert_reqs'] = 'CERT_NONE' + + # CA Certificates + if request.ca_certs is not None: + _pool.connection_pool_kw['ca_certs'] = request.ca_certs + elif request.validate_cert is True: + try: + from certifi import where + _pool.connection_pool_kw['ca_certs'] = where() + except ImportError: + pass + + # Client Certificates + if request.client_cert is not None: + _pool.connection_pool_kw['cert_file'] = request.client_cert + if request.client_key is not None: + _pool.connection_pool_kw['key_file'] = request.client_key + + self._pools[_pool_key] = _pool + + return _pool + + def _timeout_check(self): + self._process_pending_requests() + + def _process_pending_requests(self): + while self._pending: + request = self._pending.popleft() + self._process_request(request) + + def _process_request(self, request: Request): + # Prepare headers + headers = request.headers + headers.setdefault( + 'User-Agent', + bytes_to_str(request.user_agent or DEFAULT_USER_AGENT) + ) + headers.setdefault( + 'Accept-Encoding', + 'gzip,deflate' if request.use_gzip else 'none' + ) + + # Authentication + if request.auth_username is not None: + headers.update( + urllib3.util.make_headers( + basic_auth=( + f"{request.auth_username}" + f":{request.auth_password or ''}" + ) + ) + ) + + # Make the request using urllib3 + try: + _pool = self.get_pool(request=request) + response = _pool.request( + request.method, + request.url, + headers=headers, + body=request.body, + preload_content=False, + redirect=request.follow_redirects, + ) + buffer = BytesIO(response.data) + response_obj = self.Response( + request=request, + code=response.status, + headers=response.headers, + buffer=buffer, + effective_url=response.geturl(), + error=None + ) + except urllib3.exceptions.HTTPError as e: + response_obj = self.Response( + request=request, + code=599, + headers={}, + buffer=None, + effective_url=None, + error=HttpError(599, str(e)) + ) + + request.on_ready(response_obj) + + def _process_queue(self): + self._process_pending_requests() + + def on_readable(self, fd): + pass + + def on_writable(self, fd): + pass + + def _setup_request(self, curl, request, buffer, headers): + pass diff --git a/requirements/docs.txt b/requirements/docs.txt index 70c0c7d5..12722116 100644 --- a/requirements/docs.txt +++ b/requirements/docs.txt @@ -3,6 +3,4 @@ git+https://github.com/celery/sphinx_celery.git -r extras/mongodb.txt -r extras/sqlalchemy.txt -r extras/azureservicebus.txt -# we cannot use directly extras/sqs.txt -# since readthedocs cannot install pycurl -boto3>=1.26.143 +-r extras/sqs.txt diff --git a/requirements/extras/sqs.txt b/requirements/extras/sqs.txt index 77f75030..22a5270e 100644 --- a/requirements/extras/sqs.txt +++ b/requirements/extras/sqs.txt @@ -1,3 +1,2 @@ boto3>=1.26.143 -pycurl>=7.43.0.5; sys_platform != 'win32' and platform_python_implementation=="CPython" urllib3>=1.26.16 diff --git a/requirements/test-ci.txt b/requirements/test-ci.txt index 8882410e..78037a68 100644 --- a/requirements/test-ci.txt +++ b/requirements/test-ci.txt @@ -8,7 +8,6 @@ pymongo>=4.1.1; sys_platform != 'win32' -r extras/azureservicebus.txt -r extras/azurestoragequeues.txt boto3>=1.26.143; sys_platform != 'win32' -pycurl>=7.43.0.5; sys_platform != 'win32' and platform_python_implementation=="CPython" urllib3>=1.26.16; sys_platform != 'win32' -r extras/consul.txt -r extras/zookeeper.txt diff --git a/t/unit/asynchronous/aws/case.py b/t/unit/asynchronous/aws/case.py index 220cd700..95cd104b 100644 --- a/t/unit/asynchronous/aws/case.py +++ b/t/unit/asynchronous/aws/case.py @@ -5,7 +5,7 @@ import pytest import t.skip pytest.importorskip('boto3') -pytest.importorskip('pycurl') +pytest.importorskip('urllib3') @t.skip.if_pypy diff --git a/t/unit/asynchronous/aws/test_connection.py b/t/unit/asynchronous/aws/test_connection.py index 260908b3..59493f6f 100644 --- a/t/unit/asynchronous/aws/test_connection.py +++ b/t/unit/asynchronous/aws/test_connection.py @@ -90,6 +90,13 @@ class test_AsyncHTTPSConnection(AWSCase): validate_cert=VALIDATES_CERT, ) + def test_request_with_cert_path_https(self): + x = AsyncHTTPSConnection("https://example.com") + request = x.getrequest() + assert request.validate_cert is True + assert request.ca_certs is not None + assert request.ca_certs.endswith('.pem') + def test_getresponse(self): client = Mock(name='client') client.add_request = passthrough(name='client.add_request') diff --git a/t/unit/asynchronous/http/test_curl.py b/t/unit/asynchronous/http/test_curl.py deleted file mode 100644 index 51f9128e..00000000 --- a/t/unit/asynchronous/http/test_curl.py +++ /dev/null @@ -1,157 +0,0 @@ -from __future__ import annotations - -from io import BytesIO -from unittest.mock import ANY, Mock, call, patch - -import pytest - -import t.skip -from kombu.asynchronous.http.curl import READ, WRITE, CurlClient - -pytest.importorskip('pycurl') - - -@t.skip.if_pypy -@pytest.mark.usefixtures('hub') -class test_CurlClient: - - class Client(CurlClient): - Curl = Mock(name='Curl') - - def test_when_pycurl_missing(self, patching): - patching('kombu.asynchronous.http.curl.pycurl', None) - with pytest.raises(ImportError): - self.Client() - - def test_max_clients_set(self): - x = self.Client(max_clients=303) - assert x.max_clients == 303 - - def test_init(self): - with patch('kombu.asynchronous.http.curl.pycurl') as _pycurl: - x = self.Client() - assert x._multi is not None - assert x._pending is not None - assert x._free_list is not None - assert x._fds is not None - assert x._socket_action == x._multi.socket_action - assert len(x._curls) == x.max_clients - assert x._timeout_check_tref - - x._multi.setopt.assert_has_calls([ - call(_pycurl.M_TIMERFUNCTION, x._set_timeout), - call(_pycurl.M_SOCKETFUNCTION, x._handle_socket), - ]) - - def test_close(self): - with patch('kombu.asynchronous.http.curl.pycurl'): - x = self.Client() - x._timeout_check_tref = Mock(name='timeout_check_tref') - x.close() - x._timeout_check_tref.cancel.assert_called_with() - for _curl in x._curls: - _curl.close.assert_called_with() - x._multi.close.assert_called_with() - - def test_add_request(self): - with patch('kombu.asynchronous.http.curl.pycurl'): - x = self.Client() - x._process_queue = Mock(name='_process_queue') - x._set_timeout = Mock(name='_set_timeout') - request = Mock(name='request') - x.add_request(request) - assert request in x._pending - x._process_queue.assert_called_with() - x._set_timeout.assert_called_with(0) - - def test_handle_socket(self): - with patch('kombu.asynchronous.http.curl.pycurl') as _pycurl: - x = self.Client() - fd = Mock(name='fd1') - - # POLL_REMOVE - x._fds[fd] = fd - x._handle_socket(_pycurl.POLL_REMOVE, fd, x._multi, None, _pycurl) - assert fd not in x._fds - x._handle_socket(_pycurl.POLL_REMOVE, fd, x._multi, None, _pycurl) - - # POLL_IN - fds = [fd, Mock(name='fd2'), Mock(name='fd3')] - x._fds = {f: f for f in fds} - x._handle_socket(_pycurl.POLL_IN, fd, x._multi, None, _pycurl) - assert x._fds[fd] == READ - - # POLL_OUT - x._handle_socket(_pycurl.POLL_OUT, fd, x._multi, None, _pycurl) - assert x._fds[fd] == WRITE - - # POLL_INOUT - x._handle_socket(_pycurl.POLL_INOUT, fd, x._multi, None, _pycurl) - assert x._fds[fd] == READ | WRITE - - # UNKNOWN EVENT - x._handle_socket(0xff3f, fd, x._multi, None, _pycurl) - - # FD NOT IN FDS - x._fds.clear() - x._handle_socket(0xff3f, fd, x._multi, None, _pycurl) - - def test_set_timeout(self): - hub = Mock(name='hub') - x = self.Client(hub) - x._set_timeout(100) - hub.call_later.assert_called_with(100, x._timeout_check) - - def test_timeout_check(self): - with patch('kombu.asynchronous.http.curl.pycurl') as _pycurl: - hub = Mock(name='hub') - x = self.Client(hub) - fd1, fd2 = Mock(name='fd1'), Mock(name='fd2') - x._fds = {fd1: READ} - x._process_pending_requests = Mock(name='process_pending') - - def _side_effect(): - x._fds = {fd2: WRITE} - return 333, 1 - - x._multi.socket_all.side_effect = _side_effect - _pycurl.error = KeyError - - x._timeout_check(_pycurl=_pycurl) - hub.remove.assert_called_with(fd1) - hub.add_writer.assert_called_with(fd2, x.on_writable, fd2) - - x._multi.socket_all.return_value = None - x._multi.socket_all.side_effect = _pycurl.error(333) - x._timeout_check(_pycurl=_pycurl) - - def test_on_readable_on_writeable(self): - with patch('kombu.asynchronous.http.curl.pycurl') as _pycurl: - x = self.Client() - x._on_event = Mock(name='on_event') - fd = Mock(name='fd') - x.on_readable(fd, _pycurl=_pycurl) - x._on_event.assert_called_with(fd, _pycurl.CSELECT_IN) - x.on_writable(fd, _pycurl=_pycurl) - x._on_event.assert_called_with(fd, _pycurl.CSELECT_OUT) - - def test_setup_request_sets_proxy_when_specified(self): - with patch('kombu.asynchronous.http.curl.pycurl') as _pycurl: - x = self.Client() - proxy_host = 'http://www.example.com' - request = Mock( - name='request', headers={}, auth_mode=None, proxy_host=None - ) - proxied_request = Mock( - name='request', headers={}, auth_mode=None, - proxy_host=proxy_host, proxy_port=123 - ) - x._setup_request( - x.Curl, request, BytesIO(), x.Headers(), _pycurl=_pycurl - ) - with pytest.raises(AssertionError): - x.Curl.setopt.assert_any_call(_pycurl.PROXY, ANY) - x._setup_request( - x.Curl, proxied_request, BytesIO(), x.Headers(), _pycurl - ) - x.Curl.setopt.assert_any_call(_pycurl.PROXY, proxy_host) diff --git a/t/unit/asynchronous/http/test_http.py b/t/unit/asynchronous/http/test_http.py index 816bf89d..53b75311 100644 --- a/t/unit/asynchronous/http/test_http.py +++ b/t/unit/asynchronous/http/test_http.py @@ -147,7 +147,7 @@ class test_BaseClient: class test_Client: def test_get_client(self, hub): - pytest.importorskip('pycurl') + pytest.importorskip('urllib3') client = http.get_client() assert client.hub is hub client2 = http.get_client(hub) diff --git a/t/unit/asynchronous/http/test_urllib3.py b/t/unit/asynchronous/http/test_urllib3.py new file mode 100644 index 00000000..54b10cd1 --- /dev/null +++ b/t/unit/asynchronous/http/test_urllib3.py @@ -0,0 +1,257 @@ +from __future__ import annotations + +from io import BytesIO +from unittest.mock import Mock, patch + +import pytest +import urllib3 + +import t.skip +from kombu.asynchronous.http.urllib3_client import (Urllib3Client, + _get_pool_key_parts) + + +@t.skip.if_pypy +@pytest.mark.usefixtures('hub') +class test_Urllib3Client: + class Client(Urllib3Client): + urllib3 = Mock(name='urllib3') + + def test_max_clients_set(self): + x = self.Client(max_clients=303) + assert x.max_clients == 303 + + def test_init(self): + x = self.Client() + assert x._pools is not None + assert x._pending is not None + assert x._timeout_check_tref + + def test_close(self): + with patch( + 'kombu.asynchronous.http.urllib3_client.urllib3.PoolManager' + ): + x = self.Client() + x._timeout_check_tref = Mock(name='timeout_check_tref') + x.close() + x._timeout_check_tref.cancel.assert_called_with() + for pool in x._pools.values(): + pool.close.assert_called_with() + + def test_add_request(self): + with patch( + 'kombu.asynchronous.http.urllib3_client.urllib3.PoolManager' + ): + x = self.Client() + x._process_queue = Mock(name='_process_queue') + request = Mock(name='request') + x.add_request(request) + assert request in x._pending + x._process_queue.assert_called_with() + + def test_timeout_check(self): + with patch( + 'kombu.asynchronous.http.urllib3_client.urllib3.PoolManager' + ): + hub = Mock(name='hub') + x = self.Client(hub) + x._process_pending_requests = Mock(name='process_pending') + x._timeout_check() + x._process_pending_requests.assert_called_with() + + def test_process_request(self): + with patch( + 'kombu.asynchronous.http.urllib3_client.urllib3.PoolManager' + ) as _pool_manager: + x = self.Client() + request = Mock( + name='request', + method='GET', + url='http://example.com', + headers={}, + body=None, + follow_redirects=True, + auth_username=None, + auth_password=None, + user_agent=None, + use_gzip=False, + network_interface=None, + validate_cert=True, + ca_certs=None, + client_cert=None, + client_key=None, + proxy_host=None, + proxy_port=None, + proxy_username=None, + proxy_password=None, + on_ready=Mock(name='on_ready') + ) + response = Mock( + name='response', + status=200, + headers={}, + data=b'content' + ) + response.geturl.return_value = 'http://example.com' + _pool_manager.return_value.request.return_value = response + + x._process_request(request) + response_obj = x.Response( + request=request, + code=200, + headers={}, + buffer=BytesIO(b'content'), + effective_url='http://example.com', + error=None + ) + request.on_ready.assert_called() + called_response = request.on_ready.call_args[0][0] + assert called_response.code == response_obj.code + assert called_response.headers == response_obj.headers + assert ( + called_response.buffer.getvalue() == + response_obj.buffer.getvalue() + ) + assert called_response.effective_url == response_obj.effective_url + assert called_response.error == response_obj.error + + def test_process_request_with_error(self): + with patch( + 'kombu.asynchronous.http.urllib3_client.urllib3.PoolManager' + ) as _pool_manager: + x = self.Client() + x.close() + request = Mock( + name='request', + method='GET', + url='http://example.com', + headers={}, + body=None, + follow_redirects=True, + auth_username=None, + auth_password=None, + user_agent=None, + use_gzip=False, + network_interface=None, + validate_cert=True, + ca_certs=None, + client_cert=None, + client_key=None, + proxy_host=None, + proxy_port=None, + proxy_username=None, + proxy_password=None, + on_ready=Mock(name='on_ready') + ) + _pool_manager.return_value.request.side_effect = urllib3.exceptions.HTTPError("Test Error") + + x._process_request(request) + request.on_ready.assert_called() + called_response = request.on_ready.call_args[0][0] + assert called_response.code == 599 + assert called_response.error is not None + assert called_response.error.message == "Test Error" + + def test_on_readable_on_writable(self): + x = self.Client() + x.on_readable(Mock(name='fd')) + x.on_writable(Mock(name='fd')) + + def test_get_pool_with_proxy(self): + with patch( + 'kombu.asynchronous.http.urllib3_client.urllib3.ProxyManager' + ) as _proxy_manager: + x = self.Client() + request = Mock( + name='request', + proxy_host='proxy.example.com', + proxy_port=8080, + proxy_username='user', + proxy_password='pass' + ) + x.get_pool(request) + _proxy_manager.assert_called_with( + proxy_url='proxy.example.com:8080', + num_pools=x.max_clients, + proxy_headers=urllib3.make_headers( + proxy_basic_auth="user:pass" + ) + ) + + def test_get_pool_without_proxy(self): + with patch( + 'kombu.asynchronous.http.urllib3_client.urllib3.PoolManager' + ) as _pool_manager: + x = self.Client() + request = Mock(name='request', proxy_host=None) + x.get_pool(request) + _pool_manager.assert_called_with(num_pools=x.max_clients) + + def test_process_request_with_proxy(self): + with patch( + 'kombu.asynchronous.http.urllib3_client.urllib3.ProxyManager' + ) as _proxy_manager: + x = self.Client() + request = Mock( + name='request', + method='GET', + url='http://example.com', + headers={}, + body=None, + follow_redirects=True, + proxy_host='proxy.example.com', + proxy_port=8080, + proxy_username='user', + proxy_password='pass', + on_ready=Mock(name='on_ready') + ) + response = Mock( + name='response', + status=200, + headers={}, + data=b'content' + ) + response.geturl.return_value = 'http://example.com' + _proxy_manager.return_value.request.return_value = response + + x._process_request(request) + response_obj = x.Response( + request=request, + code=200, + headers={}, + buffer=BytesIO(b'content'), + effective_url='http://example.com', + error=None + ) + request.on_ready.assert_called() + called_response = request.on_ready.call_args[0][0] + assert called_response.code == response_obj.code + assert called_response.headers == response_obj.headers + assert ( + called_response.buffer.getvalue() + == response_obj.buffer.getvalue() + ) + assert called_response.effective_url == response_obj.effective_url + assert called_response.error == response_obj.error + + def test_pool_key_parts(self): + request = Mock( + name='request', + method='GET', + url='http://example.com', + headers={}, + body=None, + network_interface='test', + validate_cert=False, + ca_certs='test0.pem', + client_cert='test1.pem', + client_key='some_key', + ) + pool_key = _get_pool_key_parts(request) + assert pool_key == [ + "interface=test", + "validate_cert=False", + "ca_certs=test0.pem", + "client_cert=test1.pem", + "client_key=some_key" + ]