From 1eae594e3239707ee5f5e69adde9574426a810c0 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Mon, 29 Oct 2018 20:01:27 +0000 Subject: [PATCH] ssh: fix check_host_keys="accept" and test; closes #411 Add real accept/enforce tests. --- docs/changelog.rst | 5 +++ mitogen/ssh.py | 2 +- tests/data/stubs/ssh.py | 5 +-- tests/ssh_test.py | 94 +++++++++++++++++++++++++++++------------ 4 files changed, 76 insertions(+), 30 deletions(-) diff --git a/docs/changelog.rst b/docs/changelog.rst index 3d7cc21c..a1bbbdf0 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -51,6 +51,11 @@ Core Library signal the connection has broken, even when one participant is not a parent of the other. +* `#411 `_: the SSH method typed + "``y``" rather than the requisite "``yes``" when `check_host_keys="accept"` + was configured. This would lead to connection timeouts due to the hung + response. + * `16ca111e `_: handle OpenSSH 7.5 permission denied prompts when ``~/.ssh/config`` rewrites are present. diff --git a/mitogen/ssh.py b/mitogen/ssh.py index 1a545964..fba6e8f2 100644 --- a/mitogen/ssh.py +++ b/mitogen/ssh.py @@ -265,7 +265,7 @@ class Stream(mitogen.parent.Stream): def _host_key_prompt(self): if self.check_host_keys == 'accept': LOG.debug('%r: accepting host key', self) - self.tty_stream.transmit_side.write(b('y\n')) + self.tty_stream.transmit_side.write(b('yes\n')) return # _host_key_prompt() should never be reached with ignore or enforce diff --git a/tests/data/stubs/ssh.py b/tests/data/stubs/ssh.py index 63397479..80c02835 100755 --- a/tests/data/stubs/ssh.py +++ b/tests/data/stubs/ssh.py @@ -19,7 +19,6 @@ PERMDENIED_CLASSIC_MSG = 'Permission denied (publickey,password)\n' PERMDENIED_75_MSG = 'chicken@nandos.com: permission denied (publickey,password)\n' - def tty(msg): fp = open('/dev/tty', 'wb', 0) fp.write(msg.encode()) @@ -41,10 +40,10 @@ def confirm(msg): fp.close() -mode = os.getenv('FAKESSH_MODE') +mode = os.getenv('STUBSSH_MODE') if mode == 'ask': - assert 'y\n' == confirm(HOST_KEY_ASK_MSG) + assert 'yes\n' == confirm(HOST_KEY_ASK_MSG) elif mode == 'strict': stderr(HOST_KEY_STRICT_MSG) diff --git a/tests/ssh_test.py b/tests/ssh_test.py index ca614fa2..bdee30dd 100644 --- a/tests/ssh_test.py +++ b/tests/ssh_test.py @@ -1,5 +1,6 @@ import os import sys +import tempfile import mitogen import mitogen.ssh @@ -11,6 +12,23 @@ import testlib import plain_old_module +class StubSshMixin(testlib.RouterMixin): + """ + Mix-in that provides :meth:`stub_ssh` executing the stub 'ssh.py'. + """ + def stub_ssh(self, STUBSSH_MODE=None, **kwargs): + os.environ['STUBSSH_MODE'] = str(STUBSSH_MODE) + try: + return self.router.ssh( + hostname='hostname', + username='mitogen__has_sudo', + ssh_path=testlib.data_path('stubs/ssh.py'), + **kwargs + ) + finally: + del os.environ['STUBSSH_MODE'] + + class ConstructorTest(testlib.RouterMixin, unittest2.TestCase): def test_okay(self): context = self.router.ssh( @@ -23,7 +41,7 @@ class ConstructorTest(testlib.RouterMixin, unittest2.TestCase): self.assertEquals(3, context.call(plain_old_module.add, 1, 2)) -class SshTest(testlib.DockerMixin, unittest2.TestCase): +class SshTest(testlib.DockerMixin, testlib.TestCase): stream_class = mitogen.ssh.Stream def test_stream_name(self): @@ -105,6 +123,47 @@ class SshTest(testlib.DockerMixin, unittest2.TestCase): context.call(plain_old_module.get_sentinel_value), ) + def test_enforce_unknown_host_key(self): + fp = tempfile.NamedTemporaryFile() + try: + e = self.assertRaises(mitogen.ssh.HostKeyError, + lambda: self.docker_ssh( + username='mitogen__has_sudo_pubkey', + password='has_sudo_password', + ssh_args=['-o', 'UserKnownHostsFile ' + fp.name], + check_host_keys='enforce', + ) + ) + self.assertEquals(e.args[0], mitogen.ssh.Stream.hostkey_failed_msg) + finally: + fp.close() + + def test_accept_enforce_host_keys(self): + fp = tempfile.NamedTemporaryFile() + try: + context = self.docker_ssh( + username='mitogen__has_sudo', + password='has_sudo_password', + ssh_args=['-o', 'UserKnownHostsFile ' + fp.name], + check_host_keys='accept', + ) + context.shutdown(wait=True) + + fp.seek(0) + # Lame test, but we're about to use enforce mode anyway, which + # verifies the file contents. + self.assertTrue(len(fp.read()) > 0) + + context = self.docker_ssh( + username='mitogen__has_sudo', + password='has_sudo_password', + ssh_args=['-o', 'UserKnownHostsFile ' + fp.name], + check_host_keys='enforce', + ) + context.shutdown(wait=True) + finally: + fp.close() + class BannerTest(testlib.DockerMixin, unittest2.TestCase): # Verify the ability to disambiguate random spam appearing in the SSHd's @@ -124,54 +183,37 @@ class BannerTest(testlib.DockerMixin, unittest2.TestCase): self.assertEquals(name, context.name) -class FakeSshMixin(testlib.RouterMixin): - """ - Mix-in that provides :meth:`fake_ssh` executing the stub 'ssh.py'. - """ - def fake_ssh(self, FAKESSH_MODE=None, **kwargs): - os.environ['FAKESSH_MODE'] = str(FAKESSH_MODE) - try: - return self.router.ssh( - hostname='hostname', - username='mitogen__has_sudo', - ssh_path=testlib.data_path('stubs/ssh.py'), - **kwargs - ) - finally: - del os.environ['FAKESSH_MODE'] - - -class PermissionDeniedTest(FakeSshMixin, testlib.TestCase): +class StubPermissionDeniedTest(StubSshMixin, testlib.TestCase): def test_classic_prompt(self): self.assertRaises(mitogen.ssh.PasswordError, - lambda: self.fake_ssh(FAKESSH_MODE='permdenied_classic')) + lambda: self.stub_ssh(STUBSSH_MODE='permdenied_classic')) def test_openssh_75_prompt(self): self.assertRaises(mitogen.ssh.PasswordError, - lambda: self.fake_ssh(FAKESSH_MODE='permdenied_75')) + lambda: self.stub_ssh(STUBSSH_MODE='permdenied_75')) -class RequirePtyTest(FakeSshMixin, testlib.TestCase): +class StubCheckHostKeysTest(StubSshMixin, testlib.TestCase): stream_class = mitogen.ssh.Stream def test_check_host_keys_accept(self): # required=true, host_key_checking=accept - context = self.fake_ssh(FAKESSH_MODE='ask', check_host_keys='accept') + context = self.stub_ssh(STUBSSH_MODE='ask', check_host_keys='accept') self.assertEquals('1', context.call(os.getenv, 'STDERR_WAS_TTY')) def test_check_host_keys_enforce(self): # required=false, host_key_checking=enforce - context = self.fake_ssh(check_host_keys='enforce') + context = self.stub_ssh(check_host_keys='enforce') self.assertEquals(None, context.call(os.getenv, 'STDERR_WAS_TTY')) def test_check_host_keys_ignore(self): # required=false, host_key_checking=ignore - context = self.fake_ssh(check_host_keys='ignore') + context = self.stub_ssh(check_host_keys='ignore') self.assertEquals(None, context.call(os.getenv, 'STDERR_WAS_TTY')) def test_password_present(self): # required=true, password is not None - context = self.fake_ssh(check_host_keys='ignore', password='willick') + context = self.stub_ssh(check_host_keys='ignore', password='willick') self.assertEquals('1', context.call(os.getenv, 'STDERR_WAS_TTY'))