Merge remote-tracking branch 'origin/osx-ci-fixes'

* origin/osx-ci-fixes:
  issue #573: guard against a forked top-level Ansible process
  [linear2] simplify ClassicWorkerModel and fix repeat initialization
  issue #549 / [stream-refactor]: fix close/poller deregister crash on OSX
  issue #549: skip Docker tests if Docker is unavailable
  issue #549: remove Linux-specific assumptions from create_child_test
  issue #549: fix setrlimit() crash and hard-wire OS X default
This commit is contained in:
David Wilson 2019-08-03 18:58:53 +01:00
commit 7ca2d00fca
4 changed files with 146 additions and 87 deletions

View File

@ -246,11 +246,22 @@ def increase_open_file_limit():
limit is much higher.
"""
soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE)
if soft < hard:
LOG.debug('raising soft open file limit from %d to %d', soft, hard)
resource.setrlimit(resource.RLIMIT_NOFILE, (hard, hard))
else:
LOG.debug('cannot increase open file limit; existing limit is %d', hard)
LOG.debug('inherited open file limits: soft=%d hard=%d', soft, hard)
if soft >= hard:
LOG.debug('max open files already set to hard limit: %d', hard)
return
# OS X is limited by kern.maxfilesperproc sysctl, rather than the
# advertised unlimited hard RLIMIT_NOFILE. Just hard-wire known defaults
# for that sysctl, to avoid the mess of querying it.
for value in (hard, 10240):
try:
resource.setrlimit(resource.RLIMIT_NOFILE, (value, hard))
LOG.debug('raised soft open file limit from %d to %d', soft, value)
break
except ValueError as e:
LOG.debug('could not raise soft open file limit from %d to %d: %s',
soft, value, e)
def common_setup(enable_affinity=True, _init_logging=True):
@ -388,6 +399,18 @@ class ClassicBinding(Binding):
class ClassicWorkerModel(WorkerModel):
#: In the top-level process, this references one end of a socketpair(),
#: whose other end child MuxProcesses block reading from to determine when
#: the master process dies. When the top-level exits abnormally, or
#: normally but where :func:`_on_process_exit` has been called, this socket
#: will be closed, causing all the children to wake.
parent_sock = None
#: In the mux process, this is the other end of :attr:`cls_parent_sock`.
#: The main thread blocks on a read from it until :attr:`cls_parent_sock`
#: is closed.
child_sock = None
#: mitogen.master.Router for this worker.
router = None
@ -403,8 +426,40 @@ class ClassicWorkerModel(WorkerModel):
parent = None
def __init__(self, _init_logging=True):
self._init_logging = _init_logging
self.initialized = False
"""
Arrange for classic model multiplexers to be started, if they are not
already running.
The parent process picks a UNIX socket path each child will use prior
to fork, creates a socketpair used essentially as a semaphore, then
blocks waiting for the child to indicate the UNIX socket is ready for
use.
:param bool _init_logging:
For testing, if :data:`False`, don't initialize logging.
"""
# #573: The process ID that installed the :mod:`atexit` handler. If
# some unknown Ansible plug-in forks the Ansible top-level process and
# later performs a graceful Python exit, it may try to wait for child
# PIDs it never owned, causing a crash. We want to avoid that.
self._pid = os.getpid()
common_setup(_init_logging=_init_logging)
self.parent_sock, self.child_sock = socket.socketpair()
mitogen.core.set_cloexec(self.parent_sock.fileno())
mitogen.core.set_cloexec(self.child_sock.fileno())
self._muxes = [
MuxProcess(self, index)
for index in range(get_cpu_count(default=1))
]
for mux in self._muxes:
mux.start()
atexit.register(self._on_process_exit)
self.child_sock.close()
self.child_sock = None
def _listener_for_name(self, name):
"""
@ -438,7 +493,7 @@ class ClassicWorkerModel(WorkerModel):
self.listener_path = path
def on_process_exit(self, sock):
def _on_process_exit(self):
"""
This is an :mod:`atexit` handler installed in the top-level process.
@ -452,15 +507,18 @@ class ClassicWorkerModel(WorkerModel):
MuxProcess, debug logs may appear on the user's terminal *after* the
prompt has been printed.
"""
try:
sock.shutdown(socket.SHUT_WR)
except socket.error:
# Already closed. This is possible when tests are running.
LOG.debug('on_process_exit: ignoring duplicate call')
if self._pid != os.getpid():
return
mitogen.core.io_op(sock.recv, 1)
sock.close()
try:
self.parent_sock.shutdown(socket.SHUT_WR)
except socket.error:
# Already closed. This is possible when tests are running.
LOG.debug('_on_process_exit: ignoring duplicate call')
return
mitogen.core.io_op(self.parent_sock.recv, 1)
self.parent_sock.close()
for mux in self._muxes:
_, status = os.waitpid(mux.pid, 0)
@ -468,46 +526,15 @@ class ClassicWorkerModel(WorkerModel):
LOG.debug('mux %d PID %d %s', mux.index, mux.pid,
mitogen.parent.returncode_to_str(status))
def _initialize(self):
"""
Arrange for classic model multiplexers to be started, if they are not
already running.
The parent process picks a UNIX socket path each child will use prior
to fork, creates a socketpair used essentially as a semaphore, then
blocks waiting for the child to indicate the UNIX socket is ready for
use.
:param bool _init_logging:
For testing, if :data:`False`, don't initialize logging.
"""
common_setup(_init_logging=self._init_logging)
MuxProcess.cls_parent_sock, \
MuxProcess.cls_child_sock = socket.socketpair()
mitogen.core.set_cloexec(MuxProcess.cls_parent_sock.fileno())
mitogen.core.set_cloexec(MuxProcess.cls_child_sock.fileno())
self._muxes = [
MuxProcess(index)
for index in range(get_cpu_count(default=1))
]
for mux in self._muxes:
mux.start()
atexit.register(self.on_process_exit, MuxProcess.cls_parent_sock)
MuxProcess.cls_child_sock.close()
MuxProcess.cls_child_sock = None
def _test_reset(self):
"""
Used to clean up in unit tests.
"""
# TODO: split this up a bit.
global _classic_worker_model
assert MuxProcess.cls_parent_sock is not None
MuxProcess.cls_parent_sock.close()
MuxProcess.cls_parent_sock = None
assert self.parent_sock is not None
self.parent_sock.close()
self.parent_sock = None
self.listener_path = None
self.router = None
self.parent = None
@ -525,9 +552,6 @@ class ClassicWorkerModel(WorkerModel):
"""
See WorkerModel.on_strategy_start().
"""
if not self.initialized:
self._initialize()
self.initialized = True
def on_strategy_complete(self):
"""
@ -556,7 +580,6 @@ class ClassicWorkerModel(WorkerModel):
self.router = None
self.broker = None
self.listener_path = None
self.initialized = False
# #420: Ansible executes "meta" actions in the top-level process,
# meaning "reset_connection" will cause :class:`mitogen.core.Latch` FDs
@ -587,25 +610,16 @@ class MuxProcess(object):
See https://bugs.python.org/issue6721 for a thorough description of the
class of problems this worker is intended to avoid.
"""
#: In the top-level process, this references one end of a socketpair(),
#: whose other end child MuxProcesses block reading from to determine when
#: the master process dies. When the top-level exits abnormally, or
#: normally but where :func:`on_process_exit` has been called, this socket
#: will be closed, causing all the children to wake.
cls_parent_sock = None
#: In the mux process, this is the other end of :attr:`cls_parent_sock`.
#: The main thread blocks on a read from it until :attr:`cls_parent_sock`
#: is closed.
cls_child_sock = None
#: A copy of :data:`os.environ` at the time the multiplexer process was
#: started. It's used by mitogen_local.py to find changes made to the
#: top-level environment (e.g. vars plugins -- issue #297) that must be
#: applied to locally executed commands and modules.
cls_original_env = None
def __init__(self, index):
def __init__(self, model, index):
#: :class:`ClassicWorkerModel` instance we were created by.
self.model = model
#: MuxProcess CPU index.
self.index = index
#: Individual path of this process.
self.path = mitogen.unix.make_socket_path()
@ -614,7 +628,7 @@ class MuxProcess(object):
self.pid = os.fork()
if self.pid:
# Wait for child to boot before continuing.
mitogen.core.io_op(MuxProcess.cls_parent_sock.recv, 1)
mitogen.core.io_op(self.model.parent_sock.recv, 1)
return
ansible_mitogen.logging.set_process_name('mux:' + str(self.index))
@ -624,8 +638,8 @@ class MuxProcess(object):
os.path.basename(self.path),
))
MuxProcess.cls_parent_sock.close()
MuxProcess.cls_parent_sock = None
self.model.parent_sock.close()
self.model.parent_sock = None
try:
try:
self.worker_main()
@ -649,9 +663,9 @@ class MuxProcess(object):
try:
# Let the parent know our listening socket is ready.
mitogen.core.io_op(self.cls_child_sock.send, b('1'))
mitogen.core.io_op(self.model.child_sock.send, b('1'))
# Block until the socket is closed, which happens on parent exit.
mitogen.core.io_op(self.cls_child_sock.recv, 1)
mitogen.core.io_op(self.model.child_sock.recv, 1)
finally:
self.broker.shutdown()
self.broker.join()

