MAINT Split conftest.py into modules (#2418)

This commit is contained in:
Gyeongjae Choi 2022-05-08 16:52:08 +09:00 committed by GitHub
parent be59fae4f4
commit b20b43bd66
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
56 changed files with 957 additions and 1019 deletions

View File

@ -215,7 +215,7 @@ jobs:
--verbose \
-k 'not (chrome or firefox or node)' \
--cov=pyodide_build --cov=pyodide \
src pyodide-build packages/micropip/
src pyodide-build packages/micropip/ pyodide-test-runner
- store_test_results:
path: test-results
@ -411,7 +411,7 @@ workflows:
- test-main:
name: test-core-chrome
test-params: -k "chrome and not webworker" src packages/micropip packages/fpcast-test packages/sharedlib-test-py/ packages/cpp-exceptions-test/
test-params: -k "chrome and not webworker" src packages/micropip packages/fpcast-test packages/sharedlib-test-py/ packages/cpp-exceptions-test/ pyodide-test-runner
requires:
- build-core
filters:
@ -420,7 +420,7 @@ workflows:
- test-main:
name: test-core-firefox
test-params: -k "firefox and not webworker" src packages/micropip packages/fpcast-test packages/sharedlib-test-py/ packages/cpp-exceptions-test/
test-params: -k "firefox and not webworker" src packages/micropip packages/fpcast-test packages/sharedlib-test-py/ packages/cpp-exceptions-test/ pyodide-test-runner
requires:
- build-core
filters:
@ -429,7 +429,7 @@ workflows:
- test-main:
name: test-core-node
test-params: -k node src packages/micropip packages/fpcast-test packages/sharedlib-test-py/ packages/cpp-exceptions-test/
test-params: -k node src packages/micropip packages/fpcast-test packages/sharedlib-test-py/ packages/cpp-exceptions-test/ pyodide-test-runner
requires:
- build-core
filters:

View File

@ -7,8 +7,13 @@ from pathlib import Path
from time import time
sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
sys.path.append(str(Path(__file__).resolve().parents[1] / "pyodide-test-runner"))
import conftest # noqa: E402
from pyodide_test_runner import ( # noqa: E402
ChromeWrapper,
FirefoxWrapper,
spawn_web_server,
)
SKIP = {"fft", "hyantes"}
@ -150,6 +155,11 @@ def parse_args(benchmarks):
type=int,
help="Browser timeout(sec) for each benchmark (default: %(default)s)",
)
parser.add_argument(
"--dist-dir",
default=str(Path(__file__).parents[1] / "dist"),
help="Pyodide dist directory (default: %(default)s)",
)
return parser.parse_args()
@ -171,11 +181,11 @@ def main():
results = {}
selenium_backends = {}
browser_cls = [
("firefox", conftest.FirefoxWrapper),
("chrome", conftest.ChromeWrapper),
("firefox", FirefoxWrapper),
("chrome", ChromeWrapper),
]
with conftest.spawn_web_server() as (hostname, port, log_path):
with spawn_web_server(args.dist_dir) as (hostname, port, log_path):
# selenium initialization time
result = {"native": float("NaN")}

View File

