From 47bbea712415e79ee224d21e518470ec70477d41 Mon Sep 17 00:00:00 2001 From: Victor Stinner Date: Thu, 29 Jan 2015 02:56:05 +0100 Subject: [PATCH 01/15] asyncio: sync with Tulip * _SelectorTransport constructor: extra parameter is now optional * Fix _SelectorDatagramTransport constructor. Only start reading after connection_made() has been called. * Fix _SelectorSslTransport.close(). Don't call protocol.connection_lost() if protocol.connection_made() was not called yet: if the SSL handshake failed or is still in progress. The close() method can be called if the creation of the connection is cancelled, by a timeout for example. --- Lib/asyncio/selector_events.py | 13 ++++++++++--- Lib/test/test_asyncio/test_selector_events.py | 15 ++++++++++++++- 2 files changed, 24 insertions(+), 4 deletions(-) diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py index f4996293621..3195f622dae 100644 --- a/Lib/asyncio/selector_events.py +++ b/Lib/asyncio/selector_events.py @@ -467,7 +467,7 @@ class _SelectorTransport(transports._FlowControlMixin, _buffer_factory = bytearray # Constructs initial value for self._buffer. - def __init__(self, loop, sock, protocol, extra, server=None): + def __init__(self, loop, sock, protocol, extra=None, server=None): super().__init__(extra, loop) self._extra['socket'] = sock self._extra['sockname'] = sock.getsockname() @@ -479,6 +479,7 @@ def __init__(self, loop, sock, protocol, extra, server=None): self._sock = sock self._sock_fd = sock.fileno() self._protocol = protocol + self._protocol_connected = True self._server = server self._buffer = self._buffer_factory() self._conn_lost = 0 # Set when call to connection_lost scheduled. @@ -555,7 +556,8 @@ def _force_close(self, exc): def _call_connection_lost(self, exc): try: - self._protocol.connection_lost(exc) + if self._protocol_connected: + self._protocol.connection_lost(exc) finally: self._sock.close() self._sock = None @@ -718,6 +720,8 @@ def __init__(self, loop, rawsock, protocol, sslcontext, waiter=None, sslsock = sslcontext.wrap_socket(rawsock, **wrap_kwargs) super().__init__(loop, sslsock, protocol, extra, server) + # the protocol connection is only made after the SSL handshake + self._protocol_connected = False self._server_hostname = server_hostname self._waiter = waiter @@ -797,6 +801,7 @@ def _on_handshake(self, start_time): self._read_wants_write = False self._write_wants_read = False self._loop.add_reader(self._sock_fd, self._read_ready) + self._protocol_connected = True self._loop.call_soon(self._protocol.connection_made, self) # only wake up the waiter when connection_made() has been called self._loop.call_soon(self._wakeup_waiter) @@ -928,8 +933,10 @@ def __init__(self, loop, sock, protocol, address=None, waiter=None, extra=None): super().__init__(loop, sock, protocol, extra) self._address = address - self._loop.add_reader(self._sock_fd, self._read_ready) self._loop.call_soon(self._protocol.connection_made, self) + # only start reading when connection_made() has been called + self._loop.call_soon(self._loop.add_reader, + self._sock_fd, self._read_ready) if waiter is not None: # only wake up the waiter when connection_made() has been called self._loop.call_soon(waiter._set_result_unless_cancelled, None) diff --git a/Lib/test/test_asyncio/test_selector_events.py b/Lib/test/test_asyncio/test_selector_events.py index 51526163953..f64e40dafcd 100644 --- a/Lib/test/test_asyncio/test_selector_events.py +++ b/Lib/test/test_asyncio/test_selector_events.py @@ -1427,7 +1427,7 @@ def test_write_eof(self): self.assertFalse(tr.can_write_eof()) self.assertRaises(NotImplementedError, tr.write_eof) - def test_close(self): + def check_close(self): tr = self._make_one() tr.close() @@ -1439,6 +1439,19 @@ def test_close(self): self.assertEqual(tr._conn_lost, 1) self.assertEqual(1, self.loop.remove_reader_count[1]) + test_utils.run_briefly(self.loop) + + def test_close(self): + self.check_close() + self.assertTrue(self.protocol.connection_made.called) + self.assertTrue(self.protocol.connection_lost.called) + + def test_close_not_connected(self): + self.sslsock.do_handshake.side_effect = ssl.SSLWantReadError + self.check_close() + self.assertFalse(self.protocol.connection_made.called) + self.assertFalse(self.protocol.connection_lost.called) + @unittest.skipIf(ssl is None, 'No SSL support') def test_server_hostname(self): self.ssl_transport(server_hostname='localhost') From 54a231d5397bda24257f253eb1aaabf1b741a0b5 Mon Sep 17 00:00:00 2001 From: Victor Stinner Date: Thu, 29 Jan 2015 13:33:15 +0100 Subject: [PATCH 02/15] asyncio doc: document Protocol state machine --- Doc/library/asyncio-protocol.rst | 8 ++++++++ Lib/asyncio/protocols.py | 5 +++++ 2 files changed, 13 insertions(+) diff --git a/Doc/library/asyncio-protocol.rst b/Doc/library/asyncio-protocol.rst index 60776d1e9e3..b6fcc4840b5 100644 --- a/Doc/library/asyncio-protocol.rst +++ b/Doc/library/asyncio-protocol.rst @@ -374,6 +374,14 @@ The following callbacks are called on :class:`Protocol` instances: a connection. However, :meth:`eof_received` is called at most once and, if called, :meth:`data_received` won't be called after it. +State machine: + + start -> :meth:`~BaseProtocol.connection_made` + [-> :meth:`~Protocol.data_received` \*] + [-> :meth:`~Protocol.eof_received` ?] + -> :meth:`~BaseProtocol.connection_lost` -> end + + Datagram protocols ------------------ diff --git a/Lib/asyncio/protocols.py b/Lib/asyncio/protocols.py index 52fc25c2ee9..80fcac9a82d 100644 --- a/Lib/asyncio/protocols.py +++ b/Lib/asyncio/protocols.py @@ -78,6 +78,11 @@ class Protocol(BaseProtocol): State machine of calls: start -> CM [-> DR*] [-> ER?] -> CL -> end + + * CM: connection_made() + * DR: data_received() + * ER: eof_received() + * CL: connection_lost() """ def data_received(self, data): From 2934262fd36c35843c01b96657047625ce2e3cf6 Mon Sep 17 00:00:00 2001 From: Victor Stinner Date: Thu, 29 Jan 2015 14:15:19 +0100 Subject: [PATCH 03/15] asyncio: sync with Tulip * Cleanup gather(): use cancelled() method instead of using private Future attribute * Fix _UnixReadPipeTransport and _UnixWritePipeTransport. Only start reading when connection_made() has been called. * Issue #23333: Fix BaseSelectorEventLoop._accept_connection(). Close the transport on error. In debug mode, log errors using call_exception_handler() --- Lib/asyncio/selector_events.py | 44 +++++++++++++++++++---- Lib/asyncio/tasks.py | 2 +- Lib/asyncio/unix_events.py | 19 ++++++---- Lib/test/test_asyncio/test_events.py | 37 ++++++++++++------- Lib/test/test_asyncio/test_unix_events.py | 29 ++++++--------- 5 files changed, 86 insertions(+), 45 deletions(-) diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py index 3195f622dae..914783266d1 100644 --- a/Lib/asyncio/selector_events.py +++ b/Lib/asyncio/selector_events.py @@ -22,6 +22,7 @@ from . import selectors from . import transports from . import sslproto +from .coroutines import coroutine from .log import logger @@ -181,16 +182,47 @@ def _accept_connection(self, protocol_factory, sock, else: raise # The event loop will catch, log and ignore it. else: + extra = {'peername': addr} + accept = self._accept_connection2(protocol_factory, conn, extra, + sslcontext, server) + self.create_task(accept) + + @coroutine + def _accept_connection2(self, protocol_factory, conn, extra, + sslcontext=None, server=None): + protocol = None + transport = None + try: protocol = protocol_factory() + waiter = futures.Future(loop=self) if sslcontext: - self._make_ssl_transport( - conn, protocol, sslcontext, - server_side=True, extra={'peername': addr}, server=server) + transport = self._make_ssl_transport( + conn, protocol, sslcontext, waiter=waiter, + server_side=True, extra=extra, server=server) else: - self._make_socket_transport( - conn, protocol , extra={'peername': addr}, + transport = self._make_socket_transport( + conn, protocol, waiter=waiter, extra=extra, server=server) - # It's now up to the protocol to handle the connection. + + try: + yield from waiter + except: + transport.close() + raise + + # It's now up to the protocol to handle the connection. + except Exception as exc: + if self.get_debug(): + context = { + 'message': ('Error on transport creation ' + 'for incoming connection'), + 'exception': exc, + } + if protocol is not None: + context['protocol'] = protocol + if transport is not None: + context['transport'] = transport + self.call_exception_handler(context) def add_reader(self, fd, callback, *args): """Add a reader callback.""" diff --git a/Lib/asyncio/tasks.py b/Lib/asyncio/tasks.py index 63412a97e62..4f19a252ff6 100644 --- a/Lib/asyncio/tasks.py +++ b/Lib/asyncio/tasks.py @@ -592,7 +592,7 @@ def _done_callback(i, fut): fut.exception() return - if fut._state == futures._CANCELLED: + if fut.cancelled(): res = futures.CancelledError() if not return_exceptions: outer.set_exception(res) diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py index 67973f14f3f..7e1265a091f 100644 --- a/Lib/asyncio/unix_events.py +++ b/Lib/asyncio/unix_events.py @@ -298,8 +298,10 @@ def __init__(self, loop, pipe, protocol, waiter=None, extra=None): _set_nonblocking(self._fileno) self._protocol = protocol self._closing = False - self._loop.add_reader(self._fileno, self._read_ready) self._loop.call_soon(self._protocol.connection_made, self) + # only start reading when connection_made() has been called + self._loop.call_soon(self._loop.add_reader, + self._fileno, self._read_ready) if waiter is not None: # only wake up the waiter when connection_made() has been called self._loop.call_soon(waiter._set_result_unless_cancelled, None) @@ -401,13 +403,16 @@ def __init__(self, loop, pipe, protocol, waiter=None, extra=None): self._conn_lost = 0 self._closing = False # Set when close() or write_eof() called. - # On AIX, the reader trick only works for sockets. - # On other platforms it works for pipes and sockets. - # (Exception: OS X 10.4? Issue #19294.) - if is_socket or not sys.platform.startswith("aix"): - self._loop.add_reader(self._fileno, self._read_ready) - self._loop.call_soon(self._protocol.connection_made, self) + + # On AIX, the reader trick (to be notified when the read end of the + # socket is closed) only works for sockets. On other platforms it + # works for pipes and sockets. (Exception: OS X 10.4? Issue #19294.) + if is_socket or not sys.platform.startswith("aix"): + # only start reading when connection_made() has been called + self._loop.call_soon(self._loop.add_reader, + self._fileno, self._read_ready) + if waiter is not None: # only wake up the waiter when connection_made() has been called self._loop.call_soon(waiter._set_result_unless_cancelled, None) diff --git a/Lib/test/test_asyncio/test_events.py b/Lib/test/test_asyncio/test_events.py index a38c90ebe8c..12af62b2093 100644 --- a/Lib/test/test_asyncio/test_events.py +++ b/Lib/test/test_asyncio/test_events.py @@ -886,13 +886,18 @@ def test_create_server_ssl_verify_failed(self): if hasattr(sslcontext_client, 'check_hostname'): sslcontext_client.check_hostname = True + # no CA loaded f_c = self.loop.create_connection(MyProto, host, port, ssl=sslcontext_client) - with test_utils.disable_logger(): - with self.assertRaisesRegex(ssl.SSLError, - 'certificate verify failed '): - self.loop.run_until_complete(f_c) + with mock.patch.object(self.loop, 'call_exception_handler'): + with test_utils.disable_logger(): + with self.assertRaisesRegex(ssl.SSLError, + 'certificate verify failed '): + self.loop.run_until_complete(f_c) + + # execute the loop to log the connection error + test_utils.run_briefly(self.loop) # close connection self.assertIsNone(proto.transport) @@ -919,15 +924,20 @@ def test_create_unix_server_ssl_verify_failed(self): f_c = self.loop.create_unix_connection(MyProto, path, ssl=sslcontext_client, server_hostname='invalid') - with test_utils.disable_logger(): - with self.assertRaisesRegex(ssl.SSLError, - 'certificate verify failed '): - self.loop.run_until_complete(f_c) + with mock.patch.object(self.loop, 'call_exception_handler'): + with test_utils.disable_logger(): + with self.assertRaisesRegex(ssl.SSLError, + 'certificate verify failed '): + self.loop.run_until_complete(f_c) + + # execute the loop to log the connection error + test_utils.run_briefly(self.loop) # close connection self.assertIsNone(proto.transport) server.close() + def test_legacy_create_unix_server_ssl_verify_failed(self): with test_utils.force_legacy_ssl_support(): self.test_create_unix_server_ssl_verify_failed() @@ -949,11 +959,12 @@ def test_create_server_ssl_match_failed(self): # incorrect server_hostname f_c = self.loop.create_connection(MyProto, host, port, ssl=sslcontext_client) - with test_utils.disable_logger(): - with self.assertRaisesRegex( - ssl.CertificateError, - "hostname '127.0.0.1' doesn't match 'localhost'"): - self.loop.run_until_complete(f_c) + with mock.patch.object(self.loop, 'call_exception_handler'): + with test_utils.disable_logger(): + with self.assertRaisesRegex( + ssl.CertificateError, + "hostname '127.0.0.1' doesn't match 'localhost'"): + self.loop.run_until_complete(f_c) # close connection proto.transport.close() diff --git a/Lib/test/test_asyncio/test_unix_events.py b/Lib/test/test_asyncio/test_unix_events.py index 126196daac9..41249ff0248 100644 --- a/Lib/test/test_asyncio/test_unix_events.py +++ b/Lib/test/test_asyncio/test_unix_events.py @@ -350,16 +350,13 @@ def read_pipe_transport(self, waiter=None): return transport def test_ctor(self): - tr = self.read_pipe_transport() - self.loop.assert_reader(5, tr._read_ready) - test_utils.run_briefly(self.loop) - self.protocol.connection_made.assert_called_with(tr) + waiter = asyncio.Future(loop=self.loop) + tr = self.read_pipe_transport(waiter=waiter) + self.loop.run_until_complete(waiter) - def test_ctor_with_waiter(self): - fut = asyncio.Future(loop=self.loop) - tr = self.read_pipe_transport(waiter=fut) - test_utils.run_briefly(self.loop) - self.assertIsNone(fut.result()) + self.protocol.connection_made.assert_called_with(tr) + self.loop.assert_reader(5, tr._read_ready) + self.assertIsNone(waiter.result()) @mock.patch('os.read') def test__read_ready(self, m_read): @@ -502,17 +499,13 @@ def write_pipe_transport(self, waiter=None): return transport def test_ctor(self): - tr = self.write_pipe_transport() - self.loop.assert_reader(5, tr._read_ready) - test_utils.run_briefly(self.loop) - self.protocol.connection_made.assert_called_with(tr) + waiter = asyncio.Future(loop=self.loop) + tr = self.write_pipe_transport(waiter=waiter) + self.loop.run_until_complete(waiter) - def test_ctor_with_waiter(self): - fut = asyncio.Future(loop=self.loop) - tr = self.write_pipe_transport(waiter=fut) + self.protocol.connection_made.assert_called_with(tr) self.loop.assert_reader(5, tr._read_ready) - test_utils.run_briefly(self.loop) - self.assertEqual(None, fut.result()) + self.assertEqual(None, waiter.result()) def test_can_write_eof(self): tr = self.write_pipe_transport() From fa5d6a5ff3ca247d9c2eaf51853ff39c98c09f4a Mon Sep 17 00:00:00 2001 From: Stefan Krah Date: Thu, 29 Jan 2015 14:27:23 +0100 Subject: [PATCH 04/15] Issue #22668: Ensure that format strings survive slicing after casting. --- Include/memoryobject.h | 4 +-- Lib/test/test_memoryview.py | 19 +++++++++++++ Objects/memoryobject.c | 56 ++++++++++++++++++++++++++++++++++--- 3 files changed, 73 insertions(+), 6 deletions(-) diff --git a/Include/memoryobject.h b/Include/memoryobject.h index c2e11944467..382ca92e1f3 100644 --- a/Include/memoryobject.h +++ b/Include/memoryobject.h @@ -45,7 +45,7 @@ typedef struct { } _PyManagedBufferObject; -/* static storage used for casting between formats */ +/* deprecated, removed in 3.5 */ #define _Py_MEMORYVIEW_MAX_FORMAT 3 /* must be >= 3 */ /* memoryview state flags */ @@ -62,7 +62,7 @@ typedef struct { int flags; /* state flags */ Py_ssize_t exports; /* number of buffer re-exports */ Py_buffer view; /* private copy of the exporter's view */ - char format[_Py_MEMORYVIEW_MAX_FORMAT]; /* used for casting */ + char format[_Py_MEMORYVIEW_MAX_FORMAT]; /* deprecated, removed in 3.5 */ PyObject *weakreflist; Py_ssize_t ob_array[1]; /* shape, strides, suboffsets */ } PyMemoryViewObject; diff --git a/Lib/test/test_memoryview.py b/Lib/test/test_memoryview.py index e7df8a762c6..bd9d0d472c0 100644 --- a/Lib/test/test_memoryview.py +++ b/Lib/test/test_memoryview.py @@ -360,6 +360,25 @@ def test_reversed(self): self.assertEqual(list(reversed(m)), aslist) self.assertEqual(list(reversed(m)), list(m[::-1])) + def test_issue22668(self): + m = memoryview(bytes(range(8))) + b = m.cast('H') + c = b[0:2] + d = memoryview(b) + + del b + + self.assertEqual(c[0], 256) + self.assertEqual(d[0], 256) + self.assertEqual(c.format, "H") + self.assertEqual(d.format, "H") + + _ = m.cast('I') + self.assertEqual(c[0], 256) + self.assertEqual(d[0], 256) + self.assertEqual(c.format, "H") + self.assertEqual(d.format, "H") + # Variations on source objects for the buffer: bytes-like objects, then arrays # with itemsize > 1. diff --git a/Objects/memoryobject.c b/Objects/memoryobject.c index cb644b822b7..0be84939ab6 100644 --- a/Objects/memoryobject.c +++ b/Objects/memoryobject.c @@ -1135,6 +1135,51 @@ get_native_fmtchar(char *result, const char *fmt) return -1; } +Py_LOCAL_INLINE(char *) +get_native_fmtstr(const char *fmt) +{ + int at = 0; + + if (fmt[0] == '@') { + at = 1; + fmt++; + } + if (fmt[0] == '\0' || fmt[1] != '\0') { + return NULL; + } + +#define RETURN(s) do { return at ? "@" s : s; } while (0) + + switch (fmt[0]) { + case 'c': RETURN("c"); + case 'b': RETURN("b"); + case 'B': RETURN("B"); + case 'h': RETURN("h"); + case 'H': RETURN("H"); + case 'i': RETURN("i"); + case 'I': RETURN("I"); + case 'l': RETURN("l"); + case 'L': RETURN("L"); + #ifdef HAVE_LONG_LONG + case 'q': RETURN("q"); + case 'Q': RETURN("Q"); + #endif + case 'n': RETURN("n"); + case 'N': RETURN("N"); + case 'f': RETURN("f"); + case 'd': RETURN("d"); + #ifdef HAVE_C99_BOOL + case '?': RETURN("?"); + #else + case '?': RETURN("?"); + #endif + case 'P': RETURN("P"); + } + + return NULL; +} + + /* Cast a memoryview's data type to 'format'. The input array must be C-contiguous. At least one of input-format, output-format must have byte size. The output array is 1-D, with the same byte length as the @@ -1184,10 +1229,13 @@ cast_to_1D(PyMemoryViewObject *mv, PyObject *format) goto out; } - strncpy(mv->format, PyBytes_AS_STRING(asciifmt), - _Py_MEMORYVIEW_MAX_FORMAT); - mv->format[_Py_MEMORYVIEW_MAX_FORMAT-1] = '\0'; - view->format = mv->format; + view->format = get_native_fmtstr(PyBytes_AS_STRING(asciifmt)); + if (view->format == NULL) { + /* NOT_REACHED: get_native_fmtchar() already validates the format. */ + PyErr_SetString(PyExc_RuntimeError, + "memoryview: internal error"); + goto out; + } view->itemsize = itemsize; view->ndim = 1; From fc341bd4c5da88b31aba2806f4288d19e945ad1d Mon Sep 17 00:00:00 2001 From: Stefan Krah Date: Thu, 29 Jan 2015 14:33:37 +0100 Subject: [PATCH 05/15] Whitespace. --- Lib/test/test_memoryview.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Lib/test/test_memoryview.py b/Lib/test/test_memoryview.py index bd9d0d472c0..69b9d2dbc28 100644 --- a/Lib/test/test_memoryview.py +++ b/Lib/test/test_memoryview.py @@ -367,12 +367,12 @@ def test_issue22668(self): d = memoryview(b) del b - + self.assertEqual(c[0], 256) self.assertEqual(d[0], 256) self.assertEqual(c.format, "H") self.assertEqual(d.format, "H") - + _ = m.cast('I') self.assertEqual(c[0], 256) self.assertEqual(d[0], 256) From 3c0cf05901ea5cca0694734fd4a64b2bc267cb41 Mon Sep 17 00:00:00 2001 From: Stefan Krah Date: Thu, 29 Jan 2015 17:33:31 +0100 Subject: [PATCH 06/15] Issue #22668: Remove endianness assumption in test. --- Lib/test/test_memoryview.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/Lib/test/test_memoryview.py b/Lib/test/test_memoryview.py index bd9d0d472c0..4bc31330b78 100644 --- a/Lib/test/test_memoryview.py +++ b/Lib/test/test_memoryview.py @@ -361,18 +361,20 @@ def test_reversed(self): self.assertEqual(list(reversed(m)), list(m[::-1])) def test_issue22668(self): - m = memoryview(bytes(range(8))) + a = array.array('H', [256, 256, 256, 256]) + x = memoryview(a) + m = x.cast('B') b = m.cast('H') c = b[0:2] d = memoryview(b) del b - + self.assertEqual(c[0], 256) self.assertEqual(d[0], 256) self.assertEqual(c.format, "H") self.assertEqual(d.format, "H") - + _ = m.cast('I') self.assertEqual(c[0], 256) self.assertEqual(d[0], 256) From 978a9afc6af6c137065bdcf7ae4ef5450e5b2ec2 Mon Sep 17 00:00:00 2001 From: Victor Stinner Date: Thu, 29 Jan 2015 17:50:58 +0100 Subject: [PATCH 07/15] Issue #23243, asyncio: Emit a ResourceWarning when an event loop or a transport is not explicitly closed. Close also explicitly transports in test_sslproto. --- Lib/asyncio/base_events.py | 11 +++++++++++ Lib/asyncio/base_subprocess.py | 19 ++++++++++++++++++- Lib/asyncio/futures.py | 6 +++--- Lib/asyncio/proactor_events.py | 11 +++++++++++ Lib/asyncio/selector_events.py | 16 ++++++++++++++++ Lib/asyncio/sslproto.py | 13 +++++++++++++ Lib/asyncio/unix_events.py | 19 +++++++++++++++++++ Lib/asyncio/windows_utils.py | 6 +++++- Lib/test/test_asyncio/test_proactor_events.py | 6 +++++- Lib/test/test_asyncio/test_sslproto.py | 7 +++---- 10 files changed, 104 insertions(+), 10 deletions(-) diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py index e40d3ad5f2b..7108f2516ad 100644 --- a/Lib/asyncio/base_events.py +++ b/Lib/asyncio/base_events.py @@ -26,6 +26,7 @@ import time import traceback import sys +import warnings from . import coroutines from . import events @@ -333,6 +334,16 @@ def is_closed(self): """Returns True if the event loop was closed.""" return self._closed + # On Python 3.3 and older, objects with a destructor part of a reference + # cycle are never destroyed. It's not more the case on Python 3.4 thanks + # to the PEP 442. + if sys.version_info >= (3, 4): + def __del__(self): + if not self.is_closed(): + warnings.warn("unclosed event loop %r" % self, ResourceWarning) + if not self.is_running(): + self.close() + def is_running(self): """Returns True if the event loop is running.""" return (self._owner is not None) diff --git a/Lib/asyncio/base_subprocess.py b/Lib/asyncio/base_subprocess.py index 81c6f1a71d3..651a9a291ee 100644 --- a/Lib/asyncio/base_subprocess.py +++ b/Lib/asyncio/base_subprocess.py @@ -1,5 +1,7 @@ import collections import subprocess +import sys +import warnings from . import protocols from . import transports @@ -13,6 +15,7 @@ def __init__(self, loop, protocol, args, shell, stdin, stdout, stderr, bufsize, extra=None, **kwargs): super().__init__(extra) + self._closed = False self._protocol = protocol self._loop = loop self._pid = None @@ -40,7 +43,10 @@ def __init__(self, loop, protocol, args, shell, program, self._pid) def __repr__(self): - info = [self.__class__.__name__, 'pid=%s' % self._pid] + info = [self.__class__.__name__] + if self._closed: + info.append('closed') + info.append('pid=%s' % self._pid) if self._returncode is not None: info.append('returncode=%s' % self._returncode) @@ -70,6 +76,7 @@ def _make_read_subprocess_pipe_proto(self, fd): raise NotImplementedError def close(self): + self._closed = True for proto in self._pipes.values(): if proto is None: continue @@ -77,6 +84,15 @@ def close(self): if self._returncode is None: self.terminate() + # On Python 3.3 and older, objects with a destructor part of a reference + # cycle are never destroyed. It's not more the case on Python 3.4 thanks + # to the PEP 442. + if sys.version_info >= (3, 4): + def __del__(self): + if not self._closed: + warnings.warn("unclosed transport %r" % self, ResourceWarning) + self.close() + def get_pid(self): return self._pid @@ -104,6 +120,7 @@ def _kill_wait(self): Function called when an exception is raised during the creation of a subprocess. """ + self._closed = True if self._loop.get_debug(): logger.warning('Exception during subprocess creation, ' 'kill the subprocess %r', diff --git a/Lib/asyncio/futures.py b/Lib/asyncio/futures.py index 19212a94b9f..2c741fd4226 100644 --- a/Lib/asyncio/futures.py +++ b/Lib/asyncio/futures.py @@ -195,9 +195,9 @@ def __repr__(self): info = self._repr_info() return '<%s %s>' % (self.__class__.__name__, ' '.join(info)) - # On Python 3.3 or older, objects with a destructor part of a reference - # cycle are never destroyed. It's not more the case on Python 3.4 thanks to - # the PEP 442. + # On Python 3.3 and older, objects with a destructor part of a reference + # cycle are never destroyed. It's not more the case on Python 3.4 thanks + # to the PEP 442. if _PY34: def __del__(self): if not self._log_traceback: diff --git a/Lib/asyncio/proactor_events.py b/Lib/asyncio/proactor_events.py index 0f533a5e590..65de926be8e 100644 --- a/Lib/asyncio/proactor_events.py +++ b/Lib/asyncio/proactor_events.py @@ -7,6 +7,8 @@ __all__ = ['BaseProactorEventLoop'] import socket +import sys +import warnings from . import base_events from . import constants @@ -74,6 +76,15 @@ def close(self): self._read_fut.cancel() self._read_fut = None + # On Python 3.3 and older, objects with a destructor part of a reference + # cycle are never destroyed. It's not more the case on Python 3.4 thanks + # to the PEP 442. + if sys.version_info >= (3, 4): + def __del__(self): + if self._sock is not None: + warnings.warn("unclosed transport %r" % self, ResourceWarning) + self.close() + def _fatal_error(self, exc, message='Fatal error on pipe transport'): if isinstance(exc, (BrokenPipeError, ConnectionResetError)): if self._loop.get_debug(): diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py index 914783266d1..4bd6dc8d1cb 100644 --- a/Lib/asyncio/selector_events.py +++ b/Lib/asyncio/selector_events.py @@ -10,6 +10,8 @@ import errno import functools import socket +import sys +import warnings try: import ssl except ImportError: # pragma: no cover @@ -499,6 +501,11 @@ class _SelectorTransport(transports._FlowControlMixin, _buffer_factory = bytearray # Constructs initial value for self._buffer. + # Attribute used in the destructor: it must be set even if the constructor + # is not called (see _SelectorSslTransport which may start by raising an + # exception) + _sock = None + def __init__(self, loop, sock, protocol, extra=None, server=None): super().__init__(extra, loop) self._extra['socket'] = sock @@ -559,6 +566,15 @@ def close(self): self._conn_lost += 1 self._loop.call_soon(self._call_connection_lost, None) + # On Python 3.3 and older, objects with a destructor part of a reference + # cycle are never destroyed. It's not more the case on Python 3.4 thanks + # to the PEP 442. + if sys.version_info >= (3, 4): + def __del__(self): + if self._sock is not None: + warnings.warn("unclosed transport %r" % self, ResourceWarning) + self._sock.close() + def _fatal_error(self, exc, message='Fatal error on transport'): # Should be called from exception handler only. if isinstance(exc, (BrokenPipeError, diff --git a/Lib/asyncio/sslproto.py b/Lib/asyncio/sslproto.py index fc809b9831d..235855e21e1 100644 --- a/Lib/asyncio/sslproto.py +++ b/Lib/asyncio/sslproto.py @@ -1,4 +1,6 @@ import collections +import sys +import warnings try: import ssl except ImportError: # pragma: no cover @@ -295,6 +297,7 @@ def __init__(self, loop, ssl_protocol, app_protocol): self._loop = loop self._ssl_protocol = ssl_protocol self._app_protocol = app_protocol + self._closed = False def get_extra_info(self, name, default=None): """Get optional transport information.""" @@ -308,8 +311,18 @@ def close(self): protocol's connection_lost() method will (eventually) called with None as its argument. """ + self._closed = True self._ssl_protocol._start_shutdown() + # On Python 3.3 and older, objects with a destructor part of a reference + # cycle are never destroyed. It's not more the case on Python 3.4 thanks + # to the PEP 442. + if sys.version_info >= (3, 4): + def __del__(self): + if not self._closed: + warnings.warn("unclosed transport %r" % self, ResourceWarning) + self.close() + def pause_reading(self): """Pause the receiving end. diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py index 7e1265a091f..b06f1b2330d 100644 --- a/Lib/asyncio/unix_events.py +++ b/Lib/asyncio/unix_events.py @@ -8,6 +8,7 @@ import subprocess import sys import threading +import warnings from . import base_events @@ -353,6 +354,15 @@ def close(self): if not self._closing: self._close(None) + # On Python 3.3 and older, objects with a destructor part of a reference + # cycle are never destroyed. It's not more the case on Python 3.4 thanks + # to the PEP 442. + if sys.version_info >= (3, 4): + def __del__(self): + if self._pipe is not None: + warnings.warn("unclosed transport %r" % self, ResourceWarning) + self._pipe.close() + def _fatal_error(self, exc, message='Fatal error on pipe transport'): # should be called by exception handler only if (isinstance(exc, OSError) and exc.errno == errno.EIO): @@ -529,6 +539,15 @@ def close(self): # write_eof is all what we needed to close the write pipe self.write_eof() + # On Python 3.3 and older, objects with a destructor part of a reference + # cycle are never destroyed. It's not more the case on Python 3.4 thanks + # to the PEP 442. + if sys.version_info >= (3, 4): + def __del__(self): + if self._pipe is not None: + warnings.warn("unclosed transport %r" % self, ResourceWarning) + self._pipe.close() + def abort(self): self._close(None) diff --git a/Lib/asyncio/windows_utils.py b/Lib/asyncio/windows_utils.py index 5f8327eba63..870cd13abe6 100644 --- a/Lib/asyncio/windows_utils.py +++ b/Lib/asyncio/windows_utils.py @@ -14,6 +14,7 @@ import socket import subprocess import tempfile +import warnings __all__ = ['socketpair', 'pipe', 'Popen', 'PIPE', 'PipeHandle'] @@ -156,7 +157,10 @@ def close(self, *, CloseHandle=_winapi.CloseHandle): CloseHandle(self._handle) self._handle = None - __del__ = close + def __del__(self): + if self._handle is not None: + warnings.warn("unclosed %r" % self, ResourceWarning) + self.close() def __enter__(self): return self diff --git a/Lib/test/test_asyncio/test_proactor_events.py b/Lib/test/test_asyncio/test_proactor_events.py index 33a8a671ec1..fcd9ab1e18f 100644 --- a/Lib/test/test_asyncio/test_proactor_events.py +++ b/Lib/test/test_asyncio/test_proactor_events.py @@ -499,8 +499,12 @@ def test_sock_accept(self): self.proactor.accept.assert_called_with(self.sock) def test_socketpair(self): + class EventLoop(BaseProactorEventLoop): + # override the destructor to not log a ResourceWarning + def __del__(self): + pass self.assertRaises( - NotImplementedError, BaseProactorEventLoop, self.proactor) + NotImplementedError, EventLoop, self.proactor) def test_make_socket_transport(self): tr = self.loop._make_socket_transport(self.sock, asyncio.Protocol()) diff --git a/Lib/test/test_asyncio/test_sslproto.py b/Lib/test/test_asyncio/test_sslproto.py index 148e30dffeb..a72967ea071 100644 --- a/Lib/test/test_asyncio/test_sslproto.py +++ b/Lib/test/test_asyncio/test_sslproto.py @@ -22,7 +22,9 @@ def setUp(self): def ssl_protocol(self, waiter=None): sslcontext = test_utils.dummy_ssl_context() app_proto = asyncio.Protocol() - return sslproto.SSLProtocol(self.loop, app_proto, sslcontext, waiter) + proto = sslproto.SSLProtocol(self.loop, app_proto, sslcontext, waiter) + self.addCleanup(proto._app_transport.close) + return proto def connection_made(self, ssl_proto, do_handshake=None): transport = mock.Mock() @@ -56,9 +58,6 @@ def do_handshake(callback): with test_utils.disable_logger(): self.loop.run_until_complete(handshake_fut) - # Close the transport - ssl_proto._app_transport.close() - def test_eof_received_waiter(self): waiter = asyncio.Future(loop=self.loop) ssl_proto = self.ssl_protocol(waiter) From 47cd10d7a903773f574fc93220dbca850067fa0c Mon Sep 17 00:00:00 2001 From: Victor Stinner Date: Fri, 30 Jan 2015 00:05:19 +0100 Subject: [PATCH 08/15] asyncio: sync with Tulip Issue #23347: send_signal(), kill() and terminate() methods of BaseSubprocessTransport now check if the transport was closed and if the process exited. Issue #23347: Refactor creation of subprocess transports. Changes on BaseSubprocessTransport: * Add a wait() method to wait until the child process exit * The constructor now accepts an optional waiter parameter. The _post_init() coroutine must not be called explicitly anymore. It makes subprocess transports closer to other transports, and it gives more freedom if we want later to change completly how subprocess transports are created. * close() now kills the process instead of kindly terminate it: the child process may ignore SIGTERM and continue to run. Call explicitly terminate() and wait() if you want to kindly terminate the child process. * close() now logs a warning in debug mode if the process is still running and needs to be killed * _make_subprocess_transport() is now fully asynchronous again: if the creation of the transport failed, wait asynchronously for the process eixt. Before the wait was synchronous. This change requires close() to *kill*, and not terminate, the child process. * Remove the _kill_wait() method, replaced with a more agressive close() method. It fixes _make_subprocess_transport() on error. BaseSubprocessTransport.close() calls the close() method of pipe transports, whereas _kill_wait() closed directly pipes of the subprocess.Popen object without unregistering file descriptors from the selector (which caused severe bugs). These changes simplifies the code of subprocess.py. --- Lib/asyncio/base_subprocess.py | 108 ++++++++++++++--------- Lib/asyncio/subprocess.py | 40 +-------- Lib/asyncio/unix_events.py | 19 ++-- Lib/asyncio/windows_events.py | 7 +- Lib/test/test_asyncio/test_events.py | 33 +++---- Lib/test/test_asyncio/test_subprocess.py | 65 ++++++++++++++ 6 files changed, 167 insertions(+), 105 deletions(-) diff --git a/Lib/asyncio/base_subprocess.py b/Lib/asyncio/base_subprocess.py index 651a9a291ee..001f9b8c242 100644 --- a/Lib/asyncio/base_subprocess.py +++ b/Lib/asyncio/base_subprocess.py @@ -3,6 +3,7 @@ import sys import warnings +from . import futures from . import protocols from . import transports from .coroutines import coroutine @@ -13,27 +14,32 @@ class BaseSubprocessTransport(transports.SubprocessTransport): def __init__(self, loop, protocol, args, shell, stdin, stdout, stderr, bufsize, - extra=None, **kwargs): + waiter=None, extra=None, **kwargs): super().__init__(extra) self._closed = False self._protocol = protocol self._loop = loop + self._proc = None self._pid = None - + self._returncode = None + self._exit_waiters = [] + self._pending_calls = collections.deque() self._pipes = {} + self._finished = False + if stdin == subprocess.PIPE: self._pipes[0] = None if stdout == subprocess.PIPE: self._pipes[1] = None if stderr == subprocess.PIPE: self._pipes[2] = None - self._pending_calls = collections.deque() - self._finished = False - self._returncode = None + + # Create the child process: set the _proc attribute self._start(args=args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr, bufsize=bufsize, **kwargs) self._pid = self._proc.pid self._extra['subprocess'] = self._proc + if self._loop.get_debug(): if isinstance(args, (bytes, str)): program = args @@ -42,6 +48,8 @@ def __init__(self, loop, protocol, args, shell, logger.debug('process %r created: pid %s', program, self._pid) + self._loop.create_task(self._connect_pipes(waiter)) + def __repr__(self): info = [self.__class__.__name__] if self._closed: @@ -77,12 +85,23 @@ def _make_read_subprocess_pipe_proto(self, fd): def close(self): self._closed = True + for proto in self._pipes.values(): if proto is None: continue proto.pipe.close() - if self._returncode is None: - self.terminate() + + if self._proc is not None and self._returncode is None: + if self._loop.get_debug(): + logger.warning('Close running child process: kill %r', self) + + try: + self._proc.kill() + except ProcessLookupError: + pass + + # Don't clear the _proc reference yet because _post_init() may + # still run # On Python 3.3 and older, objects with a destructor part of a reference # cycle are never destroyed. It's not more the case on Python 3.4 thanks @@ -105,59 +124,42 @@ def get_pipe_transport(self, fd): else: return None + def _check_proc(self): + if self._closed: + raise ValueError("operation on closed transport") + if self._proc is None: + raise ProcessLookupError() + def send_signal(self, signal): + self._check_proc() self._proc.send_signal(signal) def terminate(self): + self._check_proc() self._proc.terminate() def kill(self): + self._check_proc() self._proc.kill() - def _kill_wait(self): - """Close pipes, kill the subprocess and read its return status. - - Function called when an exception is raised during the creation - of a subprocess. - """ - self._closed = True - if self._loop.get_debug(): - logger.warning('Exception during subprocess creation, ' - 'kill the subprocess %r', - self, - exc_info=True) - - proc = self._proc - if proc.stdout: - proc.stdout.close() - if proc.stderr: - proc.stderr.close() - if proc.stdin: - proc.stdin.close() - - try: - proc.kill() - except ProcessLookupError: - pass - self._returncode = proc.wait() - - self.close() - @coroutine - def _post_init(self): + def _connect_pipes(self, waiter): try: proc = self._proc loop = self._loop + if proc.stdin is not None: _, pipe = yield from loop.connect_write_pipe( lambda: WriteSubprocessPipeProto(self, 0), proc.stdin) self._pipes[0] = pipe + if proc.stdout is not None: _, pipe = yield from loop.connect_read_pipe( lambda: ReadSubprocessPipeProto(self, 1), proc.stdout) self._pipes[1] = pipe + if proc.stderr is not None: _, pipe = yield from loop.connect_read_pipe( lambda: ReadSubprocessPipeProto(self, 2), @@ -166,13 +168,16 @@ def _post_init(self): assert self._pending_calls is not None - self._loop.call_soon(self._protocol.connection_made, self) + loop.call_soon(self._protocol.connection_made, self) for callback, data in self._pending_calls: - self._loop.call_soon(callback, *data) + loop.call_soon(callback, *data) self._pending_calls = None - except: - self._kill_wait() - raise + except Exception as exc: + if waiter is not None and not waiter.cancelled(): + waiter.set_exception(exc) + else: + if waiter is not None and not waiter.cancelled(): + waiter.set_result(None) def _call(self, cb, *data): if self._pending_calls is not None: @@ -197,6 +202,23 @@ def _process_exited(self, returncode): self._call(self._protocol.process_exited) self._try_finish() + # wake up futures waiting for wait() + for waiter in self._exit_waiters: + if not waiter.cancelled(): + waiter.set_result(returncode) + self._exit_waiters = None + + def wait(self): + """Wait until the process exit and return the process return code. + + This method is a coroutine.""" + if self._returncode is not None: + return self._returncode + + waiter = futures.Future(loop=self._loop) + self._exit_waiters.append(waiter) + return (yield from waiter) + def _try_finish(self): assert not self._finished if self._returncode is None: @@ -210,9 +232,9 @@ def _call_connection_lost(self, exc): try: self._protocol.connection_lost(exc) finally: + self._loop = None self._proc = None self._protocol = None - self._loop = None class WriteSubprocessPipeProto(protocols.BaseProtocol): diff --git a/Lib/asyncio/subprocess.py b/Lib/asyncio/subprocess.py index c848a21a8f2..d0c9779c1c9 100644 --- a/Lib/asyncio/subprocess.py +++ b/Lib/asyncio/subprocess.py @@ -25,8 +25,6 @@ def __init__(self, limit, loop): super().__init__(loop=loop) self._limit = limit self.stdin = self.stdout = self.stderr = None - self.waiter = futures.Future(loop=loop) - self._waiters = collections.deque() self._transport = None def __repr__(self): @@ -61,9 +59,6 @@ def connection_made(self, transport): reader=None, loop=self._loop) - if not self.waiter.cancelled(): - self.waiter.set_result(None) - def pipe_data_received(self, fd, data): if fd == 1: reader = self.stdout @@ -94,16 +89,9 @@ def pipe_connection_lost(self, fd, exc): reader.set_exception(exc) def process_exited(self): - returncode = self._transport.get_returncode() self._transport.close() self._transport = None - # wake up futures waiting for wait() - while self._waiters: - waiter = self._waiters.popleft() - if not waiter.cancelled(): - waiter.set_result(returncode) - class Process: def __init__(self, transport, protocol, loop): @@ -124,30 +112,18 @@ def returncode(self): @coroutine def wait(self): - """Wait until the process exit and return the process return code.""" - returncode = self._transport.get_returncode() - if returncode is not None: - return returncode + """Wait until the process exit and return the process return code. - waiter = futures.Future(loop=self._loop) - self._protocol._waiters.append(waiter) - yield from waiter - return waiter.result() - - def _check_alive(self): - if self._transport.get_returncode() is not None: - raise ProcessLookupError() + This method is a coroutine.""" + return (yield from self._transport.wait()) def send_signal(self, signal): - self._check_alive() self._transport.send_signal(signal) def terminate(self): - self._check_alive() self._transport.terminate() def kill(self): - self._check_alive() self._transport.kill() @coroutine @@ -221,11 +197,6 @@ def create_subprocess_shell(cmd, stdin=None, stdout=None, stderr=None, protocol_factory, cmd, stdin=stdin, stdout=stdout, stderr=stderr, **kwds) - try: - yield from protocol.waiter - except: - transport._kill_wait() - raise return Process(transport, protocol, loop) @coroutine @@ -241,9 +212,4 @@ def create_subprocess_exec(program, *args, stdin=None, stdout=None, program, *args, stdin=stdin, stdout=stdout, stderr=stderr, **kwds) - try: - yield from protocol.waiter - except: - transport._kill_wait() - raise return Process(transport, protocol, loop) diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py index b06f1b2330d..3ecdfd2e0b1 100644 --- a/Lib/asyncio/unix_events.py +++ b/Lib/asyncio/unix_events.py @@ -16,6 +16,7 @@ from . import constants from . import coroutines from . import events +from . import futures from . import selector_events from . import selectors from . import transports @@ -175,16 +176,20 @@ def _make_subprocess_transport(self, protocol, args, shell, stdin, stdout, stderr, bufsize, extra=None, **kwargs): with events.get_child_watcher() as watcher: + waiter = futures.Future(loop=self) transp = _UnixSubprocessTransport(self, protocol, args, shell, stdin, stdout, stderr, bufsize, - extra=extra, **kwargs) - try: - yield from transp._post_init() - except: - transp.close() - raise + waiter=waiter, extra=extra, + **kwargs) + watcher.add_child_handler(transp.get_pid(), self._child_watcher_callback, transp) + try: + yield from waiter + except: + transp.close() + yield from transp.wait() + raise return transp @@ -774,7 +779,7 @@ def __exit__(self, a, b, c): pass def add_child_handler(self, pid, callback, *args): - self._callbacks[pid] = callback, args + self._callbacks[pid] = (callback, args) # Prevent a race condition in case the child is already terminated. self._do_waitpid(pid) diff --git a/Lib/asyncio/windows_events.py b/Lib/asyncio/windows_events.py index 94aafb6f5ab..437eb0ac9dd 100644 --- a/Lib/asyncio/windows_events.py +++ b/Lib/asyncio/windows_events.py @@ -366,13 +366,16 @@ def loop_accept_pipe(f=None): def _make_subprocess_transport(self, protocol, args, shell, stdin, stdout, stderr, bufsize, extra=None, **kwargs): + waiter = futures.Future(loop=self) transp = _WindowsSubprocessTransport(self, protocol, args, shell, stdin, stdout, stderr, bufsize, - extra=extra, **kwargs) + waiter=waiter, extra=extra, + **kwargs) try: - yield from transp._post_init() + yield from waiter except: transp.close() + yield from transp.wait() raise return transp diff --git a/Lib/test/test_asyncio/test_events.py b/Lib/test/test_asyncio/test_events.py index 12af62b2093..4b957d8f636 100644 --- a/Lib/test/test_asyncio/test_events.py +++ b/Lib/test/test_asyncio/test_events.py @@ -1551,9 +1551,10 @@ def test_subprocess_exec(self): stdin = transp.get_pipe_transport(0) stdin.write(b'Python The Winner') self.loop.run_until_complete(proto.got_data[1].wait()) - transp.close() + with test_utils.disable_logger(): + transp.close() self.loop.run_until_complete(proto.completed) - self.check_terminated(proto.returncode) + self.check_killed(proto.returncode) self.assertEqual(b'Python The Winner', proto.data[1]) def test_subprocess_interactive(self): @@ -1567,21 +1568,20 @@ def test_subprocess_interactive(self): self.loop.run_until_complete(proto.connected) self.assertEqual('CONNECTED', proto.state) - try: - stdin = transp.get_pipe_transport(0) - stdin.write(b'Python ') - self.loop.run_until_complete(proto.got_data[1].wait()) - proto.got_data[1].clear() - self.assertEqual(b'Python ', proto.data[1]) + stdin = transp.get_pipe_transport(0) + stdin.write(b'Python ') + self.loop.run_until_complete(proto.got_data[1].wait()) + proto.got_data[1].clear() + self.assertEqual(b'Python ', proto.data[1]) - stdin.write(b'The Winner') - self.loop.run_until_complete(proto.got_data[1].wait()) - self.assertEqual(b'Python The Winner', proto.data[1]) - finally: + stdin.write(b'The Winner') + self.loop.run_until_complete(proto.got_data[1].wait()) + self.assertEqual(b'Python The Winner', proto.data[1]) + + with test_utils.disable_logger(): transp.close() - self.loop.run_until_complete(proto.completed) - self.check_terminated(proto.returncode) + self.check_killed(proto.returncode) def test_subprocess_shell(self): connect = self.loop.subprocess_shell( @@ -1739,9 +1739,10 @@ def test_subprocess_close_client_stream(self): # GetLastError()==ERROR_INVALID_NAME on Windows!?! (Using # WriteFile() we get ERROR_BROKEN_PIPE as expected.) self.assertEqual(b'ERR:OSError', proto.data[2]) - transp.close() + with test_utils.disable_logger(): + transp.close() self.loop.run_until_complete(proto.completed) - self.check_terminated(proto.returncode) + self.check_killed(proto.returncode) def test_subprocess_wait_no_same_group(self): # start the new process in a new session diff --git a/Lib/test/test_asyncio/test_subprocess.py b/Lib/test/test_asyncio/test_subprocess.py index ecc2c9d8a85..4f197f394a1 100644 --- a/Lib/test/test_asyncio/test_subprocess.py +++ b/Lib/test/test_asyncio/test_subprocess.py @@ -4,6 +4,7 @@ from unittest import mock import asyncio +from asyncio import base_subprocess from asyncio import subprocess from asyncio import test_utils try: @@ -23,6 +24,70 @@ 'data = sys.stdin.buffer.read()', 'sys.stdout.buffer.write(data)'))] +class TestSubprocessTransport(base_subprocess.BaseSubprocessTransport): + def _start(self, *args, **kwargs): + self._proc = mock.Mock() + self._proc.stdin = None + self._proc.stdout = None + self._proc.stderr = None + + +class SubprocessTransportTests(test_utils.TestCase): + def setUp(self): + self.loop = self.new_test_loop() + self.set_event_loop(self.loop) + + + def create_transport(self, waiter=None): + protocol = mock.Mock() + protocol.connection_made._is_coroutine = False + protocol.process_exited._is_coroutine = False + transport = TestSubprocessTransport( + self.loop, protocol, ['test'], False, + None, None, None, 0, waiter=waiter) + return (transport, protocol) + + def test_close(self): + waiter = asyncio.Future(loop=self.loop) + transport, protocol = self.create_transport(waiter) + transport._process_exited(0) + transport.close() + + # The loop didn't run yet + self.assertFalse(protocol.connection_made.called) + + # methods must raise ProcessLookupError if the transport was closed + self.assertRaises(ValueError, transport.send_signal, signal.SIGTERM) + self.assertRaises(ValueError, transport.terminate) + self.assertRaises(ValueError, transport.kill) + + self.loop.run_until_complete(waiter) + + def test_proc_exited(self): + waiter = asyncio.Future(loop=self.loop) + transport, protocol = self.create_transport(waiter) + transport._process_exited(6) + self.loop.run_until_complete(waiter) + + self.assertEqual(transport.get_returncode(), 6) + + self.assertTrue(protocol.connection_made.called) + self.assertTrue(protocol.process_exited.called) + self.assertTrue(protocol.connection_lost.called) + self.assertEqual(protocol.connection_lost.call_args[0], (None,)) + + self.assertFalse(transport._closed) + self.assertIsNone(transport._loop) + self.assertIsNone(transport._proc) + self.assertIsNone(transport._protocol) + + # methods must raise ProcessLookupError if the process exited + self.assertRaises(ProcessLookupError, + transport.send_signal, signal.SIGTERM) + self.assertRaises(ProcessLookupError, transport.terminate) + self.assertRaises(ProcessLookupError, transport.kill) + + class SubprocessMixin: def test_stdin_stdout(self): From 0698638d797ca864f069d828dd2ea6d55b87a04e Mon Sep 17 00:00:00 2001 From: Victor Stinner Date: Fri, 30 Jan 2015 00:11:42 +0100 Subject: [PATCH 09/15] asyncio: Fix ResourceWarning in test_subprocess.test_proc_exit() --- Lib/test/test_asyncio/test_subprocess.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/Lib/test/test_asyncio/test_subprocess.py b/Lib/test/test_asyncio/test_subprocess.py index 4f197f394a1..d4b71b7a5a2 100644 --- a/Lib/test/test_asyncio/test_subprocess.py +++ b/Lib/test/test_asyncio/test_subprocess.py @@ -87,6 +87,8 @@ def test_proc_exited(self): self.assertRaises(ProcessLookupError, transport.terminate) self.assertRaises(ProcessLookupError, transport.kill) + transport.close() + class SubprocessMixin: From 1241ecc21bbb3f86734a3071b93514a12dd1ee23 Mon Sep 17 00:00:00 2001 From: Victor Stinner Date: Fri, 30 Jan 2015 00:16:14 +0100 Subject: [PATCH 10/15] Issue #23347, asyncio: Make BaseSubprocessTransport.wait() private --- Lib/asyncio/base_subprocess.py | 2 +- Lib/asyncio/subprocess.py | 2 +- Lib/asyncio/unix_events.py | 2 +- Lib/asyncio/windows_events.py | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/Lib/asyncio/base_subprocess.py b/Lib/asyncio/base_subprocess.py index 001f9b8c242..70676ab3ff5 100644 --- a/Lib/asyncio/base_subprocess.py +++ b/Lib/asyncio/base_subprocess.py @@ -208,7 +208,7 @@ def _process_exited(self, returncode): waiter.set_result(returncode) self._exit_waiters = None - def wait(self): + def _wait(self): """Wait until the process exit and return the process return code. This method is a coroutine.""" diff --git a/Lib/asyncio/subprocess.py b/Lib/asyncio/subprocess.py index d0c9779c1c9..4600a9f417d 100644 --- a/Lib/asyncio/subprocess.py +++ b/Lib/asyncio/subprocess.py @@ -115,7 +115,7 @@ def wait(self): """Wait until the process exit and return the process return code. This method is a coroutine.""" - return (yield from self._transport.wait()) + return (yield from self._transport._wait()) def send_signal(self, signal): self._transport.send_signal(signal) diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py index 3ecdfd2e0b1..1fc39abe09c 100644 --- a/Lib/asyncio/unix_events.py +++ b/Lib/asyncio/unix_events.py @@ -188,7 +188,7 @@ def _make_subprocess_transport(self, protocol, args, shell, yield from waiter except: transp.close() - yield from transp.wait() + yield from transp._wait() raise return transp diff --git a/Lib/asyncio/windows_events.py b/Lib/asyncio/windows_events.py index 437eb0ac9dd..c4bffc47346 100644 --- a/Lib/asyncio/windows_events.py +++ b/Lib/asyncio/windows_events.py @@ -375,7 +375,7 @@ def _make_subprocess_transport(self, protocol, args, shell, yield from waiter except: transp.close() - yield from transp.wait() + yield from transp._wait() raise return transp From 7a55b88d9cf55539d28e2aac6ced20c780984158 Mon Sep 17 00:00:00 2001 From: Victor Stinner Date: Fri, 30 Jan 2015 00:37:04 +0100 Subject: [PATCH 11/15] Issue #21962, asyncio doc: Suggest the usage of wait_for() to replace the lack of timeout parameter for locks and queues. --- Doc/library/asyncio-sync.rst | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/Doc/library/asyncio-sync.rst b/Doc/library/asyncio-sync.rst index c63447bbab4..4cc9a9645c4 100644 --- a/Doc/library/asyncio-sync.rst +++ b/Doc/library/asyncio-sync.rst @@ -4,6 +4,29 @@ Synchronization primitives ========================== +Locks: + +* :class:`Lock` +* :class:`Event` +* :class:`Condition` +* :class:`Semaphore` +* :class:`BoundedSemaphore` + +Queues: + +* :class:`Queue` +* :class:`PriorityQueue` +* :class:`LifoQueue` +* :class:`JoinableQueue` + +asyncio locks and queues API were designed to be close to classes of the +:mod:`threading` module (:class:`~threading.Lock`, :class:`~threading.Event`, +:class:`~threading.Condition`, :class:`~threading.Semaphore`, +:class:`~threading.BoundedSemaphore`) and the :mod:`queue` module +(:class:`~queue.Queue`, :class:`~queue.PriorityQueue`, +:class:`~queue.LifoQueue`), but they have no *timeout* parameter. The +:func:`asyncio.wait_for` function can be used to cancel a task after a timeout. + Locks ----- From 1077dee4575ccc43c10515de50d7c100d6ce9455 Mon Sep 17 00:00:00 2001 From: Victor Stinner Date: Fri, 30 Jan 2015 00:55:58 +0100 Subject: [PATCH 12/15] asyncio doc: add a section about task cancellation --- Doc/library/asyncio-dev.rst | 37 +++++++++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/Doc/library/asyncio-dev.rst b/Doc/library/asyncio-dev.rst index 7083e60061c..72a06f538e8 100644 --- a/Doc/library/asyncio-dev.rst +++ b/Doc/library/asyncio-dev.rst @@ -40,6 +40,43 @@ Examples of effects of the debug mode: `. +Cancellation +------------ + +Cancellation of tasks is not common in classic programming. In asynchronous +programming, not only it is something common, but you have to prepare your +code to handle it. + +Futures and tasks can be cancelled explicitly with their :meth:`Future.cancel` +method. The :func:`wait_for` function cancels the waited task when the timeout +occurs. There are many other cases where a task can be cancelled indirectly. + +Don't call :meth:`~Future.set_result` or :meth:`~Future.set_exception` method +of :class:`Future` if the future is cancelled: it would fail with an exception. +For example, write:: + + if not fut.cancelled(): + fut.set_result('done') + +Don't schedule directly a call to the :meth:`~Future.set_result` or the +:meth:`~Future.set_exception` method of a future with +:meth:`BaseEventLoop.call_soon`: the future can be cancelled before its method +is called. + +If you wait for a future, you should check early if the future was cancelled to +avoid useless operations. Example:: + + @coroutine + def slow_operation(fut): + if fut.cancelled(): + return + # ... slow computation ... + yield from fut + # ... + +The :func:`shield` function can also be used to ignore cancellation. + + .. _asyncio-multithreading: Concurrency and multithreading From f2e43cbbd4b9325da638a6b45b52e766ca91b131 Mon Sep 17 00:00:00 2001 From: Victor Stinner Date: Fri, 30 Jan 2015 01:20:44 +0100 Subject: [PATCH 13/15] Issue #23347, asyncio: send_signal(), terminate(), kill() don't check if the transport was closed. The check broken a Tulip example and this limitation is arbitrary. Check if _proc is None should be enough. Enhance also close(): do nothing when called the second time. --- Lib/asyncio/base_subprocess.py | 7 +++---- Lib/test/test_asyncio/test_subprocess.py | 16 ---------------- 2 files changed, 3 insertions(+), 20 deletions(-) diff --git a/Lib/asyncio/base_subprocess.py b/Lib/asyncio/base_subprocess.py index 70676ab3ff5..02b9e89f709 100644 --- a/Lib/asyncio/base_subprocess.py +++ b/Lib/asyncio/base_subprocess.py @@ -84,6 +84,8 @@ def _make_read_subprocess_pipe_proto(self, fd): raise NotImplementedError def close(self): + if self._closed: + return self._closed = True for proto in self._pipes.values(): @@ -100,8 +102,7 @@ def close(self): except ProcessLookupError: pass - # Don't clear the _proc reference yet because _post_init() may - # still run + # Don't clear the _proc reference yet: _post_init() may still run # On Python 3.3 and older, objects with a destructor part of a reference # cycle are never destroyed. It's not more the case on Python 3.4 thanks @@ -125,8 +126,6 @@ def get_pipe_transport(self, fd): return None def _check_proc(self): - if self._closed: - raise ValueError("operation on closed transport") if self._proc is None: raise ProcessLookupError() diff --git a/Lib/test/test_asyncio/test_subprocess.py b/Lib/test/test_asyncio/test_subprocess.py index d4b71b7a5a2..b467b04f535 100644 --- a/Lib/test/test_asyncio/test_subprocess.py +++ b/Lib/test/test_asyncio/test_subprocess.py @@ -47,22 +47,6 @@ def create_transport(self, waiter=None): None, None, None, 0, waiter=waiter) return (transport, protocol) - def test_close(self): - waiter = asyncio.Future(loop=self.loop) - transport, protocol = self.create_transport(waiter) - transport._process_exited(0) - transport.close() - - # The loop didn't run yet - self.assertFalse(protocol.connection_made.called) - - # methods must raise ProcessLookupError if the transport was closed - self.assertRaises(ValueError, transport.send_signal, signal.SIGTERM) - self.assertRaises(ValueError, transport.terminate) - self.assertRaises(ValueError, transport.kill) - - self.loop.run_until_complete(waiter) - def test_proc_exited(self): waiter = asyncio.Future(loop=self.loop) transport, protocol = self.create_transport(waiter) From 188f2c0b75c69cfbe082f1eca2ed8f8a04fa1718 Mon Sep 17 00:00:00 2001 From: Victor Stinner Date: Fri, 30 Jan 2015 01:35:14 +0100 Subject: [PATCH 14/15] asyncio doc: document the new ResourceWarning warnings --- Doc/library/asyncio-dev.rst | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/Doc/library/asyncio-dev.rst b/Doc/library/asyncio-dev.rst index 72a06f538e8..ce1275b4e02 100644 --- a/Doc/library/asyncio-dev.rst +++ b/Doc/library/asyncio-dev.rst @@ -372,3 +372,14 @@ traceback where the task was created. Example of log in debug mode:: :ref:`Detect coroutine objects never scheduled `. + +Close transports +---------------- + +When a transport is no more needed, call its ``close()`` method to release +resources. + +If a transport (or an event loop) is not closed explicitly, a +:exc:`ResourceWarning` warning will be emitted in its destructor. The +:exc:`ResourceWarning` warnings are hidden by default: use the ``-Wd`` command +line option of Python to show them. From 756f0b19823dbd44fa1cc32089963971b7169cc4 Mon Sep 17 00:00:00 2001 From: R David Murray Date: Thu, 29 Jan 2015 19:53:33 -0500 Subject: [PATCH 15/15] Fix asyncio doc typo. --- Doc/library/asyncio-eventloop.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Doc/library/asyncio-eventloop.rst b/Doc/library/asyncio-eventloop.rst index 12e60c432b6..4f7fdfe60f2 100644 --- a/Doc/library/asyncio-eventloop.rst +++ b/Doc/library/asyncio-eventloop.rst @@ -641,7 +641,7 @@ Server Server listening on sockets. Object created by the :meth:`BaseEventLoop.create_server` method and the - :func:`start_server` function. Don't instanciate the class directly. + :func:`start_server` function. Don't instantiate the class directly. .. method:: close()