2018-04-17 16:40:45 +00:00
|
|
|
import sys
|
|
|
|
|
2017-09-15 06:24:41 +00:00
|
|
|
import mitogen
|
|
|
|
import mitogen.ssh
|
|
|
|
import mitogen.utils
|
|
|
|
|
2017-11-12 12:42:27 +00:00
|
|
|
import unittest2
|
2017-11-11 22:59:38 +00:00
|
|
|
|
2017-09-15 06:24:41 +00:00
|
|
|
import testlib
|
2017-09-17 15:39:03 +00:00
|
|
|
import plain_old_module
|
2017-09-15 06:24:41 +00:00
|
|
|
|
|
|
|
|
2017-11-12 12:42:27 +00:00
|
|
|
class FakeSshTest(testlib.RouterMixin, unittest2.TestCase):
|
2017-09-17 15:39:03 +00:00
|
|
|
def test_okay(self):
|
|
|
|
context = self.router.ssh(
|
2017-09-15 06:24:41 +00:00
|
|
|
hostname='hostname',
|
2018-04-16 13:24:39 +00:00
|
|
|
username='mitogen__has_sudo',
|
2017-09-17 15:39:03 +00:00
|
|
|
ssh_path=testlib.data_path('fakessh.py'),
|
|
|
|
)
|
|
|
|
#context.call(mitogen.utils.log_to_file, '/tmp/log')
|
|
|
|
#context.call(mitogen.utils.disable_site_packages)
|
|
|
|
self.assertEquals(3, context.call(plain_old_module.add, 1, 2))
|
2017-09-17 12:41:48 +00:00
|
|
|
|
|
|
|
|
2017-11-12 12:42:27 +00:00
|
|
|
class SshTest(testlib.DockerMixin, unittest2.TestCase):
|
2017-09-17 12:41:48 +00:00
|
|
|
stream_class = mitogen.ssh.Stream
|
|
|
|
|
2017-09-29 10:34:09 +00:00
|
|
|
def test_stream_name(self):
|
|
|
|
context = self.docker_ssh(
|
2018-04-16 13:24:39 +00:00
|
|
|
username='mitogen__has_sudo',
|
|
|
|
password='has_sudo_password',
|
2017-09-29 10:34:09 +00:00
|
|
|
)
|
2018-03-22 07:40:56 +00:00
|
|
|
name = 'ssh.%s:%s' % (
|
|
|
|
self.dockerized_ssh.get_host(),
|
|
|
|
self.dockerized_ssh.port,
|
|
|
|
)
|
|
|
|
self.assertEquals(name, context.name)
|
2017-09-29 10:34:09 +00:00
|
|
|
|
|
|
|
def test_via_stream_name(self):
|
|
|
|
context = self.docker_ssh(
|
2018-04-16 13:24:39 +00:00
|
|
|
username='mitogen__has_sudo_nopw',
|
|
|
|
password='has_sudo_nopw_password',
|
2017-09-29 10:34:09 +00:00
|
|
|
)
|
|
|
|
sudo = self.router.sudo(via=context)
|
|
|
|
|
|
|
|
name = 'ssh.%s:%s.sudo.root' % (
|
|
|
|
self.dockerized_ssh.host,
|
|
|
|
self.dockerized_ssh.port,
|
|
|
|
)
|
|
|
|
self.assertEquals(name, sudo.name)
|
|
|
|
|
2017-09-17 12:41:48 +00:00
|
|
|
def test_password_required(self):
|
|
|
|
try:
|
2017-09-17 15:49:06 +00:00
|
|
|
context = self.docker_ssh(
|
2018-04-16 13:24:39 +00:00
|
|
|
username='mitogen__has_sudo',
|
2017-09-17 12:41:48 +00:00
|
|
|
)
|
|
|
|
assert 0, 'exception not thrown'
|
2018-04-17 16:40:45 +00:00
|
|
|
except mitogen.ssh.PasswordError:
|
|
|
|
e = sys.exc_info()[1]
|
2017-09-17 12:41:48 +00:00
|
|
|
|
2018-06-26 07:25:40 +00:00
|
|
|
self.assertEqual(e.args[0], self.stream_class.password_required_msg)
|
2017-09-17 12:41:48 +00:00
|
|
|
|
|
|
|
def test_password_incorrect(self):
|
|
|
|
try:
|
2017-09-17 15:49:06 +00:00
|
|
|
context = self.docker_ssh(
|
2018-04-16 13:24:39 +00:00
|
|
|
username='mitogen__has_sudo',
|
2017-09-17 12:41:48 +00:00
|
|
|
password='badpw',
|
|
|
|
)
|
|
|
|
assert 0, 'exception not thrown'
|
2018-04-17 16:40:45 +00:00
|
|
|
except mitogen.ssh.PasswordError:
|
|
|
|
e = sys.exc_info()[1]
|
2017-09-17 12:41:48 +00:00
|
|
|
|
2018-06-26 07:25:40 +00:00
|
|
|
self.assertEqual(e.args[0], self.stream_class.password_incorrect_msg)
|
2017-09-17 12:41:48 +00:00
|
|
|
|
|
|
|
def test_password_specified(self):
|
2017-09-17 15:49:06 +00:00
|
|
|
context = self.docker_ssh(
|
2018-04-16 13:24:39 +00:00
|
|
|
username='mitogen__has_sudo',
|
|
|
|
password='has_sudo_password',
|
2017-09-17 12:41:48 +00:00
|
|
|
)
|
|
|
|
|
2017-11-11 23:08:08 +00:00
|
|
|
self.assertEqual(
|
|
|
|
'i-am-mitogen-test-docker-image\n',
|
|
|
|
context.call(plain_old_module.get_sentinel_value),
|
|
|
|
)
|
2017-09-17 12:41:48 +00:00
|
|
|
|
|
|
|
def test_pubkey_required(self):
|
|
|
|
try:
|
2017-09-17 15:49:06 +00:00
|
|
|
context = self.docker_ssh(
|
2018-04-16 13:24:39 +00:00
|
|
|
username='mitogen__has_sudo_pubkey',
|
2017-09-17 12:41:48 +00:00
|
|
|
)
|
|
|
|
assert 0, 'exception not thrown'
|
2018-04-17 16:40:45 +00:00
|
|
|
except mitogen.ssh.PasswordError:
|
|
|
|
e = sys.exc_info()[1]
|
2017-09-17 12:41:48 +00:00
|
|
|
|
2018-06-26 07:25:40 +00:00
|
|
|
self.assertEqual(e.args[0], self.stream_class.password_required_msg)
|
2017-09-17 12:41:48 +00:00
|
|
|
|
|
|
|
def test_pubkey_specified(self):
|
2017-09-17 15:49:06 +00:00
|
|
|
context = self.docker_ssh(
|
2018-04-16 13:24:39 +00:00
|
|
|
username='mitogen__has_sudo_pubkey',
|
|
|
|
identity_file=testlib.data_path('docker/mitogen__has_sudo_pubkey.key'),
|
2017-09-17 15:39:03 +00:00
|
|
|
)
|
2017-11-11 23:08:08 +00:00
|
|
|
self.assertEqual(
|
|
|
|
'i-am-mitogen-test-docker-image\n',
|
|
|
|
context.call(plain_old_module.get_sentinel_value),
|
|
|
|
)
|
2017-09-29 10:34:09 +00:00
|
|
|
|
|
|
|
|
2018-07-17 19:41:00 +00:00
|
|
|
class BannerTest(testlib.DockerMixin, unittest2.TestCase):
|
|
|
|
# Verify the ability to disambiguate random spam appearing in the SSHd's
|
|
|
|
# login banner from a legitimate password prompt.
|
|
|
|
stream_class = mitogen.ssh.Stream
|
|
|
|
|
|
|
|
def test_verbose_enabled(self):
|
|
|
|
context = self.docker_ssh(
|
|
|
|
username='mitogen__has_sudo',
|
|
|
|
password='has_sudo_password',
|
|
|
|
ssh_debug_level=3,
|
|
|
|
)
|
|
|
|
name = 'ssh.%s:%s' % (
|
|
|
|
self.dockerized_ssh.get_host(),
|
|
|
|
self.dockerized_ssh.port,
|
|
|
|
)
|
|
|
|
self.assertEquals(name, context.name)
|
|
|
|
|
|
|
|
|
|
|
|
|
2017-09-29 10:34:09 +00:00
|
|
|
if __name__ == '__main__':
|
2017-11-12 12:42:27 +00:00
|
|
|
unittest2.main()
|