@ -1,31 +1,34 @@
"""
Various common utilities for testing.
"""
import contextlib
import functools
import json
import multiprocessing
import os
import pathlib
import queue
import re
import shutil
import sys
import tempfile
import textwrap
import time
import pexpect
import pytest
ROOT_PATH = pathlib.Path(__file__).parents[0].resolve()
TEST_PATH = ROOT_PATH / "src" / "tests"
DIST_PATH = ROOT_PATH / "dist"
sys.path.append(str(ROOT_PATH / "pyodide-test-runner"))
sys.path.append(str(ROOT_PATH / "pyodide-build"))
sys.path.append(str(ROOT_PATH / "src" / "py"))
from pyodide_build.testing import parse_driver_timeout, set_webdriver_script_timeout
from pyodide_test_runner.fixture import ( # noqa: F401
script_type,
selenium,
selenium_common,
selenium_context_manager,
selenium_esm,
selenium_module_scope,
selenium_standalone,
selenium_standalone_noload,
selenium_standalone_noload_common,
selenium_webworker_standalone,
web_server_main,
web_server_secondary,
)
from pyodide_test_runner.utils import maybe_skip_test
from pyodide_test_runner.utils import package_is_built as _package_is_built
def pytest_addoption(parser):
@ -50,6 +53,8 @@ def pytest_configure(config):
it to reduce the verbosity of the test names in the table. This leaves
enough room to see the information about the test failure in the summary.
"""
global CONFIG
old_cwd_relative_nodeid = config.cwd_relative_nodeid
def cwd_relative_nodeid(*args):
@ -61,6 +66,8 @@ def pytest_configure(config):
config.cwd_relative_nodeid = cwd_relative_nodeid
pytest.pyodide_dist_dir = config.getoption("--dist-dir")
def pytest_collection_modifyitems(config, items):
"""Called after collect is completed.
@ -70,400 +77,7 @@ def pytest_collection_modifyitems(config, items):
items : list of collected items
"""
for item in items:
_maybe_skip_test(item, delayed=True)
@functools.cache
def built_packages() -> list[str]:
"""Returns the list of built package names from packages.json"""
packages_json_path = DIST_PATH / "packages.json"
if not packages_json_path.exists():
return []
return list(json.loads(packages_json_path.read_text())["packages"].keys())
def _package_is_built(package_name: str) -> bool:
return package_name.lower() in built_packages()
class JavascriptException(Exception):
def __init__(self, msg, stack):
self.msg = msg
self.stack = stack
# In chrome the stack contains the message
if self.stack and self.stack.startswith(self.msg):
self.msg = ""
def __str__(self):
return "\n\n".join(x for x in [self.msg, self.stack] if x)
class SeleniumWrapper:
JavascriptException = JavascriptException
def __init__(
self,
server_port,
server_hostname="127.0.0.1",
server_log=None,
load_pyodide=True,
script_timeout=20,
script_type="classic",
):
self.server_port = server_port
self.server_hostname = server_hostname
self.base_url = f"http://{self.server_hostname}:{self.server_port}"
self.server_log = server_log
self.script_type = script_type
self.driver = self.get_driver() # type: ignore[attr-defined]
self.set_script_timeout(script_timeout)
self.script_timeout = script_timeout
self.prepare_driver()
self.javascript_setup()
if load_pyodide:
self.load_pyodide()
self.initialize_global_hiwire_objects()
self.save_state()
self.restore_state()
SETUP_CODE = pathlib.Path(ROOT_PATH / "tools/testsetup.js").read_text()
def prepare_driver(self):
if self.script_type == "classic":
self.driver.get(f"{self.base_url}/test.html")
elif self.script_type == "module":
self.driver.get(f"{self.base_url}/module_test.html")
else:
raise Exception("Unknown script type to load!")
def set_script_timeout(self, timeout):
self.driver.set_script_timeout(timeout)
def quit(self):
self.driver.quit()
def refresh(self):
self.driver.refresh()
self.javascript_setup()
def javascript_setup(self):
self.run_js(
SeleniumWrapper.SETUP_CODE,
pyodide_checks=False,
)
def load_pyodide(self):
self.run_js(
"""
let pyodide = await loadPyodide({ fullStdLib: false, jsglobals : self });
self.pyodide = pyodide;
globalThis.pyodide = pyodide;
pyodide._api.inTestHoist = true; // improve some error messages for tests
"""
)
def initialize_global_hiwire_objects(self):
"""
There are a bunch of global objects that occasionally enter the hiwire cache
but never leave. The refcount checks get angry about them if they aren't preloaded.
We need to go through and touch them all once to keep everything okay.
"""
self.run_js(
"""
pyodide.globals.get;
pyodide.pyodide_py.eval_code;
pyodide.pyodide_py.eval_code_async;
pyodide.pyodide_py.register_js_module;
pyodide.pyodide_py.unregister_js_module;
pyodide.pyodide_py.find_imports;
pyodide._api.importlib.invalidate_caches;
pyodide._api.package_loader.unpack_buffer;
pyodide._api.package_loader.get_dynlibs;
pyodide.runPython("");
"""
)
@property
def pyodide_loaded(self):
return self.run_js("return !!(self.pyodide && self.pyodide.runPython);")
@property
def logs(self):
logs = self.run_js("return self.logs;", pyodide_checks=False)
if logs is not None:
return "\n".join(str(x) for x in logs)
return ""
def clean_logs(self):
self.run_js("self.logs = []", pyodide_checks=False)
def run(self, code):
return self.run_js(
f"""
let result = pyodide.runPython({code!r});
if(result && result.toJs){{
let converted_result = result.toJs();
if(pyodide.isPyProxy(converted_result)){{
converted_result = undefined;
}}
result.destroy();
return converted_result;
}}
return result;
"""
)
def run_async(self, code):
return self.run_js(
f"""
await pyodide.loadPackagesFromImports({code!r})
let result = await pyodide.runPythonAsync({code!r});
if(result && result.toJs){{
let converted_result = result.toJs();
if(pyodide.isPyProxy(converted_result)){{
converted_result = undefined;
}}
result.destroy();
return converted_result;
}}
return result;
"""
)
def run_js(self, code, pyodide_checks=True):
"""Run JavaScript code and check for pyodide errors"""
if isinstance(code, str) and code.startswith("\n"):
# we have a multiline string, fix indentation
code = textwrap.dedent(code)
if pyodide_checks:
check_code = """
if(globalThis.pyodide && pyodide._module && pyodide._module._PyErr_Occurred()){
try {
pyodide._module._pythonexc2js();
} catch(e){
console.error(`Python exited with error flag set! Error was:\n${e.message}`);
// Don't put original error message in new one: we want
// "pytest.raises(xxx, match=msg)" to fail
throw new Error(`Python exited with error flag set!`);
}
}
"""
else:
check_code = ""
return self.run_js_inner(code, check_code)
def run_js_inner(self, code, check_code):
wrapper = """
let cb = arguments[arguments.length - 1];
let run = async () => { %s }
(async () => {
try {
let result = await run();
%s
cb([0, result]);
} catch (e) {
cb([1, e.toString(), e.stack, e.message]);
}
})()
"""
retval = self.driver.execute_async_script(wrapper % (code, check_code))
if retval[0] == 0:
return retval[1]
else:
print("JavascriptException message: ", retval[3])
raise JavascriptException(retval[1], retval[2])
def get_num_hiwire_keys(self):
return self.run_js("return pyodide._module.hiwire.num_keys();")
@property
def force_test_fail(self) -> bool:
return self.run_js("return !!pyodide._api.fail_test;")
def clear_force_test_fail(self):
self.run_js("pyodide._api.fail_test = false;")
def save_state(self):
self.run_js("self.__savedState = pyodide._api.saveState();")
def restore_state(self):
self.run_js(
"""
if(self.__savedState){
pyodide._api.restoreState(self.__savedState)
}
"""
)
def get_num_proxies(self):
return self.run_js("return pyodide._module.pyproxy_alloc_map.size")
def enable_pyproxy_tracing(self):
self.run_js("pyodide._module.enable_pyproxy_allocation_tracing()")
def disable_pyproxy_tracing(self):
self.run_js("pyodide._module.disable_pyproxy_allocation_tracing()")
def run_webworker(self, code):
if isinstance(code, str) and code.startswith("\n"):
# we have a multiline string, fix indentation
code = textwrap.dedent(code)
worker_file = (
"webworker_dev.js"
if self.script_type == "classic"
else "module_webworker_dev.js"
)
return self.run_js(
"""
let worker = new Worker('{}', {{ type: '{}' }});
let res = new Promise((res, rej) => {{
worker.onerror = e => rej(e);
worker.onmessage = e => {{
if (e.data.results) {{
res(e.data.results);
}} else {{
rej(e.data.error);
}}
}};
worker.postMessage({{ python: {!r} }});
}});
return await res
""".format(
f"http://{self.server_hostname}:{self.server_port}/{worker_file}",
self.script_type,
code,
),
pyodide_checks=False,
)
def load_package(self, packages):
self.run_js(f"await pyodide.loadPackage({packages!r})")
@property
def urls(self):
for handle in self.driver.window_handles:
self.driver.switch_to.window(handle)
yield self.driver.current_url
class FirefoxWrapper(SeleniumWrapper):
browser = "firefox"
def get_driver(self):
from selenium.webdriver import Firefox
from selenium.webdriver.firefox.options import Options
options = Options()
options.add_argument("--headless")
return Firefox(executable_path="geckodriver", options=options)
class ChromeWrapper(SeleniumWrapper):
browser = "chrome"
def get_driver(self):
from selenium.webdriver import Chrome
from selenium.webdriver.chrome.options import Options
options = Options()
options.add_argument("--headless")
options.add_argument("--no-sandbox")
options.add_argument("--js-flags=--expose-gc")
return Chrome(options=options)
def collect_garbage(self):
self.driver.execute_cdp_cmd("HeapProfiler.collectGarbage", {})
class NodeWrapper(SeleniumWrapper):
browser = "node"
def init_node(self):
self.p = pexpect.spawn("/bin/bash", timeout=60)
self.p.setecho(False)
self.p.delaybeforesend = None
# disable canonical input processing mode to allow sending longer lines
# See: https://pexpect.readthedocs.io/en/stable/api/pexpect.html#pexpect.spawn.send
self.p.sendline("stty -icanon")
self.p.sendline(
f"node --expose-gc --experimental-wasm-bigint ./src/test-js/node_test_driver.js {self.base_url}",
)
try:
self.p.expect_exact("READY!!")
except pexpect.exceptions.EOF:
raise JavascriptException("", self.p.before.decode())
def get_driver(self):
self._logs = []
self.init_node()
class NodeDriver:
def __getattr__(self, x):
raise NotImplementedError()
return NodeDriver()
def prepare_driver(self):
pass
def set_script_timeout(self, timeout):
self._timeout = timeout
def quit(self):
self.p.sendeof()
def refresh(self):
self.quit()
self.init_node()
self.javascript_setup()
def collect_garbage(self):
self.run_js("gc()")
@property
def logs(self):
return "\n".join(self._logs)
def clean_logs(self):
self._logs = []
def run_js_inner(self, code, check_code):
check_code = ""
wrapped = """
let result = await (async () => {{ {} }})();
{}
return result;
""".format(
code,
check_code,
)
from uuid import uuid4
cmd_id = str(uuid4())
try:
self.p.sendline(cmd_id)
self.p.sendline(wrapped)
self.p.sendline(cmd_id)
self.p.expect_exact(f"{cmd_id}:UUID\r\n", timeout=self._timeout)
self.p.expect_exact(f"{cmd_id}:UUID\r\n")
if self.p.before:
self._logs.append(self.p.before.decode()[:-2].replace("\r", ""))
self.p.expect("[01]\r\n")
success = int(self.p.match[0].decode()[0]) == 0
self.p.expect_exact(f"\r\n{cmd_id}:UUID\r\n")
except pexpect.exceptions.EOF:
raise JavascriptException("", self.p.before.decode())
if success:
return json.loads(self.p.before.decode().replace("undefined", "null"))
else:
raise JavascriptException("", self.p.before.decode())
maybe_skip_test(item, config.getoption("--dist-dir"), delayed=True)
@pytest.hookimpl(hookwrapper=True)
@ -479,342 +93,57 @@ def pytest_runtest_call(item):
Pytest issue #5044:
https://github.com/pytest-dev/pytest/issues/5044
"""
selenium = None
browser = None
for fixture in item._fixtureinfo.argnames:
if fixture.startswith("selenium"):
selenium = item.funcargs[fixture]
browser = item.funcargs[fixture]
break
if selenium and selenium.pyodide_loaded:
if browser and browser.pyodide_loaded:
trace_pyproxies = pytest.mark.skip_pyproxy_check.mark not in item.own_markers
trace_hiwire_refs = (
trace_pyproxies
and pytest.mark.skip_refcount_check.mark not in item.own_markers
)
yield from extra_checks_test_wrapper(
selenium, trace_hiwire_refs, trace_pyproxies
browser, trace_hiwire_refs, trace_pyproxies
)
else:
yield
def extra_checks_test_wrapper(selenium, trace_hiwire_refs, trace_pyproxies):
def extra_checks_test_wrapper(browser, trace_hiwire_refs, trace_pyproxies):
"""Extra conditions for test to pass:
1. No explicit request for test to fail
2. No leaked JsRefs
3. No leaked PyProxys
"""
selenium.clear_force_test_fail()
init_num_keys = selenium.get_num_hiwire_keys()
browser.clear_force_test_fail()
init_num_keys = browser.get_num_hiwire_keys()
if trace_pyproxies:
selenium.enable_pyproxy_tracing()
init_num_proxies = selenium.get_num_proxies()
browser.enable_pyproxy_tracing()
init_num_proxies = browser.get_num_proxies()
a = yield
try:
# If these guys cause a crash because the test really screwed things up,
# we override the error message with the better message returned by
# a.result() in the finally block.
selenium.disable_pyproxy_tracing()
selenium.restore_state()
browser.disable_pyproxy_tracing()
browser.restore_state()
finally:
# if there was an error in the body of the test, flush it out by calling
# get_result (we don't want to override the error message by raising a
# different error here.)
a.get_result()
if selenium.force_test_fail:
if browser.force_test_fail:
raise Exception("Test failure explicitly requested but no error was raised.")
if trace_pyproxies and trace_hiwire_refs:
delta_proxies = selenium.get_num_proxies() - init_num_proxies
delta_keys = selenium.get_num_hiwire_keys() - init_num_keys
delta_proxies = browser.get_num_proxies() - init_num_proxies
delta_keys = browser.get_num_hiwire_keys() - init_num_keys
assert (delta_proxies, delta_keys) == (0, 0) or delta_keys < 0
if trace_hiwire_refs:
delta_keys = selenium.get_num_hiwire_keys() - init_num_keys
delta_keys = browser.get_num_hiwire_keys() - init_num_keys
assert delta_keys <= 0
def _maybe_skip_test(item, delayed=False):
"""If necessary skip test at the fixture level, to avoid
loading the selenium_standalone fixture which takes a long time.
"""
skip_msg = None
# Testing a package. Skip the test if the package is not built.
match = re.match(
r".*/packages/(?P<name>[\w\-]+)/test_[\w\-]+\.py", str(item.parent.fspath)
)
if match:
package_name = match.group("name")
if not _package_is_built(package_name):
skip_msg = f"package '{package_name}' is not built."
# Common package import test. Skip it if the package is not built.
if (
skip_msg is None
and str(item.fspath).endswith("test_packages_common.py")
and item.name.startswith("test_import")
):
match = re.match(
r"test_import\[(firefox|chrome|node)-(?P<name>[\w-]+)\]", item.name
)
if match:
package_name = match.group("name")
if not _package_is_built(package_name):
# If the test is going to be skipped remove the
# selenium_standalone as it takes a long time to initialize
skip_msg = f"package '{package_name}' is not built."
else:
raise AssertionError(
f"Couldn't parse package name from {item.name}. This should not happen!"
)
# TODO: also use this hook to skip doctests we cannot run (or run them
# inside the selenium wrapper)
if skip_msg is not None:
if delayed:
item.add_marker(pytest.mark.skip(reason=skip_msg))
else:
pytest.skip(skip_msg)
@contextlib.contextmanager
def selenium_common(request, web_server_main, load_pyodide=True, script_type="classic"):
"""Returns an initialized selenium object.
If `_should_skip_test` indicate that the test will be skipped,
return None, as initializing Pyodide for selenium is expensive
"""
server_hostname, server_port, server_log = web_server_main
cls: type[SeleniumWrapper]
if request.param == "firefox":
cls = FirefoxWrapper
elif request.param == "chrome":
cls = ChromeWrapper
elif request.param == "node":
cls = NodeWrapper
else:
raise AssertionError(f"Unknown browser: {request.param}")
selenium = cls(
server_port=server_port,
server_hostname=server_hostname,
server_log=server_log,
load_pyodide=load_pyodide,
script_type=script_type,
)
try:
yield selenium
finally:
selenium.quit()
@pytest.fixture(params=["firefox", "chrome", "node"], scope="function")
def selenium_standalone(request, web_server_main):
# Avoid loading the fixture if the test is going to be skipped
_maybe_skip_test(request.node)
with selenium_common(request, web_server_main) as selenium:
with set_webdriver_script_timeout(
selenium, script_timeout=parse_driver_timeout(request)
):
try:
yield selenium
finally:
print(selenium.logs)
@pytest.fixture(params=["firefox", "chrome", "node"], scope="module")
def selenium_esm(request, web_server_main):
# Avoid loading the fixture if the test is going to be skipped
_maybe_skip_test(request.node)
with selenium_common(
request, web_server_main, load_pyodide=True, script_type="module"
) as selenium:
with set_webdriver_script_timeout(
selenium, script_timeout=parse_driver_timeout(request)
):
try:
yield selenium
finally:
print(selenium.logs)
@contextlib.contextmanager
def selenium_standalone_noload_common(request, web_server_main, script_type="classic"):
# Avoid loading the fixture if the test is going to be skipped
_maybe_skip_test(request.node)
with selenium_common(
request, web_server_main, load_pyodide=False, script_type=script_type
) as selenium:
with set_webdriver_script_timeout(
selenium, script_timeout=parse_driver_timeout(request)
):
try:
yield selenium
finally:
print(selenium.logs)
@pytest.fixture(params=["firefox", "chrome"], scope="function")
def selenium_webworker_standalone(request, web_server_main, script_type):
# Avoid loading the fixture if the test is going to be skipped
if request.param == "firefox" and script_type == "module":
pytest.skip("firefox does not support module type web worker")
_maybe_skip_test(request.node)
with selenium_standalone_noload_common(
request, web_server_main, script_type=script_type
) as selenium:
yield selenium
@pytest.fixture(params=["classic", "module"], scope="module")
def script_type(request):
return request.param
@pytest.fixture(params=["firefox", "chrome", "node"], scope="function")
def selenium_standalone_noload(request, web_server_main):
"""Only difference between this and selenium_webworker_standalone is that
this also tests on node."""
# Avoid loading the fixture if the test is going to be skipped
_maybe_skip_test(request.node)
with selenium_standalone_noload_common(request, web_server_main) as selenium:
yield selenium
# selenium instance cached at the module level
@pytest.fixture(params=["firefox", "chrome", "node"], scope="module")
def selenium_module_scope(request, web_server_main):
with selenium_common(request, web_server_main) as selenium:
yield selenium
# Hypothesis is unhappy with function scope fixtures. Instead, use the
# module scope fixture `selenium_module_scope` and use:
# `with selenium_context_manager(selenium_module_scope) as selenium`
@contextlib.contextmanager
def selenium_context_manager(selenium_module_scope):
try:
selenium_module_scope.clean_logs()
yield selenium_module_scope
finally:
print(selenium_module_scope.logs)
@pytest.fixture
def selenium(request, selenium_module_scope):
with selenium_context_manager(selenium_module_scope) as selenium:
with set_webdriver_script_timeout(
selenium, script_timeout=parse_driver_timeout(request)
):
yield selenium
@pytest.fixture(scope="session")
def web_server_main(request):
"""Web server that serves files in the dist/ directory"""
with spawn_web_server(request.config.option.dist_dir) as output:
yield output
@pytest.fixture(scope="session")
def web_server_secondary(request):
"""Secondary web server that serves files dist/ directory"""
with spawn_web_server(request.config.option.dist_dir) as output:
yield output
@pytest.fixture(scope="session")
def web_server_tst_data(request):
"""Web server that serves files in the src/tests/data/ directory"""
with spawn_web_server(TEST_PATH / "data") as output:
yield output
@contextlib.contextmanager
def spawn_web_server(dist_dir=None):
if dist_dir is None:
dist_dir = DIST_PATH
tmp_dir = tempfile.mkdtemp()
log_path = pathlib.Path(tmp_dir) / "http-server.log"
q: multiprocessing.Queue[str] = multiprocessing.Queue()
p = multiprocessing.Process(target=run_web_server, args=(q, log_path, dist_dir))
try:
p.start()
port = q.get()
hostname = "127.0.0.1"
print(
f"Spawning webserver at http://{hostname}:{port} "
f"(see logs in {log_path})"
)
yield hostname, port, log_path
finally:
q.put("TERMINATE")
p.join()
shutil.rmtree(tmp_dir)
def run_web_server(q, log_filepath, dist_dir):
"""Start the HTTP web server
Parameters
----------
q : Queue
communication queue
log_path : pathlib.Path
path to the file where to store the logs
"""
import http.server
import socketserver
os.chdir(dist_dir)
log_fh = log_filepath.open("w", buffering=1)
sys.stdout = log_fh
sys.stderr = log_fh
class Handler(http.server.SimpleHTTPRequestHandler):
def log_message(self, format_, *args):
print(
"[%s] source: %s:%s - %s"
% (self.log_date_time_string(), *self.client_address, format_ % args)
)
def end_headers(self):
# Enable Cross-Origin Resource Sharing (CORS)
self.send_header("Access-Control-Allow-Origin", "*")
super().end_headers()
with socketserver.TCPServer(("", 0), Handler) as httpd:
host, port = httpd.server_address
print(f"Starting webserver at http://{host}:{port}")
httpd.server_name = "test-server" # type: ignore[attr-defined]
httpd.server_port = port # type: ignore[attr-defined]
q.put(port)
def service_actions():
try:
if q.get(False) == "TERMINATE":
print("Stopping server...")
sys.exit(0)
except queue.Empty:
pass
httpd.service_actions = service_actions # type: ignore[assignment]
httpd.serve_forever()
if (
__name__ == "__main__"
and multiprocessing.current_process().name == "MainProcess"
and not hasattr(sys, "_pytest_session")
):
with spawn_web_server():
# run forever
while True:
time.sleep(1)
def package_is_built(package_name):
return _package_is_built(package_name, pytest.pyodide_dist_dir)

