mirror of https://github.com/MagicStack/uvloop.git
316 lines
8.1 KiB
Python
316 lines
8.1 KiB
Python
import asyncio
|
|
import signal
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
import uvloop
|
|
|
|
from uvloop import _testbase as tb
|
|
|
|
DELAY = 0.1
|
|
|
|
|
|
class _TestSignal:
|
|
NEW_LOOP = None
|
|
|
|
@tb.silence_long_exec_warning()
|
|
def test_signals_sigint_pycode_stop(self):
|
|
async def runner():
|
|
PROG = R"""\
|
|
import asyncio
|
|
import uvloop
|
|
import time
|
|
|
|
from uvloop import _testbase as tb
|
|
|
|
async def worker():
|
|
print('READY', flush=True)
|
|
time.sleep(200)
|
|
|
|
@tb.silence_long_exec_warning()
|
|
def run():
|
|
loop = """ + self.NEW_LOOP + """
|
|
asyncio.set_event_loop(loop)
|
|
try:
|
|
loop.run_until_complete(worker())
|
|
finally:
|
|
loop.close()
|
|
|
|
run()
|
|
"""
|
|
|
|
proc = await asyncio.create_subprocess_exec(
|
|
sys.executable, b'-c', PROG,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
loop=self.loop)
|
|
|
|
await proc.stdout.readline()
|
|
time.sleep(DELAY)
|
|
proc.send_signal(signal.SIGINT)
|
|
out, err = await proc.communicate()
|
|
self.assertIn(b'KeyboardInterrupt', err)
|
|
self.assertEqual(out, b'')
|
|
|
|
self.loop.run_until_complete(runner())
|
|
|
|
@tb.silence_long_exec_warning()
|
|
def test_signals_sigint_pycode_continue(self):
|
|
async def runner():
|
|
PROG = R"""\
|
|
import asyncio
|
|
import uvloop
|
|
import time
|
|
|
|
from uvloop import _testbase as tb
|
|
|
|
async def worker():
|
|
print('READY', flush=True)
|
|
try:
|
|
time.sleep(200)
|
|
except KeyboardInterrupt:
|
|
print("oups")
|
|
await asyncio.sleep(0.5)
|
|
print('done')
|
|
|
|
@tb.silence_long_exec_warning()
|
|
def run():
|
|
loop = """ + self.NEW_LOOP + """
|
|
asyncio.set_event_loop(loop)
|
|
try:
|
|
loop.run_until_complete(worker())
|
|
finally:
|
|
loop.close()
|
|
|
|
run()
|
|
"""
|
|
|
|
proc = await asyncio.create_subprocess_exec(
|
|
sys.executable, b'-c', PROG,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
loop=self.loop)
|
|
|
|
await proc.stdout.readline()
|
|
time.sleep(DELAY)
|
|
proc.send_signal(signal.SIGINT)
|
|
out, err = await proc.communicate()
|
|
self.assertEqual(err, b'')
|
|
self.assertEqual(out, b'oups\ndone\n')
|
|
|
|
self.loop.run_until_complete(runner())
|
|
|
|
@tb.silence_long_exec_warning()
|
|
def test_signals_sigint_uvcode(self):
|
|
async def runner():
|
|
PROG = R"""\
|
|
import asyncio
|
|
import uvloop
|
|
|
|
srv = None
|
|
|
|
async def worker():
|
|
global srv
|
|
cb = lambda *args: None
|
|
srv = await asyncio.start_server(cb, '127.0.0.1', 0)
|
|
print('READY', flush=True)
|
|
|
|
loop = """ + self.NEW_LOOP + """
|
|
asyncio.set_event_loop(loop)
|
|
loop.create_task(worker())
|
|
try:
|
|
loop.run_forever()
|
|
finally:
|
|
srv.close()
|
|
loop.run_until_complete(srv.wait_closed())
|
|
loop.close()
|
|
"""
|
|
|
|
proc = await asyncio.create_subprocess_exec(
|
|
sys.executable, b'-c', PROG,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
loop=self.loop)
|
|
|
|
await proc.stdout.readline()
|
|
time.sleep(DELAY)
|
|
proc.send_signal(signal.SIGINT)
|
|
out, err = await proc.communicate()
|
|
self.assertIn(b'KeyboardInterrupt', err)
|
|
|
|
self.loop.run_until_complete(runner())
|
|
|
|
@tb.silence_long_exec_warning()
|
|
def test_signals_sigint_and_custom_handler(self):
|
|
async def runner():
|
|
PROG = R"""\
|
|
import asyncio
|
|
import signal
|
|
import uvloop
|
|
|
|
srv = None
|
|
|
|
async def worker():
|
|
global srv
|
|
cb = lambda *args: None
|
|
srv = await asyncio.start_server(cb, '127.0.0.1', 0)
|
|
print('READY', flush=True)
|
|
|
|
def handler_sig(say):
|
|
print(say, flush=True)
|
|
exit()
|
|
|
|
def handler_hup(say):
|
|
print(say, flush=True)
|
|
|
|
loop = """ + self.NEW_LOOP + """
|
|
loop.add_signal_handler(signal.SIGINT, handler_sig, '!s-int!')
|
|
loop.add_signal_handler(signal.SIGHUP, handler_hup, '!s-hup!')
|
|
asyncio.set_event_loop(loop)
|
|
loop.create_task(worker())
|
|
try:
|
|
loop.run_forever()
|
|
finally:
|
|
srv.close()
|
|
loop.run_until_complete(srv.wait_closed())
|
|
loop.close()
|
|
"""
|
|
|
|
proc = await asyncio.create_subprocess_exec(
|
|
sys.executable, b'-c', PROG,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
loop=self.loop)
|
|
|
|
await proc.stdout.readline()
|
|
time.sleep(DELAY)
|
|
proc.send_signal(signal.SIGHUP)
|
|
time.sleep(DELAY)
|
|
proc.send_signal(signal.SIGINT)
|
|
out, err = await proc.communicate()
|
|
self.assertEqual(err, b'')
|
|
self.assertIn(b'!s-hup!', out)
|
|
self.assertIn(b'!s-int!', out)
|
|
|
|
self.loop.run_until_complete(runner())
|
|
|
|
@tb.silence_long_exec_warning()
|
|
def test_signals_and_custom_handler_1(self):
|
|
async def runner():
|
|
PROG = R"""\
|
|
import asyncio
|
|
import signal
|
|
import uvloop
|
|
|
|
srv = None
|
|
|
|
async def worker():
|
|
global srv
|
|
cb = lambda *args: None
|
|
srv = await asyncio.start_server(cb, '127.0.0.1', 0)
|
|
print('READY', flush=True)
|
|
|
|
def handler1():
|
|
print("GOTIT", flush=True)
|
|
|
|
def handler2():
|
|
assert loop.remove_signal_handler(signal.SIGUSR1)
|
|
print("REMOVED", flush=True)
|
|
|
|
def handler_hup():
|
|
exit()
|
|
|
|
loop = """ + self.NEW_LOOP + """
|
|
asyncio.set_event_loop(loop)
|
|
loop.add_signal_handler(signal.SIGUSR1, handler1)
|
|
loop.add_signal_handler(signal.SIGUSR2, handler2)
|
|
loop.add_signal_handler(signal.SIGHUP, handler_hup)
|
|
loop.create_task(worker())
|
|
try:
|
|
loop.run_forever()
|
|
finally:
|
|
srv.close()
|
|
loop.run_until_complete(srv.wait_closed())
|
|
loop.close()
|
|
|
|
"""
|
|
|
|
proc = await asyncio.create_subprocess_exec(
|
|
sys.executable, b'-c', PROG,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
loop=self.loop)
|
|
|
|
await proc.stdout.readline()
|
|
|
|
time.sleep(DELAY)
|
|
proc.send_signal(signal.SIGUSR1)
|
|
time.sleep(DELAY)
|
|
proc.send_signal(signal.SIGUSR1)
|
|
time.sleep(DELAY)
|
|
proc.send_signal(signal.SIGUSR2)
|
|
time.sleep(DELAY)
|
|
proc.send_signal(signal.SIGUSR1)
|
|
time.sleep(DELAY)
|
|
proc.send_signal(signal.SIGUSR1)
|
|
time.sleep(DELAY)
|
|
proc.send_signal(signal.SIGHUP)
|
|
|
|
out, err = await proc.communicate()
|
|
self.assertEqual(err, b'')
|
|
self.assertEqual(b'GOTIT\nGOTIT\nREMOVED\n', out)
|
|
|
|
self.loop.run_until_complete(runner())
|
|
|
|
def test_signals_invalid_signal(self):
|
|
with self.assertRaisesRegex(RuntimeError,
|
|
'sig {} cannot be caught'.format(
|
|
signal.SIGKILL)):
|
|
|
|
self.loop.add_signal_handler(signal.SIGKILL, lambda *a: None)
|
|
|
|
def test_signals_coro_callback(self):
|
|
async def coro(): pass
|
|
with self.assertRaisesRegex(TypeError, 'coroutines cannot be used'):
|
|
self.loop.add_signal_handler(signal.SIGHUP, coro)
|
|
|
|
|
|
class Test_UV_Signals(_TestSignal, tb.UVTestCase):
|
|
NEW_LOOP = 'uvloop.new_event_loop()'
|
|
|
|
def test_signals_no_SIGCHLD(self):
|
|
with self.assertRaisesRegex(RuntimeError,
|
|
r"cannot add.*handler.*SIGCHLD"):
|
|
|
|
self.loop.add_signal_handler(signal.SIGCHLD, lambda *a: None)
|
|
|
|
def test_asyncio_add_watcher_SIGCHLD_nop(self):
|
|
async def proc(loop):
|
|
proc = await asyncio.create_subprocess_exec(
|
|
'echo',
|
|
stdout=subprocess.DEVNULL,
|
|
loop=loop)
|
|
await proc.wait()
|
|
|
|
aio_loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(aio_loop)
|
|
try:
|
|
aio_loop.run_until_complete(proc(aio_loop))
|
|
finally:
|
|
aio_loop.close()
|
|
asyncio.set_event_loop(None)
|
|
|
|
try:
|
|
loop = uvloop.new_event_loop()
|
|
with self.assertWarnsRegex(
|
|
RuntimeWarning,
|
|
"asyncio is trying to install its ChildWatcher"):
|
|
asyncio.set_event_loop(loop)
|
|
finally:
|
|
asyncio.set_event_loop(None)
|
|
loop.close()
|
|
|
|
|
|
class Test_AIO_Signals(_TestSignal, tb.AIOTestCase):
|
|
NEW_LOOP = 'asyncio.new_event_loop()'
|