Restore uvloop.new_event_loop and other missing uvloop members to typing (#573)

This commit is contained in:
Thomas Grainger 2023-10-15 09:40:25 -07:00 committed by GitHub
parent 5ddf38bcca
commit 5c500ee257
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 68 additions and 67 deletions

View File

@ -13,6 +13,9 @@ from ._version import __version__ # NOQA
__all__ = ('new_event_loop', 'install', 'EventLoopPolicy')
_T = _typing.TypeVar("_T")
class Loop(__BaseLoop, __asyncio.AbstractEventLoop): # type: ignore[misc]
pass
@ -34,69 +37,84 @@ def install() -> None:
__asyncio.set_event_loop_policy(EventLoopPolicy())
def run(main, *, loop_factory=new_event_loop, debug=None, **run_kwargs):
"""The preferred way of running a coroutine with uvloop."""
if _typing.TYPE_CHECKING:
def run(
main: _typing.Coroutine[_typing.Any, _typing.Any, _T],
*,
loop_factory: _typing.Optional[
_typing.Callable[[], Loop]
] = new_event_loop,
debug: _typing.Optional[bool]=None,
) -> _T:
"""The preferred way of running a coroutine with uvloop."""
else:
def run(main, *, loop_factory=new_event_loop, debug=None, **run_kwargs):
"""The preferred way of running a coroutine with uvloop."""
async def wrapper():
# If `loop_factory` is provided we want it to return
# either uvloop.Loop or a subtype of it, assuming the user
# is using `uvloop.run()` intentionally.
loop = __asyncio._get_running_loop()
if not isinstance(loop, Loop):
raise TypeError('uvloop.run() uses a non-uvloop event loop')
return await main
async def wrapper():
# If `loop_factory` is provided we want it to return
# either uvloop.Loop or a subtype of it, assuming the user
# is using `uvloop.run()` intentionally.
loop = __asyncio._get_running_loop()
if not isinstance(loop, Loop):
raise TypeError('uvloop.run() uses a non-uvloop event loop')
return await main
vi = _sys.version_info[:2]
vi = _sys.version_info[:2]
if vi <= (3, 10):
# Copied from python/cpython
if vi <= (3, 10):
# Copied from python/cpython
if __asyncio._get_running_loop() is not None:
raise RuntimeError(
"asyncio.run() cannot be called from a running event loop")
if __asyncio._get_running_loop() is not None:
raise RuntimeError(
"asyncio.run() cannot be called from a running event loop")
if not __asyncio.iscoroutine(main):
raise ValueError("a coroutine was expected, got {!r}".format(main))
if not __asyncio.iscoroutine(main):
raise ValueError(
"a coroutine was expected, got {!r}".format(main)
)
loop = loop_factory()
try:
__asyncio.set_event_loop(loop)
if debug is not None:
loop.set_debug(debug)
return loop.run_until_complete(wrapper())
finally:
loop = loop_factory()
try:
_cancel_all_tasks(loop)
loop.run_until_complete(loop.shutdown_asyncgens())
if hasattr(loop, 'shutdown_default_executor'):
loop.run_until_complete(loop.shutdown_default_executor())
__asyncio.set_event_loop(loop)
if debug is not None:
loop.set_debug(debug)
return loop.run_until_complete(wrapper())
finally:
__asyncio.set_event_loop(None)
loop.close()
try:
_cancel_all_tasks(loop)
loop.run_until_complete(loop.shutdown_asyncgens())
if hasattr(loop, 'shutdown_default_executor'):
loop.run_until_complete(
loop.shutdown_default_executor()
)
finally:
__asyncio.set_event_loop(None)
loop.close()
elif vi == (3, 11):
if __asyncio._get_running_loop() is not None:
raise RuntimeError(
"asyncio.run() cannot be called from a running event loop")
elif vi == (3, 11):
if __asyncio._get_running_loop() is not None:
raise RuntimeError(
"asyncio.run() cannot be called from a running event loop")
with __asyncio.Runner(
loop_factory=loop_factory,
debug=debug,
**run_kwargs
) as runner:
return runner.run(wrapper())
with __asyncio.Runner(
loop_factory=loop_factory,
debug=debug,
**run_kwargs
) as runner:
return runner.run(wrapper())
else:
assert vi >= (3, 12)
return __asyncio.run(
wrapper(),
loop_factory=loop_factory,
debug=debug,
**run_kwargs
)
else:
assert vi >= (3, 12)
return __asyncio.run(
wrapper(),
loop_factory=loop_factory,
debug=debug,
**run_kwargs
)
def _cancel_all_tasks(loop):
def _cancel_all_tasks(loop: __asyncio.AbstractEventLoop) -> None:
# Copied from python/cpython
to_cancel = __asyncio.all_tasks(loop)

View File

@ -1,17 +0,0 @@
import sys
from asyncio import AbstractEventLoop
from collections.abc import Callable, Coroutine
from contextvars import Context
from typing import Any, TypeVar
_T = TypeVar('_T')
def run(
main: Coroutine[Any, Any, _T],
*,
debug: bool | None = ...,
loop_factory: Callable[[], AbstractEventLoop] | None = ...,
) -> _T: ...