Feature: urllib3 instead of curl (#2134)

* feature(urllib3): add urllib3 client

* test(urllib3): test urllib3 client

* test(urllib3): update http test for urllib3

* test(urllib3): use urllib3 client instead of curl

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* style(urllib3): remove unused imports

* style(urllib3): fix pre-commit errors

* ci(urllib3): remove pycurl dependency

* docs(urllib3): add docs

* style(urllib3): fix failing gh-workflow py3.8

* style(urllib3): add mention of ProxyManager

* style(urllib3): fix pre-commit issues

* style(pycurl): remove curl-related code

* feat(urllib3): add missing request features (header, auth, ssl, proxy, redirects)

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* fix(urllib3): improve styling

* test(urllib3): add new tests

* fix(urllib3): fix request auth

* fix(aws): validate certificate on request

* style(): add missing exports

* feat(aws): add ssl certificate verification from boto

* feat(urllib): try to use certifi.where() if request.ca_certs are not provided

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* ci(pydocstyle): add missing docstring in public class

* test(urllib3): improve test case

* ci(pydocstyle): fix multi-line docstring summary should start at the first line

* feat(urllib3): remove assert_hostname

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* test(boto): add test for get_cert_path returning .pem file path

* test(urllib3): add test for _get_pool_key_parts method

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: Asif Saif Uddin <auvipy@gmail.com>
Co-authored-by: Tomer Nosrati <tomer.nosrati@gmail.com>
This commit is contained in:
Paul Rysiavets 2024-10-14 13:01:25 +02:00 committed by GitHub
parent 3f5d19973e
commit 07c8852be5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
19 changed files with 521 additions and 473 deletions

View File

@ -9,7 +9,7 @@ omit =
*/python?.?/*
*/site-packages/*
*/pypy/*
*kombu/async/http/curl.py
*kombu/async/http/urllib3_client.py
*kombu/five.py
*kombu/transport/mongodb.py
*kombu/transport/filesystem.py

View File

@ -20,7 +20,7 @@ jobs:
python-version: ["3.12"]
steps:
- name: Install system packages
run: sudo apt-get update && sudo apt-get install libcurl4-openssl-dev libssl-dev
run: sudo apt-get update && sudo apt-get install libssl-dev
- name: Check out code from GitHub
uses: actions/checkout@v4
- name: Set up Python ${{ matrix.python-version }}

View File

@ -39,7 +39,7 @@ jobs:
steps:
- name: Install apt packages
if: startsWith(matrix.os, 'blacksmith-4vcpu-ubuntu')
run: sudo apt-get update && sudo apt-get install libcurl4-openssl-dev libssl-dev
run: sudo apt-get update && sudo apt-get install libssl-dev
- uses: actions/checkout@v4
- name: Set up Python ${{ matrix.python-version }}
uses: useblacksmith/setup-python@v6
@ -98,7 +98,7 @@ jobs:
steps:
- name: Install apt packages
run: sudo apt-get update && sudo apt-get install libcurl4-openssl-dev libssl-dev
run: sudo apt-get update && sudo apt-get install libssl-dev
- uses: actions/checkout@v4
- name: Set up Python ${{ matrix.python-version }}

View File

@ -71,7 +71,7 @@ Kombu Asynchronous
kombu.asynchronous.debug
kombu.asynchronous.http
kombu.asynchronous.http.base
kombu.asynchronous.http.curl
kombu.asynchronous.http.urllib3_client
kombu.asynchronous.aws
kombu.asynchronous.aws.connection
kombu.asynchronous.aws.sqs

View File

@ -0,0 +1,11 @@
============================================================
Urllib3 HTTP Client Pool - ``kombu.asynchronous.http.urllib3_client``
============================================================
.. contents::
:local:
.. currentmodule:: kombu.asynchronous.http.urllib3_client
.. automodule:: kombu.asynchronous.http.urllib3_client
:members:
:undoc-members:

View File

@ -7,7 +7,7 @@ from email.mime.message import MIMEMessage
from vine import promise, transform
from kombu.asynchronous.aws.ext import AWSRequest, get_response
from kombu.asynchronous.aws.ext import AWSRequest, get_cert_path, get_response
from kombu.asynchronous.http import Headers, Request, get_client
@ -92,7 +92,8 @@ class AsyncHTTPSConnection:
headers = Headers(self.headers)
return self.Request(self.path, method=self.method, headers=headers,
body=self.body, connect_timeout=self.timeout,
request_timeout=self.timeout, validate_cert=False)
request_timeout=self.timeout,
validate_cert=True, ca_certs=get_cert_path(True))
def getresponse(self, callback=None):
request = self.getrequest()

View File

@ -6,6 +6,7 @@ try:
import boto3
from botocore import exceptions
from botocore.awsrequest import AWSRequest
from botocore.httpsession import get_cert_path
from botocore.response import get_response
except ImportError:
boto3 = None
@ -19,8 +20,9 @@ except ImportError:
exceptions.BotoCoreError = BotoCoreError
AWSRequest = _void()
get_response = _void()
get_cert_path = _void()
__all__ = (
'exceptions', 'AWSRequest', 'get_response'
'exceptions', 'AWSRequest', 'get_response', 'get_cert_path',
)

View File

@ -1,24 +1,19 @@
from __future__ import annotations
from typing import TYPE_CHECKING
from kombu.asynchronous import get_event_loop
from kombu.asynchronous.http.base import Headers, Request, Response
from kombu.asynchronous.http.base import BaseClient, Headers, Request, Response
from kombu.asynchronous.hub import Hub
if TYPE_CHECKING:
from kombu.asynchronous.http.curl import CurlClient
__all__ = ('Client', 'Headers', 'Response', 'Request')
__all__ = ('Client', 'Headers', 'Response', 'Request', 'get_client')
def Client(hub: Hub | None = None, **kwargs: int) -> CurlClient:
def Client(hub: Hub | None = None, **kwargs: int) -> BaseClient:
"""Create new HTTP client."""
from .curl import CurlClient
return CurlClient(hub, **kwargs)
from .urllib3_client import Urllib3Client
return Urllib3Client(hub, **kwargs)
def get_client(hub: Hub | None = None, **kwargs: int) -> CurlClient:
def get_client(hub: Hub | None = None, **kwargs: int) -> BaseClient:
"""Get or create HTTP client bound to the current event loop."""
hub = hub or get_event_loop()
try:

View File

@ -16,7 +16,7 @@ from kombu.utils.functional import maybe_list, memoize
if TYPE_CHECKING:
from types import TracebackType
__all__ = ('Headers', 'Response', 'Request')
__all__ = ('Headers', 'Response', 'Request', 'BaseClient')
PYPY = hasattr(sys, 'pypy_version_info')
@ -236,6 +236,12 @@ def header_parser(keyt=normalize_header):
class BaseClient:
"""Base class for HTTP clients.
This class provides the basic structure and functionality for HTTP clients.
Subclasses should implement specific HTTP client behavior.
"""
Headers = Headers
Request = Request
Response = Response

View File

@ -1,289 +0,0 @@
"""HTTP Client using pyCurl."""
from __future__ import annotations
from collections import deque
from functools import partial
from io import BytesIO
from time import time
from kombu.asynchronous.hub import READ, WRITE, Hub, get_event_loop
from kombu.exceptions import HttpError
from kombu.utils.encoding import bytes_to_str
from .base import BaseClient
try:
import pycurl
except ImportError: # pragma: no cover
pycurl = Curl = METH_TO_CURL = None
else:
from pycurl import Curl
METH_TO_CURL = {
'GET': pycurl.HTTPGET,
'POST': pycurl.POST,
'PUT': pycurl.UPLOAD,
'HEAD': pycurl.NOBODY,
}
__all__ = ('CurlClient',)
DEFAULT_USER_AGENT = 'Mozilla/5.0 (compatible; pycurl)'
EXTRA_METHODS = frozenset(['DELETE', 'OPTIONS', 'PATCH'])
class CurlClient(BaseClient):
"""Curl HTTP Client."""
Curl = Curl
def __init__(self, hub: Hub | None = None, max_clients: int = 10):
if pycurl is None:
raise ImportError('The curl client requires the pycurl library.')
hub = hub or get_event_loop()
super().__init__(hub)
self.max_clients = max_clients
self._multi = pycurl.CurlMulti()
self._multi.setopt(pycurl.M_TIMERFUNCTION, self._set_timeout)
self._multi.setopt(pycurl.M_SOCKETFUNCTION, self._handle_socket)
self._curls = [self.Curl() for i in range(max_clients)]
self._free_list = self._curls[:]
self._pending = deque()
self._fds = {}
self._socket_action = self._multi.socket_action
self._timeout_check_tref = self.hub.call_repeatedly(
1.0, self._timeout_check,
)
# pycurl 7.29.0 workaround
dummy_curl_handle = pycurl.Curl()
self._multi.add_handle(dummy_curl_handle)
self._multi.remove_handle(dummy_curl_handle)
def close(self):
self._timeout_check_tref.cancel()
for _curl in self._curls:
_curl.close()
self._multi.close()
def add_request(self, request):
self._pending.append(request)
self._process_queue()
self._set_timeout(0)
return request
# the next two methods are used for linux/epoll workaround:
# we temporarily remove all curl fds from hub, so curl cannot
# close a fd which is still inside epoll
def _pop_from_hub(self):
for fd in self._fds:
self.hub.remove(fd)
def _push_to_hub(self):
for fd, events in self._fds.items():
if events & READ:
self.hub.add_reader(fd, self.on_readable, fd)
if events & WRITE:
self.hub.add_writer(fd, self.on_writable, fd)
def _handle_socket(self, event, fd, multi, data, _pycurl=pycurl):
if event == _pycurl.POLL_REMOVE:
if fd in self._fds:
self._fds.pop(fd, None)
else:
if event == _pycurl.POLL_IN:
self._fds[fd] = READ
elif event == _pycurl.POLL_OUT:
self._fds[fd] = WRITE
elif event == _pycurl.POLL_INOUT:
self._fds[fd] = READ | WRITE
def _set_timeout(self, msecs):
self.hub.call_later(msecs, self._timeout_check)
def _timeout_check(self, _pycurl=pycurl):
self._pop_from_hub()
try:
while 1:
try:
ret, _ = self._multi.socket_all()
except pycurl.error as exc:
ret = exc.args[0]
if ret != _pycurl.E_CALL_MULTI_PERFORM:
break
finally:
self._push_to_hub()
self._process_pending_requests()
def on_readable(self, fd, _pycurl=pycurl):
return self._on_event(fd, _pycurl.CSELECT_IN)
def on_writable(self, fd, _pycurl=pycurl):
return self._on_event(fd, _pycurl.CSELECT_OUT)
def _on_event(self, fd, event, _pycurl=pycurl):
self._pop_from_hub()
try:
while 1:
try:
ret, _ = self._socket_action(fd, event)
except pycurl.error as exc:
ret = exc.args[0]
if ret != _pycurl.E_CALL_MULTI_PERFORM:
break
finally:
self._push_to_hub()
self._process_pending_requests()
def _process_pending_requests(self):
while 1:
q, succeeded, failed = self._multi.info_read()
for curl in succeeded:
self._process(curl)
for curl, errno, reason in failed:
self._process(curl, errno, reason)
if q == 0:
break
self._process_queue()
def _process_queue(self):
while 1:
started = 0
while self._free_list and self._pending:
started += 1
curl = self._free_list.pop()
request = self._pending.popleft()
headers = self.Headers()
buf = BytesIO()
curl.info = {
'headers': headers,
'buffer': buf,
'request': request,
'curl_start_time': time(),
}
self._setup_request(curl, request, buf, headers)
self._multi.add_handle(curl)
if not started:
break
def _process(self, curl, errno=None, reason=None, _pycurl=pycurl):
info, curl.info = curl.info, None
self._multi.remove_handle(curl)
self._free_list.append(curl)
buffer = info['buffer']
if errno:
code = 599
error = HttpError(code, reason)
error.errno = errno
effective_url = None
buffer.close()
buffer = None
else:
error = None
code = curl.getinfo(_pycurl.HTTP_CODE)
effective_url = curl.getinfo(_pycurl.EFFECTIVE_URL)
buffer.seek(0)
# try:
request = info['request']
request.on_ready(self.Response(
request=request, code=code, headers=info['headers'],
buffer=buffer, effective_url=effective_url, error=error,
))
def _setup_request(self, curl, request, buffer, headers, _pycurl=pycurl):
setopt = curl.setopt
setopt(_pycurl.URL, bytes_to_str(request.url))
# see tornado curl client
request.headers.setdefault('Expect', '')
request.headers.setdefault('Pragma', '')
setopt(
_pycurl.HTTPHEADER,
['{}: {}'.format(*h) for h in request.headers.items()],
)
setopt(
_pycurl.HEADERFUNCTION,
partial(request.on_header or self.on_header, request.headers),
)
setopt(
_pycurl.WRITEFUNCTION, request.on_stream or buffer.write,
)
setopt(
_pycurl.FOLLOWLOCATION, request.follow_redirects,
)
setopt(
_pycurl.USERAGENT,
bytes_to_str(request.user_agent or DEFAULT_USER_AGENT),
)
if request.network_interface:
setopt(_pycurl.INTERFACE, request.network_interface)
setopt(
_pycurl.ENCODING, 'gzip,deflate' if request.use_gzip else 'none',
)
if request.proxy_host:
if not request.proxy_port:
raise ValueError('Request with proxy_host but no proxy_port')
setopt(_pycurl.PROXY, request.proxy_host)
setopt(_pycurl.PROXYPORT, request.proxy_port)
if request.proxy_username:
setopt(_pycurl.PROXYUSERPWD, '{}:{}'.format(
request.proxy_username, request.proxy_password or ''))
setopt(_pycurl.SSL_VERIFYPEER, 1 if request.validate_cert else 0)
setopt(_pycurl.SSL_VERIFYHOST, 2 if request.validate_cert else 0)
if request.ca_certs is not None:
setopt(_pycurl.CAINFO, request.ca_certs)
setopt(_pycurl.IPRESOLVE, pycurl.IPRESOLVE_WHATEVER)
for meth in METH_TO_CURL.values():
setopt(meth, False)
try:
meth = METH_TO_CURL[request.method]
except KeyError:
curl.setopt(_pycurl.CUSTOMREQUEST, request.method)
else:
curl.unsetopt(_pycurl.CUSTOMREQUEST)
setopt(meth, True)
if request.method in ('POST', 'PUT'):
body = request.body.encode('utf-8') if request.body else b''
reqbuffer = BytesIO(body)
setopt(_pycurl.READFUNCTION, reqbuffer.read)
if request.method == 'POST':
def ioctl(cmd):
if cmd == _pycurl.IOCMD_RESTARTREAD:
reqbuffer.seek(0)
setopt(_pycurl.IOCTLFUNCTION, ioctl)
setopt(_pycurl.POSTFIELDSIZE, len(body))
else:
setopt(_pycurl.INFILESIZE, len(body))
elif request.method == 'GET':
assert not request.body
if request.auth_username is not None:
auth_mode = {
'basic': _pycurl.HTTPAUTH_BASIC,
'digest': _pycurl.HTTPAUTH_DIGEST
}[request.auth_mode or 'basic']
setopt(_pycurl.HTTPAUTH, auth_mode)
userpwd = '{}:{}'.format(
request.auth_username, request.auth_password or '',
)
setopt(_pycurl.USERPWD, userpwd)
else:
curl.unsetopt(_pycurl.USERPWD)
if request.client_cert is not None:
setopt(_pycurl.SSLCERT, request.client_cert)
if request.client_key is not None:
setopt(_pycurl.SSLKEY, request.client_key)
if request.on_prepare is not None:
request.on_prepare(curl)

View File

@ -0,0 +1,219 @@
from __future__ import annotations
from collections import deque
from io import BytesIO
import urllib3
from kombu.asynchronous.hub import Hub, get_event_loop
from kombu.exceptions import HttpError
from .base import BaseClient, Request
__all__ = ('Urllib3Client',)
from ...utils.encoding import bytes_to_str
DEFAULT_USER_AGENT = 'Mozilla/5.0 (compatible; urllib3)'
EXTRA_METHODS = frozenset(['DELETE', 'OPTIONS', 'PATCH'])
def _get_pool_key_parts(request: Request) -> list[str]:
_pool_key_parts = []
if request.network_interface:
_pool_key_parts.append(f"interface={request.network_interface}")
if request.validate_cert:
_pool_key_parts.append("validate_cert=True")
else:
_pool_key_parts.append("validate_cert=False")
if request.ca_certs:
_pool_key_parts.append(f"ca_certs={request.ca_certs}")
if request.client_cert:
_pool_key_parts.append(f"client_cert={request.client_cert}")
if request.client_key:
_pool_key_parts.append(f"client_key={request.client_key}")
return _pool_key_parts
class Urllib3Client(BaseClient):
"""Urllib3 HTTP Client."""
_pools = {}
def __init__(self, hub: Hub | None = None, max_clients: int = 10):
hub = hub or get_event_loop()
super().__init__(hub)
self.max_clients = max_clients
self._pending = deque()
self._timeout_check_tref = self.hub.call_repeatedly(
1.0, self._timeout_check,
)
def pools_close(self):
for pool in self._pools.values():
pool.close()
self._pools.clear()
def close(self):
self._timeout_check_tref.cancel()
self.pools_close()
def add_request(self, request):
self._pending.append(request)
self._process_queue()
return request
def get_pool(self, request: Request):
_pool_key_parts = _get_pool_key_parts(request=request)
_proxy_url = None
proxy_headers = None
if request.proxy_host:
_proxy_url = urllib3.util.Url(
scheme=None,
host=request.proxy_host,
port=request.proxy_port,
)
if request.proxy_username:
proxy_headers = urllib3.make_headers(
proxy_basic_auth=(
f"{request.proxy_username}"
f":{request.proxy_password}"
)
)
else:
proxy_headers = None
_proxy_url = _proxy_url.url
_pool_key_parts.append(f"proxy={_proxy_url}")
if proxy_headers:
_pool_key_parts.append(f"proxy_headers={str(proxy_headers)}")
_pool_key = "|".join(_pool_key_parts)
if _pool_key in self._pools:
return self._pools[_pool_key]
# create new pool
if _proxy_url:
_pool = urllib3.ProxyManager(
proxy_url=_proxy_url,
num_pools=self.max_clients,
proxy_headers=proxy_headers
)
else:
_pool = urllib3.PoolManager(num_pools=self.max_clients)
# Network Interface
if request.network_interface:
_pool.connection_pool_kw['source_address'] = (
request.network_interface,
0
)
# SSL Verification
if request.validate_cert:
_pool.connection_pool_kw['cert_reqs'] = 'CERT_REQUIRED'
else:
_pool.connection_pool_kw['cert_reqs'] = 'CERT_NONE'
# CA Certificates
if request.ca_certs is not None:
_pool.connection_pool_kw['ca_certs'] = request.ca_certs
elif request.validate_cert is True:
try:
from certifi import where
_pool.connection_pool_kw['ca_certs'] = where()
except ImportError:
pass
# Client Certificates
if request.client_cert is not None:
_pool.connection_pool_kw['cert_file'] = request.client_cert
if request.client_key is not None:
_pool.connection_pool_kw['key_file'] = request.client_key
self._pools[_pool_key] = _pool
return _pool
def _timeout_check(self):
self._process_pending_requests()
def _process_pending_requests(self):
while self._pending:
request = self._pending.popleft()
self._process_request(request)
def _process_request(self, request: Request):
# Prepare headers
headers = request.headers
headers.setdefault(
'User-Agent',
bytes_to_str(request.user_agent or DEFAULT_USER_AGENT)
)
headers.setdefault(
'Accept-Encoding',
'gzip,deflate' if request.use_gzip else 'none'
)
# Authentication
if request.auth_username is not None:
headers.update(
urllib3.util.make_headers(
basic_auth=(
f"{request.auth_username}"
f":{request.auth_password or ''}"
)
)
)
# Make the request using urllib3
try:
_pool = self.get_pool(request=request)
response = _pool.request(
request.method,
request.url,
headers=headers,
body=request.body,
preload_content=False,
redirect=request.follow_redirects,
)
buffer = BytesIO(response.data)
response_obj = self.Response(
request=request,
code=response.status,
headers=response.headers,
buffer=buffer,
effective_url=response.geturl(),
error=None
)
except urllib3.exceptions.HTTPError as e:
response_obj = self.Response(
request=request,
code=599,
headers={},
buffer=None,
effective_url=None,
error=HttpError(599, str(e))
)
request.on_ready(response_obj)
def _process_queue(self):
self._process_pending_requests()
def on_readable(self, fd):
pass
def on_writable(self, fd):
pass
def _setup_request(self, curl, request, buffer, headers):
pass

View File

@ -3,6 +3,4 @@ git+https://github.com/celery/sphinx_celery.git
-r extras/mongodb.txt
-r extras/sqlalchemy.txt
-r extras/azureservicebus.txt
# we cannot use directly extras/sqs.txt
# since readthedocs cannot install pycurl
boto3>=1.26.143
-r extras/sqs.txt

View File

@ -1,3 +1,2 @@
boto3>=1.26.143
pycurl>=7.43.0.5; sys_platform != 'win32' and platform_python_implementation=="CPython"
urllib3>=1.26.16

View File

@ -8,7 +8,6 @@ pymongo>=4.1.1; sys_platform != 'win32'
-r extras/azureservicebus.txt
-r extras/azurestoragequeues.txt
boto3>=1.26.143; sys_platform != 'win32'
pycurl>=7.43.0.5; sys_platform != 'win32' and platform_python_implementation=="CPython"
urllib3>=1.26.16; sys_platform != 'win32'
-r extras/consul.txt
-r extras/zookeeper.txt

View File

@ -5,7 +5,7 @@ import pytest
import t.skip
pytest.importorskip('boto3')
pytest.importorskip('pycurl')
pytest.importorskip('urllib3')
@t.skip.if_pypy

View File

@ -90,6 +90,13 @@ class test_AsyncHTTPSConnection(AWSCase):
validate_cert=VALIDATES_CERT,
)
def test_request_with_cert_path_https(self):
x = AsyncHTTPSConnection("https://example.com")
request = x.getrequest()
assert request.validate_cert is True
assert request.ca_certs is not None
assert request.ca_certs.endswith('.pem')
def test_getresponse(self):
client = Mock(name='client')
client.add_request = passthrough(name='client.add_request')

View File

@ -1,157 +0,0 @@
from __future__ import annotations
from io import BytesIO
from unittest.mock import ANY, Mock, call, patch
import pytest
import t.skip
from kombu.asynchronous.http.curl import READ, WRITE, CurlClient
pytest.importorskip('pycurl')
@t.skip.if_pypy
@pytest.mark.usefixtures('hub')
class test_CurlClient:
class Client(CurlClient):
Curl = Mock(name='Curl')
def test_when_pycurl_missing(self, patching):
patching('kombu.asynchronous.http.curl.pycurl', None)
with pytest.raises(ImportError):
self.Client()
def test_max_clients_set(self):
x = self.Client(max_clients=303)
assert x.max_clients == 303
def test_init(self):
with patch('kombu.asynchronous.http.curl.pycurl') as _pycurl:
x = self.Client()
assert x._multi is not None
assert x._pending is not None
assert x._free_list is not None
assert x._fds is not None
assert x._socket_action == x._multi.socket_action
assert len(x._curls) == x.max_clients
assert x._timeout_check_tref
x._multi.setopt.assert_has_calls([
call(_pycurl.M_TIMERFUNCTION, x._set_timeout),
call(_pycurl.M_SOCKETFUNCTION, x._handle_socket),
])
def test_close(self):
with patch('kombu.asynchronous.http.curl.pycurl'):
x = self.Client()
x._timeout_check_tref = Mock(name='timeout_check_tref')
x.close()
x._timeout_check_tref.cancel.assert_called_with()
for _curl in x._curls:
_curl.close.assert_called_with()
x._multi.close.assert_called_with()
def test_add_request(self):
with patch('kombu.asynchronous.http.curl.pycurl'):
x = self.Client()
x._process_queue = Mock(name='_process_queue')
x._set_timeout = Mock(name='_set_timeout')
request = Mock(name='request')
x.add_request(request)
assert request in x._pending
x._process_queue.assert_called_with()
x._set_timeout.assert_called_with(0)
def test_handle_socket(self):
with patch('kombu.asynchronous.http.curl.pycurl') as _pycurl:
x = self.Client()
fd = Mock(name='fd1')
# POLL_REMOVE
x._fds[fd] = fd
x._handle_socket(_pycurl.POLL_REMOVE, fd, x._multi, None, _pycurl)
assert fd not in x._fds
x._handle_socket(_pycurl.POLL_REMOVE, fd, x._multi, None, _pycurl)
# POLL_IN
fds = [fd, Mock(name='fd2'), Mock(name='fd3')]
x._fds = {f: f for f in fds}
x._handle_socket(_pycurl.POLL_IN, fd, x._multi, None, _pycurl)
assert x._fds[fd] == READ
# POLL_OUT
x._handle_socket(_pycurl.POLL_OUT, fd, x._multi, None, _pycurl)
assert x._fds[fd] == WRITE
# POLL_INOUT
x._handle_socket(_pycurl.POLL_INOUT, fd, x._multi, None, _pycurl)
assert x._fds[fd] == READ | WRITE
# UNKNOWN EVENT
x._handle_socket(0xff3f, fd, x._multi, None, _pycurl)
# FD NOT IN FDS
x._fds.clear()
x._handle_socket(0xff3f, fd, x._multi, None, _pycurl)
def test_set_timeout(self):
hub = Mock(name='hub')
x = self.Client(hub)
x._set_timeout(100)
hub.call_later.assert_called_with(100, x._timeout_check)
def test_timeout_check(self):
with patch('kombu.asynchronous.http.curl.pycurl') as _pycurl:
hub = Mock(name='hub')
x = self.Client(hub)
fd1, fd2 = Mock(name='fd1'), Mock(name='fd2')
x._fds = {fd1: READ}
x._process_pending_requests = Mock(name='process_pending')
def _side_effect():
x._fds = {fd2: WRITE}
return 333, 1
x._multi.socket_all.side_effect = _side_effect
_pycurl.error = KeyError
x._timeout_check(_pycurl=_pycurl)
hub.remove.assert_called_with(fd1)
hub.add_writer.assert_called_with(fd2, x.on_writable, fd2)
x._multi.socket_all.return_value = None
x._multi.socket_all.side_effect = _pycurl.error(333)
x._timeout_check(_pycurl=_pycurl)
def test_on_readable_on_writeable(self):
with patch('kombu.asynchronous.http.curl.pycurl') as _pycurl:
x = self.Client()
x._on_event = Mock(name='on_event')
fd = Mock(name='fd')
x.on_readable(fd, _pycurl=_pycurl)
x._on_event.assert_called_with(fd, _pycurl.CSELECT_IN)
x.on_writable(fd, _pycurl=_pycurl)
x._on_event.assert_called_with(fd, _pycurl.CSELECT_OUT)
def test_setup_request_sets_proxy_when_specified(self):
with patch('kombu.asynchronous.http.curl.pycurl') as _pycurl:
x = self.Client()
proxy_host = 'http://www.example.com'
request = Mock(
name='request', headers={}, auth_mode=None, proxy_host=None
)
proxied_request = Mock(
name='request', headers={}, auth_mode=None,
proxy_host=proxy_host, proxy_port=123
)
x._setup_request(
x.Curl, request, BytesIO(), x.Headers(), _pycurl=_pycurl
)
with pytest.raises(AssertionError):
x.Curl.setopt.assert_any_call(_pycurl.PROXY, ANY)
x._setup_request(
x.Curl, proxied_request, BytesIO(), x.Headers(), _pycurl
)
x.Curl.setopt.assert_any_call(_pycurl.PROXY, proxy_host)

View File

@ -147,7 +147,7 @@ class test_BaseClient:
class test_Client:
def test_get_client(self, hub):
pytest.importorskip('pycurl')
pytest.importorskip('urllib3')
client = http.get_client()
assert client.hub is hub
client2 = http.get_client(hub)

View File

@ -0,0 +1,257 @@
from __future__ import annotations
from io import BytesIO
from unittest.mock import Mock, patch
import pytest
import urllib3
import t.skip
from kombu.asynchronous.http.urllib3_client import (Urllib3Client,
_get_pool_key_parts)
@t.skip.if_pypy
@pytest.mark.usefixtures('hub')
class test_Urllib3Client:
class Client(Urllib3Client):
urllib3 = Mock(name='urllib3')
def test_max_clients_set(self):
x = self.Client(max_clients=303)
assert x.max_clients == 303
def test_init(self):
x = self.Client()
assert x._pools is not None
assert x._pending is not None
assert x._timeout_check_tref
def test_close(self):
with patch(
'kombu.asynchronous.http.urllib3_client.urllib3.PoolManager'
):
x = self.Client()
x._timeout_check_tref = Mock(name='timeout_check_tref')
x.close()
x._timeout_check_tref.cancel.assert_called_with()
for pool in x._pools.values():
pool.close.assert_called_with()
def test_add_request(self):
with patch(
'kombu.asynchronous.http.urllib3_client.urllib3.PoolManager'
):
x = self.Client()
x._process_queue = Mock(name='_process_queue')
request = Mock(name='request')
x.add_request(request)
assert request in x._pending
x._process_queue.assert_called_with()
def test_timeout_check(self):
with patch(
'kombu.asynchronous.http.urllib3_client.urllib3.PoolManager'
):
hub = Mock(name='hub')
x = self.Client(hub)
x._process_pending_requests = Mock(name='process_pending')
x._timeout_check()
x._process_pending_requests.assert_called_with()
def test_process_request(self):
with patch(
'kombu.asynchronous.http.urllib3_client.urllib3.PoolManager'
) as _pool_manager:
x = self.Client()
request = Mock(
name='request',
method='GET',
url='http://example.com',
headers={},
body=None,
follow_redirects=True,
auth_username=None,
auth_password=None,
user_agent=None,
use_gzip=False,
network_interface=None,
validate_cert=True,
ca_certs=None,
client_cert=None,
client_key=None,
proxy_host=None,
proxy_port=None,
proxy_username=None,
proxy_password=None,
on_ready=Mock(name='on_ready')
)
response = Mock(
name='response',
status=200,
headers={},
data=b'content'
)
response.geturl.return_value = 'http://example.com'
_pool_manager.return_value.request.return_value = response
x._process_request(request)
response_obj = x.Response(
request=request,
code=200,
headers={},
buffer=BytesIO(b'content'),
effective_url='http://example.com',
error=None
)
request.on_ready.assert_called()
called_response = request.on_ready.call_args[0][0]
assert called_response.code == response_obj.code
assert called_response.headers == response_obj.headers
assert (
called_response.buffer.getvalue() ==
response_obj.buffer.getvalue()
)
assert called_response.effective_url == response_obj.effective_url
assert called_response.error == response_obj.error
def test_process_request_with_error(self):
with patch(
'kombu.asynchronous.http.urllib3_client.urllib3.PoolManager'
) as _pool_manager:
x = self.Client()
x.close()
request = Mock(
name='request',
method='GET',
url='http://example.com',
headers={},
body=None,
follow_redirects=True,
auth_username=None,
auth_password=None,
user_agent=None,
use_gzip=False,
network_interface=None,
validate_cert=True,
ca_certs=None,
client_cert=None,
client_key=None,
proxy_host=None,
proxy_port=None,
proxy_username=None,
proxy_password=None,
on_ready=Mock(name='on_ready')
)
_pool_manager.return_value.request.side_effect = urllib3.exceptions.HTTPError("Test Error")
x._process_request(request)
request.on_ready.assert_called()
called_response = request.on_ready.call_args[0][0]
assert called_response.code == 599
assert called_response.error is not None
assert called_response.error.message == "Test Error"
def test_on_readable_on_writable(self):
x = self.Client()
x.on_readable(Mock(name='fd'))
x.on_writable(Mock(name='fd'))
def test_get_pool_with_proxy(self):
with patch(
'kombu.asynchronous.http.urllib3_client.urllib3.ProxyManager'
) as _proxy_manager:
x = self.Client()
request = Mock(
name='request',
proxy_host='proxy.example.com',
proxy_port=8080,
proxy_username='user',
proxy_password='pass'
)
x.get_pool(request)
_proxy_manager.assert_called_with(
proxy_url='proxy.example.com:8080',
num_pools=x.max_clients,
proxy_headers=urllib3.make_headers(
proxy_basic_auth="user:pass"
)
)
def test_get_pool_without_proxy(self):
with patch(
'kombu.asynchronous.http.urllib3_client.urllib3.PoolManager'
) as _pool_manager:
x = self.Client()
request = Mock(name='request', proxy_host=None)
x.get_pool(request)
_pool_manager.assert_called_with(num_pools=x.max_clients)
def test_process_request_with_proxy(self):
with patch(
'kombu.asynchronous.http.urllib3_client.urllib3.ProxyManager'
) as _proxy_manager:
x = self.Client()
request = Mock(
name='request',
method='GET',
url='http://example.com',
headers={},
body=None,
follow_redirects=True,
proxy_host='proxy.example.com',
proxy_port=8080,
proxy_username='user',
proxy_password='pass',
on_ready=Mock(name='on_ready')
)
response = Mock(
name='response',
status=200,
headers={},
data=b'content'
)
response.geturl.return_value = 'http://example.com'
_proxy_manager.return_value.request.return_value = response
x._process_request(request)
response_obj = x.Response(
request=request,
code=200,
headers={},
buffer=BytesIO(b'content'),
effective_url='http://example.com',
error=None
)
request.on_ready.assert_called()
called_response = request.on_ready.call_args[0][0]
assert called_response.code == response_obj.code
assert called_response.headers == response_obj.headers
assert (
called_response.buffer.getvalue()
== response_obj.buffer.getvalue()
)
assert called_response.effective_url == response_obj.effective_url
assert called_response.error == response_obj.error
def test_pool_key_parts(self):
request = Mock(
name='request',
method='GET',
url='http://example.com',
headers={},
body=None,
network_interface='test',
validate_cert=False,
ca_certs='test0.pem',
client_cert='test1.pem',
client_key='some_key',
)
pool_key = _get_pool_key_parts(request)
assert pool_key == [
"interface=test",
"validate_cert=False",
"ca_certs=test0.pem",
"client_cert=test1.pem",
"client_key=some_key"
]