View File

@ -86,10 +86,10 @@ make lint
Many tests simply involve running a chunk of code in Pyodide and ensuring it
doesn't error. In this case, one can use the `run_in_pyodide` decorate from
`pyodide_build.testing`, e.g.
`pyodide_test_runner.decorator`, e.g.
```python
from pyodide_build.testing import run_in_pyodide
from pyodide_test_runner import run_in_pyodide
@run_in_pyodide
def test_add():
@ -108,7 +108,7 @@ The `packages` option lists packages to load before running the test. For
example,
```python
from pyodide_build.testing import run_in_pyodide
from pyodide_test_runner import run_in_pyodide
@run_in_pyodide(standalone = True, packages = ["regex"])
def test_regex():

View File

@ -54,6 +54,10 @@ substitutions:
translated to negative Python ints.
{pr}`2484`
- {{ BREAKING }} `pyodide_build.testing` is removed. `run_in_pyodide` decorator can now be accessed
through `pyodide_test_runner`.
{pr}`2418`
- {{ Enhancement }} Added the `js_id` attribute to `JsProxy` to allow using
JavaScript object identity as a dictionary key.
{pr}`2515`

View File

@ -1,4 +1,4 @@
from pyodide_build.testing import run_in_pyodide
from pyodide_test_runner import run_in_pyodide
@run_in_pyodide(packages=["Jinja2"])

