fix(UrlRequest): Add "on_finish" and add alternative implementation (#7913)

* fix(UrlRequest): Add "on_finish" and add alternative implementation

- Added "on_finish" callback to URlRequest, it fires always when the request is done
- Added alternative implementation for UrlRequest based on REQUESTS lib
- It have also implemented "on_finish" and "auth" params for easy BASICAUTH handling
- Refactored tests of new implementation to be mocked by RESPONSES, so we dont need real network to test nor we dont ping real servers during tests
- Made selectable implementations of URLlib and Requests.
- Urllib is still default implementation
- Refactored baseclass to be used with "plugins"

* fix docs and add "implementation" in config

- There was missing docs in some places
- Minor "unused variables" fixes
- Add "implementation" key to config.network
This commit is contained in:
NomadDemon 2022-08-23 01:07:59 +02:00 committed by GitHub
parent 49a5f807d6
commit 4841427812
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 713 additions and 195 deletions

View File

@ -326,6 +326,9 @@ Available configuration tokens
Check the specific module's documentation for a list of accepted
arguments.
.. versionchanged:: 2.2.0
`implementation` has been added to the network section.
.. versionchanged:: 2.1.0
`vsync` has been added to the graphics section.
`verify_gl_main_thread` has been added to the graphics section.
@ -335,6 +338,7 @@ Available configuration tokens
to the `graphics` section.
`kivy_clock` has been added to the kivy section.
`default_font` has beed added to the kivy section.
`useragent` has been added to the network section.
.. versionchanged:: 1.9.0
`borderless` and `window_state` have been added to the graphics section.
@ -369,15 +373,16 @@ try:
from ConfigParser import ConfigParser as PythonConfigParser
except ImportError:
from configparser import RawConfigParser as PythonConfigParser
from collections import OrderedDict
from os import environ
from os.path import exists
from kivy import kivy_config_fn
from kivy.logger import Logger, logger_config_update
from collections import OrderedDict
from kivy.utils import platform
from kivy.compat import PY2, string_types
from weakref import ref
from kivy import kivy_config_fn
from kivy.compat import PY2, string_types
from kivy.logger import Logger, logger_config_update
from kivy.utils import platform
_is_rpi = exists('/opt/vc/include/bcm_host.h')
# Version number of current configuration format
@ -721,7 +726,7 @@ if not environ.get('KIVY_DOC_INCLUDE'):
'KIVY_NO_CONFIG' not in environ):
try:
Config.read(kivy_config_fn)
except Exception as e:
except Exception:
Logger.exception('Core: error while reading local'
'configuration')
@ -909,6 +914,9 @@ if not environ.get('KIVY_DOC_INCLUDE'):
Config.setdefault('graphics', 'custom_titlebar', '0')
Config.setdefault('graphics', 'custom_titlebar_border', '5')
elif version == 24:
Config.setdefault("network", "implementation", "default")
else:
# for future.
break
@ -928,7 +936,7 @@ if not environ.get('KIVY_DOC_INCLUDE'):
try:
Config.filename = kivy_config_fn
Config.write()
except Exception as e:
except Exception:
Logger.exception('Core: Error while saving default config file')
# Load configuration from env

View File

