From e010667230e7fd67e5a0c0ce743968c43599af69 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Wed, 13 Feb 2019 23:10:30 +0000 Subject: [PATCH 1/4] Bump version for release. --- docs/changelog.rst | 4 ++-- mitogen/__init__.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/changelog.rst b/docs/changelog.rst index 180fe9e0..3bdc248f 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -125,7 +125,7 @@ Core Library series. -v0.2.5 (2019-02-1?) +v0.2.5 (2019-02-13) ------------------- Fixes @@ -179,7 +179,7 @@ Mitogen would not be possible without the support of users. A huge thanks for bug reports, testing, features and fixes in this release contributed by `Carl George `_, `Guy Knights `_, and -`Josh Smift `_, +`Josh Smift `_. v0.2.4 (2019-02-10) diff --git a/mitogen/__init__.py b/mitogen/__init__.py index 060430c6..08f875d4 100644 --- a/mitogen/__init__.py +++ b/mitogen/__init__.py @@ -35,7 +35,7 @@ be expected. On the slave, it is built dynamically during startup. #: Library version as a tuple. -__version__ = (0, 2, 4) +__version__ = (0, 2, 5) #: This is :data:`False` in slave contexts. Previously it was used to prevent From 8f9c67daf1eed865b55b4119fd65bb8df89ca49f Mon Sep 17 00:00:00 2001 From: David Wilson Date: Thu, 14 Feb 2019 00:35:16 +0000 Subject: [PATCH 2/4] ansible: refactor affinity class and add abstract tests. --- ansible_mitogen/affinity.py | 52 ++++++--- tests/ansible/tests/affinity_test.py | 157 ++++++++++++++++++++++++++- 2 files changed, 190 insertions(+), 19 deletions(-) diff --git a/ansible_mitogen/affinity.py b/ansible_mitogen/affinity.py index bcab89af..57926516 100644 --- a/ansible_mitogen/affinity.py +++ b/ansible_mitogen/affinity.py @@ -156,11 +156,11 @@ class Policy(object): Assign the helper subprocess policy to this process. """ - -class LinuxPolicy(Policy): +class FixedPolicy(Policy): """ - :class:`Policy` for Linux machines. The scheme here was tested on an - otherwise idle 16 thread machine. + :class:`Policy` for machines where the only control method available is + fixed CPU placement. The scheme here was tested on an otherwise idle 16 + thread machine. - The connection multiplexer is pinned to CPU 0. - The Ansible top-level (strategy) is pinned to CPU 1. @@ -180,26 +180,35 @@ class LinuxPolicy(Policy): CPU-intensive children like SSH are not forced to share the same core as the (otherwise potentially very busy) parent. """ - def __init__(self): + def __init__(self, cpu_count=None): + #: For tests. + self.cpu_count = cpu_count or multiprocessing.cpu_count() self.mem = mmap.mmap(-1, 4096) self.state = State.from_buffer(self.mem) self.state.lock.init() - if self._cpu_count() < 4: - self._reserve_mask = 3 - self._reserve_shift = 2 - self._reserve_controller = True - else: + + if self.cpu_count < 2: + # uniprocessor + self._reserve_mux = False + self._reserve_controller = False + self._reserve_mask = 0 + self._reserve_shift = 0 + elif self.cpu_count < 4: + # small SMP + self._reserve_mux = True + self._reserve_controller = False self._reserve_mask = 1 self._reserve_shift = 1 - self._reserve_controller = False + else: + # big SMP + self._reserve_mux = True + self._reserve_controller = True + self._reserve_mask = 3 + self._reserve_shift = 2 def _set_affinity(self, mask): mitogen.parent._preexec_hook = self._clear - s = struct.pack('L', mask) - _sched_setaffinity(os.getpid(), len(s), s) - - def _cpu_count(self): - return multiprocessing.cpu_count() + self._set_cpu_mask(mask) def _balance(self): self.state.lock.acquire() @@ -210,14 +219,15 @@ class LinuxPolicy(Policy): self.state.lock.release() self._set_cpu(self._reserve_shift + ( - (n % max(1, (self._cpu_count() - self._reserve_shift))) + (n % (self.cpu_count - self._reserve_shift)) )) def _set_cpu(self, cpu): self._set_affinity(1 << cpu) def _clear(self): - self._set_affinity(0xffffffff & ~self._reserve_mask) + all_cpus = (1 << self.cpu_count) - 1 + self._set_affinity(all_cpus & ~self._reserve_mask) def assign_controller(self): if self._reserve_controller: @@ -235,6 +245,12 @@ class LinuxPolicy(Policy): self._clear() +class LinuxPolicy(FixedPolicy): + def _set_cpu_mask(self, mask): + s = struct.pack('L', mask) + _sched_setaffinity(os.getpid(), len(s), s) + + if _sched_setaffinity is not None: policy = LinuxPolicy() else: diff --git a/tests/ansible/tests/affinity_test.py b/tests/ansible/tests/affinity_test.py index d898c782..ceb53513 100644 --- a/tests/ansible/tests/affinity_test.py +++ b/tests/ansible/tests/affinity_test.py @@ -11,11 +11,156 @@ import mitogen.parent import ansible_mitogen.affinity + +class NullFixedPolicy(ansible_mitogen.affinity.FixedPolicy): + def _set_cpu_mask(self, mask): + self.mask = mask + + +class FixedPolicyTest(testlib.TestCase): + klass = NullFixedPolicy + + def test_assign_controller_1core(self): + # Uniprocessor . + policy = self.klass(cpu_count=1) + policy.assign_controller() + self.assertEquals(0x1, policy.mask) + + def test_assign_controller_2core(self): + # Small SMP gets 1.. % cpu_count + policy = self.klass(cpu_count=2) + policy.assign_controller() + self.assertEquals(0x2, policy.mask) + policy.assign_controller() + self.assertEquals(0x2, policy.mask) + policy.assign_controller() + + def test_assign_controller_3core(self): + # Small SMP gets 1.. % cpu_count + policy = self.klass(cpu_count=3) + policy.assign_controller() + self.assertEquals(0x2, policy.mask) + policy.assign_controller() + self.assertEquals(0x4, policy.mask) + policy.assign_controller() + self.assertEquals(0x2, policy.mask) + policy.assign_controller() + self.assertEquals(0x4, policy.mask) + policy.assign_controller() + + def test_assign_controller_4core(self): + # Big SMP gets a dedicated core. + policy = self.klass(cpu_count=4) + policy.assign_controller() + self.assertEquals(0x2, policy.mask) + policy.assign_controller() + self.assertEquals(0x2, policy.mask) + + def test_assign_muxprocess_1core(self): + # Uniprocessor . + policy = self.klass(cpu_count=1) + policy.assign_muxprocess() + self.assertEquals(0x1, policy.mask) + + def test_assign_muxprocess_2core(self): + # Small SMP gets dedicated core. + policy = self.klass(cpu_count=2) + policy.assign_muxprocess() + self.assertEquals(0x1, policy.mask) + policy.assign_muxprocess() + self.assertEquals(0x1, policy.mask) + policy.assign_muxprocess() + + def test_assign_muxprocess_3core(self): + # Small SMP gets a dedicated core. + policy = self.klass(cpu_count=3) + policy.assign_muxprocess() + self.assertEquals(0x1, policy.mask) + policy.assign_muxprocess() + self.assertEquals(0x1, policy.mask) + + def test_assign_muxprocess_4core(self): + # Big SMP gets a dedicated core. + policy = self.klass(cpu_count=4) + policy.assign_muxprocess() + self.assertEquals(0x1, policy.mask) + policy.assign_muxprocess() + self.assertEquals(0x1, policy.mask) + + def test_assign_worker_1core(self): + # Balance n % 1 + policy = self.klass(cpu_count=1) + policy.assign_worker() + self.assertEquals(0x1, policy.mask) + policy.assign_worker() + self.assertEquals(0x1, policy.mask) + + def test_assign_worker_2core(self): + # Balance n % 1 + policy = self.klass(cpu_count=2) + policy.assign_worker() + self.assertEquals(0x2, policy.mask) + policy.assign_worker() + self.assertEquals(0x2, policy.mask) + + def test_assign_worker_3core(self): + # Balance n % 1 + policy = self.klass(cpu_count=3) + policy.assign_worker() + self.assertEquals(0x2, policy.mask) + policy.assign_worker() + self.assertEquals(0x4, policy.mask) + policy.assign_worker() + self.assertEquals(0x2, policy.mask) + + def test_assign_worker_4core(self): + # Balance n % 1 + policy = self.klass(cpu_count=4) + policy.assign_worker() + self.assertEquals(4, policy.mask) + policy.assign_worker() + self.assertEquals(8, policy.mask) + policy.assign_worker() + self.assertEquals(4, policy.mask) + + def test_assign_subprocess_1core(self): + # allow all except reserved. + policy = self.klass(cpu_count=1) + policy.assign_subprocess() + self.assertEquals(0x1, policy.mask) + policy.assign_subprocess() + self.assertEquals(0x1, policy.mask) + + def test_assign_subprocess_2core(self): + # allow all except reserved. + policy = self.klass(cpu_count=2) + policy.assign_subprocess() + self.assertEquals(0x2, policy.mask) + policy.assign_subprocess() + self.assertEquals(0x2, policy.mask) + + def test_assign_subprocess_3core(self): + # allow all except reserved. + policy = self.klass(cpu_count=3) + policy.assign_subprocess() + self.assertEquals(0x2 + 0x4, policy.mask) + policy.assign_subprocess() + self.assertEquals(0x2 + 0x4, policy.mask) + + def test_assign_subprocess_4core(self): + # allow all except reserved. + policy = self.klass(cpu_count=4) + policy.assign_subprocess() + self.assertEquals(0x4 + 0x8, policy.mask) + policy.assign_subprocess() + self.assertEquals(0x4 + 0x8, policy.mask) + + @unittest2.skipIf( reason='Linux/SMP only', condition=(not ( os.uname()[0] == 'Linux' and - multiprocessing.cpu_count() >= 4 + multiprocessing.cpu_count() > 1 )) ) class LinuxPolicyTest(testlib.TestCase): @@ -33,6 +178,16 @@ class LinuxPolicyTest(testlib.TestCase): finally: fp.close() + def test_set_cpu_mask(self): + self.policy._set_cpu_mask(0x1) + self.assertEquals(0x1, self._get_cpus()) + + self.policy._set_cpu_mask(0x2) + self.assertEquals(0x2, self._get_cpus()) + + self.policy._set_cpu_mask(0x3) + self.assertEquals(0x3, self._get_cpus()) + def test_set_clear(self): before = self._get_cpus() self.policy._set_cpu(3) From 45f915f3923280b54947eeebeca8149fcd2baa97 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Thu, 14 Feb 2019 00:37:53 +0000 Subject: [PATCH 3/4] docs: update Changelog; closes #537. --- docs/changelog.rst | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/docs/changelog.rst b/docs/changelog.rst index 3bdc248f..f7ecadbd 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -125,7 +125,7 @@ Core Library series. -v0.2.5 (2019-02-13) +v0.2.5 (2019-02-14) ------------------- Fixes @@ -145,6 +145,10 @@ Fixes ``simplejson`` from a controller that also loaded an incompatible newer version of ``simplejson``. +* `#537 `_: a swapped operator in the + CPU affinity logic meant 2 cores were reserved on 1`_: the source distribution includes a ``LICENSE`` file. From 5ed445c4aaa79a860c8dae3b6f189d42be200859 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Thu, 14 Feb 2019 00:57:49 +0000 Subject: [PATCH 4/4] issue #537: disable just the trivial LinuxPolicyTest on Travis. --- tests/ansible/tests/affinity_test.py | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/tests/ansible/tests/affinity_test.py b/tests/ansible/tests/affinity_test.py index ceb53513..8fa8cdb6 100644 --- a/tests/ansible/tests/affinity_test.py +++ b/tests/ansible/tests/affinity_test.py @@ -160,7 +160,7 @@ class FixedPolicyTest(testlib.TestCase): reason='Linux/SMP only', condition=(not ( os.uname()[0] == 'Linux' and - multiprocessing.cpu_count() > 1 + multiprocessing.cpu_count() > 2 )) ) class LinuxPolicyTest(testlib.TestCase): @@ -188,13 +188,6 @@ class LinuxPolicyTest(testlib.TestCase): self.policy._set_cpu_mask(0x3) self.assertEquals(0x3, self._get_cpus()) - def test_set_clear(self): - before = self._get_cpus() - self.policy._set_cpu(3) - self.assertEquals(self._get_cpus(), 1 << 3) - self.policy._clear() - self.assertEquals(self._get_cpus(), before) - def test_clear_on_popen(self): tf = tempfile.NamedTemporaryFile() try: