import asyncio import io import os import socket from uvloop import _testbase as tb # All tests are copied from asyncio (mostly as-is) class MyReadPipeProto(asyncio.Protocol): done = None def __init__(self, loop=None): self.state = ['INITIAL'] self.nbytes = 0 self.transport = None if loop is not None: self.done = asyncio.Future(loop=loop) def connection_made(self, transport): self.transport = transport assert self.state == ['INITIAL'], self.state self.state.append('CONNECTED') def data_received(self, data): assert self.state == ['INITIAL', 'CONNECTED'], self.state self.nbytes += len(data) def eof_received(self): assert self.state == ['INITIAL', 'CONNECTED'], self.state self.state.append('EOF') def connection_lost(self, exc): if 'EOF' not in self.state: self.state.append('EOF') # It is okay if EOF is missed. assert self.state == ['INITIAL', 'CONNECTED', 'EOF'], self.state self.state.append('CLOSED') if self.done: self.done.set_result(None) class MyWritePipeProto(asyncio.BaseProtocol): done = None paused = False def __init__(self, loop=None): self.state = 'INITIAL' self.transport = None if loop is not None: self.done = asyncio.Future(loop=loop) def connection_made(self, transport): self.transport = transport assert self.state == 'INITIAL', self.state self.state = 'CONNECTED' def connection_lost(self, exc): assert self.state == 'CONNECTED', self.state self.state = 'CLOSED' if self.done: self.done.set_result(None) def pause_writing(self): self.paused = True def resume_writing(self): self.paused = False class _BasePipeTest: def test_read_pipe(self): proto = MyReadPipeProto(loop=self.loop) rpipe, wpipe = os.pipe() pipeobj = io.open(rpipe, 'rb', 1024) async def connect(): t, p = await self.loop.connect_read_pipe( lambda: proto, pipeobj) self.assertIs(p, proto) self.assertIs(t, proto.transport) self.assertEqual(['INITIAL', 'CONNECTED'], proto.state) self.assertEqual(0, proto.nbytes) self.loop.run_until_complete(connect()) os.write(wpipe, b'1') tb.run_until(self.loop, lambda: proto.nbytes >= 1) self.assertEqual(1, proto.nbytes) os.write(wpipe, b'2345') tb.run_until(self.loop, lambda: proto.nbytes >= 5) self.assertEqual(['INITIAL', 'CONNECTED'], proto.state) self.assertEqual(5, proto.nbytes) os.close(wpipe) self.loop.run_until_complete(proto.done) self.assertEqual( ['INITIAL', 'CONNECTED', 'EOF', 'CLOSED'], proto.state) # extra info is available self.assertIsNotNone(proto.transport.get_extra_info('pipe')) def test_read_pty_output(self): proto = MyReadPipeProto(loop=self.loop) master, slave = os.openpty() master_read_obj = io.open(master, 'rb', 0) async def connect(): t, p = await self.loop.connect_read_pipe( lambda: proto, master_read_obj) self.assertIs(p, proto) self.assertIs(t, proto.transport) self.assertEqual(['INITIAL', 'CONNECTED'], proto.state) self.assertEqual(0, proto.nbytes) self.loop.run_until_complete(connect()) os.write(slave, b'1') tb.run_until(self.loop, lambda: proto.nbytes) self.assertEqual(1, proto.nbytes) os.write(slave, b'2345') tb.run_until(self.loop, lambda: proto.nbytes >= 5) self.assertEqual(['INITIAL', 'CONNECTED'], proto.state) self.assertEqual(5, proto.nbytes) # On Linux, transport raises EIO when slave is closed -- # ignore it. self.loop.set_exception_handler(lambda loop, ctx: None) os.close(slave) proto.transport.close() self.loop.run_until_complete(proto.done) self.assertEqual( ['INITIAL', 'CONNECTED', 'EOF', 'CLOSED'], proto.state) # extra info is available self.assertIsNotNone(proto.transport.get_extra_info('pipe')) def test_write_pipe(self): rpipe, wpipe = os.pipe() os.set_blocking(rpipe, False) pipeobj = io.open(wpipe, 'wb', 1024) proto = MyWritePipeProto(loop=self.loop) connect = self.loop.connect_write_pipe(lambda: proto, pipeobj) transport, p = self.loop.run_until_complete(connect) self.assertIs(p, proto) self.assertIs(transport, proto.transport) self.assertEqual('CONNECTED', proto.state) transport.write(b'1') data = bytearray() def reader(data): try: chunk = os.read(rpipe, 1024) except BlockingIOError: return len(data) data += chunk return len(data) tb.run_until(self.loop, lambda: reader(data) >= 1) self.assertEqual(b'1', data) transport.write(b'2345') tb.run_until(self.loop, lambda: reader(data) >= 5) self.assertEqual(b'12345', data) self.assertEqual('CONNECTED', proto.state) os.close(rpipe) # extra info is available self.assertIsNotNone(proto.transport.get_extra_info('pipe')) # close connection proto.transport.close() self.loop.run_until_complete(proto.done) self.assertEqual('CLOSED', proto.state) def test_write_pipe_disconnect_on_close(self): rsock, wsock = socket.socketpair() rsock.setblocking(False) pipeobj = io.open(wsock.detach(), 'wb', 1024) proto = MyWritePipeProto(loop=self.loop) connect = self.loop.connect_write_pipe(lambda: proto, pipeobj) transport, p = self.loop.run_until_complete(connect) self.assertIs(p, proto) self.assertIs(transport, proto.transport) self.assertEqual('CONNECTED', proto.state) transport.write(b'1') data = self.loop.run_until_complete(self.loop.sock_recv(rsock, 1024)) self.assertEqual(b'1', data) rsock.close() self.loop.run_until_complete(proto.done) self.assertEqual('CLOSED', proto.state) def test_write_pty(self): master, slave = os.openpty() os.set_blocking(master, False) slave_write_obj = io.open(slave, 'wb', 0) proto = MyWritePipeProto(loop=self.loop) connect = self.loop.connect_write_pipe(lambda: proto, slave_write_obj) transport, p = self.loop.run_until_complete(connect) self.assertIs(p, proto) self.assertIs(transport, proto.transport) self.assertEqual('CONNECTED', proto.state) transport.write(b'1') data = bytearray() def reader(data): try: chunk = os.read(master, 1024) except BlockingIOError: return len(data) data += chunk return len(data) tb.run_until(self.loop, lambda: reader(data) >= 1, timeout=10) self.assertEqual(b'1', data) transport.write(b'2345') tb.run_until(self.loop, lambda: reader(data) >= 5, timeout=10) self.assertEqual(b'12345', data) self.assertEqual('CONNECTED', proto.state) os.close(master) # extra info is available self.assertIsNotNone(proto.transport.get_extra_info('pipe')) # close connection proto.transport.close() self.loop.run_until_complete(proto.done) self.assertEqual('CLOSED', proto.state) def test_write_buffer_full(self): rpipe, wpipe = os.pipe() pipeobj = io.open(wpipe, 'wb', 1024) proto = MyWritePipeProto(loop=self.loop) connect = self.loop.connect_write_pipe(lambda: proto, pipeobj) transport, p = self.loop.run_until_complete(connect) self.assertIs(p, proto) self.assertIs(transport, proto.transport) self.assertEqual('CONNECTED', proto.state) for i in range(32): transport.write(b'x' * 32768) if proto.paused: transport.write(b'x' * 32768) break else: self.fail("Didn't reach a full buffer") os.close(rpipe) self.loop.run_until_complete(asyncio.wait_for(proto.done, 1)) self.assertEqual('CLOSED', proto.state) class Test_UV_Pipes(_BasePipeTest, tb.UVTestCase): pass class Test_AIO_Pipes(_BasePipeTest, tb.AIOTestCase): pass