diff --git a/dev_requirements.txt b/dev_requirements.txt index f48006e5..c536c154 100644 --- a/dev_requirements.txt +++ b/dev_requirements.txt @@ -1,4 +1,5 @@ -r docs/docs-requirements.txt +psutil==5.4.8 coverage==4.5.1 Django==1.6.11 # Last version supporting 2.6. mock==2.0.0 diff --git a/docs/ansible.rst b/docs/ansible.rst index c30fffc2..33c73d06 100644 --- a/docs/ansible.rst +++ b/docs/ansible.rst @@ -429,7 +429,7 @@ Temporary Files Temporary file handling in Ansible is tricky, and the precise behaviour varies across major versions. A variety of temporary files and directories are -created, depending on the operating mode: +created, depending on the operating mode. In the best case when pipelining is enabled and no temporary uploads are required, for each task Ansible will create one directory below a diff --git a/docs/api.rst b/docs/api.rst index 844bb900..bfba1f77 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -98,479 +98,442 @@ Router Class .. currentmodule:: mitogen.master -.. class:: Router (broker=None) +.. autoclass:: Router (broker=None) + :members: - Extend :class:`mitogen.core.Router` with functionality useful to - masters, and child contexts who later become masters. Currently when this - class is required, the target context's router is upgraded at runtime. - .. note:: +.. _context-factories: + +Connection Methods +================== - You may construct as many routers as desired, and use the same broker - for multiple routers, however usually only one broker and router need - exist. Multiple routers may be useful when dealing with separate trust - domains, for example, manipulating infrastructure belonging to separate - customers or projects. +.. currentmodule:: mitogen.parent +.. method:: Router.fork (on_fork=None, on_start=None, debug=False, profiling=False, via=None) - :param mitogen.master.Broker broker: - :class:`Broker` instance to use. If not specified, a private - :class:`Broker` is created. + Construct a context on the local machine by forking the current + process. The forked child receives a new identity, sets up a new broker + and router, and responds to function calls identically to children + created using other methods. - .. attribute:: profiling + For long-lived processes, :meth:`local` is always better as it + guarantees a pristine interpreter state that inherited little from the + parent. Forking should only be used in performance-sensitive scenarios + where short-lived children must be spawned to isolate potentially buggy + code, and only after accounting for all the bad things possible as a + result of, at a minimum: - When :data:`True`, cause the broker thread and any subsequent broker - and main threads existing in any child to write - ``/tmp/mitogen.stats...log`` containing a - :mod:`cProfile` dump on graceful exit. Must be set prior to - construction of any :class:`Broker`, e.g. via: + * Files open in the parent remaining open in the child, + causing the lifetime of the underlying object to be extended + indefinitely. - .. code:: + * From the perspective of external components, this is observable + in the form of pipes and sockets that are never closed, which may + break anything relying on closure to signal protocol termination. - mitogen.master.Router.profiling = True + * Descriptors that reference temporary files will not have their disk + space reclaimed until the child exits. - .. method:: enable_debug + * Third party package state, such as urllib3's HTTP connection pool, + attempting to write to file descriptors shared with the parent, + causing random failures in both parent and child. - Cause this context and any descendant child contexts to write debug - logs to /tmp/mitogen..log. + * UNIX signal handlers installed in the parent process remaining active + in the child, despite associated resources, such as service threads, + child processes, resource usage counters or process timers becoming + absent or reset in the child. + + * Library code that makes assumptions about the process ID remaining + unchanged, for example to implement inter-process locking, or to + generate file names. + + * Anonymous ``MAP_PRIVATE`` memory mappings whose storage requirement + doubles as either parent or child dirties their pages. + + * File-backed memory mappings that cannot have their space freed on + disk due to the mapping living on in the child. + + * Difficult to diagnose memory usage and latency spikes due to object + graphs becoming unreferenced in either parent or child, causing + immediate copy-on-write to large portions of the process heap. + + * Locks held in the parent causing random deadlocks in the child, such + as when another thread emits a log entry via the :mod:`logging` + package concurrent to another thread calling :meth:`fork`. + + * Objects existing in Thread-Local Storage of every non-:meth:`fork` + thread becoming permanently inaccessible, and never having their + object destructors called, including TLS usage by native extension + code, triggering many new variants of all the issues above. - .. method:: allocate_id + * Pseudo-Random Number Generator state that is easily observable by + network peers to be duplicate, violating requirements of + cryptographic protocols through one-time state reuse. In the worst + case, children continually reuse the same state due to repeatedly + forking from a static parent. - Arrange for a unique context ID to be allocated and associated with a - route leading to the active context. In masters, the ID is generated - directly, in children it is forwarded to the master via an - ``ALLOCATE_ID`` message that causes the master to emit matching - ``ADD_ROUTE`` messages prior to replying. - - .. _context-factories: - - **Context Factories** - - .. method:: fork (on_fork=None, on_start=None, debug=False, profiling=False, via=None) - - Construct a context on the local machine by forking the current - process. The forked child receives a new identity, sets up a new broker - and router, and responds to function calls identically to children - created using other methods. - - For long-lived processes, :meth:`local` is always better as it - guarantees a pristine interpreter state that inherited little from the - parent. Forking should only be used in performance-sensitive scenarios - where short-lived children must be spawned to isolate potentially buggy - code, and only after accounting for all the bad things possible as a - result of, at a minimum: - - * Files open in the parent remaining open in the child, - causing the lifetime of the underlying object to be extended - indefinitely. - - * From the perspective of external components, this is observable - in the form of pipes and sockets that are never closed, which may - break anything relying on closure to signal protocol termination. - - * Descriptors that reference temporary files will not have their disk - space reclaimed until the child exits. - - * Third party package state, such as urllib3's HTTP connection pool, - attempting to write to file descriptors shared with the parent, - causing random failures in both parent and child. - - * UNIX signal handlers installed in the parent process remaining active - in the child, despite associated resources, such as service threads, - child processes, resource usage counters or process timers becoming - absent or reset in the child. - - * Library code that makes assumptions about the process ID remaining - unchanged, for example to implement inter-process locking, or to - generate file names. - - * Anonymous ``MAP_PRIVATE`` memory mappings whose storage requirement - doubles as either parent or child dirties their pages. - - * File-backed memory mappings that cannot have their space freed on - disk due to the mapping living on in the child. - - * Difficult to diagnose memory usage and latency spikes due to object - graphs becoming unreferenced in either parent or child, causing - immediate copy-on-write to large portions of the process heap. - - * Locks held in the parent causing random deadlocks in the child, such - as when another thread emits a log entry via the :mod:`logging` - package concurrent to another thread calling :meth:`fork`. - - * Objects existing in Thread-Local Storage of every non-:meth:`fork` - thread becoming permanently inaccessible, and never having their - object destructors called, including TLS usage by native extension - code, triggering many new variants of all the issues above. - - * Pseudo-Random Number Generator state that is easily observable by - network peers to be duplicate, violating requirements of - cryptographic protocols through one-time state reuse. In the worst - case, children continually reuse the same state due to repeatedly - forking from a static parent. - - :meth:`fork` cleans up Mitogen-internal objects, in addition to - locks held by the :mod:`logging` package, reseeds - :func:`random.random`, and the OpenSSL PRNG via - :func:`ssl.RAND_add`, but only if the :mod:`ssl` module is - already loaded. You must arrange for your program's state, including - any third party packages in use, to be cleaned up by specifying an - `on_fork` function. - - The associated stream implementation is - :class:`mitogen.fork.Stream`. - - :param function on_fork: - Function invoked as `on_fork()` from within the child process. This - permits supplying a program-specific cleanup function to break - locks and close file descriptors belonging to the parent from - within the child. - - :param function on_start: - Invoked as `on_start(econtext)` from within the child process after - it has been set up, but before the function dispatch loop starts. - This permits supplying a custom child main function that inherits - rich data structures that cannot normally be passed via a - serialization. - - :param mitogen.core.Context via: - Same as the `via` parameter for :meth:`local`. - - :param bool debug: - Same as the `debug` parameter for :meth:`local`. - - :param bool profiling: - Same as the `profiling` parameter for :meth:`local`. - - .. method:: local (remote_name=None, python_path=None, debug=False, connect_timeout=None, profiling=False, via=None) - - Construct a context on the local machine as a subprocess of the current - process. The associated stream implementation is - :class:`mitogen.master.Stream`. - - :param str remote_name: - The ``argv[0]`` suffix for the new process. If `remote_name` is - ``test``, the new process ``argv[0]`` will be ``mitogen:test``. - - If unspecified, defaults to ``@:``. - - This variable cannot contain slash characters, as the resulting - ``argv[0]`` must be presented in such a way as to allow Python to - determine its installation prefix. This is required to support - virtualenv. - - :param str|list python_path: - String or list path to the Python interpreter to use for bootstrap. - Defaults to :data:`sys.executable` for local connections, and - ``python`` for remote connections. - - It is possible to pass a list to invoke Python wrapped using - another tool, such as ``["/usr/bin/env", "python"]``. - - :param bool debug: - If :data:`True`, arrange for debug logging (:meth:`enable_debug`) to - be enabled in the new context. Automatically :data:`True` when - :meth:`enable_debug` has been called, but may be used - selectively otherwise. - - :param bool unidirectional: - If :data:`True`, arrange for the child's router to be constructed - with :attr:`unidirectional routing - ` enabled. Automatically - :data:`True` when it was enabled for this router, but may still be - explicitly set to :data:`False`. - - :param float connect_timeout: - Fractional seconds to wait for the subprocess to indicate it is - healthy. Defaults to 30 seconds. - - :param bool profiling: - If :data:`True`, arrange for profiling (:data:`profiling`) to be - enabled in the new context. Automatically :data:`True` when - :data:`profiling` is :data:`True`, but may be used selectively - otherwise. - - :param mitogen.core.Context via: - If not :data:`None`, arrange for construction to occur via RPCs - made to the context `via`, and for :data:`ADD_ROUTE - ` messages to be generated as appropriate. - - .. code-block:: python - - # SSH to the remote machine. - remote_machine = router.ssh(hostname='mybox.com') - - # Use the SSH connection to create a sudo connection. - remote_root = router.sudo(username='root', via=remote_machine) - - .. method:: doas (username=None, password=None, doas_path=None, password_prompt=None, incorrect_prompts=None, \**kwargs) - - Construct a context on the local machine over a ``doas`` invocation. - The ``doas`` process is started in a newly allocated pseudo-terminal, - and supports typing interactive passwords. - - Accepts all parameters accepted by :meth:`local`, in addition to: - - :param str username: - Username to use, defaults to ``root``. - :param str password: - The account password to use if requested. - :param str doas_path: - Filename or complete path to the ``doas`` binary. ``PATH`` will be - searched if given as a filename. Defaults to ``doas``. - :param bytes password_prompt: - A string that indicates ``doas`` is requesting a password. Defaults - to ``Password:``. - :param list incorrect_prompts: - List of bytestrings indicating the password is incorrect. Defaults - to `(b"doas: authentication failed")`. - :raises mitogen.doas.PasswordError: - A password was requested but none was provided, the supplied - password was incorrect, or the target account did not exist. - - .. method:: docker (container=None, image=None, docker_path=None, \**kwargs) - - Construct a context on the local machine within an existing or - temporary new Docker container using the ``docker`` program. One of - `container` or `image` must be specified. - - Accepts all parameters accepted by :meth:`local`, in addition to: - - :param str container: - Existing container to connect to. Defaults to :data:`None`. - :param str username: - Username within the container to :func:`setuid` to. Defaults to - :data:`None`, which Docker interprets as ``root``. - :param str image: - Image tag to use to construct a temporary container. Defaults to - :data:`None`. - :param str docker_path: - Filename or complete path to the Docker binary. ``PATH`` will be - searched if given as a filename. Defaults to ``docker``. - - .. method:: jail (container, jexec_path=None, \**kwargs) - - Construct a context on the local machine within a FreeBSD jail using - the ``jexec`` program. - - Accepts all parameters accepted by :meth:`local`, in addition to: - - :param str container: - Existing container to connect to. Defaults to :data:`None`. - :param str username: - Username within the container to :func:`setuid` to. Defaults to - :data:`None`, which ``jexec`` interprets as ``root``. - :param str jexec_path: - Filename or complete path to the ``jexec`` binary. ``PATH`` will be - searched if given as a filename. Defaults to ``/usr/sbin/jexec``. - - .. method:: kubectl (pod, kubectl_path=None, kubectl_args=None, \**kwargs) - - Construct a context in a container via the Kubernetes ``kubectl`` - program. - - Accepts all parameters accepted by :meth:`local`, in addition to: - - :param str pod: - Kubernetes pod to connect to. - :param str kubectl_path: - Filename or complete path to the ``kubectl`` binary. ``PATH`` will - be searched if given as a filename. Defaults to ``kubectl``. - :param list kubectl_args: - Additional arguments to pass to the ``kubectl`` command. - - .. method:: lxc (container, lxc_attach_path=None, \**kwargs) - - Construct a context on the local machine within an LXC classic - container using the ``lxc-attach`` program. - - Accepts all parameters accepted by :meth:`local`, in addition to: - - :param str container: - Existing container to connect to. Defaults to :data:`None`. - :param str lxc_attach_path: - Filename or complete path to the ``lxc-attach`` binary. ``PATH`` - will be searched if given as a filename. Defaults to - ``lxc-attach``. - - .. method:: lxc (container, lxc_attach_path=None, \**kwargs) - - Construct a context on the local machine within a LXD container using - the ``lxc`` program. - - Accepts all parameters accepted by :meth:`local`, in addition to: - - :param str container: - Existing container to connect to. Defaults to :data:`None`. - :param str lxc_path: - Filename or complete path to the ``lxc`` binary. ``PATH`` will be - searched if given as a filename. Defaults to ``lxc``. - - .. method:: setns (container, kind, username=None, docker_path=None, lxc_info_path=None, machinectl_path=None, \**kwargs) - - Construct a context in the style of :meth:`local`, but change the - active Linux process namespaces via calls to `setns(1)` before - executing Python. - - The namespaces to use, and the active root file system are taken from - the root PID of a running Docker, LXC, LXD, or systemd-nspawn - container. - - A program is required only to find the root PID, after which management - of the child Python interpreter is handled directly. - - :param str container: - Container to connect to. - :param str kind: - One of ``docker``, ``lxc``, ``lxd`` or ``machinectl``. - :param str username: - Username within the container to :func:`setuid` to. Defaults to - ``root``. - :param str docker_path: - Filename or complete path to the Docker binary. ``PATH`` will be - searched if given as a filename. Defaults to ``docker``. - :param str lxc_path: - Filename or complete path to the LXD ``lxc`` binary. ``PATH`` will - be searched if given as a filename. Defaults to ``lxc``. - :param str lxc_info_path: - Filename or complete path to the LXC ``lxc-info`` binary. ``PATH`` - will be searched if given as a filename. Defaults to ``lxc-info``. - :param str machinectl_path: - Filename or complete path to the ``machinectl`` binary. ``PATH`` - will be searched if given as a filename. Defaults to - ``machinectl``. - - .. method:: su (username=None, password=None, su_path=None, password_prompt=None, incorrect_prompts=None, \**kwargs) - - Construct a context on the local machine over a ``su`` invocation. The - ``su`` process is started in a newly allocated pseudo-terminal, and - supports typing interactive passwords. - - Accepts all parameters accepted by :meth:`local`, in addition to: - - :param str username: - Username to pass to ``su``, defaults to ``root``. - :param str password: - The account password to use if requested. - :param str su_path: - Filename or complete path to the ``su`` binary. ``PATH`` will be - searched if given as a filename. Defaults to ``su``. - :param bytes password_prompt: - The string that indicates ``su`` is requesting a password. Defaults - to ``Password:``. - :param str incorrect_prompts: - Strings that signal the password is incorrect. Defaults to `("su: - sorry", "su: authentication failure")`. - - :raises mitogen.su.PasswordError: - A password was requested but none was provided, the supplied - password was incorrect, or (on BSD) the target account did not - exist. - - .. method:: sudo (username=None, sudo_path=None, password=None, \**kwargs) - - Construct a context on the local machine over a ``sudo`` invocation. - The ``sudo`` process is started in a newly allocated pseudo-terminal, - and supports typing interactive passwords. - - Accepts all parameters accepted by :meth:`local`, in addition to: - - :param str username: - Username to pass to sudo as the ``-u`` parameter, defaults to - ``root``. - :param str sudo_path: - Filename or complete path to the sudo binary. ``PATH`` will be - searched if given as a filename. Defaults to ``sudo``. - :param str password: - The password to use if/when sudo requests it. Depending on the sudo - configuration, this is either the current account password or the - target account password. :class:`mitogen.sudo.PasswordError` - will be raised if sudo requests a password but none is provided. - :param bool set_home: - If :data:`True`, request ``sudo`` set the ``HOME`` environment - variable to match the target UNIX account. - :param bool preserve_env: - If :data:`True`, request ``sudo`` to preserve the environment of - the parent process. - :param str selinux_type: - If not :data:`None`, the SELinux security context to use. - :param str selinux_role: - If not :data:`None`, the SELinux role to use. - :param list sudo_args: - Arguments in the style of :data:`sys.argv` that would normally - be passed to ``sudo``. The arguments are parsed in-process to set - equivalent parameters. Re-parsing ensures unsupported options cause - :class:`mitogen.core.StreamError` to be raised, and that - attributes of the stream match the actual behaviour of ``sudo``. - - .. method:: ssh (hostname, username=None, ssh_path=None, ssh_args=None, port=None, check_host_keys='enforce', password=None, identity_file=None, identities_only=True, compression=True, \**kwargs) - - Construct a remote context over an OpenSSH ``ssh`` invocation. - - The ``ssh`` process is started in a newly allocated pseudo-terminal to - support typing interactive passwords and responding to prompts, if a - password is specified, or `check_host_keys=accept`. In other scenarios, - ``BatchMode`` is enabled and no PTY is allocated. For many-target - configurations, both options should be avoided as most systems have a - conservative limit on the number of pseudo-terminals that may exist. - - Accepts all parameters accepted by :meth:`local`, in addition to: - - :param str username: - The SSH username; default is unspecified, which causes SSH to pick - the username to use. - :param str ssh_path: - Absolute or relative path to ``ssh``. Defaults to ``ssh``. - :param list ssh_args: - Additional arguments to pass to the SSH command. - :param int port: - Port number to connect to; default is unspecified, which causes SSH - to pick the port number. - :param str check_host_keys: - Specifies the SSH host key checking mode. Defaults to ``enforce``. - - * ``ignore``: no host key checking is performed. Connections never - fail due to an unknown or changed host key. - * ``accept``: known hosts keys are checked to ensure they match, - new host keys are automatically accepted and verified in future - connections. - * ``enforce``: known host keys are checked to ensure they match, - unknown hosts cause a connection failure. - :param str password: - Password to type if/when ``ssh`` requests it. If not specified and - a password is requested, :class:`mitogen.ssh.PasswordError` is - raised. - :param str identity_file: - Path to an SSH private key file to use for authentication. Default - is unspecified, which causes SSH to pick the identity file. - - When this option is specified, only `identity_file` will be used by - the SSH client to perform authenticaion; agent authentication is - automatically disabled, as is reading the default private key from - ``~/.ssh/id_rsa``, or ``~/.ssh/id_dsa``. - :param bool identities_only: - If :data:`True` and a password or explicit identity file is - specified, instruct the SSH client to disable any authentication - identities inherited from the surrounding environment, such as - those loaded in any running ``ssh-agent``, or default key files - present in ``~/.ssh``. This ensures authentication attempts only - occur using the supplied password or SSH key. - :param bool compression: - If :data:`True`, enable ``ssh`` compression support. Compression - has a minimal effect on the size of modules transmitted, as they - are already compressed, however it has a large effect on every - remaining message in the otherwise uncompressed stream protocol, - such as function call arguments and return values. - :param int ssh_debug_level: - Optional integer `0..3` indicating the SSH client debug level. - :raises mitogen.ssh.PasswordError: - A password was requested but none was specified, or the specified - password was incorrect. - - :raises mitogen.ssh.HostKeyError: - When `check_host_keys` is set to either ``accept``, indicates a - previously recorded key no longer matches the remote machine. When - set to ``enforce``, as above, but additionally indicates no - previously recorded key exists for the remote machine. + :meth:`fork` cleans up Mitogen-internal objects, in addition to + locks held by the :mod:`logging` package, reseeds + :func:`random.random`, and the OpenSSL PRNG via + :func:`ssl.RAND_add`, but only if the :mod:`ssl` module is + already loaded. You must arrange for your program's state, including + any third party packages in use, to be cleaned up by specifying an + `on_fork` function. + + The associated stream implementation is + :class:`mitogen.fork.Stream`. + + :param function on_fork: + Function invoked as `on_fork()` from within the child process. This + permits supplying a program-specific cleanup function to break + locks and close file descriptors belonging to the parent from + within the child. + + :param function on_start: + Invoked as `on_start(econtext)` from within the child process after + it has been set up, but before the function dispatch loop starts. + This permits supplying a custom child main function that inherits + rich data structures that cannot normally be passed via a + serialization. + + :param mitogen.core.Context via: + Same as the `via` parameter for :meth:`local`. + + :param bool debug: + Same as the `debug` parameter for :meth:`local`. + + :param bool profiling: + Same as the `profiling` parameter for :meth:`local`. + +.. method:: Router.local (remote_name=None, python_path=None, debug=False, connect_timeout=None, profiling=False, via=None) + + Construct a context on the local machine as a subprocess of the current + process. The associated stream implementation is + :class:`mitogen.master.Stream`. + + :param str remote_name: + The ``argv[0]`` suffix for the new process. If `remote_name` is + ``test``, the new process ``argv[0]`` will be ``mitogen:test``. + + If unspecified, defaults to ``@:``. + + This variable cannot contain slash characters, as the resulting + ``argv[0]`` must be presented in such a way as to allow Python to + determine its installation prefix. This is required to support + virtualenv. + + :param str|list python_path: + String or list path to the Python interpreter to use for bootstrap. + Defaults to :data:`sys.executable` for local connections, and + ``python`` for remote connections. + + It is possible to pass a list to invoke Python wrapped using + another tool, such as ``["/usr/bin/env", "python"]``. + + :param bool debug: + If :data:`True`, arrange for debug logging (:meth:`enable_debug`) to + be enabled in the new context. Automatically :data:`True` when + :meth:`enable_debug` has been called, but may be used + selectively otherwise. + + :param bool unidirectional: + If :data:`True`, arrange for the child's router to be constructed + with :attr:`unidirectional routing + ` enabled. Automatically + :data:`True` when it was enabled for this router, but may still be + explicitly set to :data:`False`. + + :param float connect_timeout: + Fractional seconds to wait for the subprocess to indicate it is + healthy. Defaults to 30 seconds. + + :param bool profiling: + If :data:`True`, arrange for profiling (:data:`profiling`) to be + enabled in the new context. Automatically :data:`True` when + :data:`profiling` is :data:`True`, but may be used selectively + otherwise. + + :param mitogen.core.Context via: + If not :data:`None`, arrange for construction to occur via RPCs + made to the context `via`, and for :data:`ADD_ROUTE + ` messages to be generated as appropriate. + + .. code-block:: python + + # SSH to the remote machine. + remote_machine = router.ssh(hostname='mybox.com') + + # Use the SSH connection to create a sudo connection. + remote_root = router.sudo(username='root', via=remote_machine) + +.. method:: Router.doas (username=None, password=None, doas_path=None, password_prompt=None, incorrect_prompts=None, \**kwargs) + + Construct a context on the local machine over a ``doas`` invocation. + The ``doas`` process is started in a newly allocated pseudo-terminal, + and supports typing interactive passwords. + + Accepts all parameters accepted by :meth:`local`, in addition to: + + :param str username: + Username to use, defaults to ``root``. + :param str password: + The account password to use if requested. + :param str doas_path: + Filename or complete path to the ``doas`` binary. ``PATH`` will be + searched if given as a filename. Defaults to ``doas``. + :param bytes password_prompt: + A string that indicates ``doas`` is requesting a password. Defaults + to ``Password:``. + :param list incorrect_prompts: + List of bytestrings indicating the password is incorrect. Defaults + to `(b"doas: authentication failed")`. + :raises mitogen.doas.PasswordError: + A password was requested but none was provided, the supplied + password was incorrect, or the target account did not exist. + +.. method:: Router.docker (container=None, image=None, docker_path=None, \**kwargs) + + Construct a context on the local machine within an existing or + temporary new Docker container using the ``docker`` program. One of + `container` or `image` must be specified. + + Accepts all parameters accepted by :meth:`local`, in addition to: + + :param str container: + Existing container to connect to. Defaults to :data:`None`. + :param str username: + Username within the container to :func:`setuid` to. Defaults to + :data:`None`, which Docker interprets as ``root``. + :param str image: + Image tag to use to construct a temporary container. Defaults to + :data:`None`. + :param str docker_path: + Filename or complete path to the Docker binary. ``PATH`` will be + searched if given as a filename. Defaults to ``docker``. + +.. method:: Router.jail (container, jexec_path=None, \**kwargs) + + Construct a context on the local machine within a FreeBSD jail using + the ``jexec`` program. + + Accepts all parameters accepted by :meth:`local`, in addition to: + + :param str container: + Existing container to connect to. Defaults to :data:`None`. + :param str username: + Username within the container to :func:`setuid` to. Defaults to + :data:`None`, which ``jexec`` interprets as ``root``. + :param str jexec_path: + Filename or complete path to the ``jexec`` binary. ``PATH`` will be + searched if given as a filename. Defaults to ``/usr/sbin/jexec``. + +.. method:: Router.kubectl (pod, kubectl_path=None, kubectl_args=None, \**kwargs) + + Construct a context in a container via the Kubernetes ``kubectl`` + program. + + Accepts all parameters accepted by :meth:`local`, in addition to: + + :param str pod: + Kubernetes pod to connect to. + :param str kubectl_path: + Filename or complete path to the ``kubectl`` binary. ``PATH`` will + be searched if given as a filename. Defaults to ``kubectl``. + :param list kubectl_args: + Additional arguments to pass to the ``kubectl`` command. + +.. method:: Router.lxc (container, lxc_attach_path=None, \**kwargs) + + Construct a context on the local machine within an LXC classic + container using the ``lxc-attach`` program. + + Accepts all parameters accepted by :meth:`local`, in addition to: + + :param str container: + Existing container to connect to. Defaults to :data:`None`. + :param str lxc_attach_path: + Filename or complete path to the ``lxc-attach`` binary. ``PATH`` + will be searched if given as a filename. Defaults to + ``lxc-attach``. + +.. method:: Router.lxc (container, lxc_attach_path=None, \**kwargs) + + Construct a context on the local machine within a LXD container using + the ``lxc`` program. + + Accepts all parameters accepted by :meth:`local`, in addition to: + + :param str container: + Existing container to connect to. Defaults to :data:`None`. + :param str lxc_path: + Filename or complete path to the ``lxc`` binary. ``PATH`` will be + searched if given as a filename. Defaults to ``lxc``. + +.. method:: Router.setns (container, kind, username=None, docker_path=None, lxc_info_path=None, machinectl_path=None, \**kwargs) + + Construct a context in the style of :meth:`local`, but change the + active Linux process namespaces via calls to `setns(1)` before + executing Python. + + The namespaces to use, and the active root file system are taken from + the root PID of a running Docker, LXC, LXD, or systemd-nspawn + container. + + A program is required only to find the root PID, after which management + of the child Python interpreter is handled directly. + + :param str container: + Container to connect to. + :param str kind: + One of ``docker``, ``lxc``, ``lxd`` or ``machinectl``. + :param str username: + Username within the container to :func:`setuid` to. Defaults to + ``root``. + :param str docker_path: + Filename or complete path to the Docker binary. ``PATH`` will be + searched if given as a filename. Defaults to ``docker``. + :param str lxc_path: + Filename or complete path to the LXD ``lxc`` binary. ``PATH`` will + be searched if given as a filename. Defaults to ``lxc``. + :param str lxc_info_path: + Filename or complete path to the LXC ``lxc-info`` binary. ``PATH`` + will be searched if given as a filename. Defaults to ``lxc-info``. + :param str machinectl_path: + Filename or complete path to the ``machinectl`` binary. ``PATH`` + will be searched if given as a filename. Defaults to + ``machinectl``. + +.. method:: Router.su (username=None, password=None, su_path=None, password_prompt=None, incorrect_prompts=None, \**kwargs) + + Construct a context on the local machine over a ``su`` invocation. The + ``su`` process is started in a newly allocated pseudo-terminal, and + supports typing interactive passwords. + + Accepts all parameters accepted by :meth:`local`, in addition to: + + :param str username: + Username to pass to ``su``, defaults to ``root``. + :param str password: + The account password to use if requested. + :param str su_path: + Filename or complete path to the ``su`` binary. ``PATH`` will be + searched if given as a filename. Defaults to ``su``. + :param bytes password_prompt: + The string that indicates ``su`` is requesting a password. Defaults + to ``Password:``. + :param str incorrect_prompts: + Strings that signal the password is incorrect. Defaults to `("su: + sorry", "su: authentication failure")`. + + :raises mitogen.su.PasswordError: + A password was requested but none was provided, the supplied + password was incorrect, or (on BSD) the target account did not + exist. + +.. method:: Router.sudo (username=None, sudo_path=None, password=None, \**kwargs) + + Construct a context on the local machine over a ``sudo`` invocation. + The ``sudo`` process is started in a newly allocated pseudo-terminal, + and supports typing interactive passwords. + + Accepts all parameters accepted by :meth:`local`, in addition to: + + :param str username: + Username to pass to sudo as the ``-u`` parameter, defaults to + ``root``. + :param str sudo_path: + Filename or complete path to the sudo binary. ``PATH`` will be + searched if given as a filename. Defaults to ``sudo``. + :param str password: + The password to use if/when sudo requests it. Depending on the sudo + configuration, this is either the current account password or the + target account password. :class:`mitogen.sudo.PasswordError` + will be raised if sudo requests a password but none is provided. + :param bool set_home: + If :data:`True`, request ``sudo`` set the ``HOME`` environment + variable to match the target UNIX account. + :param bool preserve_env: + If :data:`True`, request ``sudo`` to preserve the environment of + the parent process. + :param str selinux_type: + If not :data:`None`, the SELinux security context to use. + :param str selinux_role: + If not :data:`None`, the SELinux role to use. + :param list sudo_args: + Arguments in the style of :data:`sys.argv` that would normally + be passed to ``sudo``. The arguments are parsed in-process to set + equivalent parameters. Re-parsing ensures unsupported options cause + :class:`mitogen.core.StreamError` to be raised, and that + attributes of the stream match the actual behaviour of ``sudo``. + +.. method:: Router.ssh (hostname, username=None, ssh_path=None, ssh_args=None, port=None, check_host_keys='enforce', password=None, identity_file=None, identities_only=True, compression=True, \**kwargs) + + Construct a remote context over an OpenSSH ``ssh`` invocation. + + The ``ssh`` process is started in a newly allocated pseudo-terminal to + support typing interactive passwords and responding to prompts, if a + password is specified, or `check_host_keys=accept`. In other scenarios, + ``BatchMode`` is enabled and no PTY is allocated. For many-target + configurations, both options should be avoided as most systems have a + conservative limit on the number of pseudo-terminals that may exist. + + Accepts all parameters accepted by :meth:`local`, in addition to: + + :param str username: + The SSH username; default is unspecified, which causes SSH to pick + the username to use. + :param str ssh_path: + Absolute or relative path to ``ssh``. Defaults to ``ssh``. + :param list ssh_args: + Additional arguments to pass to the SSH command. + :param int port: + Port number to connect to; default is unspecified, which causes SSH + to pick the port number. + :param str check_host_keys: + Specifies the SSH host key checking mode. Defaults to ``enforce``. + + * ``ignore``: no host key checking is performed. Connections never + fail due to an unknown or changed host key. + * ``accept``: known hosts keys are checked to ensure they match, + new host keys are automatically accepted and verified in future + connections. + * ``enforce``: known host keys are checked to ensure they match, + unknown hosts cause a connection failure. + :param str password: + Password to type if/when ``ssh`` requests it. If not specified and + a password is requested, :class:`mitogen.ssh.PasswordError` is + raised. + :param str identity_file: + Path to an SSH private key file to use for authentication. Default + is unspecified, which causes SSH to pick the identity file. + + When this option is specified, only `identity_file` will be used by + the SSH client to perform authenticaion; agent authentication is + automatically disabled, as is reading the default private key from + ``~/.ssh/id_rsa``, or ``~/.ssh/id_dsa``. + :param bool identities_only: + If :data:`True` and a password or explicit identity file is + specified, instruct the SSH client to disable any authentication + identities inherited from the surrounding environment, such as + those loaded in any running ``ssh-agent``, or default key files + present in ``~/.ssh``. This ensures authentication attempts only + occur using the supplied password or SSH key. + :param bool compression: + If :data:`True`, enable ``ssh`` compression support. Compression + has a minimal effect on the size of modules transmitted, as they + are already compressed, however it has a large effect on every + remaining message in the otherwise uncompressed stream protocol, + such as function call arguments and return values. + :param int ssh_debug_level: + Optional integer `0..3` indicating the SSH client debug level. + :raises mitogen.ssh.PasswordError: + A password was requested but none was specified, or the specified + password was incorrect. + + :raises mitogen.ssh.HostKeyError: + When `check_host_keys` is set to either ``accept``, indicates a + previously recorded key no longer matches the remote machine. When + set to ``enforce``, as above, but additionally indicates no + previously recorded key exists for the remote machine. Context Class @@ -619,126 +582,22 @@ Channel Class ============= .. currentmodule:: mitogen.core +.. autoclass:: Channel + :members: -.. class:: Channel (router, context, dst_handle, handle=None) - - A channel inherits from :class:`mitogen.core.Sender` and - `mitogen.core.Receiver` to provide bidirectional functionality. - - Since all handles aren't known until after both ends are constructed, for - both ends to communicate through a channel, it is necessary for one end to - retrieve the handle allocated to the other and reconfigure its own channel - to match. Currently this is a manual task. Broker Class ============ .. currentmodule:: mitogen.core -.. class:: Broker - - Responsible for handling I/O multiplexing in a private thread. - - **Note:** This is the somewhat limited core version of the Broker class - used by child contexts. The master subclass is documented below. - - .. attribute:: shutdown_timeout = 3.0 - - Seconds grace to allow :class:`streams ` to shutdown - gracefully before force-disconnecting them during :meth:`shutdown`. - - .. method:: defer (func, \*args, \*kwargs) - - Arrange for `func(\*args, \**kwargs)` to be executed on the broker - thread, or immediately if the current thread is the broker thread. Safe - to call from any thread. - - .. method:: defer_sync (func) - - Arrange for `func()` to execute on the broker thread, blocking the - current thread until a result or exception is available. - - :returns: - Call result. - - .. method:: start_receive (stream) - - Mark the :attr:`receive_side ` on `stream` as - ready for reading. Safe to call from any thread. When the associated - file descriptor becomes ready for reading, - :meth:`BasicStream.on_receive` will be called. - - .. method:: stop_receive (stream) - - Mark the :attr:`receive_side ` on `stream` as - not ready for reading. Safe to call from any thread. - - .. method:: _start_transmit (stream) - - Mark the :attr:`transmit_side ` on `stream` as - ready for writing. Must only be called from the Broker thread. When the - associated file descriptor becomes ready for writing, - :meth:`BasicStream.on_transmit` will be called. - - .. method:: stop_receive (stream) - - Mark the :attr:`transmit_side ` on `stream` as - not ready for writing. Safe to call from any thread. - - .. method:: shutdown - - Request broker gracefully disconnect streams and stop. - - .. method:: join - - Wait for the broker to stop, expected to be called after - :meth:`shutdown`. - - .. method:: keep_alive - - Return :data:`True` if any reader's :attr:`Side.keep_alive` - attribute is :data:`True`, or any - :class:`Context ` is still - registered that is not the master. Used to delay shutdown while some - important work is in progress (e.g. log draining). - - **Internal Methods** - - .. method:: _broker_main - - Handle events until :meth:`shutdown`. On shutdown, invoke - :meth:`Stream.on_shutdown` for every active stream, then allow up to - :attr:`shutdown_timeout` seconds for the streams to unregister - themselves before forcefully calling - :meth:`Stream.on_disconnect`. +.. autoclass:: Broker + :members: .. currentmodule:: mitogen.master -.. class:: Broker (install_watcher=True) - - .. note:: - - You may construct as many brokers as desired, and use the same broker - for multiple routers, however usually only one broker need exist. - Multiple brokers may be useful when dealing with sets of children with - differing lifetimes. For example, a subscription service where - non-payment results in termination for one customer. - - :param bool install_watcher: - If :data:`True`, an additional thread is started to monitor the - lifetime of the main thread, triggering :meth:`shutdown` - automatically in case the user forgets to call it, or their code - crashed. - - You should not rely on this functionality in your program, it is only - intended as a fail-safe and to simplify the API for new users. In - particular, alternative Python implementations may not be able to - support watching the main thread. - - .. attribute:: shutdown_timeout = 5.0 - - Seconds grace to allow :class:`streams ` to shutdown - gracefully before force-disconnecting them during :meth:`shutdown`. +.. autoclass:: Broker + :members: Utility Functions diff --git a/docs/changelog.rst b/docs/changelog.rst index a7109a8d..9dd75d1b 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -126,7 +126,7 @@ Core Library v0.2.4 (2018-??-??) ------------------- +------------------- Mitogen for Ansible ~~~~~~~~~~~~~~~~~~~ @@ -137,15 +137,14 @@ Enhancements * `#76 `_, `#351 `_, `#352 `_: disconnect propagation - has improved, allowing Ansible to cancel waits for responses from targets - that where abruptly disconnected. This increases the chance a task will fail - gracefully, rather than hanging due to the connection being severed, for - example because of network failure or EC2 instance maintenance. + has improved, allowing Ansible to cancel waits for responses from abruptly + disconnected targets. This ensures a task will gracefully fail rather than + hang, for example on network failure or EC2 instance maintenance. * `#369 `_: :meth:`Connection.reset` is implemented, allowing `meta: reset_connection `_ to shut - down the remote interpreter as expected, and improving support for the + down the remote interpreter as documented, and improving support for the `reboot `_ module. @@ -156,26 +155,22 @@ Fixes * `#323 `_, `#333 `_: work around a Windows - Subsystem for Linux bug that would cause tracebacks to be rendered during - shutdown. + Subsystem for Linux bug that caused tracebacks to appear during shutdown. * `#334 `_: the SSH method - tilde-expands private key paths using Ansible's logic. Previously Mitogen - passed the path unmodified to SSH, which would expand it using - :func:`os.getpwent`. - - This differs from :func:`os.path.expanduser`, which prefers the ``HOME`` + tilde-expands private key paths using Ansible's logic. Previously the path + was passed unmodified to SSH, which expanded it using :func:`os.getpwent`. + This differs from :func:`os.path.expanduser`, which uses the ``HOME`` environment variable if it is set, causing behaviour to diverge when Ansible - was invoked using sudo without appropriate flags to cause the ``HOME`` - environment variable to be reset to match the target account. + was invoked across user accounts via ``sudo``. * `#370 `_: the Ansible `reboot `_ module is supported. * `#373 `_: the LXC and LXD methods - now print a useful hint when Python fails to start, as no useful error is - normally logged to the console by these tools. + print a useful hint on failure, as no useful error is normally logged to the + console by these tools. * `#400 `_: work around a threading bug in the AWX display callback when running with high verbosity setting. @@ -195,21 +190,33 @@ Fixes Core Library ~~~~~~~~~~~~ -* `#76 `_: routing maintains the set - of destination context ID ever received on each stream, and when - disconnection occurs, propagates ``DEL_ROUTE`` messages downwards towards - every stream that ever communicated with a disappearing peer, rather than - simply toward parents. +* `#76 `_: routing records the + destination context IDs ever received on each stream, and when disconnection + occurs, propagates :data:`mitogen.core.DEL_ROUTE` messages towards every + stream that ever communicated with the disappearing peer, rather than simply + towards parents. - Conversations between nodes in any level of the tree receive ``DEL_ROUTE`` - messages when a participant disconnects, allowing receivers to be woken with - :class:`mitogen.core.ChannelError` to signal the connection has broken, even - when one participant is not a parent of the other. + Conversations between nodes anywhere in the tree receive + :data:`mitogen.core.DEL_ROUTE` when either participant disconnects, allowing + receivers to wake with :class:`mitogen.core.ChannelError`, even when one + participant is not a parent of the other. -* `#405 `_: if a message is rejected - due to being too large, and it has a ``reply_to`` set, a dead message is - returned to the sender. This ensures function calls exceeding the configured - maximum size crash rather than hang. +* `#405 `_: if an oversized message + is rejected, and it has a ``reply_to`` set, a dead message is returned to the + sender. This ensures function calls exceeding the configured maximum size + crash rather than hang. + +* `#406 `_: + :class:`mitogen.core.Broker` did not call :meth:`mitogen.core.Poller.close` + during shutdown, leaking the underlying poller FD in masters and parents. + +* `#406 `_: connections could leak + FDs when a child process failed to start. + +* `#406 `_, + `#417 `_: connections could leave + FD wrapper objects that had not been closed lying around to be closed during + garbage collection, causing reused FD numbers to be closed at random moments. * `#411 `_: the SSH method typed "``y``" rather than the requisite "``yes``" when `check_host_keys="accept"` @@ -227,7 +234,7 @@ Thanks! ~~~~~~~ Mitogen would not be possible without the support of users. A huge thanks for -bug reports, features and fixes in this release contributed by +bug reports, testing, features and fixes in this release contributed by `Berend De Schouwer `_, `Brian Candler `_, `Duane Zamrok `_, @@ -441,7 +448,7 @@ Thanks! ~~~~~~~ Mitogen would not be possible without the support of users. A huge thanks for -bug reports, features and fixes in this release contributed by +bug reports, testing, features and fixes in this release contributed by `Alex Russu `_, `Alex Willmer `_, `atoom `_, diff --git a/docs/howitworks.rst b/docs/howitworks.rst index 5e2c10f5..65a6daee 100644 --- a/docs/howitworks.rst +++ b/docs/howitworks.rst @@ -16,17 +16,17 @@ The UNIX First Stage To allow delivery of the bootstrap compressed using :py:mod:`zlib`, it is necessary for something on the remote to be prepared to decompress the payload -and feed it to a Python interpreter. Since we would like to avoid writing an -error-prone shell fragment to implement this, and since we must avoid writing -to the remote machine's disk in case it is read-only, the Python process -started on the remote machine by Mitogen immediately forks in order to +and feed it to a Python interpreter [#f1]_. Since we would like to avoid +writing an error-prone shell fragment to implement this, and since we must +avoid writing to the remote machine's disk in case it is read-only, the Python +process started on the remote machine by Mitogen immediately forks in order to implement the decompression. Python Command Line ################### -The Python command line sent to the host is a :mod:`zlib`-compressed [#f1]_ and +The Python command line sent to the host is a :mod:`zlib`-compressed [#f2]_ and base64-encoded copy of the :py:meth:`mitogen.master.Stream._first_stage` function, which has been carefully optimized to reduce its size. Prior to compression and encoding, ``CONTEXT_NAME`` is replaced with the desired context @@ -65,10 +65,10 @@ allowing reading by the first stage of exactly the required bytes. Configuring argv[0] ################### -Forking provides us with an excellent opportunity for tidying up the eventual -Python interpreter, in particular, restarting it using a fresh command-line to -get rid of the large base64-encoded first stage parameter, and to replace -**argv[0]** with something descriptive. +Forking provides an excellent opportunity to tidy up the eventual Python +interpreter, in particular, restarting it using a fresh command-line to get rid +of the large base64-encoded first stage parameter, and to replace **argv[0]** +with something descriptive. After configuring its ``stdin`` to point to the read end of the pipe, the parent half of the fork re-executes Python, with **argv[0]** taken from the @@ -1018,7 +1018,13 @@ receive items in the order they are requested, as they become available. .. rubric:: Footnotes -.. [#f1] Compression may seem redundant, however it is basically free and reducing IO +.. [#f1] Although some connection methods such as SSH support compression, and + Mitogen enables SSH compression by default, there are circumstances where + disabling SSH compression is desirable, and many scenarios for future + connection methods where transport-layer compression is not supported at + all. + +.. [#f2] Compression may seem redundant, however it is basically free and reducing IO is always a good idea. The 33% / 200 byte saving may mean the presence or absence of an additional frame on the network, or in real world terms after accounting for SSH overhead, around a 2% reduced chance of a stall during diff --git a/docs/internals.rst b/docs/internals.rst index 9c533952..fc9d57ac 100644 --- a/docs/internals.rst +++ b/docs/internals.rst @@ -35,162 +35,33 @@ Side Class ========== .. currentmodule:: mitogen.core - -.. class:: Side (stream, fd, keep_alive=True) - - Represent a single side of a :py:class:`BasicStream`. This exists to allow - streams implemented using unidirectional (e.g. UNIX pipe) and bidirectional - (e.g. UNIX socket) file descriptors to operate identically. - - :param mitogen.core.Stream stream: - The stream this side is associated with. - - :param int fd: - Underlying file descriptor. - - :param bool keep_alive: - Value for :py:attr:`keep_alive` - - During construction, the file descriptor has its :py:data:`os.O_NONBLOCK` - flag enabled using :py:func:`fcntl.fcntl`. - - .. attribute:: stream - - The :py:class:`Stream` for which this is a read or write side. - - .. attribute:: fd - - Integer file descriptor to perform IO on, or :data:`None` if - :py:meth:`close` has been called. - - .. attribute:: keep_alive - - If :data:`True`, causes presence of this side in :py:class:`Broker`'s - active reader set to defer shutdown until the side is disconnected. - - .. method:: fileno - - Return :py:attr:`fd` if it is not :data:`None`, otherwise raise - :py:class:`StreamError`. This method is implemented so that - :py:class:`Side` can be used directly by :py:func:`select.select`. - - .. method:: close - - Call :py:func:`os.close` on :py:attr:`fd` if it is not :data:`None`, - then set it to :data:`None`. - - .. method:: read (n=CHUNK_SIZE) - - Read up to `n` bytes from the file descriptor, wrapping the underlying - :py:func:`os.read` call with :py:func:`io_op` to trap common - disconnection conditions. - - :py:meth:`read` always behaves as if it is reading from a regular UNIX - file; socket, pipe, and TTY disconnection errors are masked and result - in a 0-sized read just like a regular file. - - :returns: - Bytes read, or the empty to string to indicate disconnection was - detected. - - .. method:: write (s) - - Write as much of the bytes from `s` as possible to the file descriptor, - wrapping the underlying :py:func:`os.write` call with :py:func:`io_op` - to trap common disconnection connditions. - - :returns: - Number of bytes written, or :data:`None` if disconnection was - detected. +.. autoclass:: Side + :members: Stream Classes ============== .. currentmodule:: mitogen.core - -.. class:: BasicStream - - .. attribute:: receive_side - - A :py:class:`Side` representing the stream's receive file descriptor. - - .. attribute:: transmit_side - - A :py:class:`Side` representing the stream's transmit file descriptor. - - .. method:: on_disconnect (broker) - - Called by :py:class:`Broker` to force disconnect the stream. The base - implementation simply closes :py:attr:`receive_side` and - :py:attr:`transmit_side` and unregisters the stream from the broker. - - .. method:: on_receive (broker) - - Called by :py:class:`Broker` when the stream's :py:attr:`receive_side` has - been marked readable using :py:meth:`Broker.start_receive` and the - broker has detected the associated file descriptor is ready for - reading. - - Subclasses must implement this method if - :py:meth:`Broker.start_receive` is ever called on them, and the method - must call :py:meth:`on_disconect` if reading produces an empty string. - - .. method:: on_transmit (broker) - - Called by :py:class:`Broker` when the stream's :py:attr:`transmit_side` - has been marked writeable using :py:meth:`Broker._start_transmit` and - the broker has detected the associated file descriptor is ready for - writing. - - Subclasses must implement this method if - :py:meth:`Broker._start_transmit` is ever called on them. - - .. method:: on_shutdown (broker) - - Called by :py:meth:`Broker.shutdown` to allow the stream time to - gracefully shutdown. The base implementation simply called - :py:meth:`on_disconnect`. +.. autoclass:: BasicStream + :members: .. autoclass:: Stream :members: - .. method:: pending_bytes () - - Returns the number of bytes queued for transmission on this stream. - This can be used to limit the amount of data buffered in RAM by an - otherwise unlimited consumer. - - For an accurate result, this method should be called from the Broker - thread, using a wrapper like: - - :: - - def get_pending_bytes(self, stream): - latch = mitogen.core.Latch() - self.broker.defer( - lambda: latch.put(stream.pending_bytes()) - ) - return latch.get() - - .. currentmodule:: mitogen.fork - .. autoclass:: Stream :members: .. currentmodule:: mitogen.parent - .. autoclass:: Stream :members: .. currentmodule:: mitogen.ssh - .. autoclass:: Stream :members: .. currentmodule:: mitogen.sudo - .. autoclass:: Stream :members: @@ -212,6 +83,7 @@ Poller Class .. currentmodule:: mitogen.core .. autoclass:: Poller + :members: .. currentmodule:: mitogen.parent .. autoclass:: KqueuePoller @@ -256,64 +128,16 @@ ExternalContext Class ===================== .. currentmodule:: mitogen.core +.. autoclass:: ExternalContext + :members: -.. class:: ExternalContext - - External context implementation. - - .. attribute:: broker - - The :py:class:`mitogen.core.Broker` instance. - - .. attribute:: context - - The :py:class:`mitogen.core.Context` instance. - - .. attribute:: channel - - The :py:class:`mitogen.core.Channel` over which - :py:data:`CALL_FUNCTION` requests are received. - - .. attribute:: stdout_log - - The :py:class:`mitogen.core.IoLogger` connected to ``stdout``. - - .. attribute:: importer - - The :py:class:`mitogen.core.Importer` instance. - - .. attribute:: stdout_log - - The :py:class:`IoLogger` connected to ``stdout``. - - .. attribute:: stderr_log - - The :py:class:`IoLogger` connected to ``stderr``. - - .. method:: _dispatch_calls - - Implementation for the main thread in every child context. mitogen.master ============== -.. currentmodule:: mitogen.master - -.. class:: ProcessMonitor - - Install a :py:data:`signal.SIGCHLD` handler that generates callbacks when a - specific child process has exitted. - - .. method:: add (pid, callback) - - Add a callback function to be notified of the exit status of a process. - - :param int pid: - Process ID to be notified of. - - :param callback: - Function invoked as `callback(status)`, where `status` is the raw - exit status of the child process. +.. currentmodule:: mitogen.parent +.. autoclass:: ProcessMonitor + :members: Blocking I/O Functions diff --git a/mitogen/core.py b/mitogen/core.py index 83880621..5b1d5298 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -100,7 +100,7 @@ CALL_SERVICE = 110 #: * a remote receiver is disconnected or explicitly closed. #: * a related message could not be delivered due to no route existing for it. #: * a router is being torn down, as a sentinel value to notify -#: :py:meth:`mitogen.core.Router.add_handler` callbacks to clean up. +#: :meth:`mitogen.core.Router.add_handler` callbacks to clean up. IS_DEAD = 999 try: @@ -187,7 +187,7 @@ class Error(Exception): class LatchError(Error): - """Raised when an attempt is made to use a :py:class:`mitogen.core.Latch` + """Raised when an attempt is made to use a :class:`mitogen.core.Latch` that has been marked closed.""" pass @@ -239,7 +239,7 @@ class Kwargs(dict): class CallError(Error): """Serializable :class:`Error` subclass raised when - :py:meth:`Context.call() ` fails. A copy of + :meth:`Context.call() ` fails. A copy of the traceback from the external context is appended to the exception message.""" def __init__(self, fmt=None, *args): @@ -872,6 +872,15 @@ class Receiver(object): class Channel(Sender, Receiver): + """ + A channel inherits from :class:`mitogen.core.Sender` and + `mitogen.core.Receiver` to provide bidirectional functionality. + + Since all handles aren't known until after both ends are constructed, for + both ends to communicate through a channel, it is necessary for one end to + retrieve the handle allocated to the other and reconfigure its own channel + to match. Currently this is a manual task. + """ def __init__(self, router, context, dst_handle, handle=None): Sender.__init__(self, context, dst_handle) Receiver.__init__(self, router, handle) @@ -1160,12 +1169,35 @@ class LogHandler(logging.Handler): class Side(object): + """ + Represent a single side of a :class:`BasicStream`. This exists to allow + streams implemented using unidirectional (e.g. UNIX pipe) and bidirectional + (e.g. UNIX socket) file descriptors to operate identically. + + :param mitogen.core.Stream stream: + The stream this side is associated with. + + :param int fd: + Underlying file descriptor. + + :param bool keep_alive: + Value for :attr:`keep_alive` + + During construction, the file descriptor has its :data:`os.O_NONBLOCK` flag + enabled using :func:`fcntl.fcntl`. + """ _fork_refs = weakref.WeakValueDictionary() def __init__(self, stream, fd, cloexec=True, keep_alive=True, blocking=False): + #: The :class:`Stream` for which this is a read or write side. self.stream = stream + #: Integer file descriptor to perform IO on, or :data:`None` if + #: :meth:`close` has been called. self.fd = fd self.closed = False + #: If :data:`True`, causes presence of this side in + #: :class:`Broker`'s active reader set to defer shutdown until the + #: side is disconnected. self.keep_alive = keep_alive self._fork_refs[id(self)] = self if cloexec: @@ -1182,12 +1214,29 @@ class Side(object): side.close() def close(self): + """ + Call :func:`os.close` on :attr:`fd` if it is not :data:`None`, + then set it to :data:`None`. + """ if not self.closed: _vv and IOLOG.debug('%r.close()', self) self.closed = True os.close(self.fd) def read(self, n=CHUNK_SIZE): + """ + Read up to `n` bytes from the file descriptor, wrapping the underlying + :func:`os.read` call with :func:`io_op` to trap common disconnection + conditions. + + :meth:`read` always behaves as if it is reading from a regular UNIX + file; socket, pipe, and TTY disconnection errors are masked and result + in a 0-sized read like a regular file. + + :returns: + Bytes read, or the empty to string to indicate disconnection was + detected. + """ if self.closed: # Refuse to touch the handle after closed, it may have been reused # by another thread. TODO: synchronize read()/write()/close(). @@ -1198,6 +1247,15 @@ class Side(object): return s def write(self, s): + """ + Write as much of the bytes from `s` as possible to the file descriptor, + wrapping the underlying :func:`os.write` call with :func:`io_op` to + trap common disconnection connditions. + + :returns: + Number of bytes written, or :data:`None` if disconnection was + detected. + """ if self.closed or self.fd is None: # Refuse to touch the handle after closed, it may have been reused # by another thread. @@ -1210,10 +1268,52 @@ class Side(object): class BasicStream(object): + #: A :class:`Side` representing the stream's receive file descriptor. receive_side = None + + #: A :class:`Side` representing the stream's transmit file descriptor. transmit_side = None + def on_receive(self, broker): + """ + Called by :class:`Broker` when the stream's :attr:`receive_side` has + been marked readable using :meth:`Broker.start_receive` and the broker + has detected the associated file descriptor is ready for reading. + + Subclasses must implement this if :meth:`Broker.start_receive` is ever + called on them, and the method must call :meth:`on_disconect` if + reading produces an empty string. + """ + pass + + def on_transmit(self, broker): + """ + Called by :class:`Broker` when the stream's :attr:`transmit_side` + has been marked writeable using :meth:`Broker._start_transmit` and + the broker has detected the associated file descriptor is ready for + writing. + + Subclasses must implement this if :meth:`Broker._start_transmit` is + ever called on them. + """ + pass + + def on_shutdown(self, broker): + """ + Called by :meth:`Broker.shutdown` to allow the stream time to + gracefully shutdown. The base implementation simply called + :meth:`on_disconnect`. + """ + _v and LOG.debug('%r.on_shutdown()', self) + fire(self, 'shutdown') + self.on_disconnect(broker) + def on_disconnect(self, broker): + """ + Called by :class:`Broker` to force disconnect the stream. The base + implementation simply closes :attr:`receive_side` and + :attr:`transmit_side` and unregisters the stream from the broker. + """ LOG.debug('%r.on_disconnect()', self) if self.receive_side: broker.stop_receive(self) @@ -1223,19 +1323,14 @@ class BasicStream(object): self.transmit_side.close() fire(self, 'disconnect') - def on_shutdown(self, broker): - _v and LOG.debug('%r.on_shutdown()', self) - fire(self, 'shutdown') - self.on_disconnect(broker) - class Stream(BasicStream): """ - :py:class:`BasicStream` subclass implementing mitogen's :ref:`stream + :class:`BasicStream` subclass implementing mitogen's :ref:`stream protocol `. """ - #: If not :data:`None`, :py:class:`Router` stamps this into - #: :py:attr:`Message.auth_id` of every message received on this stream. + #: If not :data:`None`, :class:`Router` stamps this into + #: :attr:`Message.auth_id` of every message received on this stream. auth_id = None #: If not :data:`False`, indicates the stream has :attr:`auth_id` set and @@ -1272,7 +1367,7 @@ class Stream(BasicStream): def on_receive(self, broker): """Handle the next complete message on the stream. Raise - :py:class:`StreamError` on failure.""" + :class:`StreamError` on failure.""" _vv and IOLOG.debug('%r.on_receive()', self) buf = self.receive_side.read() @@ -1329,6 +1424,14 @@ class Stream(BasicStream): return True def pending_bytes(self): + """ + Return the number of bytes queued for transmission on this stream. This + can be used to limit the amount of data buffered in RAM by an otherwise + unlimited consumer. + + For an accurate result, this method should be called from the Broker + thread, for example by using :meth:`Broker.defer_sync`. + """ return self._output_buf_len def on_transmit(self, broker): @@ -1572,15 +1675,15 @@ class Poller(object): class Latch(object): """ - A latch is a :py:class:`Queue.Queue`-like object that supports mutation and - waiting from multiple threads, however unlike :py:class:`Queue.Queue`, + A latch is a :class:`Queue.Queue`-like object that supports mutation and + waiting from multiple threads, however unlike :class:`Queue.Queue`, waiting threads always remain interruptible, so CTRL+C always succeeds, and waits where a timeout is set experience no wake up latency. These properties are not possible in combination using the built-in threading primitives available in Python 2.x. Latches implement queues using the UNIX self-pipe trick, and a per-thread - :py:func:`socket.socketpair` that is lazily created the first time any + :func:`socket.socketpair` that is lazily created the first time any latch attempts to sleep on a thread, and dynamically associated with the waiting Latch only for duration of the wait. @@ -1626,7 +1729,7 @@ class Latch(object): def close(self): """ Mark the latch as closed, and cause every sleeping thread to be woken, - with :py:class:`mitogen.core.LatchError` raised in each thread. + with :class:`mitogen.core.LatchError` raised in each thread. """ self._lock.acquire() try: @@ -1640,17 +1743,17 @@ class Latch(object): def empty(self): """ - Return :py:data:`True` if calling :py:meth:`get` would block. + Return :data:`True` if calling :meth:`get` would block. - As with :py:class:`Queue.Queue`, :py:data:`True` may be returned even - though a subsequent call to :py:meth:`get` will succeed, since a - message may be posted at any moment between :py:meth:`empty` and - :py:meth:`get`. + As with :class:`Queue.Queue`, :data:`True` may be returned even + though a subsequent call to :meth:`get` will succeed, since a + message may be posted at any moment between :meth:`empty` and + :meth:`get`. - As with :py:class:`Queue.Queue`, :py:data:`False` may be returned even - though a subsequent call to :py:meth:`get` will block, since another - waiting thread may be woken at any moment between :py:meth:`empty` and - :py:meth:`get`. + As with :class:`Queue.Queue`, :data:`False` may be returned even + though a subsequent call to :meth:`get` will block, since another + waiting thread may be woken at any moment between :meth:`empty` and + :meth:`get`. """ return len(self._queue) == 0 @@ -1683,14 +1786,14 @@ class Latch(object): Return the next enqueued object, or sleep waiting for one. :param float timeout: - If not :py:data:`None`, specifies a timeout in seconds. + If not :data:`None`, specifies a timeout in seconds. :param bool block: - If :py:data:`False`, immediately raise - :py:class:`mitogen.core.TimeoutError` if the latch is empty. + If :data:`False`, immediately raise + :class:`mitogen.core.TimeoutError` if the latch is empty. :raises mitogen.core.LatchError: - :py:meth:`close` has been called, and the object is no longer valid. + :meth:`close` has been called, and the object is no longer valid. :raises mitogen.core.TimeoutError: Timeout was reached. @@ -1771,7 +1874,7 @@ class Latch(object): exists. :raises mitogen.core.LatchError: - :py:meth:`close` has been called, and the object is no longer valid. + :meth:`close` has been called, and the object is no longer valid. """ _vv and IOLOG.debug('%r.put(%r)', self, obj) self._lock.acquire() @@ -1807,7 +1910,7 @@ class Latch(object): class Waker(BasicStream): """ - :py:class:`BasicStream` subclass implementing the `UNIX self-pipe trick`_. + :class:`BasicStream` subclass implementing the `UNIX self-pipe trick`_. Used to wake the multiplexer when another thread needs to modify its state (via a cross-thread function call). @@ -1827,8 +1930,8 @@ class Waker(BasicStream): def __repr__(self): return 'Waker(%r rfd=%r, wfd=%r)' % ( self._broker, - self.receive_side.fd, - self.transmit_side.fd, + self.receive_side and self.receive_side.fd, + self.transmit_side and self.transmit_side.fd, ) @property @@ -1844,17 +1947,14 @@ class Waker(BasicStream): def on_receive(self, broker): """ - Drain the pipe and fire callbacks. Reading multiple bytes is safe since - new bytes corresponding to future .defer() calls are written only after - .defer() takes _lock: either a byte we read corresponds to something - already on the queue by the time we take _lock, or a byte remains - buffered, causing another wake up, because it was written after we - released _lock. + Drain the pipe and fire callbacks. Since :attr:`_deferred` is + synchronized, :meth:`defer` and :meth:`on_receive` can conspire to + ensure only one byte needs to be pending regardless of queue length. """ _vv and IOLOG.debug('%r.on_receive()', self) - self.receive_side.read(128) self._lock.acquire() try: + self.receive_side.read(1) deferred = self._deferred self._deferred = [] finally: @@ -1868,6 +1968,18 @@ class Waker(BasicStream): func, args, kwargs) self._broker.shutdown() + def _wake(self): + """ + Wake the multiplexer by writing a byte. If Broker is midway through + teardown, the FD may already be closed, so ignore EBADF. + """ + try: + self.transmit_side.write(b(' ')) + except OSError: + e = sys.exc_info()[1] + if e.args[0] != errno.EBADF: + raise + def defer(self, func, *args, **kwargs): if threading.currentThread().ident == self.broker_ident: _vv and IOLOG.debug('%r.defer() [immediate]', self) @@ -1876,25 +1988,17 @@ class Waker(BasicStream): _vv and IOLOG.debug('%r.defer() [fd=%r]', self, self.transmit_side.fd) self._lock.acquire() try: + if not self._deferred: + self._wake() self._deferred.append((func, args, kwargs)) finally: self._lock.release() - # Wake the multiplexer by writing a byte. If the broker is in the midst - # of tearing itself down, the waker fd may already have been closed, so - # ignore EBADF here. - try: - self.transmit_side.write(b(' ')) - except OSError: - e = sys.exc_info()[1] - if e.args[0] != errno.EBADF: - raise - class IoLogger(BasicStream): """ - :py:class:`BasicStream` subclass that sets up redirection of a standard - UNIX file descriptor back into the Python :py:mod:`logging` package. + :class:`BasicStream` subclass that sets up redirection of a standard + UNIX file descriptor back into the Python :mod:`logging` package. """ _buf = '' @@ -2126,8 +2230,8 @@ class Router(object): return handle def on_shutdown(self, broker): - """Called during :py:meth:`Broker.shutdown`, informs callbacks - registered with :py:meth:`add_handle_cb` the connection is dead.""" + """Called during :meth:`Broker.shutdown`, informs callbacks registered + with :meth:`add_handle_cb` the connection is dead.""" _v and LOG.debug('%r.on_shutdown(%r)', self, broker) fire(self, 'shutdown') for handle, (persist, fn) in self._handle_map.iteritems(): @@ -2249,14 +2353,26 @@ class Router(object): class Broker(object): + """ + Responsible for handling I/O multiplexing in a private thread. + + **Note:** This is the somewhat limited core version of the Broker class + used by child contexts. The master subclass is documented below. + """ poller_class = Poller _waker = None _thread = None + + #: Seconds grace to allow :class:`streams ` to shutdown gracefully + #: before force-disconnecting them during :meth:`shutdown`. shutdown_timeout = 3.0 def __init__(self, poller_class=None): self._alive = True self._waker = Waker(self) + #: Arrange for `func(\*args, \**kwargs)` to be executed on the broker + #: thread, or immediately if the current thread is the broker thread. + #: Safe to call from any thread. self.defer = self._waker.defer self.poller = self.poller_class() self.poller.start_receive( @@ -2272,6 +2388,12 @@ class Broker(object): self._waker.broker_ident = self._thread.ident def start_receive(self, stream): + """ + Mark the :attr:`receive_side ` on `stream` as + ready for reading. Safe to call from any thread. When the associated + file descriptor becomes ready for reading, + :meth:`BasicStream.on_receive` will be called. + """ _vv and IOLOG.debug('%r.start_receive(%r)', self, stream) side = stream.receive_side assert side and side.fd is not None @@ -2279,26 +2401,47 @@ class Broker(object): side.fd, (side, stream.on_receive)) def stop_receive(self, stream): + """ + Mark the :attr:`receive_side ` on `stream` as not + ready for reading. Safe to call from any thread. + """ _vv and IOLOG.debug('%r.stop_receive(%r)', self, stream) self.defer(self.poller.stop_receive, stream.receive_side.fd) def _start_transmit(self, stream): + """ + Mark the :attr:`transmit_side ` on `stream` as + ready for writing. Must only be called from the Broker thread. When the + associated file descriptor becomes ready for writing, + :meth:`BasicStream.on_transmit` will be called. + """ _vv and IOLOG.debug('%r._start_transmit(%r)', self, stream) side = stream.transmit_side assert side and side.fd is not None self.poller.start_transmit(side.fd, (side, stream.on_transmit)) def _stop_transmit(self, stream): + """ + Mark the :attr:`transmit_side ` on `stream` as not + ready for writing. + """ _vv and IOLOG.debug('%r._stop_transmit(%r)', self, stream) self.poller.stop_transmit(stream.transmit_side.fd) def keep_alive(self): + """ + Return :data:`True` if any reader's :attr:`Side.keep_alive` attribute + is :data:`True`, or any :class:`Context` is still registered that is + not the master. Used to delay shutdown while some important work is in + progress (e.g. log draining). + """ it = (side.keep_alive for (_, (side, _)) in self.poller.readers) return sum(it, 0) def defer_sync(self, func): """ - Block the calling thread while `func` runs on a broker thread. + Arrange for `func()` to execute on the broker thread, blocking the + current thread until a result or exception is available. :returns: Return value of `func()`. @@ -2330,40 +2473,61 @@ class Broker(object): for (side, func) in self.poller.poll(timeout): self._call(side.stream, func) + def _broker_exit(self): + for _, (side, _) in self.poller.readers + self.poller.writers: + LOG.error('_broker_main() force disconnecting %r', side) + side.stream.on_disconnect(self) + + self.poller.close() + + def _broker_shutdown(self): + for _, (side, _) in self.poller.readers + self.poller.writers: + self._call(side.stream, side.stream.on_shutdown) + + deadline = time.time() + self.shutdown_timeout + while self.keep_alive() and time.time() < deadline: + self._loop_once(max(0, deadline - time.time())) + + if self.keep_alive(): + LOG.error('%r: some streams did not close gracefully. ' + 'The most likely cause for this is one or ' + 'more child processes still connected to ' + 'our stdout/stderr pipes.', self) + def _broker_main(self): + """ + Handle events until :meth:`shutdown`. On shutdown, invoke + :meth:`Stream.on_shutdown` for every active stream, then allow up to + :attr:`shutdown_timeout` seconds for the streams to unregister + themselves before forcefully calling :meth:`Stream.on_disconnect`. + """ try: while self._alive: self._loop_once() fire(self, 'shutdown') - for _, (side, _) in self.poller.readers + self.poller.writers: - self._call(side.stream, side.stream.on_shutdown) - - deadline = time.time() + self.shutdown_timeout - while self.keep_alive() and time.time() < deadline: - self._loop_once(max(0, deadline - time.time())) - - if self.keep_alive(): - LOG.error('%r: some streams did not close gracefully. ' - 'The most likely cause for this is one or ' - 'more child processes still connected to ' - 'our stdout/stderr pipes.', self) - - for _, (side, _) in self.poller.readers + self.poller.writers: - LOG.error('_broker_main() force disconnecting %r', side) - side.stream.on_disconnect(self) + self._broker_shutdown() except Exception: LOG.exception('_broker_main() crashed') + self._broker_exit() fire(self, 'exit') def shutdown(self): + """ + Request broker gracefully disconnect streams and stop. Safe to call + from any thread. + """ _v and LOG.debug('%r.shutdown()', self) def _shutdown(): self._alive = False self.defer(_shutdown) def join(self): + """ + Wait for the broker to stop, expected to be called after + :meth:`shutdown`. + """ self._thread.join() def __repr__(self): @@ -2435,6 +2599,34 @@ class Dispatcher(object): class ExternalContext(object): + """ + External context implementation. + + .. attribute:: broker + The :class:`mitogen.core.Broker` instance. + + .. attribute:: context + The :class:`mitogen.core.Context` instance. + + .. attribute:: channel + The :class:`mitogen.core.Channel` over which :data:`CALL_FUNCTION` + requests are received. + + .. attribute:: stdout_log + The :class:`mitogen.core.IoLogger` connected to ``stdout``. + + .. attribute:: importer + The :class:`mitogen.core.Importer` instance. + + .. attribute:: stdout_log + The :class:`IoLogger` connected to ``stdout``. + + .. attribute:: stderr_log + The :class:`IoLogger` connected to ``stderr``. + + .. method:: _dispatch_calls + Implementation for the main thread in every child context. + """ detached = False def __init__(self, config): diff --git a/mitogen/doas.py b/mitogen/doas.py index cdcee0b0..09b2be9e 100644 --- a/mitogen/doas.py +++ b/mitogen/doas.py @@ -45,10 +45,6 @@ class Stream(mitogen.parent.Stream): create_child = staticmethod(mitogen.parent.hybrid_tty_create_child) child_is_immediate_subprocess = False - #: Once connected, points to the corresponding DiagLogStream, allowing it - #: to be disconnected at the same time this stream is being torn down. - tty_stream = None - username = 'root' password = None doas_path = 'doas' @@ -75,10 +71,6 @@ class Stream(mitogen.parent.Stream): super(Stream, self).connect() self.name = u'doas.' + mitogen.core.to_text(self.username) - def on_disconnect(self, broker): - self.tty_stream.on_disconnect(broker) - super(Stream, self).on_disconnect(broker) - def get_boot_command(self): bits = [self.doas_path, '-u', self.username, '--'] bits = bits + super(Stream, self).get_boot_command() @@ -88,15 +80,13 @@ class Stream(mitogen.parent.Stream): password_incorrect_msg = 'doas password is incorrect' password_required_msg = 'doas password is required' - def _connect_bootstrap(self, extra_fd): - self.tty_stream = mitogen.parent.DiagLogStream(extra_fd, self) - - password_sent = False + def _connect_bootstrap(self): it = mitogen.parent.iter_read( - fds=[self.receive_side.fd, extra_fd], + fds=[self.receive_side.fd, self.diag_stream.receive_side.fd], deadline=self.connect_deadline, ) + password_sent = False for buf in it: LOG.debug('%r: received %r', self, buf) if buf.endswith(self.EC0_MARKER): @@ -111,7 +101,7 @@ class Stream(mitogen.parent.Stream): if password_sent: raise PasswordError(self.password_incorrect_msg) LOG.debug('sending password') - self.tty_stream.transmit_side.write( + self.diag_stream.transmit_side.write( mitogen.core.to_text(self.password + '\n').encode('utf-8') ) password_sent = True diff --git a/mitogen/fork.py b/mitogen/fork.py index cf769788..3e3a98a9 100644 --- a/mitogen/fork.py +++ b/mitogen/fork.py @@ -188,6 +188,6 @@ class Stream(mitogen.parent.Stream): # Don't trigger atexit handlers, they were copied from the parent. os._exit(0) - def _connect_bootstrap(self, extra_fd): + def _connect_bootstrap(self): # None required. pass diff --git a/mitogen/master.py b/mitogen/master.py index 73302910..65985b4d 100644 --- a/mitogen/master.py +++ b/mitogen/master.py @@ -754,6 +754,26 @@ class ModuleResponder(object): class Broker(mitogen.core.Broker): + """ + .. note:: + + You may construct as many brokers as desired, and use the same broker + for multiple routers, however usually only one broker need exist. + Multiple brokers may be useful when dealing with sets of children with + differing lifetimes. For example, a subscription service where + non-payment results in termination for one customer. + + :param bool install_watcher: + If :data:`True`, an additional thread is started to monitor the + lifetime of the main thread, triggering :meth:`shutdown` + automatically in case the user forgets to call it, or their code + crashed. + + You should not rely on this functionality in your program, it is only + intended as a fail-safe and to simplify the API for new users. In + particular, alternative Python implementations may not be able to + support watching the main thread. + """ shutdown_timeout = 5.0 _watcher = None poller_class = mitogen.parent.PREFERRED_POLLER @@ -773,7 +793,31 @@ class Broker(mitogen.core.Broker): class Router(mitogen.parent.Router): + """ + Extend :class:`mitogen.core.Router` with functionality useful to masters, + and child contexts who later become masters. Currently when this class is + required, the target context's router is upgraded at runtime. + + .. note:: + + You may construct as many routers as desired, and use the same broker + for multiple routers, however usually only one broker and router need + exist. Multiple routers may be useful when dealing with separate trust + domains, for example, manipulating infrastructure belonging to separate + customers or projects. + + :param mitogen.master.Broker broker: + Broker to use. If not specified, a private :class:`Broker` is created. + """ broker_class = Broker + + #: When :data:`True`, cause the broker thread and any subsequent broker and + #: main threads existing in any child to write + #: ``/tmp/mitogen.stats...log`` containing a + #: :mod:`cProfile` dump on graceful exit. Must be set prior to construction + #: of any :class:`Broker`, e.g. via:: + #: + #: mitogen.master.Router.profiling = True profiling = False def __init__(self, broker=None, max_message_size=None): @@ -796,6 +840,10 @@ class Router(mitogen.parent.Router): ) def enable_debug(self): + """ + Cause this context and any descendant child contexts to write debug + logs to ``/tmp/mitogen..log``. + """ mitogen.core.enable_debug_logging() self.debug = True @@ -830,6 +878,12 @@ class IdAllocator(object): BLOCK_SIZE = 1000 def allocate(self): + """ + Arrange for a unique context ID to be allocated and associated with a + route leading to the active context. In masters, the ID is generated + directly, in children it is forwarded to the master via a + :data:`mitogen.core.ALLOCATE_ID` message. + """ self.lock.acquire() try: id_ = self.next_id diff --git a/mitogen/parent.py b/mitogen/parent.py index 0fffdd67..c4e6f621 100644 --- a/mitogen/parent.py +++ b/mitogen/parent.py @@ -211,7 +211,7 @@ def create_socketpair(): return parentfp, childfp -def detach_popen(*args, **kwargs): +def detach_popen(**kwargs): """ Use :class:`subprocess.Popen` to construct a child process, then hack the Popen so that it forgets the child it created, allowing it to survive a @@ -223,6 +223,8 @@ def detach_popen(*args, **kwargs): delivered to this process, causing later 'legitimate' calls to fail with ECHILD. + :param list close_on_error: + Array of integer file descriptors to close on exception. :returns: Process ID of the new child. """ @@ -230,7 +232,7 @@ def detach_popen(*args, **kwargs): # handling, without tying the surrounding code into managing a Popen # object, which isn't possible for at least :mod:`mitogen.fork`. This # should be replaced by a swappable helper class in a future version. - proc = subprocess.Popen(*args, **kwargs) + proc = subprocess.Popen(**kwargs) proc._child_created = False return proc.pid @@ -271,14 +273,23 @@ def create_child(args, merge_stdio=False, stderr_pipe=False, preexec_fn=None): mitogen.core.set_cloexec(stderr_w) extra = {'stderr': stderr_w} - pid = detach_popen( - args=args, - stdin=childfp, - stdout=childfp, - close_fds=True, - preexec_fn=preexec_fn, - **extra - ) + try: + pid = detach_popen( + args=args, + stdin=childfp, + stdout=childfp, + close_fds=True, + preexec_fn=preexec_fn, + **extra + ) + except Exception: + childfp.close() + parentfp.close() + if stderr_pipe: + os.close(stderr_r) + os.close(stderr_w) + raise + if stderr_pipe: os.close(stderr_w) childfp.close() @@ -338,14 +349,19 @@ def tty_create_child(args): disable_echo(master_fd) disable_echo(slave_fd) - pid = detach_popen( - args=args, - stdin=slave_fd, - stdout=slave_fd, - stderr=slave_fd, - preexec_fn=_acquire_controlling_tty, - close_fds=True, - ) + try: + pid = detach_popen( + args=args, + stdin=slave_fd, + stdout=slave_fd, + stderr=slave_fd, + preexec_fn=_acquire_controlling_tty, + close_fds=True, + ) + except Exception: + os.close(master_fd) + os.close(slave_fd) + raise os.close(slave_fd) LOG.debug('tty_create_child() child %d fd %d, parent %d, cmd: %s', @@ -372,14 +388,22 @@ def hybrid_tty_create_child(args): mitogen.core.set_block(childfp) disable_echo(master_fd) disable_echo(slave_fd) - pid = detach_popen( - args=args, - stdin=childfp, - stdout=childfp, - stderr=slave_fd, - preexec_fn=_acquire_controlling_tty, - close_fds=True, - ) + + try: + pid = detach_popen( + args=args, + stdin=childfp, + stdout=childfp, + stderr=slave_fd, + preexec_fn=_acquire_controlling_tty, + close_fds=True, + ) + except Exception: + os.close(master_fd) + os.close(slave_fd) + parentfp.close() + childfp.close() + raise os.close(slave_fd) childfp.close() @@ -915,6 +939,33 @@ class Stream(mitogen.core.Stream): #: ExternalContext.main(). max_message_size = None + #: If :attr:`create_child` supplied a diag_fd, references the corresponding + #: :class:`DiagLogStream`, allowing it to be disconnected when this stream + #: is disconnected. Set to :data:`None` if no `diag_fd` was present. + diag_stream = None + + #: Function with the semantics of :func:`create_child` used to create the + #: child process. + create_child = staticmethod(create_child) + + #: Dictionary of extra kwargs passed to :attr:`create_child`. + create_child_args = {} + + #: :data:`True` if the remote has indicated that it intends to detach, and + #: should not be killed on disconnect. + detached = False + + #: If :data:`True`, indicates the child should not be killed during + #: graceful detachment, as it the actual process implementing the child + #: context. In all other cases, the subprocess is SSH, sudo, or a similar + #: tool that should be reminded to quit during disconnection. + child_is_immediate_subprocess = True + + #: Prefix given to default names generated by :meth:`connect`. + name_prefix = u'local' + + _reaped = False + def __init__(self, *args, **kwargs): super(Stream, self).__init__(*args, **kwargs) self.sent_modules = set(['mitogen', 'mitogen.core']) @@ -952,15 +1003,6 @@ class Stream(mitogen.core.Stream): ) ) - #: If :data:`True`, indicates the subprocess managed by us should not be - #: killed during graceful detachment, as it the actual process implementing - #: the child context. In all other cases, the subprocess is SSH, sudo, or a - #: similar tool that should be reminded to quit during disconnection. - child_is_immediate_subprocess = True - - detached = False - _reaped = False - def _reap_child(self): """ Reap the child process during disconnection. @@ -1000,8 +1042,10 @@ class Stream(mitogen.core.Stream): raise def on_disconnect(self, broker): - self._reap_child() super(Stream, self).on_disconnect(broker) + if self.diag_stream is not None: + self.diag_stream.on_disconnect(broker) + self._reap_child() # Minimised, gzipped, base64'd and passed to 'python -c'. It forks, dups # file descriptor 0 as 100, creates a pipe, then execs a new interpreter @@ -1105,10 +1149,6 @@ class Stream(mitogen.core.Stream): ) return zlib.compress(source.encode('utf-8'), 9) - create_child = staticmethod(create_child) - create_child_args = {} - name_prefix = u'local' - def start_child(self): args = self.get_boot_command() try: @@ -1130,20 +1170,28 @@ class Stream(mitogen.core.Stream): def connect(self): LOG.debug('%r.connect()', self) - self.pid, fd, extra_fd = self.start_child() + self.pid, fd, diag_fd = self.start_child() self.name = u'%s.%s' % (self.name_prefix, self.pid) self.receive_side = mitogen.core.Side(self, fd) self.transmit_side = mitogen.core.Side(self, os.dup(fd)) - LOG.debug('%r.connect(): child process stdin/stdout=%r', - self, self.receive_side.fd) + if diag_fd is not None: + self.diag_stream = DiagLogStream(diag_fd, self) + else: + self.diag_stream = None + + LOG.debug('%r.connect(): stdin=%r, stdout=%r, diag=%r', + self, self.receive_side.fd, self.transmit_side.fd, + self.diag_stream and self.diag_stream.receive_side.fd) try: - self._connect_bootstrap(extra_fd) + self._connect_bootstrap() except EofError: + self.on_disconnect(self._router.broker) e = sys.exc_info()[1] self._adorn_eof_error(e) raise except Exception: + self.on_disconnect(self._router.broker) self._reap_child() raise @@ -1158,8 +1206,10 @@ class Stream(mitogen.core.Stream): write_all(self.transmit_side.fd, self.get_preamble()) discard_until(self.receive_side.fd, self.EC1_MARKER, self.connect_deadline) + if self.diag_stream: + self._router.broker.start_receive(self.diag_stream) - def _connect_bootstrap(self, extra_fd): + def _connect_bootstrap(self): discard_until(self.receive_side.fd, self.EC0_MARKER, self.connect_deadline) self._ec0_received() @@ -1822,6 +1872,10 @@ class Router(mitogen.core.Router): class ProcessMonitor(object): + """ + Install a :data:`signal.SIGCHLD` handler that generates callbacks when a + specific child process has exitted. This class is obsolete, do not use. + """ def __init__(self): # pid -> callback() self.callback_by_pid = {} @@ -1835,6 +1889,16 @@ class ProcessMonitor(object): del self.callback_by_pid[pid] def add(self, pid, callback): + """ + Add a callback function to be notified of the exit status of a process. + + :param int pid: + Process ID to be notified of. + + :param callback: + Function invoked as `callback(status)`, where `status` is the raw + exit status of the child process. + """ self.callback_by_pid[pid] = callback _instance = None diff --git a/mitogen/ssh.py b/mitogen/ssh.py index fba6e8f2..e3891f9c 100644 --- a/mitogen/ssh.py +++ b/mitogen/ssh.py @@ -127,10 +127,6 @@ class Stream(mitogen.parent.Stream): #: Number of -v invocations to pass on command line. ssh_debug_level = 0 - #: If batch_mode=False, points to the corresponding DiagLogStream, allowing - #: it to be disconnected at the same time this stream is being torn down. - tty_stream = None - #: The path to the SSH binary. ssh_path = 'ssh' @@ -195,11 +191,6 @@ class Stream(mitogen.parent.Stream): 'stderr_pipe': True, } - def on_disconnect(self, broker): - if self.tty_stream is not None: - self.tty_stream.on_disconnect(broker) - super(Stream, self).on_disconnect(broker) - def get_boot_command(self): bits = [self.ssh_path] if self.ssh_debug_level: @@ -265,7 +256,7 @@ class Stream(mitogen.parent.Stream): def _host_key_prompt(self): if self.check_host_keys == 'accept': LOG.debug('%r: accepting host key', self) - self.tty_stream.transmit_side.write(b('yes\n')) + self.diag_stream.transmit_side.write(b('yes\n')) return # _host_key_prompt() should never be reached with ignore or enforce @@ -273,16 +264,10 @@ class Stream(mitogen.parent.Stream): # with ours. raise HostKeyError(self.hostkey_config_msg) - def _ec0_received(self): - if self.tty_stream is not None: - self._router.broker.start_receive(self.tty_stream) - return super(Stream, self)._ec0_received() - - def _connect_bootstrap(self, extra_fd): + def _connect_bootstrap(self): fds = [self.receive_side.fd] - if extra_fd is not None: - self.tty_stream = mitogen.parent.DiagLogStream(extra_fd, self) - fds.append(extra_fd) + if self.diag_stream is not None: + fds.append(self.diag_stream.receive_side.fd) it = mitogen.parent.iter_read(fds=fds, deadline=self.connect_deadline) @@ -311,7 +296,7 @@ class Stream(mitogen.parent.Stream): if self.password is None: raise PasswordError(self.password_required_msg) LOG.debug('%r: sending password', self) - self.tty_stream.transmit_side.write( + self.diag_stream.transmit_side.write( (self.password + '\n').encode() ) password_sent = True diff --git a/mitogen/su.py b/mitogen/su.py index 7e2e5f08..9b0172c8 100644 --- a/mitogen/su.py +++ b/mitogen/su.py @@ -80,9 +80,6 @@ class Stream(mitogen.parent.Stream): super(Stream, self).connect() self.name = u'su.' + mitogen.core.to_text(self.username) - def on_disconnect(self, broker): - super(Stream, self).on_disconnect(broker) - def get_boot_command(self): argv = mitogen.parent.Argv(super(Stream, self).get_boot_command()) return [self.su_path, self.username, '-c', str(argv)] @@ -90,7 +87,7 @@ class Stream(mitogen.parent.Stream): password_incorrect_msg = 'su password is incorrect' password_required_msg = 'su password is required' - def _connect_bootstrap(self, extra_fd): + def _connect_bootstrap(self): password_sent = False it = mitogen.parent.iter_read( fds=[self.receive_side.fd], diff --git a/mitogen/sudo.py b/mitogen/sudo.py index 84b81ddc..b2eaabce 100644 --- a/mitogen/sudo.py +++ b/mitogen/sudo.py @@ -150,10 +150,6 @@ class Stream(mitogen.parent.Stream): super(Stream, self).connect() self.name = u'sudo.' + mitogen.core.to_text(self.username) - def on_disconnect(self, broker): - self.tty_stream.on_disconnect(broker) - super(Stream, self).on_disconnect(broker) - def get_boot_command(self): # Note: sudo did not introduce long-format option processing until July # 2013, so even though we parse long-format options, supply short-form @@ -177,12 +173,14 @@ class Stream(mitogen.parent.Stream): password_incorrect_msg = 'sudo password is incorrect' password_required_msg = 'sudo password is required' - def _connect_bootstrap(self, extra_fd): - self.tty_stream = mitogen.parent.DiagLogStream(extra_fd, self) + def _connect_bootstrap(self): + fds = [self.receive_side.fd] + if self.diag_stream is not None: + fds.append(self.diag_stream.receive_side.fd) password_sent = False it = mitogen.parent.iter_read( - fds=[self.receive_side.fd, extra_fd], + fds=fds, deadline=self.connect_deadline, ) diff --git a/mitogen/unix.py b/mitogen/unix.py index 417842bc..12182a28 100644 --- a/mitogen/unix.py +++ b/mitogen/unix.py @@ -49,10 +49,13 @@ from mitogen.core import LOG def is_path_dead(path): s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) try: - s.connect(path) - except socket.error: - e = sys.exc_info()[1] - return e.args[0] in (errno.ECONNREFUSED, errno.ENOENT) + try: + s.connect(path) + except socket.error: + e = sys.exc_info()[1] + return e.args[0] in (errno.ECONNREFUSED, errno.ENOENT) + finally: + s.close() return False @@ -78,6 +81,11 @@ class Listener(mitogen.core.BasicStream): self.receive_side = mitogen.core.Side(self, self._sock.fileno()) router.broker.start_receive(self) + def on_shutdown(self, broker): + broker.stop_receive(self) + self._sock.close() + self.receive_side.closed = True + def _accept_client(self, sock): sock.setblocking(True) try: diff --git a/tests/ansible/integration/async/result_shell_echo_hi.yml b/tests/ansible/integration/async/result_shell_echo_hi.yml index 8858037a..dbf40bde 100644 --- a/tests/ansible/integration/async/result_shell_echo_hi.yml +++ b/tests/ansible/integration/async/result_shell_echo_hi.yml @@ -24,7 +24,7 @@ that: - async_out.changed == True - async_out.cmd == "echo hi" - - 'async_out.delta.startswith("0:00:00")' + - 'async_out.delta.startswith("0:00:")' - async_out.end.startswith("20") - async_out.invocation.module_args._raw_params == "echo hi" - async_out.invocation.module_args._uses_shell == True diff --git a/tests/broker_test.py b/tests/broker_test.py index 7d070e3d..7890b0f3 100644 --- a/tests/broker_test.py +++ b/tests/broker_test.py @@ -1,6 +1,7 @@ import threading +import mock import unittest2 import testlib @@ -8,6 +9,19 @@ import testlib import mitogen.core +class ShutdownTest(testlib.TestCase): + klass = mitogen.core.Broker + + def test_poller_closed(self): + broker = self.klass() + actual_close = broker.poller.close + broker.poller.close = mock.Mock() + broker.shutdown() + broker.join() + self.assertEquals(1, len(broker.poller.close.mock_calls)) + actual_close() + + class DeferSyncTest(testlib.TestCase): klass = mitogen.core.Broker @@ -18,6 +32,7 @@ class DeferSyncTest(testlib.TestCase): self.assertEquals(th, broker._thread) finally: broker.shutdown() + broker.join() def test_exception(self): broker = self.klass() @@ -26,6 +41,7 @@ class DeferSyncTest(testlib.TestCase): broker.defer_sync, lambda: int('dave')) finally: broker.shutdown() + broker.join() if __name__ == '__main__': diff --git a/tests/call_error_test.py b/tests/call_error_test.py index 447a80a9..1480a743 100644 --- a/tests/call_error_test.py +++ b/tests/call_error_test.py @@ -10,7 +10,7 @@ import testlib import plain_old_module -class ConstructorTest(unittest2.TestCase): +class ConstructorTest(testlib.TestCase): klass = mitogen.core.CallError def test_string_noargs(self): @@ -44,7 +44,7 @@ class ConstructorTest(unittest2.TestCase): self.assertTrue('test_from_exc_tb' in e.args[0]) -class PickleTest(unittest2.TestCase): +class PickleTest(testlib.TestCase): klass = mitogen.core.CallError def test_string_noargs(self): diff --git a/tests/data/stubs/stub-doas.py b/tests/data/stubs/stub-doas.py index 08caf044..ca929bc0 100755 --- a/tests/data/stubs/stub-doas.py +++ b/tests/data/stubs/stub-doas.py @@ -2,8 +2,13 @@ import json import os +import subprocess import sys os.environ['ORIGINAL_ARGV'] = json.dumps(sys.argv) os.environ['THIS_IS_STUB_DOAS'] = '1' -os.execv(sys.executable, sys.argv[sys.argv.index('--') + 1:]) + +# This must be a child process and not exec() since Mitogen replaces its stderr +# descriptor, causing the last user of the slave PTY to close it, resulting in +# the master side indicating EIO. +subprocess.check_call(sys.argv[sys.argv.index('--') + 1:]) diff --git a/tests/data/stubs/stub-sudo.py b/tests/data/stubs/stub-sudo.py index ff88cd8e..a7f2704f 100755 --- a/tests/data/stubs/stub-sudo.py +++ b/tests/data/stubs/stub-sudo.py @@ -2,8 +2,13 @@ import json import os +import subprocess import sys os.environ['ORIGINAL_ARGV'] = json.dumps(sys.argv) os.environ['THIS_IS_STUB_SUDO'] = '1' -os.execv(sys.executable, sys.argv[sys.argv.index('--') + 1:]) + +# This must be a child process and not exec() since Mitogen replaces its stderr +# descriptor, causing the last user of the slave PTY to close it, resulting in +# the master side indicating EIO. +subprocess.check_call(sys.argv[sys.argv.index('--') + 1:]) diff --git a/tests/docker_test.py b/tests/docker_test.py index 2d45609a..49c742ee 100644 --- a/tests/docker_test.py +++ b/tests/docker_test.py @@ -7,7 +7,7 @@ import unittest2 import testlib -class ConstructorTest(testlib.RouterMixin, unittest2.TestCase): +class ConstructorTest(testlib.RouterMixin, testlib.TestCase): def test_okay(self): docker_path = testlib.data_path('stubs/stub-docker.py') context = self.router.docker( diff --git a/tests/fakessh_test.py b/tests/fakessh_test.py index c584acfe..63c70058 100644 --- a/tests/fakessh_test.py +++ b/tests/fakessh_test.py @@ -10,7 +10,7 @@ import mitogen.fakessh import testlib -class RsyncTest(testlib.DockerMixin, unittest2.TestCase): +class RsyncTest(testlib.DockerMixin, testlib.TestCase): @timeoutcontext.timeout(5) @unittest2.skip('broken') def test_rsync_from_master(self): diff --git a/tests/fork_test.py b/tests/fork_test.py index 8b396bbf..dd214bd1 100644 --- a/tests/fork_test.py +++ b/tests/fork_test.py @@ -1,4 +1,5 @@ +import _ssl import ctypes import os import random @@ -13,21 +14,29 @@ import testlib import plain_old_module -IS_64BIT = struct.calcsize('P') == 8 -PLATFORM_TO_PATH = { - ('darwin', False): '/usr/lib/libssl.dylib', - ('darwin', True): '/usr/lib/libssl.dylib', - ('linux2', False): '/usr/lib/libssl.so', - ('linux2', True): '/usr/lib/x86_64-linux-gnu/libssl.so', - # Python 2.6 - ('linux3', False): '/usr/lib/libssl.so', - ('linux3', True): '/usr/lib/x86_64-linux-gnu/libssl.so', - # Python 3 - ('linux', False): '/usr/lib/libssl.so', - ('linux', True): '/usr/lib/x86_64-linux-gnu/libssl.so', -} +def _find_ssl_linux(): + s = testlib.subprocess__check_output(['ldd', _ssl.__file__]) + for line in s.splitlines(): + bits = line.split() + if bits[0].startswith('libssl'): + return bits[2] -c_ssl = ctypes.CDLL(PLATFORM_TO_PATH[sys.platform, IS_64BIT]) +def _find_ssl_darwin(): + s = testlib.subprocess__check_output(['otool', '-l', _ssl.__file__]) + for line in s.splitlines(): + bits = line.split() + if bits[0] == 'name' and 'libssl' in bits[1]: + return bits[1] + + +if sys.platform.startswith('linux'): + LIBSSL_PATH = _find_ssl_linux() +elif sys.platform == 'darwin': + LIBSSL_PATH = _find_ssl_darwin() +else: + assert 0, "Don't know how to find libssl on this platform" + +c_ssl = ctypes.CDLL(LIBSSL_PATH) c_ssl.RAND_pseudo_bytes.argtypes = [ctypes.c_char_p, ctypes.c_int] c_ssl.RAND_pseudo_bytes.restype = ctypes.c_int @@ -55,7 +64,7 @@ def exercise_importer(n): return simple_pkg.a.subtract_one_add_two(n) -class ForkTest(testlib.RouterMixin, unittest2.TestCase): +class ForkTest(testlib.RouterMixin, testlib.TestCase): def test_okay(self): context = self.router.fork() self.assertNotEqual(context.call(os.getpid), os.getpid()) @@ -84,7 +93,8 @@ class ForkTest(testlib.RouterMixin, unittest2.TestCase): context = self.router.fork(on_start=on_start) self.assertEquals(123, recv.get().unpickle()) -class DoubleChildTest(testlib.RouterMixin, unittest2.TestCase): + +class DoubleChildTest(testlib.RouterMixin, testlib.TestCase): def test_okay(self): # When forking from the master process, Mitogen had nothing to do with # setting up stdio -- that was inherited wherever the Master is running diff --git a/tests/local_test.py b/tests/local_test.py index fbf5c1c8..5a620e52 100644 --- a/tests/local_test.py +++ b/tests/local_test.py @@ -20,7 +20,7 @@ def get_os_environ(): return dict(os.environ) -class LocalTest(testlib.RouterMixin, unittest2.TestCase): +class LocalTest(testlib.RouterMixin, testlib.TestCase): stream_class = mitogen.ssh.Stream def test_stream_name(self): @@ -29,7 +29,7 @@ class LocalTest(testlib.RouterMixin, unittest2.TestCase): self.assertEquals('local.%d' % (pid,), context.name) -class PythonPathTest(testlib.RouterMixin, unittest2.TestCase): +class PythonPathTest(testlib.RouterMixin, testlib.TestCase): stream_class = mitogen.ssh.Stream def test_inherited(self): diff --git a/tests/master_test.py b/tests/master_test.py index 19a9b414..31d11013 100644 --- a/tests/master_test.py +++ b/tests/master_test.py @@ -6,7 +6,7 @@ import testlib import mitogen.master -class ScanCodeImportsTest(unittest2.TestCase): +class ScanCodeImportsTest(testlib.TestCase): func = staticmethod(mitogen.master.scan_code_imports) if mitogen.core.PY3: diff --git a/tests/minify_test.py b/tests/minify_test.py index 98307059..d1161c90 100644 --- a/tests/minify_test.py +++ b/tests/minify_test.py @@ -16,7 +16,7 @@ def read_sample(fname): return sample -class MinimizeSourceTest(unittest2.TestCase): +class MinimizeSourceTest(testlib.TestCase): func = staticmethod(mitogen.minify.minimize_source) def test_class(self): @@ -55,7 +55,7 @@ class MinimizeSourceTest(unittest2.TestCase): self.assertEqual(expected, self.func(original)) -class MitogenCoreTest(unittest2.TestCase): +class MitogenCoreTest(testlib.TestCase): # Verify minimize_source() succeeds for all built-in modules. func = staticmethod(mitogen.minify.minimize_source) @@ -95,7 +95,11 @@ class MitogenCoreTest(unittest2.TestCase): def test_minify_all(self): for name in glob.glob('mitogen/*.py'): original = self.read_source(name) - minified = self.func(original) + try: + minified = self.func(original) + except Exception: + print('file was: ' + name) + raise self._test_syntax_valid(minified, name) self._test_line_counts_match(original, minified) diff --git a/tests/parent_test.py b/tests/parent_test.py index 9d540ccc..e83d6f1a 100644 --- a/tests/parent_test.py +++ b/tests/parent_test.py @@ -153,7 +153,7 @@ class StreamErrorTest(testlib.RouterMixin, testlib.TestCase): self.assertTrue(s in e.args[0]) -class ContextTest(testlib.RouterMixin, unittest2.TestCase): +class ContextTest(testlib.RouterMixin, testlib.TestCase): def test_context_shutdown(self): local = self.router.local() pid = local.call(os.getpid) @@ -181,7 +181,7 @@ class OpenPtyTest(testlib.TestCase): self.assertEquals(e.args[0], msg) -class TtyCreateChildTest(unittest2.TestCase): +class TtyCreateChildTest(testlib.TestCase): func = staticmethod(mitogen.parent.tty_create_child) def test_dev_tty_open_succeeds(self): @@ -207,11 +207,12 @@ class TtyCreateChildTest(unittest2.TestCase): self.assertEquals(pid, waited_pid) self.assertEquals(0, status) self.assertEquals(mitogen.core.b(''), tf.read()) + os.close(fd) finally: tf.close() -class IterReadTest(unittest2.TestCase): +class IterReadTest(testlib.TestCase): func = staticmethod(mitogen.parent.iter_read) def make_proc(self): @@ -230,6 +231,7 @@ class IterReadTest(unittest2.TestCase): break finally: proc.terminate() + proc.stdout.close() def test_deadline_exceeded_before_call(self): proc = self.make_proc() @@ -244,6 +246,7 @@ class IterReadTest(unittest2.TestCase): self.assertEqual(len(got), 0) finally: proc.terminate() + proc.stdout.close() def test_deadline_exceeded_during_call(self): proc = self.make_proc() @@ -261,9 +264,10 @@ class IterReadTest(unittest2.TestCase): self.assertLess(len(got), 5) finally: proc.terminate() + proc.stdout.close() -class WriteAllTest(unittest2.TestCase): +class WriteAllTest(testlib.TestCase): func = staticmethod(mitogen.parent.write_all) def make_proc(self): @@ -280,6 +284,7 @@ class WriteAllTest(unittest2.TestCase): self.func(proc.stdin.fileno(), self.ten_ms_chunk) finally: proc.terminate() + proc.stdin.close() def test_deadline_exceeded_before_call(self): proc = self.make_proc() @@ -289,6 +294,7 @@ class WriteAllTest(unittest2.TestCase): )) finally: proc.terminate() + proc.stdin.close() def test_deadline_exceeded_during_call(self): proc = self.make_proc() @@ -301,6 +307,7 @@ class WriteAllTest(unittest2.TestCase): )) finally: proc.terminate() + proc.stdin.close() class DisconnectTest(testlib.RouterMixin, testlib.TestCase): diff --git a/tests/responder_test.py b/tests/responder_test.py index 46400fce..888302c0 100644 --- a/tests/responder_test.py +++ b/tests/responder_test.py @@ -13,7 +13,7 @@ import plain_old_module import simple_pkg.a -class NeutralizeMainTest(testlib.RouterMixin, unittest2.TestCase): +class NeutralizeMainTest(testlib.RouterMixin, testlib.TestCase): klass = mitogen.master.ModuleResponder def call(self, *args, **kwargs): @@ -67,7 +67,7 @@ class NeutralizeMainTest(testlib.RouterMixin, unittest2.TestCase): -class GoodModulesTest(testlib.RouterMixin, unittest2.TestCase): +class GoodModulesTest(testlib.RouterMixin, testlib.TestCase): def test_plain_old_module(self): # The simplest case: a top-level module with no interesting imports or # package machinery damage. @@ -89,7 +89,7 @@ class GoodModulesTest(testlib.RouterMixin, unittest2.TestCase): self.assertEquals(output, "['__main__', 50]\n") -class BrokenModulesTest(unittest2.TestCase): +class BrokenModulesTest(testlib.TestCase): def test_obviously_missing(self): # Ensure we don't crash in the case of a module legitimately being # unavailable. Should never happen in the real world. @@ -144,7 +144,7 @@ class BrokenModulesTest(unittest2.TestCase): self.assertIsInstance(msg.unpickle(), tuple) -class BlacklistTest(unittest2.TestCase): +class BlacklistTest(testlib.TestCase): @unittest2.skip('implement me') def test_whitelist_no_blacklist(self): assert 0 diff --git a/tests/router_test.py b/tests/router_test.py index 7b7e2896..b0add6d3 100644 --- a/tests/router_test.py +++ b/tests/router_test.py @@ -36,7 +36,7 @@ def send_n_sized_reply(sender, n): return 123 -class SourceVerifyTest(testlib.RouterMixin, unittest2.TestCase): +class SourceVerifyTest(testlib.RouterMixin, testlib.TestCase): def setUp(self): super(SourceVerifyTest, self).setUp() # Create some children, ping them, and store what their messages look @@ -149,7 +149,7 @@ class PolicyTest(testlib.RouterMixin, testlib.TestCase): self.assertEquals(e.args[0], self.router.refused_msg) -class CrashTest(testlib.BrokerMixin, unittest2.TestCase): +class CrashTest(testlib.BrokerMixin, testlib.TestCase): # This is testing both Broker's ability to crash nicely, and Router's # ability to respond to the crash event. klass = mitogen.master.Router @@ -178,8 +178,7 @@ class CrashTest(testlib.BrokerMixin, unittest2.TestCase): self.assertTrue(expect in log.stop()) - -class AddHandlerTest(unittest2.TestCase): +class AddHandlerTest(testlib.TestCase): klass = mitogen.master.Router def test_invoked_at_shutdown(self): diff --git a/tests/serialization_test.py b/tests/serialization_test.py index f108ff37..d8c54c59 100644 --- a/tests/serialization_test.py +++ b/tests/serialization_test.py @@ -20,7 +20,7 @@ def roundtrip(v): return mitogen.core.Message(data=msg.data).unpickle() -class BlobTest(unittest2.TestCase): +class BlobTest(testlib.TestCase): klass = mitogen.core.Blob # Python 3 pickle protocol 2 does weird stuff depending on whether an empty @@ -36,7 +36,7 @@ class BlobTest(unittest2.TestCase): self.assertEquals(b(''), roundtrip(v)) -class ContextTest(testlib.RouterMixin, unittest2.TestCase): +class ContextTest(testlib.RouterMixin, testlib.TestCase): klass = mitogen.core.Context # Ensure Context can be round-tripped by regular pickle in addition to diff --git a/tests/ssh_test.py b/tests/ssh_test.py index 36359a66..661ff5ed 100644 --- a/tests/ssh_test.py +++ b/tests/ssh_test.py @@ -29,7 +29,7 @@ class StubSshMixin(testlib.RouterMixin): del os.environ['STUBSSH_MODE'] -class ConstructorTest(testlib.RouterMixin, unittest2.TestCase): +class ConstructorTest(testlib.RouterMixin, testlib.TestCase): def test_okay(self): context = self.router.ssh( hostname='hostname', @@ -165,7 +165,7 @@ class SshTest(testlib.DockerMixin, testlib.TestCase): fp.close() -class BannerTest(testlib.DockerMixin, unittest2.TestCase): +class BannerTest(testlib.DockerMixin, testlib.TestCase): # Verify the ability to disambiguate random spam appearing in the SSHd's # login banner from a legitimate password prompt. stream_class = mitogen.ssh.Stream diff --git a/tests/testlib.py b/tests/testlib.py index 8f11337d..2f3c2b2e 100644 --- a/tests/testlib.py +++ b/tests/testlib.py @@ -6,11 +6,14 @@ import re import socket import subprocess import sys +import threading import time +import traceback import unittest2 import mitogen.core +import mitogen.fork import mitogen.master import mitogen.utils @@ -41,6 +44,14 @@ if faulthandler is not None: faulthandler.enable() +def get_fd_count(): + """ + Return the number of FDs open by this process. + """ + import psutil + return psutil.Process().num_fds() + + def data_path(suffix): path = os.path.join(DATA_DIR, suffix) if path.endswith('.key'): @@ -166,6 +177,53 @@ def sync_with_broker(broker, timeout=10.0): sem.get(timeout=10.0) +def log_fd_calls(): + mypid = os.getpid() + l = threading.Lock() + real_pipe = os.pipe + def pipe(): + with l: + rv = real_pipe() + if mypid == os.getpid(): + sys.stdout.write('\n%s\n' % (rv,)) + traceback.print_stack(limit=3) + sys.stdout.write('\n') + return rv + os.pipe = pipe + + real_socketpair = socket.socketpair + def socketpair(*args): + with l: + rv = real_socketpair(*args) + if mypid == os.getpid(): + sys.stdout.write('\n%s -> %s\n' % (args, rv)) + traceback.print_stack(limit=3) + sys.stdout.write('\n') + return rv + socket.socketpair = socketpair + + real_dup2 = os.dup2 + def dup2(*args): + with l: + real_dup2(*args) + if mypid == os.getpid(): + sys.stdout.write('\n%s\n' % (args,)) + traceback.print_stack(limit=3) + sys.stdout.write('\n') + os.dup2 = dup2 + + real_dup = os.dup + def dup(*args): + with l: + rv = real_dup(*args) + if mypid == os.getpid(): + sys.stdout.write('\n%s -> %s\n' % (args, rv)) + traceback.print_stack(limit=3) + sys.stdout.write('\n') + return rv + os.dup = dup + + class CaptureStreamHandler(logging.StreamHandler): def __init__(self, *args, **kwargs): logging.StreamHandler.__init__(self, *args, **kwargs) @@ -211,6 +269,46 @@ class LogCapturer(object): class TestCase(unittest2.TestCase): + @classmethod + def setUpClass(cls): + # This is done in setUpClass() so we have a chance to run before any + # Broker() instantiations in setUp() etc. + mitogen.fork.on_fork() + cls._fd_count_before = get_fd_count() + super(TestCase, cls).setUpClass() + + ALLOWED_THREADS = set([ + 'MainThread', + 'mitogen.master.join_thread_async' + ]) + + @classmethod + def _teardown_check_threads(cls): + counts = {} + for thread in threading.enumerate(): + assert thread.name in cls.ALLOWED_THREADS, \ + 'Found thread %r still running after tests.' % (thread.name,) + counts[thread.name] = counts.get(thread.name, 0) + 1 + + for name in counts: + assert counts[name] == 1, \ + 'Found %d copies of thread %r running after tests.' % (name,) + + @classmethod + def _teardown_check_fds(cls): + mitogen.core.Latch._on_fork() + if get_fd_count() != cls._fd_count_before: + import os; os.system('lsof -p %s' % (os.getpid(),)) + assert 0, "%s leaked FDs. Count before: %s, after: %s" % ( + cls, cls._fd_count_before, get_fd_count(), + ) + + @classmethod + def tearDownClass(cls): + super(TestCase, cls).tearDownClass() + cls._teardown_check_threads() + cls._teardown_check_fds() + def assertRaises(self, exc, func, *args, **kwargs): """Like regular assertRaises, except return the exception that was raised. Can't use context manager because tests must run on Python2.4""" diff --git a/tests/types_test.py b/tests/types_test.py index 4f80e076..f929c098 100644 --- a/tests/types_test.py +++ b/tests/types_test.py @@ -11,8 +11,10 @@ import unittest2 import mitogen.core from mitogen.core import b +import testlib -class BlobTest(unittest2.TestCase): + +class BlobTest(testlib.TestCase): klass = mitogen.core.Blob def make(self): @@ -43,7 +45,7 @@ class BlobTest(unittest2.TestCase): mitogen.core.BytesType(blob2)) -class SecretTest(unittest2.TestCase): +class SecretTest(testlib.TestCase): klass = mitogen.core.Secret def make(self): diff --git a/tests/unix_test.py b/tests/unix_test.py index 67265c81..ee9499ba 100644 --- a/tests/unix_test.py +++ b/tests/unix_test.py @@ -30,7 +30,7 @@ class MyService(mitogen.service.Service): } -class IsPathDeadTest(unittest2.TestCase): +class IsPathDeadTest(testlib.TestCase): func = staticmethod(mitogen.unix.is_path_dead) path = '/tmp/stale-socket' @@ -57,7 +57,7 @@ class IsPathDeadTest(unittest2.TestCase): os.unlink(self.path) -class ListenerTest(testlib.RouterMixin, unittest2.TestCase): +class ListenerTest(testlib.RouterMixin, testlib.TestCase): klass = mitogen.unix.Listener def test_constructor_basic(self): @@ -66,7 +66,7 @@ class ListenerTest(testlib.RouterMixin, unittest2.TestCase): os.unlink(listener.path) -class ClientTest(unittest2.TestCase): +class ClientTest(testlib.TestCase): klass = mitogen.unix.Listener def _try_connect(self, path): @@ -87,6 +87,8 @@ class ClientTest(unittest2.TestCase): resp = context.call_service(service_name=MyService, method_name='ping') self.assertEquals(mitogen.context_id, resp['src_id']) self.assertEquals(0, resp['auth_id']) + router.broker.shutdown() + router.broker.join() def _test_simple_server(self, path): router = mitogen.master.Router() @@ -102,7 +104,9 @@ class ClientTest(unittest2.TestCase): time.sleep(0.1) finally: pool.shutdown() + pool.join() router.broker.shutdown() + router.broker.join() finally: os._exit(0) diff --git a/tests/utils_test.py b/tests/utils_test.py index b2e0aa9e..5b81289e 100644 --- a/tests/utils_test.py +++ b/tests/utils_test.py @@ -6,6 +6,8 @@ import mitogen.core import mitogen.master import mitogen.utils +import testlib + def func0(router): return router @@ -16,7 +18,7 @@ def func(router): return router -class RunWithRouterTest(unittest2.TestCase): +class RunWithRouterTest(testlib.TestCase): # test_shutdown_on_exception # test_shutdown_on_success @@ -26,7 +28,7 @@ class RunWithRouterTest(unittest2.TestCase): self.assertFalse(router.broker._thread.isAlive()) -class WithRouterTest(unittest2.TestCase): +class WithRouterTest(testlib.TestCase): def test_with_broker(self): router = func() self.assertIsInstance(router, mitogen.master.Router) @@ -40,7 +42,7 @@ class Unicode(mitogen.core.UnicodeType): pass class Bytes(mitogen.core.BytesType): pass -class CastTest(unittest2.TestCase): +class CastTest(testlib.TestCase): def test_dict(self): self.assertEqual(type(mitogen.utils.cast({})), dict) self.assertEqual(type(mitogen.utils.cast(Dict())), dict)