mirror of https://github.com/MagicStack/uvloop.git
tests: Another attempt to fix test_signals_restore on Travis
This commit is contained in:
parent
e7202088a3
commit
cc10f73bf3
|
@ -7,12 +7,13 @@ import uvloop
|
|||
|
||||
from uvloop import _testbase as tb
|
||||
|
||||
DELAY = 0.01
|
||||
DELAY = 0.1
|
||||
|
||||
|
||||
class _TestSignal:
|
||||
NEW_LOOP = None
|
||||
|
||||
@tb.silence_long_exec_warning()
|
||||
def test_signals_sigint_pycode_stop(self):
|
||||
async def runner():
|
||||
PROG = R"""\
|
||||
|
@ -20,13 +21,22 @@ import asyncio
|
|||
import uvloop
|
||||
import time
|
||||
|
||||
from uvloop import _testbase as tb
|
||||
|
||||
async def worker():
|
||||
print('READY', flush=True)
|
||||
time.sleep(200)
|
||||
|
||||
loop = """ + self.NEW_LOOP + """
|
||||
asyncio.set_event_loop(loop)
|
||||
loop.run_until_complete(worker())
|
||||
@tb.silence_long_exec_warning()
|
||||
def run():
|
||||
loop = """ + self.NEW_LOOP + """
|
||||
asyncio.set_event_loop(loop)
|
||||
try:
|
||||
loop.run_until_complete(worker())
|
||||
finally:
|
||||
loop.close()
|
||||
|
||||
run()
|
||||
"""
|
||||
|
||||
proc = await asyncio.create_subprocess_exec(
|
||||
|
@ -44,6 +54,7 @@ loop.run_until_complete(worker())
|
|||
|
||||
self.loop.run_until_complete(runner())
|
||||
|
||||
@tb.silence_long_exec_warning()
|
||||
def test_signals_sigint_pycode_continue(self):
|
||||
async def runner():
|
||||
PROG = R"""\
|
||||
|
@ -51,6 +62,8 @@ import asyncio
|
|||
import uvloop
|
||||
import time
|
||||
|
||||
from uvloop import _testbase as tb
|
||||
|
||||
async def worker():
|
||||
print('READY', flush=True)
|
||||
try:
|
||||
|
@ -60,9 +73,16 @@ async def worker():
|
|||
await asyncio.sleep(0.5)
|
||||
print('done')
|
||||
|
||||
loop = """ + self.NEW_LOOP + """
|
||||
asyncio.set_event_loop(loop)
|
||||
loop.run_until_complete(worker())
|
||||
@tb.silence_long_exec_warning()
|
||||
def run():
|
||||
loop = """ + self.NEW_LOOP + """
|
||||
asyncio.set_event_loop(loop)
|
||||
try:
|
||||
loop.run_until_complete(worker())
|
||||
finally:
|
||||
loop.close()
|
||||
|
||||
run()
|
||||
"""
|
||||
|
||||
proc = await asyncio.create_subprocess_exec(
|
||||
|
@ -80,6 +100,7 @@ loop.run_until_complete(worker())
|
|||
|
||||
self.loop.run_until_complete(runner())
|
||||
|
||||
@tb.silence_long_exec_warning()
|
||||
def test_signals_sigint_uvcode(self):
|
||||
async def runner():
|
||||
PROG = R"""\
|
||||
|
@ -119,6 +140,7 @@ finally:
|
|||
|
||||
self.loop.run_until_complete(runner())
|
||||
|
||||
@tb.silence_long_exec_warning()
|
||||
def test_signals_sigint_and_custom_handler(self):
|
||||
async def runner():
|
||||
PROG = R"""\
|
||||
|
@ -172,6 +194,7 @@ finally:
|
|||
|
||||
self.loop.run_until_complete(runner())
|
||||
|
||||
@tb.silence_long_exec_warning()
|
||||
def test_signals_and_custom_handler_1(self):
|
||||
async def runner():
|
||||
PROG = R"""\
|
||||
|
@ -243,90 +266,6 @@ finally:
|
|||
class Test_UV_Signals(_TestSignal, tb.UVTestCase):
|
||||
NEW_LOOP = 'uvloop.new_event_loop()'
|
||||
|
||||
def test_signals_restore(self):
|
||||
# Test that uvloop restores signals installed with the signals
|
||||
# module after the loop is done running.
|
||||
|
||||
async def runner():
|
||||
PROG = R"""\
|
||||
import asyncio
|
||||
import uvloop
|
||||
import signal
|
||||
import time
|
||||
|
||||
srv = None
|
||||
|
||||
async def worker():
|
||||
global srv
|
||||
cb = lambda *args: None
|
||||
srv = await asyncio.start_server(cb, '127.0.0.1', 0)
|
||||
print('READY', flush=True)
|
||||
|
||||
def py_handler(signum, frame):
|
||||
print('pyhandler', flush=True)
|
||||
|
||||
def aio_handler():
|
||||
print('aiohandler', flush=True)
|
||||
loop.stop()
|
||||
|
||||
signal.signal(signal.SIGUSR1, py_handler)
|
||||
|
||||
print('step1', flush=True)
|
||||
print(input(), flush=True)
|
||||
loop = """ + self.NEW_LOOP + """
|
||||
loop.add_signal_handler(signal.SIGUSR1, aio_handler)
|
||||
asyncio.set_event_loop(loop)
|
||||
loop.create_task(worker())
|
||||
try:
|
||||
loop.run_forever()
|
||||
finally:
|
||||
srv.close()
|
||||
loop.run_until_complete(srv.wait_closed())
|
||||
loop.close()
|
||||
print('step3', flush=True)
|
||||
print(input(), flush=True)
|
||||
"""
|
||||
|
||||
proc = await asyncio.create_subprocess_exec(
|
||||
sys.executable, b'-c', PROG,
|
||||
stdin=subprocess.PIPE,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
loop=self.loop)
|
||||
|
||||
ln = await proc.stdout.readline()
|
||||
self.assertEqual(ln, b'step1\n')
|
||||
|
||||
proc.send_signal(signal.SIGUSR1)
|
||||
ln = await proc.stdout.readline()
|
||||
self.assertEqual(ln, b'pyhandler\n')
|
||||
|
||||
proc.stdin.write(b'test\n')
|
||||
ln = await proc.stdout.readline()
|
||||
self.assertEqual(ln, b'test\n')
|
||||
|
||||
ln = await proc.stdout.readline()
|
||||
self.assertEqual(ln, b'READY\n')
|
||||
|
||||
proc.send_signal(signal.SIGUSR1)
|
||||
ln = await proc.stdout.readline()
|
||||
self.assertEqual(ln, b'aiohandler\n')
|
||||
|
||||
ln = await proc.stdout.readline()
|
||||
self.assertEqual(ln, b'step3\n')
|
||||
|
||||
proc.send_signal(signal.SIGUSR1)
|
||||
ln = await proc.stdout.readline()
|
||||
self.assertEqual(ln, b'pyhandler\n')
|
||||
|
||||
proc.stdin.write(b'done\n')
|
||||
|
||||
out, err = await proc.communicate()
|
||||
self.assertEqual(out, b'done\n')
|
||||
self.assertEqual(err, b'')
|
||||
|
||||
self.loop.run_until_complete(runner())
|
||||
|
||||
|
||||
class Test_AIO_Signals(_TestSignal, tb.AIOTestCase):
|
||||
NEW_LOOP = 'asyncio.new_event_loop()'
|
||||
|
|
|
@ -68,6 +68,23 @@ def _cert_fullname(name):
|
|||
return fullname
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def silence_long_exec_warning():
|
||||
|
||||
class Filter(logging.Filter):
|
||||
def filter(self, record):
|
||||
return not (record.msg.startswith('Executing') and
|
||||
record.msg.endswith('seconds'))
|
||||
|
||||
logger = logging.getLogger('asyncio')
|
||||
filter = Filter()
|
||||
logger.addFilter(filter)
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
logger.removeFilter(filter)
|
||||
|
||||
|
||||
class SSLTestCase:
|
||||
|
||||
ONLYCERT = _cert_fullname('ssl_cert.pem')
|
||||
|
|
Loading…
Reference in New Issue