From 08538d327b3f3664ce5c1d5bbf4963f5699831f7 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sun, 10 Jun 2018 15:27:06 +0100 Subject: [PATCH] ansible: don't write failed job result after async timeout. The failed job result is likely to be "interrupted system call", and we don't want that to overwrite the SIGALRM handler's "the task timed out", so just discard it. --- ansible_mitogen/planner.py | 1 - ansible_mitogen/target.py | 191 +++++++++++++++++++++---------------- 2 files changed, 109 insertions(+), 83 deletions(-) diff --git a/ansible_mitogen/planner.py b/ansible_mitogen/planner.py index 8ea3886a..801950f9 100644 --- a/ansible_mitogen/planner.py +++ b/ansible_mitogen/planner.py @@ -51,7 +51,6 @@ except ImportError: # Ansible <2.4 from ansible.plugins import module_loader from ansible.plugins import module_utils_loader -import mitogen import ansible_mitogen.target diff --git a/ansible_mitogen/target.py b/ansible_mitogen/target.py index 9d4d8d3a..9bf239d3 100644 --- a/ansible_mitogen/target.py +++ b/ansible_mitogen/target.py @@ -266,75 +266,67 @@ def _get_async_dir(): ) -def _write_job_status(job_id, dct): - """ - Update an async job status file. - """ - LOG.info('_write_job_status(%r, %r)', job_id, dct) - dct.setdefault('ansible_job_id', job_id) - dct.setdefault('data', '') +class AsyncRunner(object): + def __init__(self, job_id, timeout_secs, econtext, kwargs): + self.job_id = job_id + self.timeout_secs = timeout_secs + self.econtext = econtext + self.kwargs = kwargs + self._timed_out = False + self._init_path() - async_dir = _get_async_dir() - if not os.path.exists(async_dir): - os.makedirs(async_dir) + def _init_path(self): + async_dir = _get_async_dir() + if not os.path.exists(async_dir): + os.makedirs(async_dir) + self.path = os.path.join(async_dir, self.job_id) - path = os.path.join(async_dir, job_id) - with open(path + '.tmp', 'w') as fp: - fp.write(json.dumps(dct)) - os.rename(path + '.tmp', path) + def _update(self, dct): + """ + Update an async job status file. + """ + LOG.info('%r._update(%r, %r)', self, self.job_id, dct) + dct.setdefault('ansible_job_id', self.job_id) + dct.setdefault('data', '') + with open(self.path + '.tmp', 'w') as fp: + fp.write(json.dumps(dct)) + os.rename(self.path + '.tmp', self.path) -def _sigalrm(broker, timeout_secs, job_id): - """ - Respond to SIGALRM (job timeout) by updating the job file and killing the - process. - """ - msg = "Job reached maximum time limit of %d seconds." % (timeout_secs,) - _write_job_status(job_id, { - "failed": 1, - "finished": 1, - "msg": msg, - }) - broker.shutdown() + def _on_sigalrm(self, signum, frame): + """ + Respond to SIGALRM (job timeout) by updating the job file and killing + the process. + """ + msg = "Job reached maximum time limit of %d seconds." % ( + self.timeout_secs, + ) + self._update({ + "failed": 1, + "finished": 1, + "msg": msg, + }) + self._timed_out = True + self.econtext.broker.shutdown() + def _install_alarm(self): + signal.signal(signal.SIGALRM, self._on_sigalrm) + signal.alarm(self.timeout_secs) -def _install_alarm(broker, timeout_secs, job_id): - handler = lambda *_: _sigalrm(broker, timeout_secs, job_id) - signal.signal(signal.SIGALRM, handler) - signal.alarm(timeout_secs) + def _run_module(self): + kwargs = dict(self.kwargs, **{ + 'detach': True, + 'econtext': self.econtext, + 'emulate_tty': False, + }) + dct = run_module(kwargs) + if mitogen.core.PY3: + for key in 'stdout', 'stderr': + dct[key] = dct[key].decode('utf-8', 'surrogateescape') + return dct -def _run_module_async(kwargs, job_id, timeout_secs, econtext): - """ - 1. Immediately updates the status file to mark the job as started. - 2. Installs a timer/signal handler to implement the time limit. - 3. Runs as with run_module(), writing the result to the status file. - - :param dict kwargs: - Runner keyword arguments. - :param str job_id: - String job ID. - :param int timeout_secs: - If >0, limit the task's maximum run time. - """ - _write_job_status(job_id, { - 'started': 1, - 'finished': 0, - 'pid': os.getpid() - }) - - if timeout_secs > 0: - _install_alarm(econtext.broker, timeout_secs, job_id) - - kwargs['detach'] = True - kwargs['econtext'] = econtext - kwargs['emulate_tty'] = False - dct = run_module(kwargs) - if mitogen.core.PY3: - for key in 'stdout', 'stderr': - dct[key] = dct[key].decode('utf-8', 'surrogateescape') - - try: + def _parse_result(self, dct): filtered, warnings = ( ansible.module_utils.json_utils. _filter_non_json_lines(dct['stdout']) @@ -342,34 +334,69 @@ def _run_module_async(kwargs, job_id, timeout_secs, econtext): result = json.loads(filtered) result.setdefault('warnings', []).extend(warnings) result['stderr'] = dct['stderr'] - _write_job_status(job_id, result) - except Exception: - _write_job_status(job_id, { - "failed": 1, - "msg": traceback.format_exc(), - "data": dct['stdout'], # temporary notice only - "stderr": dct['stderr'] + self._update(result) + + def _run(self): + """ + 1. Immediately updates the status file to mark the job as started. + 2. Installs a timer/signal handler to implement the time limit. + 3. Runs as with run_module(), writing the result to the status file. + + :param dict kwargs: + Runner keyword arguments. + :param str job_id: + String job ID. + :param int timeout_secs: + If >0, limit the task's maximum run time. + """ + self._update({ + 'started': 1, + 'finished': 0, + 'pid': os.getpid() }) + if self.timeout_secs > 0: + self._install_alarm() + + dct = self._run_module() + if not self._timed_out: + # After SIGALRM fires, there is a window between broker responding + # to shutdown() by killing the process, and work continuing on the + # main thread. If main thread was asleep in at least + # basic.py/select.select(), an EINTR will be raised. We want to + # discard that exception. + try: + self._parse_result(dct) + except Exception: + self._update({ + "failed": 1, + "msg": traceback.format_exc(), + "data": dct['stdout'], # temporary notice only + "stderr": dct['stderr'] + }) + + def run(self): + try: + try: + self._run() + except Exception: + self._update({ + "failed": 1, + "msg": traceback.format_exc(), + }) + finally: + self.econtext.broker.shutdown() + @mitogen.core.takes_econtext def run_module_async(kwargs, job_id, timeout_secs, econtext): """ - Arrange for a module to be executed with its run status and result - serialized to a disk file. This function expects to run in a child forked - using :func:`create_fork_child`. + Execute a module with its run status and result written to a file, + terminating on the process on completion. This function must run in a child + forked using :func:`create_fork_child`. """ - try: - try: - _run_module_async(kwargs, job_id, timeout_secs, econtext) - except Exception: - # Catch any (ansible_mitogen) bugs and write them to the job file. - _write_job_status(job_id, { - "failed": 1, - "msg": traceback.format_exc(), - }) - finally: - econtext.broker.shutdown() + arunner = AsyncRunner(job_id, timeout_secs, econtext, kwargs) + arunner.run() def make_temp_directory(base_dir):