mirror of https://github.com/python/cpython.git
396 lines
13 KiB
Python
396 lines
13 KiB
Python
import argparse
|
|
import asyncio
|
|
import json
|
|
import plistlib
|
|
import shutil
|
|
import subprocess
|
|
import sys
|
|
from contextlib import asynccontextmanager
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
|
|
|
|
DECODE_ARGS = ("UTF-8", "backslashreplace")
|
|
|
|
|
|
# Work around a bug involving sys.exit and TaskGroups
|
|
# (https://github.com/python/cpython/issues/101515).
|
|
def exit(*args):
|
|
raise MySystemExit(*args)
|
|
|
|
|
|
class MySystemExit(Exception):
|
|
pass
|
|
|
|
|
|
# All subprocesses are executed through this context manager so that no matter
|
|
# what happens, they can always be cancelled from another task, and they will
|
|
# always be cleaned up on exit.
|
|
@asynccontextmanager
|
|
async def async_process(*args, **kwargs):
|
|
process = await asyncio.create_subprocess_exec(*args, **kwargs)
|
|
try:
|
|
yield process
|
|
finally:
|
|
if process.returncode is None:
|
|
# Allow a reasonably long time for Xcode to clean itself up,
|
|
# because we don't want stale emulators left behind.
|
|
timeout = 10
|
|
process.terminate()
|
|
try:
|
|
await asyncio.wait_for(process.wait(), timeout)
|
|
except TimeoutError:
|
|
print(
|
|
f"Command {args} did not terminate after {timeout} seconds "
|
|
f" - sending SIGKILL"
|
|
)
|
|
process.kill()
|
|
|
|
# Even after killing the process we must still wait for it,
|
|
# otherwise we'll get the warning "Exception ignored in __del__".
|
|
await asyncio.wait_for(process.wait(), timeout=1)
|
|
|
|
|
|
async def async_check_output(*args, **kwargs):
|
|
async with async_process(
|
|
*args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, **kwargs
|
|
) as process:
|
|
stdout, stderr = await process.communicate()
|
|
if process.returncode == 0:
|
|
return stdout.decode(*DECODE_ARGS)
|
|
else:
|
|
raise subprocess.CalledProcessError(
|
|
process.returncode,
|
|
args,
|
|
stdout.decode(*DECODE_ARGS),
|
|
stderr.decode(*DECODE_ARGS),
|
|
)
|
|
|
|
|
|
# Return a list of UDIDs associated with booted simulators
|
|
async def list_devices():
|
|
# List the testing simulators, in JSON format
|
|
raw_json = await async_check_output(
|
|
"xcrun", "simctl", "--set", "testing", "list", "-j"
|
|
)
|
|
json_data = json.loads(raw_json)
|
|
|
|
# Filter out the booted iOS simulators
|
|
return [
|
|
simulator["udid"]
|
|
for runtime, simulators in json_data["devices"].items()
|
|
for simulator in simulators
|
|
if runtime.split(".")[-1].startswith("iOS") and simulator["state"] == "Booted"
|
|
]
|
|
|
|
|
|
async def find_device(initial_devices):
|
|
while True:
|
|
new_devices = set(await list_devices()).difference(initial_devices)
|
|
if len(new_devices) == 0:
|
|
await asyncio.sleep(1)
|
|
elif len(new_devices) == 1:
|
|
udid = new_devices.pop()
|
|
print(f"{datetime.now():%Y-%m-%d %H:%M:%S}: New test simulator detected")
|
|
print(f"UDID: {udid}")
|
|
return udid
|
|
else:
|
|
exit(f"Found more than one new device: {new_devices}")
|
|
|
|
|
|
async def log_stream_task(initial_devices):
|
|
# Wait up to 5 minutes for the build to complete and the simulator to boot.
|
|
udid = await asyncio.wait_for(find_device(initial_devices), 5 * 60)
|
|
|
|
# Stream the iOS device's logs, filtering out messages that come from the
|
|
# XCTest test suite (catching NSLog messages from the test method), or
|
|
# Python itself (catching stdout/stderr content routed to the system log
|
|
# with config->use_system_logger).
|
|
args = [
|
|
"xcrun",
|
|
"simctl",
|
|
"--set",
|
|
"testing",
|
|
"spawn",
|
|
udid,
|
|
"log",
|
|
"stream",
|
|
"--style",
|
|
"compact",
|
|
"--predicate",
|
|
(
|
|
'senderImagePath ENDSWITH "/iOSTestbedTests.xctest/iOSTestbedTests"'
|
|
' OR senderImagePath ENDSWITH "/Python.framework/Python"'
|
|
),
|
|
]
|
|
|
|
async with async_process(
|
|
*args,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.STDOUT,
|
|
) as process:
|
|
suppress_dupes = False
|
|
while line := (await process.stdout.readline()).decode(*DECODE_ARGS):
|
|
# The iOS log streamer can sometimes lag; when it does, it outputs
|
|
# a warning about messages being dropped... often multiple times.
|
|
# Only print the first of these duplicated warnings.
|
|
if line.startswith("=== Messages dropped "):
|
|
if not suppress_dupes:
|
|
suppress_dupes = True
|
|
sys.stdout.write(line)
|
|
else:
|
|
suppress_dupes = False
|
|
sys.stdout.write(line)
|
|
sys.stdout.flush()
|
|
|
|
|
|
async def xcode_test(location, simulator, verbose):
|
|
# Run the test suite on the named simulator
|
|
print("Starting xcodebuild...")
|
|
args = [
|
|
"xcodebuild",
|
|
"test",
|
|
"-project",
|
|
str(location / "iOSTestbed.xcodeproj"),
|
|
"-scheme",
|
|
"iOSTestbed",
|
|
"-destination",
|
|
f"platform=iOS Simulator,name={simulator}",
|
|
"-resultBundlePath",
|
|
str(location / f"{datetime.now():%Y%m%d-%H%M%S}.xcresult"),
|
|
"-derivedDataPath",
|
|
str(location / "DerivedData"),
|
|
]
|
|
if not verbose:
|
|
args += ["-quiet"]
|
|
|
|
async with async_process(
|
|
*args,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.STDOUT,
|
|
) as process:
|
|
while line := (await process.stdout.readline()).decode(*DECODE_ARGS):
|
|
sys.stdout.write(line)
|
|
sys.stdout.flush()
|
|
|
|
status = await asyncio.wait_for(process.wait(), timeout=1)
|
|
exit(status)
|
|
|
|
|
|
def clone_testbed(
|
|
source: Path,
|
|
target: Path,
|
|
framework: Path,
|
|
apps: list[Path],
|
|
) -> None:
|
|
if target.exists():
|
|
print(f"{target} already exists; aborting without creating project.")
|
|
sys.exit(10)
|
|
|
|
if framework is None:
|
|
if not (
|
|
source / "Python.xcframework/ios-arm64_x86_64-simulator/bin"
|
|
).is_dir():
|
|
print(
|
|
f"The testbed being cloned ({source}) does not contain "
|
|
f"a simulator framework. Re-run with --framework"
|
|
)
|
|
sys.exit(11)
|
|
else:
|
|
if not framework.is_dir():
|
|
print(f"{framework} does not exist.")
|
|
sys.exit(12)
|
|
elif not (
|
|
framework.suffix == ".xcframework"
|
|
or (framework / "Python.framework").is_dir()
|
|
):
|
|
print(
|
|
f"{framework} is not an XCframework, "
|
|
f"or a simulator slice of a framework build."
|
|
)
|
|
sys.exit(13)
|
|
|
|
print("Cloning testbed project:")
|
|
print(f" Cloning {source}...", end="", flush=True)
|
|
shutil.copytree(source, target, symlinks=True)
|
|
print(" done")
|
|
|
|
if framework is not None:
|
|
if framework.suffix == ".xcframework":
|
|
print(" Installing XCFramework...", end="", flush=True)
|
|
xc_framework_path = (target / "Python.xcframework").resolve()
|
|
if xc_framework_path.is_dir():
|
|
shutil.rmtree(xc_framework_path)
|
|
else:
|
|
xc_framework_path.unlink()
|
|
xc_framework_path.symlink_to(
|
|
framework.relative_to(xc_framework_path.parent, walk_up=True)
|
|
)
|
|
print(" done")
|
|
else:
|
|
print(" Installing simulator framework...", end="", flush=True)
|
|
sim_framework_path = (
|
|
target / "Python.xcframework" / "ios-arm64_x86_64-simulator"
|
|
).resolve()
|
|
if sim_framework_path.is_dir():
|
|
shutil.rmtree(sim_framework_path)
|
|
else:
|
|
sim_framework_path.unlink()
|
|
sim_framework_path.symlink_to(
|
|
framework.relative_to(sim_framework_path.parent, walk_up=True)
|
|
)
|
|
print(" done")
|
|
else:
|
|
print(" Using pre-existing iOS framework.")
|
|
|
|
for app_src in apps:
|
|
print(f" Installing app {app_src.name!r}...", end="", flush=True)
|
|
app_target = target / f"iOSTestbed/app/{app_src.name}"
|
|
if app_target.is_dir():
|
|
shutil.rmtree(app_target)
|
|
shutil.copytree(app_src, app_target)
|
|
print(" done")
|
|
|
|
print(f"Successfully cloned testbed: {target.resolve()}")
|
|
|
|
|
|
def update_plist(testbed_path, args):
|
|
# Add the test runner arguments to the testbed's Info.plist file.
|
|
info_plist = testbed_path / "iOSTestbed" / "iOSTestbed-Info.plist"
|
|
with info_plist.open("rb") as f:
|
|
info = plistlib.load(f)
|
|
|
|
info["TestArgs"] = args
|
|
|
|
with info_plist.open("wb") as f:
|
|
plistlib.dump(info, f)
|
|
|
|
|
|
async def run_testbed(simulator: str, args: list[str], verbose: bool=False):
|
|
location = Path(__file__).parent
|
|
print("Updating plist...", end="", flush=True)
|
|
update_plist(location, args)
|
|
print(" done.")
|
|
|
|
# Get the list of devices that are booted at the start of the test run.
|
|
# The simulator started by the test suite will be detected as the new
|
|
# entry that appears on the device list.
|
|
initial_devices = await list_devices()
|
|
|
|
try:
|
|
async with asyncio.TaskGroup() as tg:
|
|
tg.create_task(log_stream_task(initial_devices))
|
|
tg.create_task(xcode_test(location, simulator=simulator, verbose=verbose))
|
|
except* MySystemExit as e:
|
|
raise SystemExit(*e.exceptions[0].args) from None
|
|
except* subprocess.CalledProcessError as e:
|
|
# Extract it from the ExceptionGroup so it can be handled by `main`.
|
|
raise e.exceptions[0]
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description=(
|
|
"Manages the process of testing a Python project in the iOS simulator."
|
|
),
|
|
)
|
|
|
|
subcommands = parser.add_subparsers(dest="subcommand")
|
|
|
|
clone = subcommands.add_parser(
|
|
"clone",
|
|
description=(
|
|
"Clone the testbed project, copying in an iOS Python framework and"
|
|
"any specified application code."
|
|
),
|
|
help="Clone a testbed project to a new location.",
|
|
)
|
|
clone.add_argument(
|
|
"--framework",
|
|
help=(
|
|
"The location of the XCFramework (or simulator-only slice of an "
|
|
"XCFramework) to use when running the testbed"
|
|
),
|
|
)
|
|
clone.add_argument(
|
|
"--app",
|
|
dest="apps",
|
|
action="append",
|
|
default=[],
|
|
help="The location of any code to include in the testbed project",
|
|
)
|
|
clone.add_argument(
|
|
"location",
|
|
help="The path where the testbed will be cloned.",
|
|
)
|
|
|
|
run = subcommands.add_parser(
|
|
"run",
|
|
usage="%(prog)s [-h] [--simulator SIMULATOR] -- <test arg> [<test arg> ...]",
|
|
description=(
|
|
"Run a testbed project. The arguments provided after `--` will be "
|
|
"passed to the running iOS process as if they were arguments to "
|
|
"`python -m`."
|
|
),
|
|
help="Run a testbed project",
|
|
)
|
|
run.add_argument(
|
|
"--simulator",
|
|
default="iPhone SE (3rd Generation)",
|
|
help="The name of the simulator to use (default: 'iPhone SE (3rd Generation)')",
|
|
)
|
|
run.add_argument(
|
|
"-v", "--verbose",
|
|
action="store_true",
|
|
help="Enable verbose output",
|
|
)
|
|
|
|
try:
|
|
pos = sys.argv.index("--")
|
|
testbed_args = sys.argv[1:pos]
|
|
test_args = sys.argv[pos + 1 :]
|
|
except ValueError:
|
|
testbed_args = sys.argv[1:]
|
|
test_args = []
|
|
|
|
context = parser.parse_args(testbed_args)
|
|
|
|
if context.subcommand == "clone":
|
|
clone_testbed(
|
|
source=Path(__file__).parent,
|
|
target=Path(context.location),
|
|
framework=Path(context.framework).resolve() if context.framework else None,
|
|
apps=[Path(app) for app in context.apps],
|
|
)
|
|
elif context.subcommand == "run":
|
|
if test_args:
|
|
if not (
|
|
Path(__file__).parent / "Python.xcframework/ios-arm64_x86_64-simulator/bin"
|
|
).is_dir():
|
|
print(
|
|
f"Testbed does not contain a compiled iOS framework. Use "
|
|
f"`python {sys.argv[0]} clone ...` to create a runnable "
|
|
f"clone of this testbed."
|
|
)
|
|
sys.exit(20)
|
|
|
|
asyncio.run(
|
|
run_testbed(
|
|
simulator=context.simulator,
|
|
verbose=context.verbose,
|
|
args=test_args,
|
|
)
|
|
)
|
|
else:
|
|
print(f"Must specify test arguments (e.g., {sys.argv[0]} run -- test)")
|
|
print()
|
|
parser.print_help(sys.stderr)
|
|
sys.exit(21)
|
|
else:
|
|
parser.print_help(sys.stderr)
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|