tests: move affinity_test to Ansible tests.

This commit is contained in:
David Wilson 2019-02-08 16:14:10 +00:00
parent 5ae7464011
commit 8026e54b9b
2 changed files with 59 additions and 47 deletions

View File

@ -0,0 +1,59 @@
import tempfile
import os
import mock
import unittest2
import testlib
import mitogen.parent
import ansible_mitogen.affinity
@unittest2.skipIf(
reason='Linux only',
condition=os.uname()[0] != 'Linux'
)
class LinuxPolicyTest(testlib.TestCase):
klass = ansible_mitogen.affinity.LinuxPolicy
def setUp(self):
self.policy = self.klass()
def _get_cpus(self, path='/proc/self/status'):
fp = open(path)
try:
for line in fp:
if line.startswith('Cpus_allowed'):
return int(line.split()[1], 16)
finally:
fp.close()
def test_set_clear(self):
before = self._get_cpus()
self.policy._set_cpu(3)
self.assertEquals(self._get_cpus(), 1 << 3)
self.policy._clear()
self.assertEquals(self._get_cpus(), before)
def test_clear_on_popen(self):
tf = tempfile.NamedTemporaryFile()
try:
before = self._get_cpus()
self.policy._set_cpu(3)
my_cpu = self._get_cpus()
pid = mitogen.parent.detach_popen(
args=['cp', '/proc/self/status', tf.name]
)
os.waitpid(pid, 0)
his_cpu = self._get_cpus(tf.name)
self.assertNotEquals(my_cpu, his_cpu)
self.policy._clear()
finally:
tf.close()
if __name__ == '__main__':
unittest2.main()

View File

@ -24,53 +24,6 @@ def func(router):
return router return router
class ResetAffinityTest(testlib.TestCase):
func = staticmethod(mitogen.utils.reset_affinity)
def _get_cpus(self, path='/proc/self/status'):
fp = open(path)
try:
for line in fp:
if line.startswith('Cpus_allowed'):
return int(line.split()[1], 16)
finally:
fp.close()
@mock.patch('random.randint')
def test_set_reset(self, randint):
randint.return_value = 3
before = self._get_cpus()
self.func()
self.assertEquals(self._get_cpus(), 1 << 3)
self.func(clear=True)
self.assertEquals(self._get_cpus(), before)
@mock.patch('random.randint')
def test_clear_on_popen(self, randint):
randint.return_value = 3
tf = tempfile.NamedTemporaryFile()
try:
before = self._get_cpus()
self.func()
my_cpu = self._get_cpus()
pid = mitogen.parent.detach_popen(
args=['cp', '/proc/self/status', tf.name]
)
os.waitpid(pid, 0)
his_cpu = self._get_cpus(tf.name)
self.assertNotEquals(my_cpu, his_cpu)
self.func(clear=True)
finally:
tf.close()
ResetAffinityTest = unittest2.skipIf(
reason='Linux only',
condition=os.uname()[0] != 'Linux'
)(ResetAffinityTest)
class RunWithRouterTest(testlib.TestCase): class RunWithRouterTest(testlib.TestCase):
# test_shutdown_on_exception # test_shutdown_on_exception
# test_shutdown_on_success # test_shutdown_on_success