diff --git a/Doc/library/asyncio-dev.rst b/Doc/library/asyncio-dev.rst index 7083e60061c..ce1275b4e02 100644 --- a/Doc/library/asyncio-dev.rst +++ b/Doc/library/asyncio-dev.rst @@ -40,6 +40,43 @@ Examples of effects of the debug mode: `. +Cancellation +------------ + +Cancellation of tasks is not common in classic programming. In asynchronous +programming, not only it is something common, but you have to prepare your +code to handle it. + +Futures and tasks can be cancelled explicitly with their :meth:`Future.cancel` +method. The :func:`wait_for` function cancels the waited task when the timeout +occurs. There are many other cases where a task can be cancelled indirectly. + +Don't call :meth:`~Future.set_result` or :meth:`~Future.set_exception` method +of :class:`Future` if the future is cancelled: it would fail with an exception. +For example, write:: + + if not fut.cancelled(): + fut.set_result('done') + +Don't schedule directly a call to the :meth:`~Future.set_result` or the +:meth:`~Future.set_exception` method of a future with +:meth:`BaseEventLoop.call_soon`: the future can be cancelled before its method +is called. + +If you wait for a future, you should check early if the future was cancelled to +avoid useless operations. Example:: + + @coroutine + def slow_operation(fut): + if fut.cancelled(): + return + # ... slow computation ... + yield from fut + # ... + +The :func:`shield` function can also be used to ignore cancellation. + + .. _asyncio-multithreading: Concurrency and multithreading @@ -335,3 +372,14 @@ traceback where the task was created. Example of log in debug mode:: :ref:`Detect coroutine objects never scheduled `. + +Close transports +---------------- + +When a transport is no more needed, call its ``close()`` method to release +resources. + +If a transport (or an event loop) is not closed explicitly, a +:exc:`ResourceWarning` warning will be emitted in its destructor. The +:exc:`ResourceWarning` warnings are hidden by default: use the ``-Wd`` command +line option of Python to show them. diff --git a/Doc/library/asyncio-eventloop.rst b/Doc/library/asyncio-eventloop.rst index 12e60c432b6..4f7fdfe60f2 100644 --- a/Doc/library/asyncio-eventloop.rst +++ b/Doc/library/asyncio-eventloop.rst @@ -641,7 +641,7 @@ Server Server listening on sockets. Object created by the :meth:`BaseEventLoop.create_server` method and the - :func:`start_server` function. Don't instanciate the class directly. + :func:`start_server` function. Don't instantiate the class directly. .. method:: close() diff --git a/Doc/library/asyncio-protocol.rst b/Doc/library/asyncio-protocol.rst index 60776d1e9e3..b6fcc4840b5 100644 --- a/Doc/library/asyncio-protocol.rst +++ b/Doc/library/asyncio-protocol.rst @@ -374,6 +374,14 @@ The following callbacks are called on :class:`Protocol` instances: a connection. However, :meth:`eof_received` is called at most once and, if called, :meth:`data_received` won't be called after it. +State machine: + + start -> :meth:`~BaseProtocol.connection_made` + [-> :meth:`~Protocol.data_received` \*] + [-> :meth:`~Protocol.eof_received` ?] + -> :meth:`~BaseProtocol.connection_lost` -> end + + Datagram protocols ------------------ diff --git a/Doc/library/asyncio-sync.rst b/Doc/library/asyncio-sync.rst index c63447bbab4..4cc9a9645c4 100644 --- a/Doc/library/asyncio-sync.rst +++ b/Doc/library/asyncio-sync.rst @@ -4,6 +4,29 @@ Synchronization primitives ========================== +Locks: + +* :class:`Lock` +* :class:`Event` +* :class:`Condition` +* :class:`Semaphore` +* :class:`BoundedSemaphore` + +Queues: + +* :class:`Queue` +* :class:`PriorityQueue` +* :class:`LifoQueue` +* :class:`JoinableQueue` + +asyncio locks and queues API were designed to be close to classes of the +:mod:`threading` module (:class:`~threading.Lock`, :class:`~threading.Event`, +:class:`~threading.Condition`, :class:`~threading.Semaphore`, +:class:`~threading.BoundedSemaphore`) and the :mod:`queue` module +(:class:`~queue.Queue`, :class:`~queue.PriorityQueue`, +:class:`~queue.LifoQueue`), but they have no *timeout* parameter. The +:func:`asyncio.wait_for` function can be used to cancel a task after a timeout. + Locks ----- diff --git a/Doc/whatsnew/3.5.rst b/Doc/whatsnew/3.5.rst index 98e421d9df3..c309aa80c7c 100644 --- a/Doc/whatsnew/3.5.rst +++ b/Doc/whatsnew/3.5.rst @@ -485,6 +485,12 @@ Changes in the Python API Changes in the C API -------------------- +* The undocumented :c:member:`~PyMemoryViewObject.format` member of the + (non-public) :c:type:`PyMemoryViewObject` structure has been removed. + + All extensions relying on the relevant parts in ``memoryobject.h`` + must be rebuilt. + * The :c:type:`PyMemAllocator` structure was renamed to :c:type:`PyMemAllocatorEx` and a new ``calloc`` field was added. diff --git a/Include/memoryobject.h b/Include/memoryobject.h index c2e11944467..ab5ee0956c6 100644 --- a/Include/memoryobject.h +++ b/Include/memoryobject.h @@ -45,9 +45,6 @@ typedef struct { } _PyManagedBufferObject; -/* static storage used for casting between formats */ -#define _Py_MEMORYVIEW_MAX_FORMAT 3 /* must be >= 3 */ - /* memoryview state flags */ #define _Py_MEMORYVIEW_RELEASED 0x001 /* access to master buffer blocked */ #define _Py_MEMORYVIEW_C 0x002 /* C-contiguous layout */ @@ -62,7 +59,6 @@ typedef struct { int flags; /* state flags */ Py_ssize_t exports; /* number of buffer re-exports */ Py_buffer view; /* private copy of the exporter's view */ - char format[_Py_MEMORYVIEW_MAX_FORMAT]; /* used for casting */ PyObject *weakreflist; Py_ssize_t ob_array[1]; /* shape, strides, suboffsets */ } PyMemoryViewObject; diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py index e40d3ad5f2b..7108f2516ad 100644 --- a/Lib/asyncio/base_events.py +++ b/Lib/asyncio/base_events.py @@ -26,6 +26,7 @@ import time import traceback import sys +import warnings from . import coroutines from . import events @@ -333,6 +334,16 @@ def is_closed(self): """Returns True if the event loop was closed.""" return self._closed + # On Python 3.3 and older, objects with a destructor part of a reference + # cycle are never destroyed. It's not more the case on Python 3.4 thanks + # to the PEP 442. + if sys.version_info >= (3, 4): + def __del__(self): + if not self.is_closed(): + warnings.warn("unclosed event loop %r" % self, ResourceWarning) + if not self.is_running(): + self.close() + def is_running(self): """Returns True if the event loop is running.""" return (self._owner is not None) diff --git a/Lib/asyncio/base_subprocess.py b/Lib/asyncio/base_subprocess.py index 81c6f1a71d3..02b9e89f709 100644 --- a/Lib/asyncio/base_subprocess.py +++ b/Lib/asyncio/base_subprocess.py @@ -1,6 +1,9 @@ import collections import subprocess +import sys +import warnings +from . import futures from . import protocols from . import transports from .coroutines import coroutine @@ -11,26 +14,32 @@ class BaseSubprocessTransport(transports.SubprocessTransport): def __init__(self, loop, protocol, args, shell, stdin, stdout, stderr, bufsize, - extra=None, **kwargs): + waiter=None, extra=None, **kwargs): super().__init__(extra) + self._closed = False self._protocol = protocol self._loop = loop + self._proc = None self._pid = None - + self._returncode = None + self._exit_waiters = [] + self._pending_calls = collections.deque() self._pipes = {} + self._finished = False + if stdin == subprocess.PIPE: self._pipes[0] = None if stdout == subprocess.PIPE: self._pipes[1] = None if stderr == subprocess.PIPE: self._pipes[2] = None - self._pending_calls = collections.deque() - self._finished = False - self._returncode = None + + # Create the child process: set the _proc attribute self._start(args=args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr, bufsize=bufsize, **kwargs) self._pid = self._proc.pid self._extra['subprocess'] = self._proc + if self._loop.get_debug(): if isinstance(args, (bytes, str)): program = args @@ -39,8 +48,13 @@ def __init__(self, loop, protocol, args, shell, logger.debug('process %r created: pid %s', program, self._pid) + self._loop.create_task(self._connect_pipes(waiter)) + def __repr__(self): - info = [self.__class__.__name__, 'pid=%s' % self._pid] + info = [self.__class__.__name__] + if self._closed: + info.append('closed') + info.append('pid=%s' % self._pid) if self._returncode is not None: info.append('returncode=%s' % self._returncode) @@ -70,12 +84,34 @@ def _make_read_subprocess_pipe_proto(self, fd): raise NotImplementedError def close(self): + if self._closed: + return + self._closed = True + for proto in self._pipes.values(): if proto is None: continue proto.pipe.close() - if self._returncode is None: - self.terminate() + + if self._proc is not None and self._returncode is None: + if self._loop.get_debug(): + logger.warning('Close running child process: kill %r', self) + + try: + self._proc.kill() + except ProcessLookupError: + pass + + # Don't clear the _proc reference yet: _post_init() may still run + + # On Python 3.3 and older, objects with a destructor part of a reference + # cycle are never destroyed. It's not more the case on Python 3.4 thanks + # to the PEP 442. + if sys.version_info >= (3, 4): + def __del__(self): + if not self._closed: + warnings.warn("unclosed transport %r" % self, ResourceWarning) + self.close() def get_pid(self): return self._pid @@ -89,58 +125,40 @@ def get_pipe_transport(self, fd): else: return None + def _check_proc(self): + if self._proc is None: + raise ProcessLookupError() + def send_signal(self, signal): + self._check_proc() self._proc.send_signal(signal) def terminate(self): + self._check_proc() self._proc.terminate() def kill(self): + self._check_proc() self._proc.kill() - def _kill_wait(self): - """Close pipes, kill the subprocess and read its return status. - - Function called when an exception is raised during the creation - of a subprocess. - """ - if self._loop.get_debug(): - logger.warning('Exception during subprocess creation, ' - 'kill the subprocess %r', - self, - exc_info=True) - - proc = self._proc - if proc.stdout: - proc.stdout.close() - if proc.stderr: - proc.stderr.close() - if proc.stdin: - proc.stdin.close() - - try: - proc.kill() - except ProcessLookupError: - pass - self._returncode = proc.wait() - - self.close() - @coroutine - def _post_init(self): + def _connect_pipes(self, waiter): try: proc = self._proc loop = self._loop + if proc.stdin is not None: _, pipe = yield from loop.connect_write_pipe( lambda: WriteSubprocessPipeProto(self, 0), proc.stdin) self._pipes[0] = pipe + if proc.stdout is not None: _, pipe = yield from loop.connect_read_pipe( lambda: ReadSubprocessPipeProto(self, 1), proc.stdout) self._pipes[1] = pipe + if proc.stderr is not None: _, pipe = yield from loop.connect_read_pipe( lambda: ReadSubprocessPipeProto(self, 2), @@ -149,13 +167,16 @@ def _post_init(self): assert self._pending_calls is not None - self._loop.call_soon(self._protocol.connection_made, self) + loop.call_soon(self._protocol.connection_made, self) for callback, data in self._pending_calls: - self._loop.call_soon(callback, *data) + loop.call_soon(callback, *data) self._pending_calls = None - except: - self._kill_wait() - raise + except Exception as exc: + if waiter is not None and not waiter.cancelled(): + waiter.set_exception(exc) + else: + if waiter is not None and not waiter.cancelled(): + waiter.set_result(None) def _call(self, cb, *data): if self._pending_calls is not None: @@ -180,6 +201,23 @@ def _process_exited(self, returncode): self._call(self._protocol.process_exited) self._try_finish() + # wake up futures waiting for wait() + for waiter in self._exit_waiters: + if not waiter.cancelled(): + waiter.set_result(returncode) + self._exit_waiters = None + + def _wait(self): + """Wait until the process exit and return the process return code. + + This method is a coroutine.""" + if self._returncode is not None: + return self._returncode + + waiter = futures.Future(loop=self._loop) + self._exit_waiters.append(waiter) + return (yield from waiter) + def _try_finish(self): assert not self._finished if self._returncode is None: @@ -193,9 +231,9 @@ def _call_connection_lost(self, exc): try: self._protocol.connection_lost(exc) finally: + self._loop = None self._proc = None self._protocol = None - self._loop = None class WriteSubprocessPipeProto(protocols.BaseProtocol): diff --git a/Lib/asyncio/futures.py b/Lib/asyncio/futures.py index 19212a94b9f..2c741fd4226 100644 --- a/Lib/asyncio/futures.py +++ b/Lib/asyncio/futures.py @@ -195,9 +195,9 @@ def __repr__(self): info = self._repr_info() return '<%s %s>' % (self.__class__.__name__, ' '.join(info)) - # On Python 3.3 or older, objects with a destructor part of a reference - # cycle are never destroyed. It's not more the case on Python 3.4 thanks to - # the PEP 442. + # On Python 3.3 and older, objects with a destructor part of a reference + # cycle are never destroyed. It's not more the case on Python 3.4 thanks + # to the PEP 442. if _PY34: def __del__(self): if not self._log_traceback: diff --git a/Lib/asyncio/proactor_events.py b/Lib/asyncio/proactor_events.py index 0f533a5e590..65de926be8e 100644 --- a/Lib/asyncio/proactor_events.py +++ b/Lib/asyncio/proactor_events.py @@ -7,6 +7,8 @@ __all__ = ['BaseProactorEventLoop'] import socket +import sys +import warnings from . import base_events from . import constants @@ -74,6 +76,15 @@ def close(self): self._read_fut.cancel() self._read_fut = None + # On Python 3.3 and older, objects with a destructor part of a reference + # cycle are never destroyed. It's not more the case on Python 3.4 thanks + # to the PEP 442. + if sys.version_info >= (3, 4): + def __del__(self): + if self._sock is not None: + warnings.warn("unclosed transport %r" % self, ResourceWarning) + self.close() + def _fatal_error(self, exc, message='Fatal error on pipe transport'): if isinstance(exc, (BrokenPipeError, ConnectionResetError)): if self._loop.get_debug(): diff --git a/Lib/asyncio/protocols.py b/Lib/asyncio/protocols.py index 52fc25c2ee9..80fcac9a82d 100644 --- a/Lib/asyncio/protocols.py +++ b/Lib/asyncio/protocols.py @@ -78,6 +78,11 @@ class Protocol(BaseProtocol): State machine of calls: start -> CM [-> DR*] [-> ER?] -> CL -> end + + * CM: connection_made() + * DR: data_received() + * ER: eof_received() + * CL: connection_lost() """ def data_received(self, data): diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py index f4996293621..4bd6dc8d1cb 100644 --- a/Lib/asyncio/selector_events.py +++ b/Lib/asyncio/selector_events.py @@ -10,6 +10,8 @@ import errno import functools import socket +import sys +import warnings try: import ssl except ImportError: # pragma: no cover @@ -22,6 +24,7 @@ from . import selectors from . import transports from . import sslproto +from .coroutines import coroutine from .log import logger @@ -181,16 +184,47 @@ def _accept_connection(self, protocol_factory, sock, else: raise # The event loop will catch, log and ignore it. else: + extra = {'peername': addr} + accept = self._accept_connection2(protocol_factory, conn, extra, + sslcontext, server) + self.create_task(accept) + + @coroutine + def _accept_connection2(self, protocol_factory, conn, extra, + sslcontext=None, server=None): + protocol = None + transport = None + try: protocol = protocol_factory() + waiter = futures.Future(loop=self) if sslcontext: - self._make_ssl_transport( - conn, protocol, sslcontext, - server_side=True, extra={'peername': addr}, server=server) + transport = self._make_ssl_transport( + conn, protocol, sslcontext, waiter=waiter, + server_side=True, extra=extra, server=server) else: - self._make_socket_transport( - conn, protocol , extra={'peername': addr}, + transport = self._make_socket_transport( + conn, protocol, waiter=waiter, extra=extra, server=server) - # It's now up to the protocol to handle the connection. + + try: + yield from waiter + except: + transport.close() + raise + + # It's now up to the protocol to handle the connection. + except Exception as exc: + if self.get_debug(): + context = { + 'message': ('Error on transport creation ' + 'for incoming connection'), + 'exception': exc, + } + if protocol is not None: + context['protocol'] = protocol + if transport is not None: + context['transport'] = transport + self.call_exception_handler(context) def add_reader(self, fd, callback, *args): """Add a reader callback.""" @@ -467,7 +501,12 @@ class _SelectorTransport(transports._FlowControlMixin, _buffer_factory = bytearray # Constructs initial value for self._buffer. - def __init__(self, loop, sock, protocol, extra, server=None): + # Attribute used in the destructor: it must be set even if the constructor + # is not called (see _SelectorSslTransport which may start by raising an + # exception) + _sock = None + + def __init__(self, loop, sock, protocol, extra=None, server=None): super().__init__(extra, loop) self._extra['socket'] = sock self._extra['sockname'] = sock.getsockname() @@ -479,6 +518,7 @@ def __init__(self, loop, sock, protocol, extra, server=None): self._sock = sock self._sock_fd = sock.fileno() self._protocol = protocol + self._protocol_connected = True self._server = server self._buffer = self._buffer_factory() self._conn_lost = 0 # Set when call to connection_lost scheduled. @@ -526,6 +566,15 @@ def close(self): self._conn_lost += 1 self._loop.call_soon(self._call_connection_lost, None) + # On Python 3.3 and older, objects with a destructor part of a reference + # cycle are never destroyed. It's not more the case on Python 3.4 thanks + # to the PEP 442. + if sys.version_info >= (3, 4): + def __del__(self): + if self._sock is not None: + warnings.warn("unclosed transport %r" % self, ResourceWarning) + self._sock.close() + def _fatal_error(self, exc, message='Fatal error on transport'): # Should be called from exception handler only. if isinstance(exc, (BrokenPipeError, @@ -555,7 +604,8 @@ def _force_close(self, exc): def _call_connection_lost(self, exc): try: - self._protocol.connection_lost(exc) + if self._protocol_connected: + self._protocol.connection_lost(exc) finally: self._sock.close() self._sock = None @@ -718,6 +768,8 @@ def __init__(self, loop, rawsock, protocol, sslcontext, waiter=None, sslsock = sslcontext.wrap_socket(rawsock, **wrap_kwargs) super().__init__(loop, sslsock, protocol, extra, server) + # the protocol connection is only made after the SSL handshake + self._protocol_connected = False self._server_hostname = server_hostname self._waiter = waiter @@ -797,6 +849,7 @@ def _on_handshake(self, start_time): self._read_wants_write = False self._write_wants_read = False self._loop.add_reader(self._sock_fd, self._read_ready) + self._protocol_connected = True self._loop.call_soon(self._protocol.connection_made, self) # only wake up the waiter when connection_made() has been called self._loop.call_soon(self._wakeup_waiter) @@ -928,8 +981,10 @@ def __init__(self, loop, sock, protocol, address=None, waiter=None, extra=None): super().__init__(loop, sock, protocol, extra) self._address = address - self._loop.add_reader(self._sock_fd, self._read_ready) self._loop.call_soon(self._protocol.connection_made, self) + # only start reading when connection_made() has been called + self._loop.call_soon(self._loop.add_reader, + self._sock_fd, self._read_ready) if waiter is not None: # only wake up the waiter when connection_made() has been called self._loop.call_soon(waiter._set_result_unless_cancelled, None) diff --git a/Lib/asyncio/sslproto.py b/Lib/asyncio/sslproto.py index fc809b9831d..235855e21e1 100644 --- a/Lib/asyncio/sslproto.py +++ b/Lib/asyncio/sslproto.py @@ -1,4 +1,6 @@ import collections +import sys +import warnings try: import ssl except ImportError: # pragma: no cover @@ -295,6 +297,7 @@ def __init__(self, loop, ssl_protocol, app_protocol): self._loop = loop self._ssl_protocol = ssl_protocol self._app_protocol = app_protocol + self._closed = False def get_extra_info(self, name, default=None): """Get optional transport information.""" @@ -308,8 +311,18 @@ def close(self): protocol's connection_lost() method will (eventually) called with None as its argument. """ + self._closed = True self._ssl_protocol._start_shutdown() + # On Python 3.3 and older, objects with a destructor part of a reference + # cycle are never destroyed. It's not more the case on Python 3.4 thanks + # to the PEP 442. + if sys.version_info >= (3, 4): + def __del__(self): + if not self._closed: + warnings.warn("unclosed transport %r" % self, ResourceWarning) + self.close() + def pause_reading(self): """Pause the receiving end. diff --git a/Lib/asyncio/subprocess.py b/Lib/asyncio/subprocess.py index c848a21a8f2..4600a9f417d 100644 --- a/Lib/asyncio/subprocess.py +++ b/Lib/asyncio/subprocess.py @@ -25,8 +25,6 @@ def __init__(self, limit, loop): super().__init__(loop=loop) self._limit = limit self.stdin = self.stdout = self.stderr = None - self.waiter = futures.Future(loop=loop) - self._waiters = collections.deque() self._transport = None def __repr__(self): @@ -61,9 +59,6 @@ def connection_made(self, transport): reader=None, loop=self._loop) - if not self.waiter.cancelled(): - self.waiter.set_result(None) - def pipe_data_received(self, fd, data): if fd == 1: reader = self.stdout @@ -94,16 +89,9 @@ def pipe_connection_lost(self, fd, exc): reader.set_exception(exc) def process_exited(self): - returncode = self._transport.get_returncode() self._transport.close() self._transport = None - # wake up futures waiting for wait() - while self._waiters: - waiter = self._waiters.popleft() - if not waiter.cancelled(): - waiter.set_result(returncode) - class Process: def __init__(self, transport, protocol, loop): @@ -124,30 +112,18 @@ def returncode(self): @coroutine def wait(self): - """Wait until the process exit and return the process return code.""" - returncode = self._transport.get_returncode() - if returncode is not None: - return returncode + """Wait until the process exit and return the process return code. - waiter = futures.Future(loop=self._loop) - self._protocol._waiters.append(waiter) - yield from waiter - return waiter.result() - - def _check_alive(self): - if self._transport.get_returncode() is not None: - raise ProcessLookupError() + This method is a coroutine.""" + return (yield from self._transport._wait()) def send_signal(self, signal): - self._check_alive() self._transport.send_signal(signal) def terminate(self): - self._check_alive() self._transport.terminate() def kill(self): - self._check_alive() self._transport.kill() @coroutine @@ -221,11 +197,6 @@ def create_subprocess_shell(cmd, stdin=None, stdout=None, stderr=None, protocol_factory, cmd, stdin=stdin, stdout=stdout, stderr=stderr, **kwds) - try: - yield from protocol.waiter - except: - transport._kill_wait() - raise return Process(transport, protocol, loop) @coroutine @@ -241,9 +212,4 @@ def create_subprocess_exec(program, *args, stdin=None, stdout=None, program, *args, stdin=stdin, stdout=stdout, stderr=stderr, **kwds) - try: - yield from protocol.waiter - except: - transport._kill_wait() - raise return Process(transport, protocol, loop) diff --git a/Lib/asyncio/tasks.py b/Lib/asyncio/tasks.py index 63412a97e62..4f19a252ff6 100644 --- a/Lib/asyncio/tasks.py +++ b/Lib/asyncio/tasks.py @@ -592,7 +592,7 @@ def _done_callback(i, fut): fut.exception() return - if fut._state == futures._CANCELLED: + if fut.cancelled(): res = futures.CancelledError() if not return_exceptions: outer.set_exception(res) diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py index 67973f14f3f..1fc39abe09c 100644 --- a/Lib/asyncio/unix_events.py +++ b/Lib/asyncio/unix_events.py @@ -8,6 +8,7 @@ import subprocess import sys import threading +import warnings from . import base_events @@ -15,6 +16,7 @@ from . import constants from . import coroutines from . import events +from . import futures from . import selector_events from . import selectors from . import transports @@ -174,16 +176,20 @@ def _make_subprocess_transport(self, protocol, args, shell, stdin, stdout, stderr, bufsize, extra=None, **kwargs): with events.get_child_watcher() as watcher: + waiter = futures.Future(loop=self) transp = _UnixSubprocessTransport(self, protocol, args, shell, stdin, stdout, stderr, bufsize, - extra=extra, **kwargs) - try: - yield from transp._post_init() - except: - transp.close() - raise + waiter=waiter, extra=extra, + **kwargs) + watcher.add_child_handler(transp.get_pid(), self._child_watcher_callback, transp) + try: + yield from waiter + except: + transp.close() + yield from transp._wait() + raise return transp @@ -298,8 +304,10 @@ def __init__(self, loop, pipe, protocol, waiter=None, extra=None): _set_nonblocking(self._fileno) self._protocol = protocol self._closing = False - self._loop.add_reader(self._fileno, self._read_ready) self._loop.call_soon(self._protocol.connection_made, self) + # only start reading when connection_made() has been called + self._loop.call_soon(self._loop.add_reader, + self._fileno, self._read_ready) if waiter is not None: # only wake up the waiter when connection_made() has been called self._loop.call_soon(waiter._set_result_unless_cancelled, None) @@ -351,6 +359,15 @@ def close(self): if not self._closing: self._close(None) + # On Python 3.3 and older, objects with a destructor part of a reference + # cycle are never destroyed. It's not more the case on Python 3.4 thanks + # to the PEP 442. + if sys.version_info >= (3, 4): + def __del__(self): + if self._pipe is not None: + warnings.warn("unclosed transport %r" % self, ResourceWarning) + self._pipe.close() + def _fatal_error(self, exc, message='Fatal error on pipe transport'): # should be called by exception handler only if (isinstance(exc, OSError) and exc.errno == errno.EIO): @@ -401,13 +418,16 @@ def __init__(self, loop, pipe, protocol, waiter=None, extra=None): self._conn_lost = 0 self._closing = False # Set when close() or write_eof() called. - # On AIX, the reader trick only works for sockets. - # On other platforms it works for pipes and sockets. - # (Exception: OS X 10.4? Issue #19294.) - if is_socket or not sys.platform.startswith("aix"): - self._loop.add_reader(self._fileno, self._read_ready) - self._loop.call_soon(self._protocol.connection_made, self) + + # On AIX, the reader trick (to be notified when the read end of the + # socket is closed) only works for sockets. On other platforms it + # works for pipes and sockets. (Exception: OS X 10.4? Issue #19294.) + if is_socket or not sys.platform.startswith("aix"): + # only start reading when connection_made() has been called + self._loop.call_soon(self._loop.add_reader, + self._fileno, self._read_ready) + if waiter is not None: # only wake up the waiter when connection_made() has been called self._loop.call_soon(waiter._set_result_unless_cancelled, None) @@ -524,6 +544,15 @@ def close(self): # write_eof is all what we needed to close the write pipe self.write_eof() + # On Python 3.3 and older, objects with a destructor part of a reference + # cycle are never destroyed. It's not more the case on Python 3.4 thanks + # to the PEP 442. + if sys.version_info >= (3, 4): + def __del__(self): + if self._pipe is not None: + warnings.warn("unclosed transport %r" % self, ResourceWarning) + self._pipe.close() + def abort(self): self._close(None) @@ -750,7 +779,7 @@ def __exit__(self, a, b, c): pass def add_child_handler(self, pid, callback, *args): - self._callbacks[pid] = callback, args + self._callbacks[pid] = (callback, args) # Prevent a race condition in case the child is already terminated. self._do_waitpid(pid) diff --git a/Lib/asyncio/windows_events.py b/Lib/asyncio/windows_events.py index 94aafb6f5ab..c4bffc47346 100644 --- a/Lib/asyncio/windows_events.py +++ b/Lib/asyncio/windows_events.py @@ -366,13 +366,16 @@ def loop_accept_pipe(f=None): def _make_subprocess_transport(self, protocol, args, shell, stdin, stdout, stderr, bufsize, extra=None, **kwargs): + waiter = futures.Future(loop=self) transp = _WindowsSubprocessTransport(self, protocol, args, shell, stdin, stdout, stderr, bufsize, - extra=extra, **kwargs) + waiter=waiter, extra=extra, + **kwargs) try: - yield from transp._post_init() + yield from waiter except: transp.close() + yield from transp._wait() raise return transp diff --git a/Lib/asyncio/windows_utils.py b/Lib/asyncio/windows_utils.py index 5f8327eba63..870cd13abe6 100644 --- a/Lib/asyncio/windows_utils.py +++ b/Lib/asyncio/windows_utils.py @@ -14,6 +14,7 @@ import socket import subprocess import tempfile +import warnings __all__ = ['socketpair', 'pipe', 'Popen', 'PIPE', 'PipeHandle'] @@ -156,7 +157,10 @@ def close(self, *, CloseHandle=_winapi.CloseHandle): CloseHandle(self._handle) self._handle = None - __del__ = close + def __del__(self): + if self._handle is not None: + warnings.warn("unclosed %r" % self, ResourceWarning) + self.close() def __enter__(self): return self diff --git a/Lib/test/test_asyncio/test_events.py b/Lib/test/test_asyncio/test_events.py index a38c90ebe8c..4b957d8f636 100644 --- a/Lib/test/test_asyncio/test_events.py +++ b/Lib/test/test_asyncio/test_events.py @@ -886,13 +886,18 @@ def test_create_server_ssl_verify_failed(self): if hasattr(sslcontext_client, 'check_hostname'): sslcontext_client.check_hostname = True + # no CA loaded f_c = self.loop.create_connection(MyProto, host, port, ssl=sslcontext_client) - with test_utils.disable_logger(): - with self.assertRaisesRegex(ssl.SSLError, - 'certificate verify failed '): - self.loop.run_until_complete(f_c) + with mock.patch.object(self.loop, 'call_exception_handler'): + with test_utils.disable_logger(): + with self.assertRaisesRegex(ssl.SSLError, + 'certificate verify failed '): + self.loop.run_until_complete(f_c) + + # execute the loop to log the connection error + test_utils.run_briefly(self.loop) # close connection self.assertIsNone(proto.transport) @@ -919,15 +924,20 @@ def test_create_unix_server_ssl_verify_failed(self): f_c = self.loop.create_unix_connection(MyProto, path, ssl=sslcontext_client, server_hostname='invalid') - with test_utils.disable_logger(): - with self.assertRaisesRegex(ssl.SSLError, - 'certificate verify failed '): - self.loop.run_until_complete(f_c) + with mock.patch.object(self.loop, 'call_exception_handler'): + with test_utils.disable_logger(): + with self.assertRaisesRegex(ssl.SSLError, + 'certificate verify failed '): + self.loop.run_until_complete(f_c) + + # execute the loop to log the connection error + test_utils.run_briefly(self.loop) # close connection self.assertIsNone(proto.transport) server.close() + def test_legacy_create_unix_server_ssl_verify_failed(self): with test_utils.force_legacy_ssl_support(): self.test_create_unix_server_ssl_verify_failed() @@ -949,11 +959,12 @@ def test_create_server_ssl_match_failed(self): # incorrect server_hostname f_c = self.loop.create_connection(MyProto, host, port, ssl=sslcontext_client) - with test_utils.disable_logger(): - with self.assertRaisesRegex( - ssl.CertificateError, - "hostname '127.0.0.1' doesn't match 'localhost'"): - self.loop.run_until_complete(f_c) + with mock.patch.object(self.loop, 'call_exception_handler'): + with test_utils.disable_logger(): + with self.assertRaisesRegex( + ssl.CertificateError, + "hostname '127.0.0.1' doesn't match 'localhost'"): + self.loop.run_until_complete(f_c) # close connection proto.transport.close() @@ -1540,9 +1551,10 @@ def test_subprocess_exec(self): stdin = transp.get_pipe_transport(0) stdin.write(b'Python The Winner') self.loop.run_until_complete(proto.got_data[1].wait()) - transp.close() + with test_utils.disable_logger(): + transp.close() self.loop.run_until_complete(proto.completed) - self.check_terminated(proto.returncode) + self.check_killed(proto.returncode) self.assertEqual(b'Python The Winner', proto.data[1]) def test_subprocess_interactive(self): @@ -1556,21 +1568,20 @@ def test_subprocess_interactive(self): self.loop.run_until_complete(proto.connected) self.assertEqual('CONNECTED', proto.state) - try: - stdin = transp.get_pipe_transport(0) - stdin.write(b'Python ') - self.loop.run_until_complete(proto.got_data[1].wait()) - proto.got_data[1].clear() - self.assertEqual(b'Python ', proto.data[1]) + stdin = transp.get_pipe_transport(0) + stdin.write(b'Python ') + self.loop.run_until_complete(proto.got_data[1].wait()) + proto.got_data[1].clear() + self.assertEqual(b'Python ', proto.data[1]) - stdin.write(b'The Winner') - self.loop.run_until_complete(proto.got_data[1].wait()) - self.assertEqual(b'Python The Winner', proto.data[1]) - finally: + stdin.write(b'The Winner') + self.loop.run_until_complete(proto.got_data[1].wait()) + self.assertEqual(b'Python The Winner', proto.data[1]) + + with test_utils.disable_logger(): transp.close() - self.loop.run_until_complete(proto.completed) - self.check_terminated(proto.returncode) + self.check_killed(proto.returncode) def test_subprocess_shell(self): connect = self.loop.subprocess_shell( @@ -1728,9 +1739,10 @@ def test_subprocess_close_client_stream(self): # GetLastError()==ERROR_INVALID_NAME on Windows!?! (Using # WriteFile() we get ERROR_BROKEN_PIPE as expected.) self.assertEqual(b'ERR:OSError', proto.data[2]) - transp.close() + with test_utils.disable_logger(): + transp.close() self.loop.run_until_complete(proto.completed) - self.check_terminated(proto.returncode) + self.check_killed(proto.returncode) def test_subprocess_wait_no_same_group(self): # start the new process in a new session diff --git a/Lib/test/test_asyncio/test_proactor_events.py b/Lib/test/test_asyncio/test_proactor_events.py index 33a8a671ec1..fcd9ab1e18f 100644 --- a/Lib/test/test_asyncio/test_proactor_events.py +++ b/Lib/test/test_asyncio/test_proactor_events.py @@ -499,8 +499,12 @@ def test_sock_accept(self): self.proactor.accept.assert_called_with(self.sock) def test_socketpair(self): + class EventLoop(BaseProactorEventLoop): + # override the destructor to not log a ResourceWarning + def __del__(self): + pass self.assertRaises( - NotImplementedError, BaseProactorEventLoop, self.proactor) + NotImplementedError, EventLoop, self.proactor) def test_make_socket_transport(self): tr = self.loop._make_socket_transport(self.sock, asyncio.Protocol()) diff --git a/Lib/test/test_asyncio/test_selector_events.py b/Lib/test/test_asyncio/test_selector_events.py index 51526163953..f64e40dafcd 100644 --- a/Lib/test/test_asyncio/test_selector_events.py +++ b/Lib/test/test_asyncio/test_selector_events.py @@ -1427,7 +1427,7 @@ def test_write_eof(self): self.assertFalse(tr.can_write_eof()) self.assertRaises(NotImplementedError, tr.write_eof) - def test_close(self): + def check_close(self): tr = self._make_one() tr.close() @@ -1439,6 +1439,19 @@ def test_close(self): self.assertEqual(tr._conn_lost, 1) self.assertEqual(1, self.loop.remove_reader_count[1]) + test_utils.run_briefly(self.loop) + + def test_close(self): + self.check_close() + self.assertTrue(self.protocol.connection_made.called) + self.assertTrue(self.protocol.connection_lost.called) + + def test_close_not_connected(self): + self.sslsock.do_handshake.side_effect = ssl.SSLWantReadError + self.check_close() + self.assertFalse(self.protocol.connection_made.called) + self.assertFalse(self.protocol.connection_lost.called) + @unittest.skipIf(ssl is None, 'No SSL support') def test_server_hostname(self): self.ssl_transport(server_hostname='localhost') diff --git a/Lib/test/test_asyncio/test_sslproto.py b/Lib/test/test_asyncio/test_sslproto.py index 148e30dffeb..a72967ea071 100644 --- a/Lib/test/test_asyncio/test_sslproto.py +++ b/Lib/test/test_asyncio/test_sslproto.py @@ -22,7 +22,9 @@ def setUp(self): def ssl_protocol(self, waiter=None): sslcontext = test_utils.dummy_ssl_context() app_proto = asyncio.Protocol() - return sslproto.SSLProtocol(self.loop, app_proto, sslcontext, waiter) + proto = sslproto.SSLProtocol(self.loop, app_proto, sslcontext, waiter) + self.addCleanup(proto._app_transport.close) + return proto def connection_made(self, ssl_proto, do_handshake=None): transport = mock.Mock() @@ -56,9 +58,6 @@ def do_handshake(callback): with test_utils.disable_logger(): self.loop.run_until_complete(handshake_fut) - # Close the transport - ssl_proto._app_transport.close() - def test_eof_received_waiter(self): waiter = asyncio.Future(loop=self.loop) ssl_proto = self.ssl_protocol(waiter) diff --git a/Lib/test/test_asyncio/test_subprocess.py b/Lib/test/test_asyncio/test_subprocess.py index ecc2c9d8a85..b467b04f535 100644 --- a/Lib/test/test_asyncio/test_subprocess.py +++ b/Lib/test/test_asyncio/test_subprocess.py @@ -4,6 +4,7 @@ from unittest import mock import asyncio +from asyncio import base_subprocess from asyncio import subprocess from asyncio import test_utils try: @@ -23,6 +24,56 @@ 'data = sys.stdin.buffer.read()', 'sys.stdout.buffer.write(data)'))] +class TestSubprocessTransport(base_subprocess.BaseSubprocessTransport): + def _start(self, *args, **kwargs): + self._proc = mock.Mock() + self._proc.stdin = None + self._proc.stdout = None + self._proc.stderr = None + + +class SubprocessTransportTests(test_utils.TestCase): + def setUp(self): + self.loop = self.new_test_loop() + self.set_event_loop(self.loop) + + + def create_transport(self, waiter=None): + protocol = mock.Mock() + protocol.connection_made._is_coroutine = False + protocol.process_exited._is_coroutine = False + transport = TestSubprocessTransport( + self.loop, protocol, ['test'], False, + None, None, None, 0, waiter=waiter) + return (transport, protocol) + + def test_proc_exited(self): + waiter = asyncio.Future(loop=self.loop) + transport, protocol = self.create_transport(waiter) + transport._process_exited(6) + self.loop.run_until_complete(waiter) + + self.assertEqual(transport.get_returncode(), 6) + + self.assertTrue(protocol.connection_made.called) + self.assertTrue(protocol.process_exited.called) + self.assertTrue(protocol.connection_lost.called) + self.assertEqual(protocol.connection_lost.call_args[0], (None,)) + + self.assertFalse(transport._closed) + self.assertIsNone(transport._loop) + self.assertIsNone(transport._proc) + self.assertIsNone(transport._protocol) + + # methods must raise ProcessLookupError if the process exited + self.assertRaises(ProcessLookupError, + transport.send_signal, signal.SIGTERM) + self.assertRaises(ProcessLookupError, transport.terminate) + self.assertRaises(ProcessLookupError, transport.kill) + + transport.close() + + class SubprocessMixin: def test_stdin_stdout(self): diff --git a/Lib/test/test_asyncio/test_unix_events.py b/Lib/test/test_asyncio/test_unix_events.py index 126196daac9..41249ff0248 100644 --- a/Lib/test/test_asyncio/test_unix_events.py +++ b/Lib/test/test_asyncio/test_unix_events.py @@ -350,16 +350,13 @@ def read_pipe_transport(self, waiter=None): return transport def test_ctor(self): - tr = self.read_pipe_transport() - self.loop.assert_reader(5, tr._read_ready) - test_utils.run_briefly(self.loop) - self.protocol.connection_made.assert_called_with(tr) + waiter = asyncio.Future(loop=self.loop) + tr = self.read_pipe_transport(waiter=waiter) + self.loop.run_until_complete(waiter) - def test_ctor_with_waiter(self): - fut = asyncio.Future(loop=self.loop) - tr = self.read_pipe_transport(waiter=fut) - test_utils.run_briefly(self.loop) - self.assertIsNone(fut.result()) + self.protocol.connection_made.assert_called_with(tr) + self.loop.assert_reader(5, tr._read_ready) + self.assertIsNone(waiter.result()) @mock.patch('os.read') def test__read_ready(self, m_read): @@ -502,17 +499,13 @@ def write_pipe_transport(self, waiter=None): return transport def test_ctor(self): - tr = self.write_pipe_transport() - self.loop.assert_reader(5, tr._read_ready) - test_utils.run_briefly(self.loop) - self.protocol.connection_made.assert_called_with(tr) + waiter = asyncio.Future(loop=self.loop) + tr = self.write_pipe_transport(waiter=waiter) + self.loop.run_until_complete(waiter) - def test_ctor_with_waiter(self): - fut = asyncio.Future(loop=self.loop) - tr = self.write_pipe_transport(waiter=fut) + self.protocol.connection_made.assert_called_with(tr) self.loop.assert_reader(5, tr._read_ready) - test_utils.run_briefly(self.loop) - self.assertEqual(None, fut.result()) + self.assertEqual(None, waiter.result()) def test_can_write_eof(self): tr = self.write_pipe_transport() diff --git a/Lib/test/test_memoryview.py b/Lib/test/test_memoryview.py index e7df8a762c6..e9bd9fdf82c 100644 --- a/Lib/test/test_memoryview.py +++ b/Lib/test/test_memoryview.py @@ -360,6 +360,27 @@ def test_reversed(self): self.assertEqual(list(reversed(m)), aslist) self.assertEqual(list(reversed(m)), list(m[::-1])) + def test_issue22668(self): + a = array.array('H', [256, 256, 256, 256]) + x = memoryview(a) + m = x.cast('B') + b = m.cast('H') + c = b[0:2] + d = memoryview(b) + + del b + + self.assertEqual(c[0], 256) + self.assertEqual(d[0], 256) + self.assertEqual(c.format, "H") + self.assertEqual(d.format, "H") + + _ = m.cast('I') + self.assertEqual(c[0], 256) + self.assertEqual(d[0], 256) + self.assertEqual(c.format, "H") + self.assertEqual(d.format, "H") + # Variations on source objects for the buffer: bytes-like objects, then arrays # with itemsize > 1. diff --git a/Lib/test/test_sys.py b/Lib/test/test_sys.py index bd2d2047715..edb9c10d7ac 100644 --- a/Lib/test/test_sys.py +++ b/Lib/test/test_sys.py @@ -965,7 +965,7 @@ def get_gen(): yield 1 check(int(PyLong_BASE**2-1), vsize('') + 2*self.longdigit) check(int(PyLong_BASE**2), vsize('') + 3*self.longdigit) # memoryview - check(memoryview(b''), size('Pnin 2P2n2i5P 3cPn')) + check(memoryview(b''), size('Pnin 2P2n2i5P Pn')) # module check(unittest, size('PnPPP')) # None diff --git a/Objects/memoryobject.c b/Objects/memoryobject.c index 935da04cd79..b611dc864ce 100644 --- a/Objects/memoryobject.c +++ b/Objects/memoryobject.c @@ -1132,6 +1132,51 @@ get_native_fmtchar(char *result, const char *fmt) return -1; } +Py_LOCAL_INLINE(char *) +get_native_fmtstr(const char *fmt) +{ + int at = 0; + + if (fmt[0] == '@') { + at = 1; + fmt++; + } + if (fmt[0] == '\0' || fmt[1] != '\0') { + return NULL; + } + +#define RETURN(s) do { return at ? "@" s : s; } while (0) + + switch (fmt[0]) { + case 'c': RETURN("c"); + case 'b': RETURN("b"); + case 'B': RETURN("B"); + case 'h': RETURN("h"); + case 'H': RETURN("H"); + case 'i': RETURN("i"); + case 'I': RETURN("I"); + case 'l': RETURN("l"); + case 'L': RETURN("L"); + #ifdef HAVE_LONG_LONG + case 'q': RETURN("q"); + case 'Q': RETURN("Q"); + #endif + case 'n': RETURN("n"); + case 'N': RETURN("N"); + case 'f': RETURN("f"); + case 'd': RETURN("d"); + #ifdef HAVE_C99_BOOL + case '?': RETURN("?"); + #else + case '?': RETURN("?"); + #endif + case 'P': RETURN("P"); + } + + return NULL; +} + + /* Cast a memoryview's data type to 'format'. The input array must be C-contiguous. At least one of input-format, output-format must have byte size. The output array is 1-D, with the same byte length as the @@ -1181,10 +1226,13 @@ cast_to_1D(PyMemoryViewObject *mv, PyObject *format) goto out; } - strncpy(mv->format, PyBytes_AS_STRING(asciifmt), - _Py_MEMORYVIEW_MAX_FORMAT); - mv->format[_Py_MEMORYVIEW_MAX_FORMAT-1] = '\0'; - view->format = mv->format; + view->format = get_native_fmtstr(PyBytes_AS_STRING(asciifmt)); + if (view->format == NULL) { + /* NOT_REACHED: get_native_fmtchar() already validates the format. */ + PyErr_SetString(PyExc_RuntimeError, + "memoryview: internal error"); + goto out; + } view->itemsize = itemsize; view->ndim = 1;