View File

@ -1,4 +1,4 @@
from pyodide_build.testing import run_in_pyodide
from pyodide_test_runner import run_in_pyodide
@run_in_pyodide(

View File

@ -1,4 +1,4 @@
from pyodide_build.testing import run_in_pyodide
from pyodide_test_runner import run_in_pyodide
@run_in_pyodide(packages=["bitarray-tests"])

View File

@ -1,4 +1,4 @@
from pyodide_build.testing import run_in_pyodide
from pyodide_test_runner import run_in_pyodide
@run_in_pyodide(packages=["boost-histogram"])

View File

@ -1,4 +1,4 @@
from pyodide_build.testing import run_in_pyodide
from pyodide_test_runner import run_in_pyodide
@run_in_pyodide(packages=["cffi"])

View File

@ -1,6 +1,5 @@
import pytest
from pyodide_build.testing import run_in_pyodide
from pyodide_test_runner import run_in_pyodide
CHROME_FAIL_v90_MSG = (
"Doesn't work in chrome v89 or v90, I think because of "

View File

@ -1,4 +1,4 @@
from pyodide_build.testing import run_in_pyodide
from pyodide_test_runner import run_in_pyodide
@run_in_pyodide(packages=["cloudpickle"])

View File

@ -1,8 +1,7 @@
from hypothesis import HealthCheck, given, settings
from hypothesis.strategies import binary, integers
from conftest import selenium_context_manager
from pyodide_build.testing import run_in_pyodide
from pyodide_test_runner import run_in_pyodide
from pyodide_test_runner.fixture import selenium_context_manager
@run_in_pyodide(packages=["cryptography"])

View File

@ -1,4 +1,4 @@
from pyodide_build.testing import run_in_pyodide
from pyodide_test_runner import run_in_pyodide
@run_in_pyodide(packages=["fpcast-test"])

View File

@ -1,4 +1,4 @@
from pyodide_build.testing import run_in_pyodide
from pyodide_test_runner import run_in_pyodide
@run_in_pyodide(standalone=True, packages=["numpy", "imageio"])

View File

@ -1,4 +1,4 @@
from pyodide_build.testing import run_in_pyodide
from pyodide_test_runner import run_in_pyodide
@run_in_pyodide(standalone=True, packages=["jedi"])

View File

@ -1,4 +1,4 @@
from pyodide_build.testing import run_in_pyodide
from pyodide_test_runner import run_in_pyodide
@run_in_pyodide(packages=["lazy-object-proxy"])

View File

@ -6,8 +6,7 @@ from pathlib import Path
from typing import Any
import pytest
from pyodide_build.testing import run_in_pyodide
from pyodide_test_runner import run_in_pyodide, spawn_web_server
sys.path.append(str(Path(__file__).resolve().parent / "src"))
@ -162,17 +161,15 @@ def test_parse_wheel_url():
@pytest.mark.parametrize("base_url", ["'{base_url}'", "'.'"])
def test_install_custom_url(selenium_standalone_micropip, base_url):
selenium = selenium_standalone_micropip
base_url = base_url.format(base_url=selenium.base_url)
root = Path(__file__).resolve().parents[2]
src = root / "src" / "tests" / "data"
target = root / "dist" / "test_data"
target.symlink_to(src, True)
path = "/test_data/snowballstemmer-2.0.0-py2.py3-none-any.whl"
try:
with spawn_web_server(Path(__file__).parent / "test") as server:
server_hostname, server_port, _ = server
base_url = f"http://{server_hostname}:{server_port}/"
url = base_url + "snowballstemmer-2.0.0-py2.py3-none-any.whl"
selenium.run_js(
f"""
let url = {base_url} + '{path}';
let url = '{url}';
let resp = await fetch(url);
await pyodide.runPythonAsync(`
import micropip
@ -181,25 +178,24 @@ def test_install_custom_url(selenium_standalone_micropip, base_url):
`);
"""
)
finally:
target.unlink()
def test_add_requirement(web_server_tst_data):
def test_add_requirement():
pytest.importorskip("packaging")
from micropip import _micropip
server_hostname, server_port, server_log = web_server_tst_data
base_url = f"http://{server_hostname}:{server_port}/"
url = base_url + "snowballstemmer-2.0.0-py2.py3-none-any.whl"
with spawn_web_server(Path(__file__).parent / "test") as server:
server_hostname, server_port, _ = server
base_url = f"http://{server_hostname}:{server_port}/"
url = base_url + "snowballstemmer-2.0.0-py2.py3-none-any.whl"
transaction: dict[str, Any] = {
"wheels": [],
"locked": {},
}
asyncio.get_event_loop().run_until_complete(
_micropip.PACKAGE_MANAGER.add_requirement(url, {}, transaction)
)
transaction: dict[str, Any] = {
"wheels": [],
"locked": {},
}
asyncio.get_event_loop().run_until_complete(
_micropip.PACKAGE_MANAGER.add_requirement(url, {}, transaction)
)
[name, req, version] = transaction["wheels"][0]
assert name == "snowballstemmer"

View File

@ -1,4 +1,4 @@
from pyodide_build.testing import run_in_pyodide
from pyodide_test_runner import run_in_pyodide
@run_in_pyodide(standalone=True, packages=["msgpack"])

View File

@ -1,4 +1,4 @@
from pyodide_build.testing import run_in_pyodide
from pyodide_test_runner import run_in_pyodide
@run_in_pyodide(standalone=True, packages=["networkx"])

View File

@ -1,4 +1,4 @@
from pyodide_build.testing import run_in_pyodide
from pyodide_test_runner import run_in_pyodide
@run_in_pyodide(

View File

@ -1,4 +1,4 @@
from pyodide_build.testing import run_in_pyodide
from pyodide_test_runner import run_in_pyodide
@run_in_pyodide(

View File

@ -1,6 +1,5 @@
import pytest
from pyodide_build.testing import run_in_pyodide as run_in_pyodide_orig
from pyodide_test_runner import run_in_pyodide as run_in_pyodide_orig
def run_in_pyodide(**kwargs):

View File

@ -1,7 +1,7 @@
import base64
import pathlib
from pyodide_build.testing import run_in_pyodide
from pyodide_test_runner import run_in_pyodide
REFERENCE_IMAGES_PATH = pathlib.Path(__file__).parent / "reference-images"

View File

@ -1,4 +1,4 @@
from pyodide_build.testing import run_in_pyodide
from pyodide_test_runner import run_in_pyodide
@run_in_pyodide(packages=["python_solvespace"])

View File

@ -1,4 +1,4 @@
from pyodide_build.testing import run_in_pyodide
from pyodide_test_runner import run_in_pyodide
@run_in_pyodide(

View File

@ -1,4 +1,4 @@
from pyodide_build.testing import run_in_pyodide
from pyodide_test_runner import run_in_pyodide
@run_in_pyodide(packages=["regex"])

View File

@ -1,6 +1,6 @@
import os
from pyodide_build.testing import run_in_pyodide
from pyodide_test_runner import run_in_pyodide
if "CI" in os.environ:
xfail_browsers = {"chrome": "scikit-image takes too long to load in CI "}

View File

@ -1,6 +1,5 @@
import pytest
from conftest import selenium_context_manager
from pyodide_test_runner.fixture import selenium_context_manager
@pytest.mark.driver_timeout(40)

View File

@ -1,6 +1,6 @@
from pyodide_build import testing
from pyodide_test_runner import run_in_pyodide
run_in_pyodide = testing.run_in_pyodide(
run_in_pyodide_scipy = run_in_pyodide(
module_scope=True,
packages=["scipy"],
# xfail_browsers={"chrome": "Times out in chrome"},
@ -8,7 +8,7 @@ run_in_pyodide = testing.run_in_pyodide(
)
@run_in_pyodide
@run_in_pyodide_scipy
def test_scipy_linalg():
import numpy as np
import scipy.linalg
@ -24,28 +24,28 @@ def test_scipy_linalg():
assert_allclose(res, np.identity(N), rtol=1e-07, atol=1e-9)
@run_in_pyodide
@run_in_pyodide_scipy
def test_brentq():
from scipy.optimize import brentq
brentq(lambda x: x, -1, 1)
@run_in_pyodide
@run_in_pyodide_scipy
def test_dlamch():
from scipy.linalg import lapack
lapack.dlamch("Epsilon-Machine")
@run_in_pyodide
@run_in_pyodide_scipy
def test_binom_ppf():
from scipy.stats import binom
assert binom.ppf(0.9, 1000, 0.1) == 112
@testing.run_in_pyodide(module_scope=True, packages=["pytest", "scipy-tests"])
@run_in_pyodide(module_scope=True, packages=["pytest", "scipy-tests"])
def test_scipy_pytest():
import pytest

View File

@ -1,4 +1,4 @@
from pyodide_build.testing import run_in_pyodide
from pyodide_test_runner import run_in_pyodide
@run_in_pyodide(packages=["sharedlib-test-py"])

View File

@ -1,4 +1,4 @@
from pyodide_build.testing import run_in_pyodide
from pyodide_test_runner import run_in_pyodide
@run_in_pyodide(packages=["sqlalchemy"])

View File

@ -1,4 +1,4 @@
from pyodide_build.testing import run_in_pyodide
from pyodide_test_runner import run_in_pyodide
@run_in_pyodide(packages=["test", "ssl"])

View File

@ -1,4 +1,4 @@
from pyodide_build.testing import run_in_pyodide
from pyodide_test_runner import run_in_pyodide
@run_in_pyodide(packages=["sympy"])

View File

@ -3,7 +3,7 @@ import os
import pytest
from conftest import ROOT_PATH, _package_is_built
from conftest import ROOT_PATH, package_is_built
from pyodide_build.io import parse_package_config
PKG_DIR = ROOT_PATH / "packages"
@ -44,7 +44,7 @@ def test_parse_package(name):
@pytest.mark.driver_timeout(60)
@pytest.mark.parametrize("name", registered_packages())
def test_import(name, selenium_standalone):
if not _package_is_built(name):
if not package_is_built(name):
raise AssertionError(
"Implementation error. Test for an unbuilt package "
"should have been skipped in selenium_standalone fixture"

View File

@ -1,4 +1,4 @@
from pyodide_build.testing import run_in_pyodide
from pyodide_test_runner import run_in_pyodide
@run_in_pyodide(packages=["wrapt"])

View File

@ -1,4 +1,4 @@
from pyodide_build.testing import run_in_pyodide
from pyodide_test_runner import run_in_pyodide
@run_in_pyodide(packages=["numpy", "numcodecs", "zarr"])

View File

@ -1,16 +0,0 @@
from pyodide_build.testing import set_webdriver_script_timeout
class _MockSelenium:
script_timeout = 2
def set_script_timeout(self, value):
self._timeout = value
def test_set_webdriver_script_timeout():
selenium = _MockSelenium()
assert not hasattr(selenium, "_timeout")
with set_webdriver_script_timeout(selenium, script_timeout=10):
assert selenium._timeout == 10
assert selenium._timeout == 2

View File

@ -0,0 +1,16 @@
from .browser import ChromeWrapper, FirefoxWrapper, NodeWrapper, SeleniumWrapper
from .decorator import run_in_pyodide
from .fixture import * # noqa: F403, F401
from .server import spawn_web_server
from .utils import parse_driver_timeout, set_webdriver_script_timeout
__all__ = [
"SeleniumWrapper",
"FirefoxWrapper",
"ChromeWrapper",
"NodeWrapper",
"set_webdriver_script_timeout",
"parse_driver_timeout",
"run_in_pyodide",
"spawn_web_server",
]

View File

@ -0,0 +1,462 @@
import json
import textwrap
from pathlib import Path
import pexpect
TEST_SETUP_CODE = """
Error.stackTraceLimit = Infinity;
// Fix globalThis is messed up in firefox see facebook/react#16606.
// Replace it with window.
globalThis.globalThis = globalThis.window || globalThis;
globalThis.sleep = function (s) {
return new Promise((resolve) => setTimeout(resolve, s));
};
globalThis.assert = function (cb, message = "") {
if (message !== "") {
message = "\\n" + message;
}
if (cb() !== true) {
throw new Error(
`Assertion failed: ${cb.toString().slice(6)}${message}`
);
}
};
globalThis.assertAsync = async function (cb, message = "") {
if (message !== "") {
message = "\\n" + message;
}
if ((await cb()) !== true) {
throw new Error(
`Assertion failed: ${cb.toString().slice(12)}${message}`
);
}
};
function checkError(err, errname, pattern, pat_str, thiscallstr) {
if (typeof pattern === "string") {
pattern = new RegExp(pattern);
}
if (!err) {
throw new Error(`${thiscallstr} failed, no error thrown`);
}
if (err.constructor.name !== errname) {
throw new Error(
`${thiscallstr} failed, expected error ` +
`of type '${errname}' got type '${err.constructor.name}'`
);
}
if (!pattern.test(err.message)) {
throw new Error(
`${thiscallstr} failed, expected error ` +
`message to match pattern ${pat_str} got:\n${err.message}`
);
}
}
globalThis.assertThrows = function (cb, errname, pattern) {
let pat_str = typeof pattern === "string" ? `"${pattern}"` : `${pattern}`;
let thiscallstr = `assertThrows(${cb.toString()}, "${errname}", ${pat_str})`;
let err = undefined;
try {
cb();
} catch (e) {
err = e;
}
checkError(err, errname, pattern, pat_str, thiscallstr);
};
globalThis.assertThrowsAsync = async function (cb, errname, pattern) {
let pat_str = typeof pattern === "string" ? `"${pattern}"` : `${pattern}`;
let thiscallstr = `assertThrowsAsync(${cb.toString()}, "${errname}", ${pat_str})`;
let err = undefined;
try {
await cb();
} catch (e) {
err = e;
}
checkError(err, errname, pattern, pat_str, thiscallstr);
};
""".strip()
class JavascriptException(Exception):
def __init__(self, msg, stack):
self.msg = msg
self.stack = stack
# In chrome the stack contains the message
if self.stack and self.stack.startswith(self.msg):
self.msg = ""
def __str__(self):
return "\n\n".join(x for x in [self.msg, self.stack] if x)
class SeleniumWrapper:
JavascriptException = JavascriptException
def __init__(
self,
server_port,
server_hostname="127.0.0.1",
server_log=None,
load_pyodide=True,
script_timeout=20,
script_type="classic",
dist_dir=None,
):
self.server_port = server_port
self.server_hostname = server_hostname
self.base_url = f"http://{self.server_hostname}:{self.server_port}"
self.server_log = server_log
self.script_type = script_type
self.dist_dir = dist_dir
self.driver = self.get_driver() # type: ignore[attr-defined]
self.set_script_timeout(script_timeout)
self.script_timeout = script_timeout
self.prepare_driver()
self.javascript_setup()
if load_pyodide:
self.load_pyodide()
self.initialize_global_hiwire_objects()
self.save_state()
self.restore_state()
def prepare_driver(self):
if self.script_type == "classic":
self.driver.get(f"{self.base_url}/test.html")
elif self.script_type == "module":
self.driver.get(f"{self.base_url}/module_test.html")
else:
raise Exception("Unknown script type to load!")
def set_script_timeout(self, timeout):
self.driver.set_script_timeout(timeout)
def quit(self):
self.driver.quit()
def refresh(self):
self.driver.refresh()
self.javascript_setup()
def javascript_setup(self):
self.run_js(
TEST_SETUP_CODE,
pyodide_checks=False,
)
def load_pyodide(self):
self.run_js(
"""
let pyodide = await loadPyodide({ fullStdLib: false, jsglobals : self });
self.pyodide = pyodide;
globalThis.pyodide = pyodide;
pyodide._api.inTestHoist = true; // improve some error messages for tests
"""
)
def initialize_global_hiwire_objects(self):
"""
There are a bunch of global objects that occasionally enter the hiwire cache
but never leave. The refcount checks get angry about them if they aren't preloaded.
We need to go through and touch them all once to keep everything okay.
"""
self.run_js(
"""
pyodide.globals.get;
pyodide.pyodide_py.eval_code;
pyodide.pyodide_py.eval_code_async;
pyodide.pyodide_py.register_js_module;
pyodide.pyodide_py.unregister_js_module;
pyodide.pyodide_py.find_imports;
pyodide._api.importlib.invalidate_caches;
pyodide._api.package_loader.unpack_buffer;
pyodide._api.package_loader.get_dynlibs;
pyodide.runPython("");
"""
)
@property
def pyodide_loaded(self):
return self.run_js("return !!(self.pyodide && self.pyodide.runPython);")
@property
def logs(self):
logs = self.run_js("return self.logs;", pyodide_checks=False)
if logs is not None:
return "\n".join(str(x) for x in logs)
return ""
def clean_logs(self):
self.run_js("self.logs = []", pyodide_checks=False)
def run(self, code):
return self.run_js(
f"""
let result = pyodide.runPython({code!r});
if(result && result.toJs){{
let converted_result = result.toJs();
if(pyodide.isPyProxy(converted_result)){{
converted_result = undefined;
}}
result.destroy();
return converted_result;
}}
return result;
"""
)
def run_async(self, code):
return self.run_js(
f"""
await pyodide.loadPackagesFromImports({code!r})
let result = await pyodide.runPythonAsync({code!r});
if(result && result.toJs){{
let converted_result = result.toJs();
if(pyodide.isPyProxy(converted_result)){{
converted_result = undefined;
}}
result.destroy();
return converted_result;
}}
return result;
"""
)
def run_js(self, code, pyodide_checks=True):
"""Run JavaScript code and check for pyodide errors"""
if isinstance(code, str) and code.startswith("\n"):
# we have a multiline string, fix indentation
code = textwrap.dedent(code)
if pyodide_checks:
check_code = """
if(globalThis.pyodide && pyodide._module && pyodide._module._PyErr_Occurred()){
try {
pyodide._module._pythonexc2js();
} catch(e){
console.error(`Python exited with error flag set! Error was:\n${e.message}`);
// Don't put original error message in new one: we want
// "pytest.raises(xxx, match=msg)" to fail
throw new Error(`Python exited with error flag set!`);
}
}
"""
else:
check_code = ""
return self.run_js_inner(code, check_code)
def run_js_inner(self, code, check_code):
wrapper = """
let cb = arguments[arguments.length - 1];
let run = async () => { %s }
(async () => {
try {
let result = await run();
%s
cb([0, result]);
} catch (e) {
cb([1, e.toString(), e.stack, e.message]);
}
})()
"""
retval = self.driver.execute_async_script(wrapper % (code, check_code))
if retval[0] == 0:
return retval[1]
else:
print("JavascriptException message: ", retval[3])
raise JavascriptException(retval[1], retval[2])
def get_num_hiwire_keys(self):
return self.run_js("return pyodide._module.hiwire.num_keys();")
@property
def force_test_fail(self) -> bool:
return self.run_js("return !!pyodide._api.fail_test;")
def clear_force_test_fail(self):
self.run_js("pyodide._api.fail_test = false;")
def save_state(self):
self.run_js("self.__savedState = pyodide._api.saveState();")
def restore_state(self):
self.run_js(
"""
if(self.__savedState){
pyodide._api.restoreState(self.__savedState)
}
"""
)
def get_num_proxies(self):
return self.run_js("return pyodide._module.pyproxy_alloc_map.size")
def enable_pyproxy_tracing(self):
self.run_js("pyodide._module.enable_pyproxy_allocation_tracing()")
def disable_pyproxy_tracing(self):
self.run_js("pyodide._module.disable_pyproxy_allocation_tracing()")
def run_webworker(self, code):
if isinstance(code, str) and code.startswith("\n"):
# we have a multiline string, fix indentation
code = textwrap.dedent(code)
worker_file = (
"webworker_dev.js"
if self.script_type == "classic"
else "module_webworker_dev.js"
)
return self.run_js(
"""
let worker = new Worker('{}', {{ type: '{}' }});
let res = new Promise((res, rej) => {{
worker.onerror = e => rej(e);
worker.onmessage = e => {{
if (e.data.results) {{
res(e.data.results);
}} else {{
rej(e.data.error);
}}
}};
worker.postMessage({{ python: {!r} }});
}});
return await res
""".format(
f"http://{self.server_hostname}:{self.server_port}/{worker_file}",
self.script_type,
code,
),
pyodide_checks=False,
)
def load_package(self, packages):
self.run_js(f"await pyodide.loadPackage({packages!r})")
@property
def urls(self):
for handle in self.driver.window_handles:
self.driver.switch_to.window(handle)
yield self.driver.current_url
class FirefoxWrapper(SeleniumWrapper):
browser = "firefox"
def get_driver(self):
from selenium.webdriver import Firefox
from selenium.webdriver.firefox.options import Options
options = Options()
options.add_argument("--headless")
return Firefox(executable_path="geckodriver", options=options)
class ChromeWrapper(SeleniumWrapper):
browser = "chrome"
def get_driver(self):
from selenium.webdriver import Chrome
from selenium.webdriver.chrome.options import Options
options = Options()
options.add_argument("--headless")
options.add_argument("--no-sandbox")
options.add_argument("--js-flags=--expose-gc")
return Chrome(options=options)
def collect_garbage(self):
self.driver.execute_cdp_cmd("HeapProfiler.collectGarbage", {})
class NodeWrapper(SeleniumWrapper):
browser = "node"
def init_node(self):
curdir = Path(__file__).parent
self.p = pexpect.spawn("/bin/bash", timeout=60)
self.p.setecho(False)
self.p.delaybeforesend = None
# disable canonical input processing mode to allow sending longer lines
# See: https://pexpect.readthedocs.io/en/stable/api/pexpect.html#pexpect.spawn.send
self.p.sendline("stty -icanon")
self.p.sendline(
f"node --expose-gc --experimental-wasm-bigint {curdir}/node_test_driver.js {self.base_url} {self.dist_dir}",
)
try:
self.p.expect_exact("READY!!")
except pexpect.exceptions.EOF:
raise JavascriptException("", self.p.before.decode())
def get_driver(self):
self._logs = []
self.init_node()
class NodeDriver:
def __getattr__(self, x):
raise NotImplementedError()
return NodeDriver()
def prepare_driver(self):
pass
def set_script_timeout(self, timeout):
self._timeout = timeout
def quit(self):
self.p.sendeof()
def refresh(self):
self.quit()
self.init_node()
self.javascript_setup()
def collect_garbage(self):
self.run_js("gc()")
@property
def logs(self):
return "\n".join(self._logs)
def clean_logs(self):
self._logs = []
def run_js_inner(self, code, check_code):
check_code = ""
wrapped = """
let result = await (async () => {{ {} }})();
{}
return result;
""".format(
code,
check_code,
)
from uuid import uuid4
cmd_id = str(uuid4())
self.p.sendline(cmd_id)
self.p.sendline(wrapped)
self.p.sendline(cmd_id)
self.p.expect_exact(f"{cmd_id}:UUID\r\n", timeout=self._timeout)
self.p.expect_exact(f"{cmd_id}:UUID\r\n")
if self.p.before:
self._logs.append(self.p.before.decode()[:-2].replace("\r", ""))
self.p.expect("[01]\r\n")
success = int(self.p.match[0].decode()[0]) == 0
self.p.expect_exact(f"\r\n{cmd_id}:UUID\r\n")
if success:
return json.loads(self.p.before.decode().replace("undefined", "null"))
else:
raise JavascriptException("", self.p.before.decode())

View File

@ -1,29 +1,11 @@
import contextlib
import inspect
from base64 import b64encode
from pprint import pformat
from typing import Callable, Collection
import pytest
def _run_in_pyodide_get_source(f):
lines, start_line = inspect.getsourcelines(f)
num_decorator_lines = 0
for line in lines:
if line.startswith("def") or line.startswith("async def"):
break
num_decorator_lines += 1
start_line += num_decorator_lines - 1
# Remove first line, which is the decorator. Then pad with empty lines to fix line number.
lines = ["\n"] * start_line + lines[num_decorator_lines:]
return "".join(lines)
def chunkstring(string, length):
return (string[0 + i : length + i] for i in range(0, len(string), length))
from pprint import pformat
from .utils import set_webdriver_script_timeout
def run_in_pyodide(
@ -77,7 +59,7 @@ def run_in_pyodide(
source = _run_in_pyodide_get_source(f)
filename = inspect.getsourcefile(f)
encoded = pformat(
list(chunkstring(b64encode(source.encode()).decode(), 100))
list(_chunkstring(b64encode(source.encode()).decode(), 100))
)
selenium.run_js(
@ -128,29 +110,18 @@ def run_in_pyodide(
return decorator
@contextlib.contextmanager
def set_webdriver_script_timeout(selenium, script_timeout: float | None):
"""Set selenium script timeout
Parameters
----------
selenum : SeleniumWrapper
a SeleniumWrapper wrapper instance
script_timeout : int | float
value of the timeout in seconds
"""
if script_timeout is not None:
selenium.set_script_timeout(script_timeout)
yield
# revert to the initial value
if script_timeout is not None:
selenium.set_script_timeout(selenium.script_timeout)
def _chunkstring(string, length):
return (string[0 + i : length + i] for i in range(0, len(string), length))
def parse_driver_timeout(request) -> float | None:
"""Parse driver timeout value from pytest request object"""
mark = request.node.get_closest_marker("driver_timeout")
if mark is None:
return None
else:
return mark.args[0]
def _run_in_pyodide_get_source(f):
lines, start_line = inspect.getsourcelines(f)
num_decorator_lines = 0
for line in lines:
if line.startswith("def") or line.startswith("async def"):
break
num_decorator_lines += 1
start_line += num_decorator_lines - 1
# Remove first line, which is the decorator. Then pad with empty lines to fix line number.
lines = ["\n"] * start_line + lines[num_decorator_lines:]
return "".join(lines)

View File

@ -0,0 +1,149 @@
import contextlib
import pytest
from .browser import ChromeWrapper, FirefoxWrapper, NodeWrapper, SeleniumWrapper
from .server import spawn_web_server
from .utils import parse_driver_timeout, set_webdriver_script_timeout
@contextlib.contextmanager
def selenium_common(request, web_server_main, load_pyodide=True, script_type="classic"):
"""Returns an initialized selenium object.
If `_should_skip_test` indicate that the test will be skipped,
return None, as initializing Pyodide for selenium is expensive
"""
server_hostname, server_port, server_log = web_server_main
cls: type[SeleniumWrapper]
if request.param == "firefox":
cls = FirefoxWrapper
elif request.param == "chrome":
cls = ChromeWrapper
elif request.param == "node":
cls = NodeWrapper
else:
raise AssertionError(f"Unknown browser: {request.param}")
dist_dir = request.config.getoption("--dist-dir")
selenium = cls(
server_port=server_port,
server_hostname=server_hostname,
server_log=server_log,
load_pyodide=load_pyodide,
script_type=script_type,
dist_dir=dist_dir,
)
try:
yield selenium
finally:
selenium.quit()
@pytest.fixture(params=["firefox", "chrome", "node"], scope="function")
def selenium_standalone(request, web_server_main):
with selenium_common(request, web_server_main) as selenium:
with set_webdriver_script_timeout(
selenium, script_timeout=parse_driver_timeout(request)
):
try:
yield selenium
finally:
print(selenium.logs)
@pytest.fixture(params=["firefox", "chrome", "node"], scope="module")
def selenium_esm(request, web_server_main):
with selenium_common(
request, web_server_main, load_pyodide=True, script_type="module"
) as selenium:
with set_webdriver_script_timeout(
selenium, script_timeout=parse_driver_timeout(request)
):
try:
yield selenium
finally:
print(selenium.logs)
@contextlib.contextmanager
def selenium_standalone_noload_common(request, web_server_main, script_type="classic"):
with selenium_common(
request, web_server_main, load_pyodide=False, script_type=script_type
) as selenium:
with set_webdriver_script_timeout(
selenium, script_timeout=parse_driver_timeout(request)
):
try:
yield selenium
finally:
print(selenium.logs)
@pytest.fixture(params=["firefox", "chrome"], scope="function")
def selenium_webworker_standalone(request, web_server_main, script_type):
# Avoid loading the fixture if the test is going to be skipped
if request.param == "firefox" and script_type == "module":
pytest.skip("firefox does not support module type web worker")
with selenium_standalone_noload_common(
request, web_server_main, script_type=script_type
) as selenium:
yield selenium
@pytest.fixture(params=["classic", "module"], scope="module")
def script_type(request):
return request.param
@pytest.fixture(params=["firefox", "chrome", "node"], scope="function")
def selenium_standalone_noload(request, web_server_main):
"""Only difference between this and selenium_webworker_standalone is that
this also tests on node."""
with selenium_standalone_noload_common(request, web_server_main) as selenium:
yield selenium
# selenium instance cached at the module level
@pytest.fixture(params=["firefox", "chrome", "node"], scope="module")
def selenium_module_scope(request, web_server_main):
with selenium_common(request, web_server_main) as selenium:
yield selenium
# Hypothesis is unhappy with function scope fixtures. Instead, use the
# module scope fixture `selenium_module_scope` and use:
# `with selenium_context_manager(selenium_module_scope) as selenium`
@contextlib.contextmanager
def selenium_context_manager(selenium_module_scope):
try:
selenium_module_scope.clean_logs()
yield selenium_module_scope
finally:
print(selenium_module_scope.logs)
@pytest.fixture
def selenium(request, selenium_module_scope):
with selenium_context_manager(selenium_module_scope) as selenium:
with set_webdriver_script_timeout(
selenium, script_timeout=parse_driver_timeout(request)
):
yield selenium
@pytest.fixture(scope="session")
def web_server_main(request):
"""Web server that serves files in the dist directory"""
with spawn_web_server(request.config.option.dist_dir) as output:
yield output
@pytest.fixture(scope="session")
def web_server_secondary(request):
"""Secondary web server that serves files dist directory"""
with spawn_web_server(request.config.option.dist_dir) as output:
yield output

View File

@ -5,11 +5,14 @@ const util = require("util");
const node_fetch = require("node-fetch");
const base64 = require("base-64");
let { loadPyodide } = require("pyodide");
let base_url = process.argv[2];
let baseUrl = process.argv[2];
let distDir = process.argv[3];
let { loadPyodide } = require(`${distDir}/pyodide`);
// node requires full paths.
function fetch(path) {
return node_fetch(new URL(path, base_url).toString());
return node_fetch(new URL(path, baseUrl).toString());
}
const context = {

View File

@ -0,0 +1,82 @@
import contextlib
import http.server
import multiprocessing
import os
import pathlib
import queue
import shutil
import socketserver
import sys
import tempfile
@contextlib.contextmanager
def spawn_web_server(dist_dir):
tmp_dir = tempfile.mkdtemp()
log_path = pathlib.Path(tmp_dir) / "http-server.log"
q: multiprocessing.Queue[str] = multiprocessing.Queue()
p = multiprocessing.Process(target=run_web_server, args=(q, log_path, dist_dir))
try:
p.start()
port = q.get()
hostname = "127.0.0.1"
print(
f"Spawning webserver at http://{hostname}:{port} "
f"(see logs in {log_path})"
)
yield hostname, port, log_path
finally:
q.put("TERMINATE")
p.join()
shutil.rmtree(tmp_dir)
def run_web_server(q, log_filepath, dist_dir):
"""Start the HTTP web server
Parameters
----------
q : Queue
communication queue
log_path : pathlib.Path
path to the file where to store the logs
"""
os.chdir(dist_dir)
log_fh = log_filepath.open("w", buffering=1)
sys.stdout = log_fh
sys.stderr = log_fh
class Handler(http.server.SimpleHTTPRequestHandler):
def log_message(self, format_, *args):
print(
"[%s] source: %s:%s - %s"
% (self.log_date_time_string(), *self.client_address, format_ % args)
)
def end_headers(self):
# Enable Cross-Origin Resource Sharing (CORS)
self.send_header("Access-Control-Allow-Origin", "*")
super().end_headers()
with socketserver.TCPServer(("", 0), Handler) as httpd:
host, port = httpd.server_address
print(f"Starting webserver at http://{host}:{port}")
httpd.server_name = "test-server" # type: ignore[attr-defined]
httpd.server_port = port # type: ignore[attr-defined]
q.put(port)
def service_actions():
try:
if q.get(False) == "TERMINATE":
print("Stopping server...")
sys.exit(0)
except queue.Empty:
pass
httpd.service_actions = service_actions # type: ignore[assignment]
httpd.serve_forever()

View File

@ -1,7 +1,8 @@
import pathlib
from textwrap import dedent
from pyodide_build.testing import _run_in_pyodide_get_source, run_in_pyodide
from pyodide_test_runner import run_in_pyodide
from pyodide_test_runner.decorator import _run_in_pyodide_get_source
def test_web_server_secondary(selenium, web_server_secondary):
@ -89,7 +90,6 @@ def test_assert(selenium):
let threw;
assertThrows(() => { throw new TypeError("aaabbbccc") }, "TypeError", "bbc");
assertThrows(() => { throw new TypeError("aaabbbccc") }, "TypeError", /.{3}.{3}.{3}/);
threw = false;
try {
assertThrows(() => 0, "TypeError", /.*/);
@ -98,7 +98,6 @@ def test_assert(selenium):
assert(() => e.message == `assertThrows(() => 0, "TypeError", /.*/) failed, no error thrown`, e.message);
}
assert(() => threw);
threw = false;
try {
assertThrows(() => { throw new ReferenceError("blah"); }, "TypeError", /.*/);
@ -107,7 +106,6 @@ def test_assert(selenium):
assert(() => e.message.endsWith("expected error of type 'TypeError' got type 'ReferenceError'"));
}
assert(() => threw);
threw = false;
try {
assertThrows(() => { throw new TypeError("blah"); }, "TypeError", "abcd");
@ -117,7 +115,6 @@ def test_assert(selenium):
assert(() => e.message.endsWith(`expected error message to match pattern "abcd" got:\nblah`));
}
assert(() => threw);
threw = false;
try {
assertThrows(() => { throw new TypeError("blah"); }, "TypeError", /a..d/);

View File

@ -0,0 +1,93 @@
import contextlib
import functools
import json
import re
from pathlib import Path
import pytest
@contextlib.contextmanager
def set_webdriver_script_timeout(selenium, script_timeout: float | None):
"""Set selenium script timeout
Parameters
----------
selenum : SeleniumWrapper
a SeleniumWrapper wrapper instance
script_timeout : int | float
value of the timeout in seconds
"""
if script_timeout is not None:
selenium.set_script_timeout(script_timeout)
yield
# revert to the initial value
if script_timeout is not None:
selenium.set_script_timeout(selenium.script_timeout)
def parse_driver_timeout(request) -> float | None:
"""Parse driver timeout value from pytest request object"""
mark = request.node.get_closest_marker("driver_timeout")
if mark is None:
return None
else:
return mark.args[0]
def maybe_skip_test(item, dist_dir, delayed=False):
"""If necessary skip test at the fixture level, to avoid
loading the selenium_standalone fixture which takes a long time.
"""
skip_msg = None
# Testing a package. Skip the test if the package is not built.
match = re.match(
r".*/packages/(?P<name>[\w\-]+)/test_[\w\-]+\.py", str(item.parent.fspath)
)
if match:
package_name = match.group("name")
if not package_is_built(package_name, dist_dir):
skip_msg = f"package '{package_name}' is not built."
# Common package import test. Skip it if the package is not built.
if (
skip_msg is None
and str(item.fspath).endswith("test_packages_common.py")
and item.name.startswith("test_import")
):
match = re.match(
r"test_import\[(firefox|chrome|node)-(?P<name>[\w-]+)\]", item.name
)
if match:
package_name = match.group("name")
if not package_is_built(package_name, dist_dir):
# If the test is going to be skipped remove the
# selenium_standalone as it takes a long time to initialize
skip_msg = f"package '{package_name}' is not built."
else:
raise AssertionError(
f"Couldn't parse package name from {item.name}. This should not happen!"
)
# TODO: also use this hook to skip doctests we cannot run (or run them
# inside the selenium wrapper)
if skip_msg is not None:
if delayed:
item.add_marker(pytest.mark.skip(reason=skip_msg))
else:
pytest.skip(skip_msg)
@functools.cache
def built_packages(dist_dir: Path) -> list[str]:
"""Returns the list of built package names from packages.json"""
packages_json_path = dist_dir / "packages.json"
if not packages_json_path.exists():
return []
return list(json.loads(packages_json_path.read_text())["packages"].keys())
def package_is_built(package_name: str, dist_dir: Path) -> bool:
return package_name.lower() in built_packages(dist_dir)

View File

@ -1,4 +1,4 @@
from pyodide_build.testing import run_in_pyodide
from pyodide_test_runner import run_in_pyodide
@run_in_pyodide

View File

@ -3,11 +3,11 @@ import sys
import time
import pytest
from pyodide_test_runner import run_in_pyodide
from pyodide_test_runner.fixture import selenium_common
from conftest import selenium_common
from pyodide import CodeRunner, console # noqa: E402
from pyodide.console import Console, _CommandCompiler, _Compile # noqa: E402
from pyodide_build.testing import run_in_pyodide
def test_command_compiler():

View File

@ -1,7 +1,6 @@
# See also test_typeconversions, and test_python.
import pytest
from pyodide_build.testing import run_in_pyodide
from pyodide_test_runner import run_in_pyodide
def test_jsproxy_dir(selenium):

View File

@ -3,9 +3,9 @@ from textwrap import dedent
from typing import Any, Sequence
import pytest
from pyodide_test_runner import run_in_pyodide
from pyodide import CodeRunner, eval_code, find_imports, should_quiet # noqa: E402
from pyodide_build.testing import run_in_pyodide
def _strip_assertions_stderr(messages: Sequence[str]) -> list[str]:

View File

@ -1,6 +1,5 @@
import pytest
from pyodide_build.testing import run_in_pyodide
from pyodide_test_runner import run_in_pyodide
def test_open_url(selenium, httpserver):

View File

@ -1,4 +1,4 @@
from pyodide_build.testing import run_in_pyodide
from pyodide_test_runner import run_in_pyodide
@run_in_pyodide

View File

@ -4,9 +4,8 @@ from typing import Any
import pytest
from hypothesis import assume, given, settings, strategies
from hypothesis.strategies import from_type, text
from conftest import selenium_context_manager
from pyodide_build.testing import run_in_pyodide
from pyodide_test_runner import run_in_pyodide
from pyodide_test_runner.fixture import selenium_context_manager
@given(s=text())

View File

@ -1,75 +0,0 @@
const vm = require("vm");
const readline = require("readline");
const path = require("path");
const util = require("util");
const node_fetch = require("node-fetch");
const base64 = require("base-64");
require(path.resolve("./pyodide.js"));
let base_url = process.argv[2];
// node requires full paths.
function fetch(path) {
return node_fetch(new URL(path, base_url).toString());
}
const context = Object.assign({}, globalThis, {
path,
process,
require,
fetch,
TextDecoder: util.TextDecoder,
TextEncoder: util.TextEncoder,
URL,
atob: base64.decode,
btoa: base64.encode,
});
vm.createContext(context);
vm.runInContext("globalThis.self = globalThis;", context);
// Get rid of all colors in output of console.log, they mess us up.
for (let key of Object.keys(util.inspect.styles)) {
util.inspect.styles[key] = undefined;
}
const rl = readline.createInterface({
input: process.stdin,
output: process.stdout,
terminal: false,
});
let cur_code = "";
let cur_uuid;
rl.on("line", async function (line) {
if (!cur_uuid) {
cur_uuid = line;
return;
}
if (line !== cur_uuid) {
cur_code += line + "\n";
} else {
evalCode(cur_uuid, cur_code, context);
cur_code = "";
cur_uuid = undefined;
}
});
async function evalCode(uuid, code, eval_context) {
let p = new Promise((resolve, reject) => {
eval_context.___outer_resolve = resolve;
eval_context.___outer_reject = reject;
});
let wrapped_code = `
(async function(){
${code}
})().then(___outer_resolve).catch(___outer_reject);
`;
let delim = uuid + ":UUID";
console.log(delim);
try {
vm.runInContext(wrapped_code, eval_context);
let result = JSON.stringify(await p);
console.log(`${delim}\n0\n${result}\n${delim}`);
} catch (e) {
console.log(`${delim}\n1\n${e.stack}\n${delim}`);
}
}

View File

@ -1,76 +0,0 @@
Error.stackTraceLimit = Infinity;
// Fix globalThis is messed up in firefox see facebook/react#16606.
// Replace it with window.
globalThis.globalThis = globalThis.window || globalThis;
globalThis.sleep = function (s) {
return new Promise((resolve) => setTimeout(resolve, s));
};
globalThis.assert = function (cb, message = "") {
if (message !== "") {
message = "\n" + message;
}
if (cb() !== true) {
throw new Error(
`Assertion failed: ${cb.toString().slice(6)}${message}`
);
}
};
globalThis.assertAsync = async function (cb, message = "") {
if (message !== "") {
message = "\n" + message;
}
if ((await cb()) !== true) {
throw new Error(
`Assertion failed: ${cb.toString().slice(12)}${message}`
);
}
};
function checkError(err, errname, pattern, pat_str, thiscallstr) {
if (typeof pattern === "string") {
pattern = new RegExp(pattern);
}
if (!err) {
throw new Error(`${thiscallstr} failed, no error thrown`);
}
if (err.constructor.name !== errname) {
throw new Error(
`${thiscallstr} failed, expected error ` +
`of type '${errname}' got type '${err.constructor.name}'`
);
}
if (!pattern.test(err.message)) {
throw new Error(
`${thiscallstr} failed, expected error ` +
`message to match pattern ${pat_str} got:\n${err.message}`
);
}
}
globalThis.assertThrows = function (cb, errname, pattern) {
let pat_str = typeof pattern === "string" ? `"${pattern}"` : `${pattern}`;
let thiscallstr = `assertThrows(${cb.toString()}, "${errname}", ${pat_str})`;
let err = undefined;
try {
cb();
} catch (e) {
err = e;
}
checkError(err, errname, pattern, pat_str, thiscallstr);
};
globalThis.assertThrowsAsync = async function (cb, errname, pattern) {
let pat_str = typeof pattern === "string" ? `"${pattern}"` : `${pattern}`;
let thiscallstr = `assertThrowsAsync(${cb.toString()}, "${errname}", ${pat_str})`;
let err = undefined;
try {
await cb();
} catch (e) {
err = e;
}
checkError(err, errname, pattern, pat_str, thiscallstr);
};