View File

@ -1628,12 +1628,17 @@ class Protocol(object):
self.stream.on_disconnect(broker)
def on_disconnect(self, broker):
# Normally both sides an FD, so it is important that tranmit_side is
# deregistered from Poller before closing the receive side, as pollers
# like epoll and kqueue unregister all events on FD close, causing
# subsequent attempt to unregister the transmit side to fail.
LOG.debug('%r: disconnecting', self)
if self.stream.receive_side:
broker.stop_receive(self.stream)
self.stream.receive_side.close()
broker.stop_receive(self.stream)
if self.stream.transmit_side:
broker._stop_transmit(self.stream)
self.stream.receive_side.close()
if self.stream.transmit_side:
self.stream.transmit_side.close()

View File

@ -15,7 +15,46 @@ from mitogen.core import b
import testlib
def _osx_mode(n):
"""
fstat(2) on UNIX sockets on OSX return different mode bits depending on
which side is being inspected, so zero those bits for comparison.
"""
if sys.platform == 'darwin':
n &= ~int('0777', 8)
return n
def run_fd_check(func, fd, mode, on_start=None):
"""
Run ``tests/data/fd_check.py`` using `func`. The subprocess writes
information about the `fd` it received to a temporary file.
:param func:
Function like `create_child()` used to start child.
:param fd:
FD child should read/write from, and report information about.
:param mode:
"read" or "write", depending on whether the FD is readable or writeable
from the perspective of the child. If "read", `on_start()` should write
"TEST" to it and the child reads "TEST" from it, otherwise `on_start()`
should read "TEST" from it and the child writes "TEST" to it.
:param on_start:
Function invoked as `on_start(proc)`
:returns:
Tuple of `(proc, info, on_start_result)`, where:
* `proc`: the :class:`mitogen.parent.Process` returned by `func`.
* `info`: dict containing information returned by the child:
* `buf`: "TEST" that was read in "read" mode
* `flags`: :attr:`fcntl.F_GETFL` flags for `fd`
* `st_mode`: st_mode field from :func:`os.fstat`
* `st_dev`: st_dev field from :func:`os.fstat`
* `st_ino`: st_ino field from :func:`os.fstat`
* `ttyname`: :func:`os.ttyname` invoked on `fd`.
* `controlling_tty`: :func:os.ttyname` invoked on ``/dev/tty``
from within the child.
"""
tf = tempfile.NamedTemporaryFile()
args = [
sys.executable,
@ -61,7 +100,7 @@ class StdinSockMixin(object):
st = os.fstat(proc.stdin.fileno())
self.assertTrue(stat.S_ISSOCK(st.st_mode))
self.assertEquals(st.st_dev, info['st_dev'])
self.assertEquals(st.st_mode, info['st_mode'])
self.assertEquals(st.st_mode, _osx_mode(info['st_mode']))
flags = fcntl.fcntl(proc.stdin.fileno(), fcntl.F_GETFL)
self.assertTrue(flags & os.O_RDWR)
self.assertTrue(info['buf'], 'TEST')
@ -75,7 +114,7 @@ class StdoutSockMixin(object):
st = os.fstat(proc.stdout.fileno())
self.assertTrue(stat.S_ISSOCK(st.st_mode))
self.assertEquals(st.st_dev, info['st_dev'])
self.assertEquals(st.st_mode, info['st_mode'])
self.assertEquals(st.st_mode, _osx_mode(info['st_mode']))
flags = fcntl.fcntl(proc.stdout.fileno(), fcntl.F_GETFL)
self.assertTrue(flags & os.O_RDWR)
self.assertTrue(buf, 'TEST')
@ -105,8 +144,6 @@ class CreateChildMergedTest(StdinSockMixin, StdoutSockMixin,
self.assertEquals(None, proc.stderr)
st = os.fstat(proc.stdout.fileno())
self.assertTrue(stat.S_ISSOCK(st.st_mode))
self.assertEquals(st.st_dev, info['st_dev'])
self.assertEquals(st.st_mode, info['st_mode'])
flags = fcntl.fcntl(proc.stdout.fileno(), fcntl.F_GETFL)
self.assertTrue(flags & os.O_RDWR)
self.assertTrue(buf, 'TEST')
@ -145,13 +182,11 @@ class TtyCreateChildTest(testlib.TestCase):
self.assertTrue(isinstance(info['ttyname'],
mitogen.core.UnicodeType))
os.ttyname(proc.stdin.fileno()) # crashes if not TTY
self.assertTrue(os.isatty(proc.stdin.fileno()))
flags = fcntl.fcntl(proc.stdin.fileno(), fcntl.F_GETFL)
self.assertTrue(flags & os.O_RDWR)
self.assertTrue(info['flags'] & os.O_RDWR)
self.assertNotEquals(st.st_dev, info['st_dev'])
self.assertTrue(info['buf'], 'TEST')
def test_stdout(self):
@ -164,17 +199,18 @@ class TtyCreateChildTest(testlib.TestCase):
self.assertTrue(isinstance(info['ttyname'],
mitogen.core.UnicodeType))
os.ttyname(proc.stdout.fileno()) # crashes if wrong
self.assertTrue(os.isatty(proc.stdout.fileno()))
flags = fcntl.fcntl(proc.stdout.fileno(), fcntl.F_GETFL)
self.assertTrue(flags & os.O_RDWR)
self.assertTrue(info['flags'] & os.O_RDWR)
self.assertNotEquals(st.st_dev, info['st_dev'])
self.assertTrue(flags & os.O_RDWR)
self.assertTrue(buf, 'TEST')
def test_stderr(self):
# proc.stderr is None in the parent since there is no separate stderr
# stream. In the child, FD 2/stderr is connected to the TTY.
proc, info, buf = run_fd_check(self.func, 2, 'write',
lambda proc: wait_read(proc.stdout, 4))
@ -184,13 +220,12 @@ class TtyCreateChildTest(testlib.TestCase):
self.assertTrue(isinstance(info['ttyname'],
mitogen.core.UnicodeType))
os.ttyname(proc.stdin.fileno()) # crashes if not TTY
self.assertTrue(os.isatty(proc.stdout.fileno()))
flags = fcntl.fcntl(proc.stdout.fileno(), fcntl.F_GETFL)
self.assertTrue(flags & os.O_RDWR)
self.assertTrue(info['flags'] & os.O_RDWR)
self.assertNotEquals(st.st_dev, info['st_dev'])
self.assertTrue(flags & os.O_RDWR)
self.assertTrue(buf, 'TEST')
@ -222,6 +257,7 @@ class TtyCreateChildTest(testlib.TestCase):
class StderrDiagTtyMixin(object):
def test_stderr(self):
# proc.stderr is the PTY master, FD 2 in the child is the PTY slave
proc, info, buf = run_fd_check(self.func, 2, 'write',
lambda proc: wait_read(proc.stderr, 4))
@ -231,13 +267,12 @@ class StderrDiagTtyMixin(object):
self.assertTrue(isinstance(info['ttyname'],
mitogen.core.UnicodeType))
os.ttyname(proc.stderr.fileno()) # crashes if wrong
self.assertTrue(os.isatty(proc.stderr.fileno()))
flags = fcntl.fcntl(proc.stderr.fileno(), fcntl.F_GETFL)
self.assertTrue(flags & os.O_RDWR)
self.assertTrue(info['flags'] & os.O_RDWR)
self.assertNotEquals(st.st_dev, info['st_dev'])
self.assertTrue(flags & os.O_RDWR)
self.assertTrue(buf, 'TEST')

View File

@ -427,6 +427,11 @@ class DockerizedSshDaemon(object):
raise ValueError('could not find SSH port in: %r' % (s,))
def start_container(self):
try:
subprocess__check_output(['docker'])
except Exception:
raise unittest2.SkipTest('Docker binary is unavailable')
self.container_name = 'mitogen-test-%08x' % (random.getrandbits(64),)
args = [
'docker',