Refactor conftest.py (#1055)

This commit is contained in:
Dexter Chua 2021-01-07 06:29:10 +08:00 committed by GitHub
parent 1f816eea9c
commit 6ef0ebbfdf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 80 additions and 192 deletions

View File

@ -48,79 +48,40 @@ except ImportError:
pytest = None
def pyodide_inited(driver):
return driver.execute_script(
"""
return !!(window.PYODIDE_READY || window.PYODIDE_ERROR);
"""
)
class JavascriptException(Exception):
def __init__(self, msg, stack):
self.msg = msg
self.stack = stack
def pyodide_init_get_error(driver):
return driver.execute_script(
"""
if(!window.PYODIDE_ERROR){
return [undefined, undefined];
}
return [window.PYODIDE_ERROR.message, window.PYODIDE_ERROR.stack];
"""
)
def package_loaded(driver):
inited = driver.execute_script("return window.done;")
return bool(inited)
def _display_driver_logs(browser, driver):
if browser == "chrome":
print("# Selenium browser logs")
print(driver.get_log("browser"))
elif browser == "firefox":
# browser logs are not available in GeckoDriver
# https://github.com/mozilla/geckodriver/issues/284
print(
"Accessing raw browser logs with Selenium is not " "supported by Firefox."
)
def __str__(self):
if self.stack:
return self.msg + "\n" + self.stack
else:
return self.msg
class SeleniumWrapper:
JavascriptException = JavascriptException
def __init__(
self, server_port, server_hostname="127.0.0.1", server_log=None, build_dir=None
):
from selenium.webdriver.support.wait import WebDriverWait
from selenium.common.exceptions import TimeoutException
if build_dir is None:
build_dir = BUILD_PATH
driver = self.get_driver()
wait = WebDriverWait(driver, timeout=40)
self.driver = self.get_driver()
self.server_port = server_port
self.server_hostname = server_hostname
self.server_log = server_log
if not (pathlib.Path(build_dir) / "test.html").exists():
# selenium does not expose HTTP response codes
raise ValueError(
f"{(build_dir / 'test.html').resolve()} " f"does not exist!"
)
driver.get(f"http://{server_hostname}:{server_port}/test.html")
try:
wait.until(pyodide_inited)
except TimeoutException:
_display_driver_logs(self.browser, driver)
raise TimeoutException()
[error, traceback] = pyodide_init_get_error(driver)
if error is not None:
raise Exception(
f"""
Pyodide initialization failed: {error}
Traceback:
{traceback}
"""
)
self.wait = wait
self.driver = driver
self.server_port = server_port
self.server_hostname = server_hostname
self.server_log = server_log
self.driver.get(f"http://{server_hostname}:{server_port}/test.html")
self.run_js("Error.stackTraceLimit = Infinity")
self.run_js_async("await languagePluginLoader")
@property
def logs(self):
@ -137,130 +98,80 @@ class SeleniumWrapper:
return self.run_js("return pyodide.runPython({!r})".format(code))
def run_async(self, code):
from selenium.common.exceptions import TimeoutException
self.run_js(
"""
window.done = false;
pyodide.runPythonAsync({!r})
.then(function(output)
{{ window.output = output; window.error = false; }},
function(output)
{{ window.output = output; window.error = true; }})
.finally(() => window.done = true);
""".format(
code
)
)
try:
self.wait.until(package_loaded)
except TimeoutException:
_display_driver_logs(self.browser, self.driver)
print(self.logs)
raise TimeoutException("runPythonAsync timed out")
return self.run_js(
"""
if (window.error) {
throw window.output;
}
return window.output;
"""
)
return self.run_js_async("return pyodide.runPythonAsync({!r})".format(code))
def run_js(self, code):
if isinstance(code, str) and code.startswith("\n"):
# we have a multiline string, fix indentation
code = textwrap.dedent(code)
catch = f"""
wrapper = """
Error.stackTraceLimit = Infinity;
try {{ {code} }}
catch (error) {{ console.log(error.stack); throw error; }}"""
return self.driver.execute_script(catch)
def setup_webworker(self):
hostname = self.server_hostname
port = self.server_port
url = f"http://{hostname}:{port}/webworker_dev.js"
self.run_js(
f"""
window.done = false;
window.pyodideWorker = new Worker( '{url}' );
window.pyodideWorker.onerror = function(e) {{
window.output = e;
window.error = true;
window.done = true;
}};
window.pyodideWorker.onmessage = function(e) {{
if (e.data.results) {{
window.output = e.data.results;
window.error = false;
}} else {{
window.output = e.data.error;
window.error = true;
}}
window.done = true;
}};
let run = () => { %s }
try {
return [0, run()]
} catch (e) {
return [1, e.toString(), e.stack];
}
"""
)
def run_webworker(self, code):
from selenium.common.exceptions import TimeoutException
retval = self.driver.execute_script(wrapper % code)
self.setup_webworker()
if retval[0] == 0:
return retval[1]
else:
raise JavascriptException(retval[1], retval[2])
def run_js_async(self, code):
if isinstance(code, str) and code.startswith("\n"):
# we have a multiline string, fix indentation
code = textwrap.dedent(code)
self.run_js(
"""
var data = {{
python: {!r}
}};
window.pyodideWorker.postMessage(data);
wrapper = """
let cb = arguments[arguments.length - 1];
let run = async () => { %s }
(async () => {
try {{
cb([0, await run()]);
}} catch (e) {{
cb([1, e.toString(), e.stack]);
}}
})()
"""
retval = self.driver.execute_async_script(wrapper % code)
if retval[0] == 0:
return retval[1]
else:
raise JavascriptException(retval[1], retval[2])
def run_webworker(self, code):
if isinstance(code, str) and code.startswith("\n"):
# we have a multiline string, fix indentation
code = textwrap.dedent(code)
return self.run_js_async(
"""
let worker = new Worker( '{}' );
worker.postMessage({{ python: {!r} }});
return new Promise((res, rej) => {{
worker.onerror = e => rej(e);
worker.onmessage = e => {{
if (e.data.results) {{
res(e.data.results);
}} else {{
rej(e.data.error);
}}
}};
}})
""".format(
code
f"http://{self.server_hostname}:{self.server_port}/webworker_dev.js",
code,
)
)
try:
self.wait.until(package_loaded)
except TimeoutException:
_display_driver_logs(self.browser, self.driver)
print(self.logs)
raise TimeoutException("run_webworker timed out")
return self.run_js(
"""
if (window.error) {
if (window.output instanceof Error) {
throw window.output;
} else {
throw new Error(String(window.output))
}
}
return window.output;
"""
)
def load_package(self, packages):
self.run_js(
"window.done = false\n"
+ "pyodide.loadPackage({!r})".format(packages)
+ ".finally(function() { window.done = true; })"
)
__tracebackhide__ = True
self.wait_until_packages_loaded()
def wait_until_packages_loaded(self):
from selenium.common.exceptions import TimeoutException
__tracebackhide__ = True
try:
self.wait.until(package_loaded)
except TimeoutException:
_display_driver_logs(self.browser, self.driver)
print(self.logs)
raise TimeoutException("wait_until_packages_loaded timed out")
self.run_js_async("await pyodide.loadPackage({!r})".format(packages))
@property
def urls(self):
@ -276,13 +187,10 @@ class FirefoxWrapper(SeleniumWrapper):
def get_driver(self):
from selenium.webdriver import Firefox
from selenium.webdriver.firefox.options import Options
from selenium.common.exceptions import JavascriptException
options = Options()
options.add_argument("-headless")
self.JavascriptException = JavascriptException
return Firefox(executable_path="geckodriver", options=options)
@ -293,14 +201,11 @@ class ChromeWrapper(SeleniumWrapper):
def get_driver(self):
from selenium.webdriver import Chrome
from selenium.webdriver.chrome.options import Options
from selenium.common.exceptions import WebDriverException
options = Options()
options.add_argument("--headless")
options.add_argument("--no-sandbox")
self.JavascriptException = WebDriverException
return Chrome(options=options)

View File

@ -20,17 +20,6 @@
window.languagePluginUrl = './';
</script>
<script src="./pyodide.js"></script>
<script type="text/javascript">
async function main(){
try {
await languagePluginLoader;
window.PYODIDE_READY = true;
} catch(e) {
window.PYODIDE_ERROR = e;
}
}
main();
</script>
</head>
<body></body>
</html>

View File

@ -93,12 +93,7 @@ def test_load_packages_multiple(selenium_standalone, packages):
def test_load_packages_sequential(selenium_standalone, packages):
selenium = selenium_standalone
promises = ",".join('pyodide.loadPackage("{}")'.format(x) for x in packages)
selenium.run_js(
"window.done = false\n"
+ "Promise.all([{}])".format(promises)
+ ".finally(function() { window.done = true; })"
)
selenium.wait_until_packages_loaded()
selenium.run_js_async("return Promise.all([{}])".format(promises))
selenium.run(f"import {packages[0]}")
selenium.run(f"import {packages[1]}")
# The log must show that each package is loaded exactly once,
@ -150,5 +145,5 @@ def test_load_package_unknown(selenium_standalone):
(build_dir / "pyparsing-custom.data").unlink()
assert selenium_standalone.run_js(
"return window.pyodide.loadedPackages." "hasOwnProperty('pyparsing-custom')"
"return window.pyodide.loadedPackages.hasOwnProperty('pyparsing-custom')"
)

View File

@ -1,6 +1,5 @@
# See also test_pyproxy, test_jsproxy, and test_python.
import pytest
from selenium.common.exceptions import WebDriverException
def test_python2js(selenium):
@ -263,19 +262,19 @@ def test_jsproxy_attribute_error(selenium):
)
msg = "AttributeError: z"
with pytest.raises(WebDriverException, match=msg):
with pytest.raises(selenium.JavascriptException, match=msg):
selenium.run("point.z")
selenium.run("del point.y")
msg = "AttributeError: y"
with pytest.raises(WebDriverException, match=msg):
with pytest.raises(selenium.JavascriptException, match=msg):
selenium.run("point.y")
assert selenium.run_js("return point.y;") is None
def test_javascript_error(selenium):
msg = "JsException: Error: This is a js error"
with pytest.raises(WebDriverException, match=msg):
with pytest.raises(selenium.JavascriptException, match=msg):
selenium.run(
"""
from js import Error