@ -58,43 +58,35 @@ If you want a synchronous request, you can call the wait() method.
from base64 import b64encode
from collections import deque
from threading import Thread, Event
from http.client import HTTPConnection
from json import loads
from threading import Event, Thread
from time import sleep
from kivy.compat import PY2
from kivy.config import Config
from urllib.parse import urlparse, urlunparse
if PY2:
from httplib import HTTPConnection
from urlparse import urlparse, urlunparse
else:
from http.client import HTTPConnection
from urllib.parse import urlparse, urlunparse
import requests
from kivy.clock import Clock
from kivy.config import Config
from kivy.logger import Logger
from kivy.utils import platform
from kivy.weakmethod import WeakMethod
try:
import ssl
HTTPSConnection = None
if PY2:
from httplib import HTTPSConnection
else:
from http.client import HTTPSConnection
from http.client import HTTPSConnection
except ImportError:
# depending the platform, if openssl support wasn't compiled before python,
# this class is not available.
pass
from kivy.clock import Clock
from kivy.weakmethod import WeakMethod
from kivy.logger import Logger
from kivy.utils import platform
# list to save UrlRequest and prevent GC on un-referenced objects
g_requests = []
class UrlRequest(Thread):
class UrlRequestBase(Thread):
'''A UrlRequest. See module documentation for usage.
.. versionchanged:: 1.5.1
@ -123,6 +115,11 @@ class UrlRequest(Thread):
Parameters `on_cancel` added.
.. versionchanged:: 2.2.0
Parameters `on_finish` added.
Parameters `auth` added.
:Parameters:
`url`: str
Complete url string to call.
@ -143,6 +140,8 @@ class UrlRequest(Thread):
`on_cancel`: callback(request)
Callback function to call if user requested to cancel the download
operation via the .cancel() method.
`on_finish`: callback(request)
Additional callback function to call if request is done.
`req_body`: str, defaults to None
Data to sent in the request. If it's not None, a POST will be done
instead of a GET.
@ -179,16 +178,21 @@ class UrlRequest(Thread):
`proxy_headers`: dict, defaults to None
If set, and `proxy_host` is also set, the headers to send to the
proxy server in the ``CONNECT`` request.
`auth`: HTTPBasicAuth, defaults to None
If set, request will use basicauth to authenticate.
Only used in "Requests" implementation
'''
def __init__(self, url, on_success=None, on_redirect=None,
on_failure=None, on_error=None, on_progress=None,
req_body=None, req_headers=None, chunk_size=8192,
timeout=None, method=None, decode=True, debug=False,
file_path=None, ca_file=None, verify=True, proxy_host=None,
proxy_port=None, proxy_headers=None, user_agent=None,
on_cancel=None, cookies=None):
super(UrlRequest, self).__init__()
def __init__(
self, url, on_success=None, on_redirect=None,
on_failure=None, on_error=None, on_progress=None,
req_body=None, req_headers=None, chunk_size=8192,
timeout=None, method=None, decode=True, debug=False,
file_path=None, ca_file=None, verify=True, proxy_host=None,
proxy_port=None, proxy_headers=None, user_agent=None,
on_cancel=None, on_finish=None, cookies=None, auth=None
):
super().__init__()
self._queue = deque()
self._trigger_result = Clock.create_trigger(self._dispatch_result, 0)
self.daemon = True
@ -198,6 +202,7 @@ class UrlRequest(Thread):
self.on_error = WeakMethod(on_error) if on_error else None
self.on_progress = WeakMethod(on_progress) if on_progress else None
self.on_cancel = WeakMethod(on_cancel) if on_cancel else None
self.on_finish = WeakMethod(on_finish) if on_finish else None
self.decode = decode
self.file_path = file_path
self._debug = debug
@ -217,6 +222,8 @@ class UrlRequest(Thread):
self._cancel_event = Event()
self._user_agent = user_agent
self._cookies = cookies
self._requested_url = url
self._auth = auth
if platform in ['android', 'ios']:
import certifi
@ -284,34 +291,12 @@ class UrlRequest(Thread):
if self in g_requests:
g_requests.remove(self)
def _parse_url(self, url):
parse = urlparse(url)
host = parse.hostname
port = parse.port
userpass = None
# append user + pass to hostname if specified
if parse.username and parse.password:
userpass = {
"Authorization": "Basic {}".format(b64encode(
"{}:{}".format(
parse.username,
parse.password
).encode('utf-8')
).decode('utf-8'))
}
return host, port, userpass, parse
def _fetch_url(self, url, body, headers, q):
# Parse and fetch the current url
trigger = self._trigger_result
chunk_size = self._chunk_size
report_progress = self.on_progress is not None
timeout = self._timeout
file_path = self.file_path
ca_file = self.ca_file
verify = self.verify
if self._debug:
Logger.debug('UrlRequest: {0} Fetch url <{1}>'.format(
@ -321,104 +306,27 @@ class UrlRequest(Thread):
Logger.debug('UrlRequest: {0} - headers: {1}'.format(
id(self), headers))
# parse url
host, port, userpass, parse = self._parse_url(url)
if userpass and not headers:
headers = userpass
elif userpass and headers:
key = list(userpass.keys())[0]
headers[key] = userpass[key]
# translate scheme to connection class
cls = self.get_connection_for_scheme(parse.scheme)
# reconstruct path to pass on the request
path = parse.path
if parse.params:
path += ';' + parse.params
if parse.query:
path += '?' + parse.query
if parse.fragment:
path += '#' + parse.fragment
# create connection instance
args = {}
if timeout is not None:
args['timeout'] = timeout
if (ca_file is not None and hasattr(ssl, 'create_default_context') and
parse.scheme == 'https'):
ctx = ssl.create_default_context(cafile=ca_file)
ctx.verify_mode = ssl.CERT_REQUIRED
args['context'] = ctx
if not verify and parse.scheme == 'https' and (
hasattr(ssl, 'create_default_context')):
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
args['context'] = ctx
if self._proxy_host:
Logger.debug('UrlRequest: {0} - proxy via {1}:{2}'.format(
id(self), self._proxy_host, self._proxy_port
))
req = cls(self._proxy_host, self._proxy_port, **args)
if parse.scheme == 'https':
req.set_tunnel(host, port, self._proxy_headers)
else:
path = urlunparse(parse)
else:
req = cls(host, port, **args)
# send request
method = self._method
if method is None:
method = 'GET' if body is None else 'POST'
req.request(method, path, body, headers or {})
# read header
resp = req.getresponse()
req, resp = self.call_request(body, headers)
# read content
if report_progress or file_path is not None:
try:
total_size = int(resp.getheader('content-length'))
except:
total_size = -1
total_size = self.get_total_size(resp)
# before starting the download, send a fake progress to permit the
# user to initialize his ui
if report_progress:
q(('progress', resp, (0, total_size)))
def get_chunks(fd=None):
bytes_so_far = 0
result = b''
while 1:
chunk = resp.read(chunk_size)
if not chunk:
break
if fd:
fd.write(chunk)
else:
result += chunk
bytes_so_far += len(chunk)
# report progress to user
if report_progress:
q(('progress', resp, (bytes_so_far, total_size)))
trigger()
if self._cancel_event.is_set():
break
return bytes_so_far, result
if file_path is not None:
with open(file_path, 'wb') as fd:
bytes_so_far, result = get_chunks(fd)
bytes_so_far, result = self.get_chunks(
resp, chunk_size, total_size, report_progress, q,
trigger, fd=fd
)
else:
bytes_so_far, result = get_chunks()
bytes_so_far, result = self.get_chunks(
resp, chunk_size, total_size, report_progress, q, trigger
)
# ensure that results are dispatched for the last chunk,
# avoid trigger
@ -426,32 +334,19 @@ class UrlRequest(Thread):
q(('progress', resp, (bytes_so_far, total_size)))
trigger()
else:
result = resp.read()
result = self.get_response(resp)
try:
if isinstance(result, bytes):
result = result.decode('utf-8')
except UnicodeDecodeError:
# if it's an image? decoding would not work
pass
req.close()
self.close_connection(req)
# return everything
return result, resp
def get_connection_for_scheme(self, scheme):
'''Return the Connection class for a particular scheme.
This is an internal function that can be expanded to support custom
schemes.
Actual supported schemes: http, https.
'''
if scheme == 'http':
return HTTPConnection
elif scheme == 'https' and HTTPSConnection is not None:
return HTTPSConnection
else:
raise Exception('No class for scheme %s' % scheme)
def decode_result(self, result, resp):
'''Decode the result fetched from url according to his Content-Type.
Currently supports only application/json.
@ -459,7 +354,7 @@ class UrlRequest(Thread):
# Entry to decode url from the content type.
# For example, if the content type is a json, it will be automatically
# decoded.
content_type = resp.getheader('Content-Type', None)
content_type = self.get_content_type(resp)
if content_type is not None:
ct = content_type.split(';')[0]
if ct == 'application/json':
@ -467,7 +362,7 @@ class UrlRequest(Thread):
result = result.decode('utf-8')
try:
return loads(result)
except:
except Exception:
return result
return result
@ -479,12 +374,13 @@ class UrlRequest(Thread):
result, resp, data = self._queue.pop()
except IndexError:
return
if resp:
# Small workaround in order to prevent the situation mentioned
# in the comment below
final_cookies = ""
parsed_headers = []
for key, value in resp.getheaders():
for key, value in self.get_all_headers(resp):
if key == "Set-Cookie":
final_cookies += "{};".format(value)
else:
@ -496,15 +392,17 @@ class UrlRequest(Thread):
# ? http://stackoverflow.com/questions/2454494/..
# ..urllib2-multiple-set-cookie-headers-in-response
self._resp_headers = dict(parsed_headers)
self._resp_status = resp.status
self._resp_status = self.get_status_code(resp)
if result == 'success':
status_class = resp.status // 100
status_class = self.get_status_code(resp) // 100
if status_class in (1, 2):
if self._debug:
Logger.debug('UrlRequest: {0} Download finished with'
' {1} datalen'.format(id(self),
len(data)))
Logger.debug(
'UrlRequest: {0} Download finished with '
'{1} datalen'.format(id(self), data)
)
self._is_finished = True
self._result = data
if self.on_success:
@ -525,9 +423,13 @@ class UrlRequest(Thread):
elif status_class in (4, 5):
if self._debug:
Logger.debug('UrlRequest: {} Download failed with '
'http error {}'.format(id(self),
resp.status))
Logger.debug(
'UrlRequest: {} Download failed with '
'http error {}'.format(
id(self),
self.get_status_code(resp)
)
)
self._is_finished = True
self._result = data
if self.on_failure:
@ -566,6 +468,13 @@ class UrlRequest(Thread):
else:
assert 0
if result != "progress" and self.on_finish:
if self._debug:
Logger.debug('UrlRequest: Request is finished')
func = self.on_finish()
if func:
func(self)
@property
def is_finished(self):
'''Return True if the request has finished, whether it's a
@ -633,26 +542,250 @@ class UrlRequest(Thread):
self._cancel_event.set()
if __name__ == '__main__':
class UrlRequestUrllib(UrlRequestBase):
from pprint import pprint
def get_chunks(
self, resp, chunk_size, total_size, report_progress, q,
trigger, fd=None
):
bytes_so_far = 0
result = b''
def on_success(req, result):
pprint('Got the result:')
pprint(result)
while 1:
chunk = resp.read(chunk_size)
if not chunk:
break
def on_error(req, error):
pprint('Got an error:')
pprint(error)
if fd:
fd.write(chunk)
Clock.start_clock()
req = UrlRequest('https://en.wikipedia.org/w/api.php?format'
'=json&action=query&titles=Kivy&prop=revisions&rvprop=content',
on_success, on_error)
while not req.is_finished:
sleep(1)
Clock.tick()
Clock.stop_clock()
else:
result += chunk
print('result =', req.result)
print('error =', req.error)
bytes_so_far += len(chunk)
if report_progress:
q(('progress', resp, (bytes_so_far, total_size)))
trigger()
if self._cancel_event.is_set():
break
return bytes_so_far, result
def get_response(self, resp):
return resp.read()
def get_total_size(self, resp):
try:
return int(resp.getheader('content-length'))
except Exception:
return -1
def get_content_type(self, resp):
return resp.getheader('Content-Type', None)
def get_status_code(self, resp):
return resp.status
def get_all_headers(self, resp):
return resp.getheaders()
def close_connection(self, req):
req.close()
def _parse_url(self, url):
parse = urlparse(url)
host = parse.hostname
port = parse.port
userpass = None
# append user + pass to hostname if specified
if parse.username and parse.password:
userpass = {
"Authorization": "Basic {}".format(b64encode(
"{}:{}".format(
parse.username,
parse.password
).encode('utf-8')
).decode('utf-8'))
}
return host, port, userpass, parse
def _get_connection_for_scheme(self, scheme):
'''Return the Connection class for a particular scheme.
This is an internal function that can be expanded to support custom
schemes.
Actual supported schemes: http, https.
'''
if scheme == 'http':
return HTTPConnection
elif scheme == 'https' and HTTPSConnection is not None:
return HTTPSConnection
else:
raise Exception('No class for scheme %s' % scheme)
def call_request(self, body, headers):
timeout = self._timeout
ca_file = self.ca_file
verify = self.verify
url = self._requested_url
# parse url
host, port, userpass, parse = self._parse_url(url)
if userpass and not headers:
headers = userpass
elif userpass and headers:
key = list(userpass.keys())[0]
headers[key] = userpass[key]
# translate scheme to connection class
cls = self._get_connection_for_scheme(parse.scheme)
# reconstruct path to pass on the request
path = parse.path
if parse.params:
path += ';' + parse.params
if parse.query:
path += '?' + parse.query
if parse.fragment:
path += '#' + parse.fragment
# create connection instance
args = {}
if timeout is not None:
args['timeout'] = timeout
if (ca_file is not None and hasattr(ssl, 'create_default_context') and
parse.scheme == 'https'):
ctx = ssl.create_default_context(cafile=ca_file)
ctx.verify_mode = ssl.CERT_REQUIRED
args['context'] = ctx
if not verify and parse.scheme == 'https' and (
hasattr(ssl, 'create_default_context')):
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
args['context'] = ctx
if self._proxy_host:
Logger.debug('UrlRequest: {0} - proxy via {1}:{2}'.format(
id(self), self._proxy_host, self._proxy_port
))
req = cls(self._proxy_host, self._proxy_port, **args)
if parse.scheme == 'https':
req.set_tunnel(host, port, self._proxy_headers)
else:
path = urlunparse(parse)
else:
req = cls(host, port, **args)
# send request
method = self._method
if method is None:
method = 'GET' if body is None else 'POST'
req.request(method, path, body, headers or {})
# read header
return req, req.getresponse()
class UrlRequestRequests(UrlRequestBase):
def get_chunks(
self, resp, chunk_size, total_size, report_progress, q,
trigger, fd=None
):
bytes_so_far = 0
result = b''
for chunk in resp.iter_content(chunk_size):
if not chunk:
break
if fd:
fd.write(chunk)
else:
result += chunk
bytes_so_far += len(chunk)
if report_progress:
q(('progress', resp, (bytes_so_far, total_size)))
trigger()
if self._cancel_event.is_set():
break
return bytes_so_far, result
def get_response(self, resp):
return resp.content
def get_total_size(self, resp):
return int(resp.headers.get('Content-Length', -1))
def get_content_type(self, resp):
return resp.headers.get('Content-Type', None)
def get_status_code(self, resp):
return resp.status_code
def get_all_headers(self, resp):
return resp.headers.items()
def close_connection(self, req):
pass
def call_request(self, body, headers):
timeout = self._timeout
ca_file = self.ca_file
verify = self.verify
url = self._requested_url
auth = self._auth
req = requests
kwargs = {}
# get method
if self._method is None:
method = 'get' if body is None else 'post'
else:
method = self._method.lower()
req_call = getattr(req, method)
if auth:
kwargs["auth"] = auth
# send request
response = req_call(
url,
data=body,
headers=headers,
timeout=timeout,
verify=verify,
cert=ca_file,
**kwargs
)
return None, response
implementation_map = {
"default": UrlRequestUrllib,
"requests": UrlRequestRequests,
"urllib": UrlRequestUrllib,
}
if Config.has_option('network', 'implementation'):
prefered_implementation = Config.get("network", "implementation")
UrlRequest = implementation_map.get(prefered_implementation)
else:
UrlRequest = implementation_map["default"]

View File

@ -0,0 +1,378 @@
"""
UrlRequest tests
================
"""
import threading
from base64 import b64encode
from datetime import datetime
from time import sleep
import certifi
import pytest
import responses
from kivy.network.urlrequest import UrlRequestRequests as UrlRequest
from requests.auth import HTTPBasicAuth
from responses import matchers
class UrlRequestQueue:
def __init__(self, queue):
self.queue = queue
def _on_success(self, req, *args):
self.queue.append((threading.get_ident(), "success", args))
def _on_redirect(self, req, *args):
self.queue.append((threading.get_ident(), "redirect", args))
def _on_error(self, req, *args):
self.queue.append((threading.get_ident(), "error", args))
def _on_failure(self, req, *args):
self.queue.append((threading.get_ident(), "failure", args))
def _on_progress(self, req, *args):
self.queue.append((threading.get_ident(), "progress", args))
def _on_finish(self, req, *args):
self.queue.append((threading.get_ident(), "finish", args))
class TestCallbacks:
url = "https://example.com"
def _ensure_called_from_thread(self, queue):
tid = threading.get_ident()
for item in queue:
assert item[0] == tid
def _check_queue_values(self, queue_element, status):
assert queue_element[1] == status
def wait_request_is_finished(self, kivy_clock, request, timeout=10):
start_time = datetime.now()
timed_out = False
while not request.is_finished and not timed_out:
kivy_clock.tick()
sleep(0.1)
timed_out = (datetime.now() - start_time).total_seconds() > timeout
assert request.is_finished
@responses.activate
def test_on_success(self, kivy_clock):
_queue = UrlRequestQueue([])
responses.get(
self.url,
body="{}",
status=200,
content_type="application/json",
)
req = UrlRequest(
self.url,
on_success=_queue._on_success,
debug=True,
)
self.wait_request_is_finished(kivy_clock, req)
processed_queue = _queue.queue
assert len(processed_queue) == 1
self._ensure_called_from_thread(processed_queue)
self._check_queue_values(processed_queue[0], "success")
@responses.activate
def test_on_success_with_finish(self, kivy_clock):
_queue = UrlRequestQueue([])
responses.get(
self.url,
body="{}",
status=200,
content_type="application/json",
)
req = UrlRequest(
self.url,
on_success=_queue._on_success,
on_finish=_queue._on_finish,
debug=True,
)
self.wait_request_is_finished(kivy_clock, req)
processed_queue = _queue.queue
assert len(processed_queue) == 2
self._ensure_called_from_thread(processed_queue)
self._check_queue_values(processed_queue[0], "success")
self._check_queue_values(processed_queue[1], "finish")
@responses.activate
def test_on_redirect(self, kivy_clock):
_queue = UrlRequestQueue([])
responses.get(
self.url,
body="{}",
status=301,
content_type="application/json",
)
req = UrlRequest(
self.url,
on_redirect=_queue._on_redirect,
debug=True,
)
self.wait_request_is_finished(kivy_clock, req)
processed_queue = _queue.queue
assert len(processed_queue) == 1
self._ensure_called_from_thread(processed_queue)
self._check_queue_values(processed_queue[0], "redirect")
@responses.activate
def test_on_redirect_with_finish(self, kivy_clock):
_queue = UrlRequestQueue([])
responses.get(
self.url,
body="{}",
status=301,
content_type="application/json",
)
req = UrlRequest(
self.url,
on_redirect=_queue._on_redirect,
on_finish=_queue._on_finish,
debug=True,
)
self.wait_request_is_finished(kivy_clock, req)
processed_queue = _queue.queue
assert len(processed_queue) == 2
self._ensure_called_from_thread(processed_queue)
self._check_queue_values(processed_queue[0], "redirect")
self._check_queue_values(processed_queue[1], "finish")
@responses.activate
def test_on_error(self, kivy_clock):
_queue = UrlRequestQueue([])
responses.get(
self.url,
body=Exception("..."),
status=400,
content_type="application/json",
)
req = UrlRequest(
self.url,
on_error=_queue._on_error,
debug=True,
)
self.wait_request_is_finished(kivy_clock, req)
processed_queue = _queue.queue
assert len(processed_queue) == 1
self._ensure_called_from_thread(processed_queue)
self._check_queue_values(processed_queue[0], "error")
@responses.activate
def test_on_error_with_finis(self, kivy_clock):
_queue = UrlRequestQueue([])
responses.get(
self.url,
body=Exception("..."),
status=400,
content_type="application/json",
)
req = UrlRequest(
self.url,
on_error=_queue._on_error,
on_finish=_queue._on_finish,
debug=True,
)
self.wait_request_is_finished(kivy_clock, req)
processed_queue = _queue.queue
assert len(processed_queue) == 2
self._ensure_called_from_thread(processed_queue)
self._check_queue_values(processed_queue[0], "error")
self._check_queue_values(processed_queue[1], "finish")
@responses.activate
def test_on_failure(self, kivy_clock):
_queue = UrlRequestQueue([])
responses.get(
self.url,
body="{}",
status=400,
content_type="application/json",
)
req = UrlRequest(
self.url,
on_failure=_queue._on_failure,
debug=True,
)
self.wait_request_is_finished(kivy_clock, req)
processed_queue = _queue.queue
assert len(processed_queue) == 1
self._ensure_called_from_thread(processed_queue)
self._check_queue_values(processed_queue[0], "failure")
@responses.activate
def test_on_failure_with_finish(self, kivy_clock):
_queue = UrlRequestQueue([])
responses.get(
self.url,
body="{}",
status=400,
content_type="application/json",
)
req = UrlRequest(
self.url,
on_failure=_queue._on_failure,
on_finish=_queue._on_finish,
debug=True,
)
self.wait_request_is_finished(kivy_clock, req)
processed_queue = _queue.queue
assert len(processed_queue) == 2
self._ensure_called_from_thread(processed_queue)
self._check_queue_values(processed_queue[0], "failure")
self._check_queue_values(processed_queue[1], "finish")
@responses.activate
def test_on_progress(self, kivy_clock):
_queue = UrlRequestQueue([])
responses.get(
self.url,
body="x" * 100,
status=200,
content_type="text/plain",
auto_calculate_content_length=True
)
req = UrlRequest(
self.url,
on_progress=_queue._on_progress,
chunk_size=70,
debug=True,
)
self.wait_request_is_finished(kivy_clock, req)
processed_queue = _queue.queue
assert len(processed_queue) == 4
self._ensure_called_from_thread(processed_queue)
self._check_queue_values(processed_queue[0], "progress")
self._check_queue_values(processed_queue[1], "progress")
self._check_queue_values(processed_queue[2], "progress")
self._check_queue_values(processed_queue[3], "progress")
@responses.activate
def test_on_progress_with_finish(self, kivy_clock):
_queue = UrlRequestQueue([])
responses.get(
self.url,
body="x" * 100,
status=200,
content_type="text/plain",
auto_calculate_content_length=True
)
req = UrlRequest(
self.url,
on_progress=_queue._on_progress,
on_finish=_queue._on_finish,
chunk_size=70,
debug=True,
)
self.wait_request_is_finished(kivy_clock, req)
processed_queue = _queue.queue
assert len(processed_queue) == 5
self._ensure_called_from_thread(processed_queue)
self._check_queue_values(processed_queue[0], "progress")
self._check_queue_values(processed_queue[1], "progress")
self._check_queue_values(processed_queue[2], "progress")
self._check_queue_values(processed_queue[3], "progress")
self._check_queue_values(processed_queue[4], "finish")
@responses.activate
def test_on_finish(self, kivy_clock):
_queue = UrlRequestQueue([])
responses.get(
self.url,
body="{}",
status=400,
content_type="application/json",
)
req = UrlRequest(
self.url,
on_finish=_queue._on_finish,
debug=True,
)
self.wait_request_is_finished(kivy_clock, req)
processed_queue = _queue.queue
assert len(processed_queue) == 1
self._ensure_called_from_thread(processed_queue)
self._check_queue_values(processed_queue[0], "finish")
@responses.activate
def test_auth_header(self, kivy_clock):
_queue = UrlRequestQueue([])
head = {
"Authorization": "Basic {}".format(
b64encode(b"exampleuser:examplepassword").decode("utf-8")
)
}
responses.get(
self.url,
body="{}",
status=400,
content_type="application/json",
match=[matchers.header_matcher(head)],
)
req = UrlRequest(
self.url,
req_headers=head,
on_finish=_queue._on_finish,
debug=True,
auth=HTTPBasicAuth("exampleuser", "examplepassword")
)
self.wait_request_is_finished(kivy_clock, req)
processed_queue = _queue.queue
assert len(processed_queue) == 1
self._ensure_called_from_thread(processed_queue)
self._check_queue_values(processed_queue[0], "finish")
@pytest.mark.parametrize("scheme", ("http", "https"))
@responses.activate
def test_ca_file(self, scheme, kivy_clock):
_queue = UrlRequestQueue([])
responses.get(
f"{scheme}://example.com",
body="{}",
status=400,
content_type="application/json",
)
req = UrlRequest(
f"{scheme}://example.com",
on_finish=_queue._on_finish,
ca_file=certifi.where(),
debug=True,
)
self.wait_request_is_finished(kivy_clock, req)
processed_queue = _queue.queue
assert len(processed_queue) == 1
self._ensure_called_from_thread(processed_queue)
self._check_queue_values(processed_queue[0], "finish")

View File

@ -2,13 +2,14 @@
UrlRequest tests
================
'''
import pytest
import os
import threading
from base64 import b64encode
from datetime import datetime
from time import sleep
from base64 import b64encode
import os
import pytest
from kivy.network.urlrequest import UrlRequestUrllib as UrlRequest
def wait_request_is_finished(kivy_clock, request, timeout=10):
@ -62,7 +63,6 @@ class UrlRequestQueue:
@pytest.mark.skipif(os.environ.get('NONETWORK'), reason="No network")
def test_callbacks(kivy_clock):
from kivy.network.urlrequest import UrlRequest
obj = UrlRequestQueue([])
queue = obj.queue
req = UrlRequest('http://google.com',
@ -82,7 +82,6 @@ def test_callbacks(kivy_clock):
@pytest.mark.skipif(os.environ.get('NONETWORK'), reason="No network")
def test_auth_header(kivy_clock):
from kivy.network.urlrequest import UrlRequest
obj = UrlRequestQueue([])
queue = obj.queue
head = {
@ -111,7 +110,6 @@ def test_auth_header(kivy_clock):
@pytest.mark.skipif(os.environ.get('NONETWORK'), reason="No network")
def test_auth_auto(kivy_clock):
from kivy.network.urlrequest import UrlRequest
obj = UrlRequestQueue([])
queue = obj.queue
req = UrlRequest(
@ -136,7 +134,6 @@ def test_auth_auto(kivy_clock):
@pytest.mark.parametrize("scheme", ("http", "https"))
def test_ca_file(kivy_clock, scheme):
"""Passing a `ca_file` should not crash on http scheme, refs #6946"""
from kivy.network.urlrequest import UrlRequest
import certifi
obj = UrlRequestQueue([])
queue = obj.queue

View File

@ -52,8 +52,10 @@ dev =
kivy_deps.glew_dev~=0.3.1; sys_platform == "win32"
flake8
pre-commit
responses
base =
pillow
requests
docutils
pygments
kivy_deps.angle~=0.3.2; sys_platform == "win32"