diff --git a/.gitignore b/.gitignore index fa8e4fab..22bade01 100644 --- a/.gitignore +++ b/.gitignore @@ -5,7 +5,7 @@ etc/ssh_host_dsa_key.pub etc/ssh_host_rsa_key etc/ssh_host_rsa_key.pub var/log/cowrie/* -cowrie.egg-info/ +*.egg-info dl/ dist/ docs/_build diff --git a/src/cowrie/commands/base.py b/src/cowrie/commands/base.py index ebb9527c..be8587cf 100644 --- a/src/cowrie/commands/base.py +++ b/src/cowrie/commands/base.py @@ -1043,70 +1043,63 @@ commands["sh"] = Command_sh class Command_php(HoneyPotCommand): - def start(self): - if not len(self.args): - pass - elif self.args[0] == "-v": - output = ("PHP 5.3.5 (cli)", "Copyright (c) 1997-2010 The PHP Group") - for line in output: - self.write(f"{line}\n") - self.exit() - elif self.args[0] == "-h": - output = [ - "Usage: php [options] [-f] [--] [args...]", - " php [options] -r [--] [args...]", - " php [options] [-B ] -R [-E ] [--] [args...]", - " php [options] [-B ] -F [-E ] [--] [args...]", - " php [options] -- [args...]", - " php [options] -a", - "", - " -a Run interactively", - " -c | Look for php.ini file in this directory", - " -n No php.ini file will be used", - " -d foo[=bar] Define INI entry foo with value 'bar'", - " -e Generate extended information for debugger/profiler", - " -f Parse and execute .", - " -h This help", - " -i PHP information", - " -l Syntax check only (lint)", - " -m Show compiled in modules", - " -r Run PHP without using script tags ", - " -B Run PHP before processing input lines", - " -R Run PHP for every input line", - " -F Parse and execute for every input line", - " -E Run PHP after processing all input lines", - " -H Hide any passed arguments from external tools.", - " -s Output HTML syntax highlighted source.", - " -v Version number", - " -w Output source with stripped comments and whitespace.", - " -z Load Zend extension .", - "", - " args... Arguments passed to script. Use -- args when first argument", - " starts with - or script is read from stdin", - "", - " --ini Show configuration file names", - "", - " --rf Show information about function .", - " --rc Show information about class .", - " --re Show information about extension .", - " --ri Show configuration for extension .", - "", - ] - for line in output: - self.write(f"{line}\n") - self.exit() - else: + HELP = "Usage: php [options] [-f] [--] [args...]\n" \ + " php [options] -r [--] [args...]\n" \ + " php [options] [-B ] -R [-E ] [--] [args...]\n" \ + " php [options] [-B ] -F [-E ] [--] [args...]\n" \ + " php [options] -- [args...]\n" \ + " php [options] -a\n" \ + "\n" \ + " -a Run interactively\n" \ + " -c | Look for php.ini file in this directory\n" \ + " -n No php.ini file will be used\n" \ + " -d foo[=bar] Define INI entry foo with value 'bar'\n" \ + " -e Generate extended information for debugger/profiler\n" \ + " -f Parse and execute .\n" \ + " -h This help\n" \ + " -i PHP information\n" \ + " -l Syntax check only (lint)\n" \ + " -m Show compiled in modules\n" \ + " -r Run PHP without using script tags \n" \ + " -B Run PHP before processing input lines\n" \ + " -R Run PHP for every input line\n" \ + " -F Parse and execute for every input line\n" \ + " -E Run PHP after processing all input lines\n" \ + " -H Hide any passed arguments from external tools.\n" \ + " -s Output HTML syntax highlighted source.\n" \ + " -v Version number\n" \ + " -w Output source with stripped comments and whitespace.\n" \ + " -z Load Zend extension .\n" \ + "\n" \ + " args... Arguments passed to script. Use -- args when first argument\n" \ + " starts with - or script is read from stdin\n" \ + "\n" \ + " --ini Show configuration file names\n" \ + "\n" \ + " --rf Show information about function .\n" \ + " --rc Show information about class .\n" \ + " --re Show information about extension .\n" \ + " --ri Show configuration for extension .\n" \ + "\n" + + VERSION = "PHP 5.3.5 (cli)\n" \ + "Copyright (c) 1997-2010 The PHP Group\n" + + def start(self) -> None: + if len(self.args): + if self.args[0] == "-v": + self.write(Command_php.VERSION) + elif self.args[0] == "-h": + self.write(Command_php.HELP) self.exit() - def lineReceived(self, line): - log.msg( - eventid="cowrie.command.success", - realm="php", - input=line, - format="INPUT (%(realm)s): %(input)s", - ) + def lineReceived(self, line: str) -> None: + log.msg(eventid="cowrie.command.success", + realm="php", + input=line, + format="INPUT (%(realm)s): %(input)s") - def handle_CTRL_D(self): + def handle_CTRL_D(self) -> None: self.exit() diff --git a/src/cowrie/test/fake_server.py b/src/cowrie/test/fake_server.py index e2f644be..ad2d0a9a 100644 --- a/src/cowrie/test/fake_server.py +++ b/src/cowrie/test/fake_server.py @@ -1,5 +1,3 @@ -# -*- test-case-name: Cowrie Test Cases -*- - # Copyright (c) 2016 Dave Germiquet # See LICENSE for details. @@ -9,7 +7,8 @@ from cowrie.shell import fs class FakeServer: - """ + """FakeServer class. + @ivar hostname Servers Host Name @ivar fs File System for cowrie to use """ @@ -23,13 +22,13 @@ class FakeServer: class FakeAvatar: - """ + """FakeAvatar class. + @var avatar itself @ivar server server configuration @var fs File System for cowrie to use @var environ for user @var uid for user - @var """ def __init__(self, server): diff --git a/src/cowrie/test/fake_transport.py b/src/cowrie/test/fake_transport.py index 037e9077..a1fc41c6 100644 --- a/src/cowrie/test/fake_transport.py +++ b/src/cowrie/test/fake_transport.py @@ -1,5 +1,3 @@ -# -*- test-case-name: Cowrie Test Cases -*- - # Copyright (c) 2016 Dave Germiquet # See LICENSE for details. @@ -12,12 +10,11 @@ from twisted.test import proto_helpers class Container: - """ - This class is placeholder for creating a fake interface + """This class is placeholder for creating a fake interface. + @var host Client fake information @var port Fake Port for connection @var otherVersionString version - @var """ otherVersionString = "1.0" @@ -32,24 +29,18 @@ class Container: factory: Container | None def getPeer(self): - """ - Fake function for mockup - """ + """Fake function for mockup.""" self.host = "1.1.1.1" self.port = 2222 return self def processEnded(self, reason): - """ - Fake function for mockup - """ + """Fake function for mockup.""" pass class FakeTransport(proto_helpers.StringTransport): - """ - Fake transport with abortConnection() method. - """ + """Fake transport with abortConnection() method.""" # Thanks to TerminalBuffer (some code was taken from twisted Terminal Buffer) @@ -132,8 +123,7 @@ class FakeTransport(proto_helpers.StringTransport): pass def setPrivateModes(self, modes): - """ - Enable the given modes. + """Enable the given modes. Track which modes have been enabled so that the implementations of other L{insults.ITerminalTransport} methods can be properly implemented diff --git a/src/cowrie/test/proxy_compare.py b/src/cowrie/test/proxy_compare.py index feb1fc38..760e629e 100644 --- a/src/cowrie/test/proxy_compare.py +++ b/src/cowrie/test/proxy_compare.py @@ -1,9 +1,11 @@ +#mypy: ignore # noqa from __future__ import annotations -from twisted.internet import defer from backend_pool.ssh_exec import execute_ssh from backend_pool.telnet_exec import execute_telnet +from twisted.internet import defer + class ProxyTestCommand: """ diff --git a/src/cowrie/test/test_awk.py b/src/cowrie/test/test_awk.py index f9bd1857..a4dd2bb1 100644 --- a/src/cowrie/test/test_awk.py +++ b/src/cowrie/test/test_awk.py @@ -1,71 +1,54 @@ -# -*- test-case-name: Cowrie Test Cases -*- - # Copyright (c) 2018 Michel Oosterhof # See LICENSE for details. - -""" -Tests for general shell interaction and echo command -""" from __future__ import annotations - import os +import unittest -from twisted.trial import unittest +from cowrie.shell.protocol import HoneyPotInteractiveProtocol +from cowrie.test.fake_server import FakeAvatar, FakeServer +from cowrie.test.fake_transport import FakeTransport -from cowrie.shell import protocol -from cowrie.test import fake_server, fake_transport - -os.environ["COWRIE_HONEYPOT_DATA_PATH"] = "../data" +os.environ["COWRIE_HONEYPOT_DATA_PATH"] = "data" os.environ["COWRIE_HONEYPOT_DOWNLOAD_PATH"] = "/tmp" -os.environ["COWRIE_SHELL_FILESYSTEM"] = "../share/cowrie/fs.pickle" +os.environ["COWRIE_SHELL_FILESYSTEM"] = "share/cowrie/fs.pickle" PROMPT = b"root@unitTest:~# " class ShellEchoCommandTests(unittest.TestCase): - def setUp(self): - self.proto = protocol.HoneyPotInteractiveProtocol( - fake_server.FakeAvatar(fake_server.FakeServer()) - ) - self.tr = fake_transport.FakeTransport("1.1.1.1", "1111") - self.proto.makeConnection(self.tr) + """Tests for cowrie/commands/awk.py.""" + + proto = HoneyPotInteractiveProtocol(FakeAvatar(FakeServer())) + tr = FakeTransport("", "31337") + + @classmethod + def setUpClass(cls) -> None: + cls.proto.makeConnection(cls.tr) + + @classmethod + def tearDownClass(cls) -> None: + cls.proto.connectionLost("tearDown From Unit Test") + + def setUp(self) -> None: self.tr.clear() - def test_awk_command_001(self): - """ - Test $0, full input line contents - """ - self.proto.lineReceived(b'echo "test test" | awk "{ print $0 }"\n') + def test_awk_command_001(self) -> None: + self.proto.lineReceived(b"echo \"test test\" | awk \"{ print $0 }\"\n") self.assertEqual(self.tr.value(), b"test test\n" + PROMPT) - def test_awk_command_002(self): - """ - Test $1, first agument - """ - self.proto.lineReceived(b'echo "test" | awk "{ print $1 }"\n') + def test_awk_command_002(self) -> None: + self.proto.lineReceived(b"echo \"test\" | awk \"{ print $1 }\"\n") self.assertEqual(self.tr.value(), b"test\n" + PROMPT) - def test_awk_command_003(self): - """ - Test $1 $2 space separated - """ - self.proto.lineReceived(b'echo "test test" | awk "{ print $1 $2 }"\n') + def test_awk_command_003(self) -> None: + self.proto.lineReceived(b"echo \"test test\" | awk \"{ print $1 $2 }\"\n") self.assertEqual(self.tr.value(), b"test test\n" + PROMPT) - def test_awk_command_004(self): - """ - Test $1,$2 comma separated - """ - self.proto.lineReceived(b'echo "test test" | awk "{ print $1,$2 }"\n') + def test_awk_command_004(self) -> None: + self.proto.lineReceived(b"echo \"test test\" | awk \"{ print $1,$2 }\"\n") self.assertEqual(self.tr.value(), b"test test\n" + PROMPT) - def test_awk_command_005(self): - """ - Test $1$2 not separated - """ - self.proto.lineReceived(b'echo "test test" | awk "{ print $1$2 }"\n') + def test_awk_command_005(self) -> None: + self.proto.lineReceived(b"echo \"test test\" | awk \"{ print $1$2 }\"\n") self.assertEqual(self.tr.value(), b"testtest\n" + PROMPT) - - def tearDown(self): - self.proto.connectionLost("tearDown From Unit Test") diff --git a/src/cowrie/test/test_base64.py b/src/cowrie/test/test_base64.py index 260649da..8b46edfa 100644 --- a/src/cowrie/test/test_base64.py +++ b/src/cowrie/test/test_base64.py @@ -1,51 +1,43 @@ -# -*- test-case-name: Cowrie Test Cases -*- - # Copyright (c) 2020 Peter Sufliarsky # See LICENSE for details. - -""" -Tests for general shell interaction and base64 command -""" from __future__ import annotations - import os +import unittest -from twisted.trial import unittest +from cowrie.shell.protocol import HoneyPotInteractiveProtocol +from cowrie.test.fake_server import FakeAvatar, FakeServer +from cowrie.test.fake_transport import FakeTransport -from cowrie.shell import protocol -from cowrie.test import fake_server, fake_transport - -os.environ["COWRIE_HONEYPOT_DATA_PATH"] = "../data" +os.environ["COWRIE_HONEYPOT_DATA_PATH"] = "data" os.environ["COWRIE_HONEYPOT_DOWNLOAD_PATH"] = "/tmp" -os.environ["COWRIE_SHELL_FILESYSTEM"] = "../share/cowrie/fs.pickle" +os.environ["COWRIE_SHELL_FILESYSTEM"] = "share/cowrie/fs.pickle" TRY_CHMOD_HELP_MSG = b"Try 'base64 --help' for more information.\n" PROMPT = b"root@unitTest:~# " class ShellBase64CommandTests(unittest.TestCase): - def setUp(self): - self.proto = protocol.HoneyPotInteractiveProtocol( - fake_server.FakeAvatar(fake_server.FakeServer()) - ) - self.tr = fake_transport.FakeTransport("1.1.1.1", "1111") - self.proto.makeConnection(self.tr) + """Tests for cowrie/commands/base64.py""" + + proto = HoneyPotInteractiveProtocol(FakeAvatar(FakeServer())) + tr = FakeTransport("", "31337") + + @classmethod + def setUpClass(cls) -> None: + cls.proto.makeConnection(cls.tr) + + @classmethod + def tearDownClass(cls) -> None: + cls.proto.connectionLost("tearDown From Unit Test") + + def setUp(self) -> None: self.tr.clear() - def test_base64_command_001(self): - """ - Missing operand - """ + def test_base64_command_001(self) -> None: self.proto.lineReceived(b"echo cowrie | base64") self.assertEqual(self.tr.value(), b"Y293cmllCg==\n" + PROMPT) - def test_base64_command_002(self): - """ - Missing operand - """ + def test_base64_command_002(self) -> None: self.proto.lineReceived(b"echo Y293cmllCg== | base64 -d") self.assertEqual(self.tr.value(), b"cowrie\n" + PROMPT) - - def tearDown(self): - self.proto.connectionLost("tearDown From Unit Test") diff --git a/src/cowrie/test/test_base_commands.py b/src/cowrie/test/test_base_commands.py index 915e791f..3596c06f 100644 --- a/src/cowrie/test/test_base_commands.py +++ b/src/cowrie/test/test_base_commands.py @@ -1,261 +1,297 @@ -# -*- test-case-name: Cowrie Test Cases -*- - # Copyright (c) 2016 Dave Germiquet # See LICENSE for details. - from __future__ import annotations - import os +import unittest -from twisted.trial import unittest +from cowrie.commands.base import Command_php +from cowrie.shell.protocol import HoneyPotInteractiveProtocol +from cowrie.test.fake_server import FakeAvatar, FakeServer +from cowrie.test.fake_transport import FakeTransport -from cowrie.shell import protocol -from cowrie.test import fake_server, fake_transport - -os.environ["COWRIE_HONEYPOT_DATA_PATH"] = "../data" -os.environ["COWRIE_SHELL_FILESYSTEM"] = "../share/cowrie/fs.pickle" +os.environ["COWRIE_HONEYPOT_DATA_PATH"] = "data" +os.environ["COWRIE_SHELL_FILESYSTEM"] = "share/cowrie/fs.pickle" PROMPT = b"root@unitTest:~# " +NONEXISTEN_FILE = "/path/to/the/file/that/does/not/exist" -class ShellBaseCommandsTests(unittest.TestCase): - def setUp(self): - self.proto = protocol.HoneyPotInteractiveProtocol( - fake_server.FakeAvatar(fake_server.FakeServer()) - ) - self.tr = fake_transport.FakeTransport("1.1.1.1", "1111") +class ShellBaseCommandsTests(unittest.TestCase): # TODO: ps, history + """Tests for basic commands from cowrie/commands/base.py.""" + + def setUp(self) -> None: + self.proto = HoneyPotInteractiveProtocol(FakeAvatar(FakeServer())) + self.tr = FakeTransport("", "31337") self.proto.makeConnection(self.tr) self.tr.clear() - def test_whoami_command(self): + def tearDown(self) -> None: + self.proto.connectionLost("tearDown From Unit Test") + + def test_whoami_command(self) -> None: self.proto.lineReceived(b"whoami\n") self.assertEqual(self.tr.value(), b"root\n" + PROMPT) - def test_users_command(self): + def test_users_command(self) -> None: self.proto.lineReceived(b"users \n") self.assertEqual(self.tr.value(), b"root\n" + PROMPT) - # def test_exit_command(self): - # self.proto.lineReceived(b'exit \n') + def test_exit_command(self) -> None: + self.proto.lineReceived(b"exit\n") + self.assertEqual(self.tr.value(), b"") - # def test_logout_command(self): - # self.proto.lineReceived(b'logout \n') + def test_logout_command(self) -> None: + self.proto.lineReceived(b"logout\n") + self.assertEqual(self.tr.value(), b"") - # def test_clear_command(self): - # self.proto.lineReceived(b'clear \n') + def test_clear_command(self) -> None: + self.proto.lineReceived(b"clear\n") + self.assertEqual(self.tr.value(), PROMPT) - def test_hostname_command(self): + def test_hostname_command(self) -> None: self.proto.lineReceived(b"hostname unitChanged\n") self.assertEqual(self.tr.value(), b"root@unitChanged:~# ") - # def test_reset_command(self): - # self.proto.lineReceived(b'reset') + def test_reset_command(self) -> None: + self.proto.lineReceived(b"reset\n") + self.assertEqual(self.tr.value(), PROMPT) - # def test_ps_command(self): - # self.proto.lineReceived(b'ps\n') - # self.assertEqual(self.tr.value().decode('utf8'), "\n".join(self.data['results']['ps'])) - - def test_id_command(self): + def test_id_command(self) -> None: self.proto.lineReceived(b"id\n") - self.assertEqual( - self.tr.value(), b"uid=0(root) gid=0(root) groups=0(root)\n" + PROMPT - ) + self.assertEqual(self.tr.value(), b"uid=0(root) gid=0(root) groups=0(root)\n" + PROMPT) - def test_passwd_command(self): + def test_passwd_command(self) -> None: self.proto.lineReceived(b"passwd\n") self.proto.lineReceived(b"changeme\n") self.proto.lineReceived(b"changeme\n") - self.assertEqual( - self.tr.value(), - b"Enter new UNIX password: Retype new UNIX password: passwd: password updated successfully\n" - + PROMPT, - ) + self.assertEqual(self.tr.value(), + b"Enter new UNIX password: Retype new UNIX password: passwd: password updated successfully\n" + PROMPT) - # def test_shutdown_command(self): - # self.proto.lineReceived(b'shutdown\n') + def test_shutdown_command(self) -> None: + self.proto.lineReceived(b"shutdown\n") + self.assertEqual(self.tr.value(), b"Try `shutdown --help' for more information.\n" + PROMPT) # TODO: Is it right?.. - # def test_poweroff_command(self): - # self.proto.lineReceived(b'poweroff\n') + def test_poweroff_command(self) -> None: + self.proto.lineReceived(b"poweroff\n") + self.assertEqual(self.tr.value(), b"Try `shutdown --help' for more information.\n" + PROMPT) # TODO: Is it right?.. - # def test_history_command(self): - # self.proto.lineReceived(b"history\n") - # self.proto.lineReceived(b"history\n") - # print("THIS TEST IS INCOMPLETE") - - def test_date_command(self): + def test_date_command(self) -> None: self.proto.lineReceived(b"date\n") - self.assertRegex( - self.tr.value(), - b"[A-Za-z][A-Za-z][A-Za-z] [A-Za-z][A-Za-z][A-Za-z] [0-9][0-9] [0-9][0-9]:[0-9][0-9]:[0-9][0-9] UTC [0-9][0-9][0-9][0-9]\n" - + PROMPT, - ) + self.assertRegex(self.tr.value(), rb"[A-Za-z]{3} [A-Za-z]{3} \d{2} \d{2}:\d{2}:\d{2} UTC \d{4}\n" + PROMPT) - # def test_bash_command(self): - # self.proto.lineReceived(b'bash\n') - # print("THIS TEST IS INCOMPLETE") + def test_bash_command(self) -> None: + self.proto.lineReceived(b"bash\n") + self.assertEqual(self.tr.value(), PROMPT) - def test_sh_command(self): + def test_sh_command(self) -> None: self.proto.lineReceived(b"sh -c id\n") - self.assertEqual( - self.tr.value(), b"uid=0(root) gid=0(root) groups=0(root)\n" + PROMPT - ) + self.assertEqual(self.tr.value(), b"uid=0(root) gid=0(root) groups=0(root)\n" + PROMPT) - # def test_php_command(self): - # self.proto.lineReceived(b'php -h') - # print("THIS TEST IS INCOMPLETE") + def test_php_help_command(self) -> None: + self.proto.lineReceived(b"php -h\n") + self.assertEqual(self.tr.value(), Command_php.HELP.encode() + PROMPT) - def test_chattr_command(self): + def test_php_version_command(self) -> None: + self.proto.lineReceived(b"php -v\n") + self.assertEqual(self.tr.value(), Command_php.VERSION.encode() + PROMPT) + + def test_chattr_command(self) -> None: self.proto.lineReceived(b"chattr\n") self.assertEqual(self.tr.value(), PROMPT) - def test_umask_command(self): + def test_umask_command(self) -> None: self.proto.lineReceived(b"umask\n") self.assertEqual(self.tr.value(), PROMPT) - def test_set_command(self): + def test_set_command(self) -> None: self.proto.lineReceived(b"set\n") + self.assertEqual(self.tr.value(), + b"COLUMNS=80\nHOME=/root\nLINES=25\nLOGNAME=root\nPATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin\nTMOUT=1800\nUSER=root\n" + PROMPT) - self.assertEqual( - self.tr.value(), - b"COLUMNS=80\nHOME=/root\nLINES=25\nLOGNAME=root\nPATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin\nTMOUT=1800\nUSER=root\n" - + PROMPT, - ) - - def test_unset_command(self): + def test_unset_command(self) -> None: self.proto.lineReceived(b"unset\n") self.assertEqual(self.tr.value(), PROMPT) - def test_export_command(self): + def test_export_command(self) -> None: self.proto.lineReceived(b"export\n") self.assertEqual(self.tr.value(), PROMPT) - def test_alias_command(self): + def test_alias_command(self) -> None: self.proto.lineReceived(b"alias\n") self.assertEqual(self.tr.value(), PROMPT) - def test_jobs_command(self): + def test_jobs_command(self) -> None: self.proto.lineReceived(b"jobs\n") self.assertEqual(self.tr.value(), PROMPT) - def test_kill_command(self): + def test_kill_command(self) -> None: self.proto.lineReceived(b"/bin/kill\n") self.assertEqual(self.tr.value(), PROMPT) - def test_pkill_command(self): + def test_pkill_command(self) -> None: self.proto.lineReceived(b"/bin/pkill\n") self.assertEqual(self.tr.value(), PROMPT) - def test_killall_command(self): + def test_killall_command(self) -> None: self.proto.lineReceived(b"/bin/killall\n") self.assertEqual(self.tr.value(), PROMPT) - def test_killall5_command(self): + def test_killall5_command(self) -> None: self.proto.lineReceived(b"/bin/killall5\n") self.assertEqual(self.tr.value(), PROMPT) - def test_su_command(self): + def test_su_command(self) -> None: self.proto.lineReceived(b"su\n") self.assertEqual(self.tr.value(), PROMPT) - def test_chown_command(self): + def test_chown_command(self) -> None: self.proto.lineReceived(b"chown\n") self.assertEqual(self.tr.value(), PROMPT) - def test_chgrp_command(self): + def test_chgrp_command(self) -> None: self.proto.lineReceived(b"chgrp\n") self.assertEqual(self.tr.value(), PROMPT) - def tearDown(self): - self.proto.connectionLost("tearDown From Unit Test") + def test_cd_output(self) -> None: + path = "/usr/bin" + + self.proto.lineReceived(f"cd {path:s}".encode()) + self.assertEqual(self.tr.value(), PROMPT.replace(b"~", path.encode())) + self.assertEqual(self.proto.cwd, path) + + def test_cd_error_output(self) -> None: + self.proto.lineReceived(f"cd {NONEXISTEN_FILE:s}".encode()) + self.assertEqual(self.tr.value(), f"bash: cd: {NONEXISTEN_FILE:s}: No such file or directory\n".encode() + PROMPT) class ShellFileCommandsTests(unittest.TestCase): - def setUp(self): - self.proto = protocol.HoneyPotInteractiveProtocol( - fake_server.FakeAvatar(fake_server.FakeServer()) - ) - self.tr = fake_transport.FakeTransport("1.1.1.1", "1111") - self.proto.makeConnection(self.tr) + proto = HoneyPotInteractiveProtocol(FakeAvatar(FakeServer())) + tr = FakeTransport("", "31337") + cpuinfo = proto.fs.file_contents("/proc/cpuinfo") - # def test_cat_output(self): - # self.proto.lineReceived(b'cat /proc/cpuinfo') - # print("THIS TEST IS INCOMPLETE") - # print(self.tr.value()) + @classmethod + def setUpClass(cls) -> None: + cls.proto.makeConnection(cls.tr) - # def test_grep_output(self): - # self.proto.lineReceived(b'grep cpu /proc/cpuinfo') - # print("THIS TEST IS INCOMPLETE") - # print(self.tr.value()) + @classmethod + def tearDownClass(cls) -> None: + cls.proto.connectionLost("tearDown From Unit Test") - # def test_tail_output(self): - # self.proto.lineReceived(b'tail -n 10 /proc/cpuinfo') - # print("THIS TEST IS INCOMPLETE") - # print(self.tr.value()) + def setUp(self) -> None: + self.tr.clear() - # def test_cd_output(self): - # self.proto.lineReceived(b'cd /usr/bin') - # print("THIS TEST IS INCOMPLETE") - # print(self.tr.value()) + def test_cat_output(self) -> None: + self.proto.lineReceived(b"cat /proc/cpuinfo\n") + self.assertEqual(self.tr.value(), self.cpuinfo + PROMPT) - # def test_rm_output(self): - # self.proto.lineReceived(b'rm /usr/bin/gcc') - # print("THIS TEST IS INCOMPLETE") - # print(self.tr.value()) + def test_grep_output(self) -> None: + lines = [line.strip() for line in self.cpuinfo.splitlines() if b"cpu" in line] + lines.append(b"") + self.proto.lineReceived(b"grep cpu /proc/cpuinfo\n") + self.assertEqual(self.tr.value(), b"\n".join(lines) + PROMPT) - # def test_cp_output(self): - # self.proto.lineReceived(b'cp /usr/bin/gcc /tmp') - # print("THIS TEST IS INCOMPLETE") - # print(self.tr.value()) + def test_tail_output(self) -> None: + lines = [line.strip() for line in self.cpuinfo.splitlines()][-10:] + lines.append(b"") + self.proto.lineReceived(b"tail -n 10 /proc/cpuinfo\n") + self.assertEqual(self.tr.value(), b"\n".join(lines) + PROMPT) - # def test_mv_output(self): - # self.proto.lineReceived(b'mv /usr/bin/gcc /tmp') - # print("THIS TEST IS INCOMPLETE") - # print(self.tr.value()) + def test_head_output(self) -> None: + lines = [line.strip() for line in self.cpuinfo.splitlines()][:10] + lines.append(b"") + self.proto.lineReceived(b"head -n 10 /proc/cpuinfo\n") + self.assertEqual(self.tr.value(), b"\n".join(lines) + PROMPT) - # def test_mkdir_output(self): - # self.proto.lineReceived(b'mkdir /tmp/hello') - # print("THIS TEST IS INCOMPLETE") - # print(self.tr.value()) + def test_rm_output(self) -> None: + self.proto.lineReceived(b"rm /usr/bin/gcc\n") + self.assertEqual(self.tr.value(), PROMPT) - # def test_rmdir_output(self): - # self.proto.lineReceived(b'mkdir /tmp/blah') - # self.proto.lineReceived(b'rmdir /tmp/blah') - # print("THIS TEST IS INCOMPLETE") - # print(self.tr.value()) + def test_rm_error_output(self) -> None: # TODO: quotes?.. + self.proto.lineReceived(f"rm {NONEXISTEN_FILE:s}\n".encode()) + self.assertEqual(self.tr.value(), + f"rm: cannot remove `{NONEXISTEN_FILE:s}': No such file or directory\n".encode() + PROMPT) - # def test_pwd_output(self): - # self.proto.lineReceived(b'pwd') - # print("THIS TEST IS INCOMPLETE") - # print(self.tr.value()) + def test_cp_output(self) -> None: + self.proto.lineReceived(b"cp /usr/bin/gcc /tmp\n") + self.assertEqual(self.tr.value(), PROMPT) - # def test_touch_output(self): - # self.proto.lineReceived(b'touch unittests.txt') - # print("THIS TEST IS INCOMPLETE") - # print(self.tr.value()) + def test_cp_error_output(self) -> None: # TODO: quotes?.. + self.proto.lineReceived(f"cp {NONEXISTEN_FILE:s} /tmp\n".encode()) + self.assertEqual(self.tr.value(), + f"cp: cannot stat `{NONEXISTEN_FILE:s}': No such file or directory\n".encode() + PROMPT) - def tearDown(self): - self.proto.connectionLost("tearDown From Unit Test") + def test_mv_output(self) -> None: + self.proto.lineReceived(b"mv /usr/bin/awk /tmp\n") + self.assertEqual(self.tr.value(), PROMPT) + + def test_mv_error_output(self) -> None: # TODO: quotes?.. + self.proto.lineReceived(f"mv {NONEXISTEN_FILE:s} /tmp\n".encode()) + self.assertEqual(self.tr.value(), + f"mv: cannot stat `{NONEXISTEN_FILE:s}': No such file or directory\n".encode() + PROMPT) + + def test_mkdir_output(self) -> None: + path = "/tmp/hello" + + self.proto.lineReceived(f"mkdir {path:s}\n".encode()) + self.assertEqual(self.tr.value(), PROMPT) + self.assertTrue(self.proto.fs.exists(path)) + self.assertTrue(self.proto.fs.isdir(path)) + + def test_mkdir_error_output(self) -> None: # TODO: quotes?.. + path = "/etc" + + self.proto.lineReceived(f"mkdir {path:s}\n".encode()) + self.assertEqual(self.tr.value(), f"mkdir: cannot create directory `{path:s}': File exists\n".encode() + PROMPT) + + def test_rmdir_output(self) -> None: + path = "/tmp/bye" + + self.proto.lineReceived(f"mkdir {path:s}\n".encode()) + self.tr.clear() + self.proto.lineReceived(f"rmdir {path:s}\n".encode()) + self.assertEqual(self.tr.value(), PROMPT) + self.assertFalse(self.proto.fs.exists(path)) + + def test_rmdir_error_output(self) -> None: # TODO: quotes?.. + self.proto.lineReceived(f"rmdir {NONEXISTEN_FILE:s}\n".encode()) + self.assertEqual(self.tr.value(), + f"rmdir: failed to remove `{NONEXISTEN_FILE:s}': No such file or directory\n".encode() + PROMPT) + + def test_pwd_output(self) -> None: + self.proto.lineReceived(b"pwd\n") + self.assertEqual(self.tr.value(), self.proto.cwd.encode() + b"\n" + PROMPT) + + def test_touch_output(self) -> None: + path = "/tmp/test.txt" + + self.proto.lineReceived(f"touch {path:s}\n".encode()) + self.assertEqual(self.tr.value(), PROMPT) + self.assertTrue(self.proto.fs.exists(path)) class ShellPipeCommandsTests(unittest.TestCase): - def setUp(self): - self.proto = protocol.HoneyPotInteractiveProtocol( - fake_server.FakeAvatar(fake_server.FakeServer()) - ) - self.tr = fake_transport.FakeTransport("1.1.1.1", "1111") - self.proto.makeConnection(self.tr) + proto = HoneyPotInteractiveProtocol(FakeAvatar(FakeServer())) + tr = FakeTransport("", "31337") - def test_shell_pipe_with_cat_tail(self): + @classmethod + def setUpClass(cls) -> None: + cls.proto.makeConnection(cls.tr) + + @classmethod + def tearDownClass(cls) -> None: + cls.proto.connectionLost("tearDown From Unit Test") + + def setUp(self) -> None: + self.tr.clear() + + def test_shell_pipe_with_cat_tail(self) -> None: self.proto.lineReceived(b"echo test | tail -n 1\n") - self.assertEqual(self.tr.value(), PROMPT + b"test\n" + PROMPT) + self.assertEqual(self.tr.value(), b"test\n" + PROMPT) - def test_shell_pipe_with_cat_head(self): + def test_shell_pipe_with_cat_head(self) -> None: self.proto.lineReceived(b"echo test | head -n 1\n") - self.assertEqual(self.tr.value(), PROMPT + b"test\n" + PROMPT) + self.assertEqual(self.tr.value(), b"test\n" + PROMPT) - # def test_shell_busybox_with_cat_and_sudo_grep(self): + # def test_shell_busybox_with_cat_and_sudo_grep(self) -> None: # self.proto.lineReceived(b'busybox cat /proc/cpuinfo | sudo grep cpu \n') - - def tearDown(self): - self.proto.connectionLost("tearDown From Unit Test") diff --git a/src/cowrie/test/test_cat.py b/src/cowrie/test/test_cat.py index 415650c7..bb6293fa 100644 --- a/src/cowrie/test/test_cat.py +++ b/src/cowrie/test/test_cat.py @@ -1,70 +1,51 @@ -# -*- test-case-name: Cowrie Test Cases -*- - # Copyright (c) 2018 Michel Oosterhof # See LICENSE for details. - -""" -Tests for general shell interaction and cat command -""" - from __future__ import annotations import os +import unittest -from twisted.trial import unittest +from cowrie.shell.protocol import HoneyPotInteractiveProtocol +from cowrie.test.fake_server import FakeAvatar, FakeServer +from cowrie.test.fake_transport import FakeTransport -from cowrie.shell import protocol -from cowrie.test import fake_server, fake_transport - -os.environ["COWRIE_HONEYPOT_DATA_PATH"] = "../data" +os.environ["COWRIE_HONEYPOT_DATA_PATH"] = "data" os.environ["COWRIE_HONEYPOT_DOWNLOAD_PATH"] = "/tmp" -os.environ["COWRIE_SHELL_FILESYSTEM"] = "../share/cowrie/fs.pickle" +os.environ["COWRIE_SHELL_FILESYSTEM"] = "share/cowrie/fs.pickle" PROMPT = b"root@unitTest:~# " class ShellCatCommandTests(unittest.TestCase): - def setUp(self): - self.proto = protocol.HoneyPotInteractiveProtocol( - fake_server.FakeAvatar(fake_server.FakeServer()) - ) - self.tr = fake_transport.FakeTransport("1.1.1.1", "1111") + """Test for cowrie/commands/cat.py.""" + + def setUp(self) -> None: + self.proto = HoneyPotInteractiveProtocol(FakeAvatar(FakeServer())) + self.tr = FakeTransport("", "31337") self.proto.makeConnection(self.tr) self.tr.clear() - def test_cat_command_001(self): - """ - No such file - """ + def tearDown(self) -> None: + self.proto.connectionLost("tearDown From Unit Test") + + def test_cat_command_001(self) -> None: self.proto.lineReceived(b"cat nonExisting\n") self.assertEqual( self.tr.value(), b"cat: nonExisting: No such file or directory\n" + PROMPT ) - def test_cat_command_002(self): - """ - argument - (stdin) - """ + def test_cat_command_002(self) -> None: self.proto.lineReceived(b"echo test | cat -\n") self.assertEqual(self.tr.value(), b"test\n" + PROMPT) - def test_cat_command_003(self): - """ - test without arguments, read stdin only and quit - """ + def test_cat_command_003(self) -> None: self.proto.lineReceived(b"echo 1 | cat\n") self.proto.lineReceived(b"echo 2\n") self.proto.handle_CTRL_D() self.assertEqual(self.tr.value(), b"1\n" + PROMPT + b"2\n" + PROMPT) - def test_cat_command_004(self): - """ - test handle of CTRL_C - """ + def test_cat_command_004(self) -> None: self.proto.lineReceived(b"cat\n") self.proto.lineReceived(b"test\n") self.proto.handle_CTRL_C() self.assertEqual(self.tr.value(), b"test\n^C\n" + PROMPT) - - def tearDown(self): - self.proto.connectionLost("tearDown From Unit Test") diff --git a/src/cowrie/test/test_chmod.py b/src/cowrie/test/test_chmod.py index 7e5678e4..80ad5000 100644 --- a/src/cowrie/test/test_chmod.py +++ b/src/cowrie/test/test_chmod.py @@ -1,166 +1,114 @@ -# -*- test-case-name: Cowrie Test Cases -*- - # Copyright (c) 2020 Peter Sufliarsky # See LICENSE for details. - -""" -Tests for general shell interaction and chmod command -""" - from __future__ import annotations - import os +import unittest -from twisted.trial import unittest +from cowrie.shell.protocol import HoneyPotInteractiveProtocol +from cowrie.test.fake_server import FakeAvatar, FakeServer +from cowrie.test.fake_transport import FakeTransport -from cowrie.shell import protocol -from cowrie.test import fake_server, fake_transport - -os.environ["COWRIE_HONEYPOT_DATA_PATH"] = "../data" +os.environ["COWRIE_HONEYPOT_DATA_PATH"] = "data" os.environ["COWRIE_HONEYPOT_DOWNLOAD_PATH"] = "/tmp" -os.environ["COWRIE_SHELL_FILESYSTEM"] = "../share/cowrie/fs.pickle" +os.environ["COWRIE_SHELL_FILESYSTEM"] = "share/cowrie/fs.pickle" TRY_CHMOD_HELP_MSG = b"Try 'chmod --help' for more information.\n" PROMPT = b"root@unitTest:~# " class ShellChmodCommandTests(unittest.TestCase): - def setUp(self): - self.proto = protocol.HoneyPotInteractiveProtocol( - fake_server.FakeAvatar(fake_server.FakeServer()) - ) - self.tr = fake_transport.FakeTransport("1.1.1.1", "1111") - self.proto.makeConnection(self.tr) + """Test for cowrie/commands/chmod.py.""" + + proto = HoneyPotInteractiveProtocol(FakeAvatar(FakeServer())) + tr = FakeTransport("", "31337") + + @classmethod + def setUpClass(cls) -> None: + cls.proto.makeConnection(cls.tr) + + @classmethod + def tearDownClass(cls) -> None: + cls.proto.connectionLost("tearDown From Unit Test") + + def setUp(self) -> None: self.tr.clear() - def test_chmod_command_001(self): - """ - Missing operand - """ + def test_chmod_command_001(self) -> None: self.proto.lineReceived(b"chmod") self.assertEqual( self.tr.value(), b"chmod: missing operand\n" + TRY_CHMOD_HELP_MSG + PROMPT ) - def test_chmod_command_002(self): - """ - Missing operand - """ + def test_chmod_command_002(self) -> None: self.proto.lineReceived(b"chmod -x") self.assertEqual( self.tr.value(), b"chmod: missing operand\n" + TRY_CHMOD_HELP_MSG + PROMPT ) - def test_chmod_command_003(self): - """ - Missing operand after ... - """ + def test_chmod_command_003(self) -> None: self.proto.lineReceived(b"chmod +x") self.assertEqual( self.tr.value(), - b"chmod: missing operand after \xe2\x80\x98+x\xe2\x80\x99\n" - + TRY_CHMOD_HELP_MSG - + PROMPT, + b"chmod: missing operand after \xe2\x80\x98+x\xe2\x80\x99\n" + TRY_CHMOD_HELP_MSG + PROMPT ) - def test_chmod_command_004(self): - """ - Invalid option - """ + def test_chmod_command_004(self) -> None: self.proto.lineReceived(b"chmod -A") self.assertEqual( self.tr.value(), - b"chmod: invalid option -- 'A'\n" + TRY_CHMOD_HELP_MSG + PROMPT, + b"chmod: invalid option -- 'A'\n" + TRY_CHMOD_HELP_MSG + PROMPT ) - def test_chmod_command_005(self): - """ - Unrecognized option - """ + def test_chmod_command_005(self) -> None: self.proto.lineReceived(b"chmod --A") self.assertEqual( self.tr.value(), - b"chmod: unrecognized option '--A'\n" + TRY_CHMOD_HELP_MSG + PROMPT, + b"chmod: unrecognized option '--A'\n" + TRY_CHMOD_HELP_MSG + PROMPT ) - def test_chmod_command_006(self): - """ - No such file or directory - """ + def test_chmod_command_006(self) -> None: self.proto.lineReceived(b"chmod -x abcd") self.assertEqual( self.tr.value(), - b"chmod: cannot access 'abcd': No such file or directory\n" + PROMPT, + b"chmod: cannot access 'abcd': No such file or directory\n" + PROMPT ) - def test_chmod_command_007(self): - """ - Invalid mode - """ + def test_chmod_command_007(self) -> None: self.proto.lineReceived(b"chmod abcd efgh") self.assertEqual( self.tr.value(), - b"chmod: invalid mode: \xe2\x80\x98abcd\xe2\x80\x99\n" - + TRY_CHMOD_HELP_MSG - + PROMPT, + b"chmod: invalid mode: \xe2\x80\x98abcd\xe2\x80\x99\n" + TRY_CHMOD_HELP_MSG + PROMPT ) - def test_chmod_command_008(self): - """ - Valid directory .ssh - """ + def test_chmod_command_008(self) -> None: self.proto.lineReceived(b"chmod +x .ssh") self.assertEqual(self.tr.value(), PROMPT) - def test_chmod_command_009(self): - """ - Valid directory .ssh recursive - """ + def test_chmod_command_009(self) -> None: self.proto.lineReceived(b"chmod -R +x .ssh") self.assertEqual(self.tr.value(), PROMPT) - def test_chmod_command_010(self): - """ - Valid directory /root/.ssh - """ + def test_chmod_command_010(self) -> None: self.proto.lineReceived(b"chmod +x /root/.ssh") self.assertEqual(self.tr.value(), PROMPT) - def test_chmod_command_011(self): - """ - Valid directory ~/.ssh - """ + def test_chmod_command_011(self) -> None: self.proto.lineReceived(b"chmod +x ~/.ssh") self.assertEqual(self.tr.value(), PROMPT) - def test_chmod_command_012(self): - """ - chmod a+x - """ + def test_chmod_command_012(self) -> None: self.proto.lineReceived(b"chmod a+x .ssh") self.assertEqual(self.tr.value(), PROMPT) - def test_chmod_command_013(self): - """ - chmod ug+x - """ + def test_chmod_command_013(self) -> None: self.proto.lineReceived(b"chmod ug+x .ssh") self.assertEqual(self.tr.value(), PROMPT) - def test_chmod_command_014(self): - """ - chmod 777 - """ + def test_chmod_command_014(self) -> None: self.proto.lineReceived(b"chmod 777 .ssh") self.assertEqual(self.tr.value(), PROMPT) - def test_chmod_command_015(self): - """ - chmod 0775 - """ + def test_chmod_command_015(self) -> None: self.proto.lineReceived(b"chmod 0755 .ssh") self.assertEqual(self.tr.value(), PROMPT) - - def tearDown(self): - self.proto.connectionLost("tearDown From Unit Test") diff --git a/src/cowrie/test/test_echo.py b/src/cowrie/test/test_echo.py index e595e7e5..2553d8ab 100644 --- a/src/cowrie/test/test_echo.py +++ b/src/cowrie/test/test_echo.py @@ -1,219 +1,134 @@ -# -*- test-case-name: Cowrie Test Cases -*- - # Copyright (c) 2018 Michel Oosterhof # See LICENSE for details. - -""" -Tests for general shell interaction and echo command -""" - from __future__ import annotations - import os +import unittest -from twisted.trial import unittest +from cowrie.shell.protocol import HoneyPotInteractiveProtocol +from cowrie.test.fake_server import FakeAvatar, FakeServer +from cowrie.test.fake_transport import FakeTransport -from cowrie.shell import protocol -from cowrie.test import fake_server, fake_transport - -os.environ["COWRIE_HONEYPOT_DATA_PATH"] = "../data" +os.environ["COWRIE_HONEYPOT_DATA_PATH"] = "data" os.environ["COWRIE_HONEYPOT_DOWNLOAD_PATH"] = "/tmp" -os.environ["COWRIE_SHELL_FILESYSTEM"] = "../share/cowrie/fs.pickle" +os.environ["COWRIE_SHELL_FILESYSTEM"] = "share/cowrie/fs.pickle" PROMPT = b"root@unitTest:~# " class ShellEchoCommandTests(unittest.TestCase): - def setUp(self): - self.proto = protocol.HoneyPotInteractiveProtocol( - fake_server.FakeAvatar(fake_server.FakeServer()) - ) - self.tr = fake_transport.FakeTransport("1.1.1.1", "1111") - self.proto.makeConnection(self.tr) + """Test for echo command from cowrie/commands/base.py.""" + + proto = HoneyPotInteractiveProtocol(FakeAvatar(FakeServer())) + tr = FakeTransport("", "31337") + + @classmethod + def setUpClass(cls) -> None: + cls.proto.makeConnection(cls.tr) + + @classmethod + def tearDownClass(cls) -> None: + cls.proto.connectionLost("tearDown From Unit Test") + + def setUp(self) -> None: self.tr.clear() - def test_echo_command_001(self): - """ - Basic test - """ - self.proto.lineReceived(b'echo "test"\n') + def test_echo_command_001(self) -> None: + self.proto.lineReceived(b"echo \"test\"\n") self.assertEqual(self.tr.value(), b"test\n" + PROMPT) - def test_echo_command_002(self): - """ - argument splitting and recombining - """ + def test_echo_command_002(self) -> None: self.proto.lineReceived(b"echo test test\n") self.assertEqual(self.tr.value(), b"test test\n" + PROMPT) - def test_echo_command_003(self): - """ - echo -n - """ - self.proto.lineReceived(b'echo -n "test test"\n') + def test_echo_command_003(self) -> None: + self.proto.lineReceived(b"echo -n \"test test\"\n") self.assertEqual(self.tr.value(), b"test test" + PROMPT) - def test_echo_command_004(self): - """ - echo -n - """ - self.proto.lineReceived(b'echo -n "test test"\n') - self.assertEqual(self.tr.value(), b"test test" + PROMPT) - - def test_echo_command_005(self): - """ - echo test >> test; cat test - """ + def test_echo_command_005(self) -> None: self.proto.lineReceived(b"echo test > test5; cat test5") self.assertEqual(self.tr.value(), b"test\n" + PROMPT) - def test_echo_command_006(self): - """ - echo -n - """ - self.proto.lineReceived(b'echo "\\n"\n') + def test_echo_command_006(self) -> None: + self.proto.lineReceived(b"echo \"\\n\"\n") self.assertEqual(self.tr.value(), b"\\n\n" + PROMPT) - def test_echo_command_007(self): - """ - echo test >> test; cat test - """ + def test_echo_command_007(self) -> None: self.proto.lineReceived(b"echo test >> test7; cat test7") self.assertEqual(self.tr.value(), b"test\n" + PROMPT) - def test_echo_command_008(self): - """ - echo test > test; echo test >> test; cat test - """ + def test_echo_command_008(self) -> None: self.proto.lineReceived(b"echo test > test8; echo test >> test8; cat test8") self.assertEqual(self.tr.value(), b"test\ntest\n" + PROMPT) - def test_echo_command_009(self): - """ - echo test | grep test - """ + def test_echo_command_009(self) -> None: self.proto.lineReceived(b"echo test | grep test") self.assertEqual(self.tr.value(), b"test\n" + PROMPT) - def test_echo_command_010(self): - """ - echo test | grep test - """ + def test_echo_command_010(self) -> None: self.proto.lineReceived(b"echo test | grep test2") self.assertEqual(self.tr.value(), PROMPT) - def test_echo_command_011(self): - """ - echo test > test011; cat test011 | grep test - """ + def test_echo_command_011(self) -> None: self.proto.lineReceived(b"echo test > test011; cat test011 | grep test") self.assertEqual(self.tr.value(), b"test\n" + PROMPT) - def test_echo_command_012(self): - """ - echo test > test012; grep test test012 - """ + def test_echo_command_012(self) -> None: self.proto.lineReceived(b"echo test > test012; grep test test012") self.assertEqual(self.tr.value(), b"test\n" + PROMPT) - def test_echo_command_013(self): - """ - echo "ls""ls" - """ - self.proto.lineReceived(b'echo "ls""ls"') + def test_echo_command_013(self) -> None: + self.proto.lineReceived(b"echo \"ls\"\"ls\"") self.assertEqual(self.tr.value(), b"lsls\n" + PROMPT) - def test_echo_command_014(self): - """ - echo '"ls"' - """ + def test_echo_command_014(self) -> None: self.proto.lineReceived(b"echo '\"ls\"'") self.assertEqual(self.tr.value(), b'"ls"\n' + PROMPT) - def test_echo_command_015(self): - """ - echo "'ls'" - """ + def test_echo_command_015(self) -> None: self.proto.lineReceived(b"echo \"'ls'\"") self.assertEqual(self.tr.value(), b"'ls'\n" + PROMPT) - def test_echo_command_016(self): - """ - echo -e "\x6b\x61\x6d\x69" - """ - self.proto.lineReceived(b'echo -e "\x6b\x61\x6d\x69"') + def test_echo_command_016(self) -> None: + self.proto.lineReceived(b"echo -e \"\x6b\x61\x6d\x69\"") self.assertEqual(self.tr.value(), b"kami\n" + PROMPT) - def test_echo_command_017(self): - """ - echo -e "\x6b\x61\x6d\x69" - """ + def test_echo_command_017(self) -> None: self.proto.lineReceived(b"echo echo test | bash") self.assertEqual(self.tr.value(), b"test\n" + PROMPT) - def test_echo_command_018(self): - """ - echo $(echo test) - """ + def test_echo_command_018(self) -> None: self.proto.lineReceived(b"echo $(echo test)") self.assertEqual(self.tr.value(), b"test\n" + PROMPT) - def test_echo_command_019(self): - """ - echo $(echo $(echo test)) - """ + def test_echo_command_019(self) -> None: self.proto.lineReceived(b"echo $(echo $(echo test))") self.assertEqual(self.tr.value(), b"test\n" + PROMPT) - def test_echo_command_020(self): - """ - echo test_$(echo test)_test - """ + def test_echo_command_020(self) -> None: self.proto.lineReceived(b"echo test_$(echo test)_test") self.assertEqual(self.tr.value(), b"test_test_test\n" + PROMPT) - def test_echo_command_021(self): - """ - echo test_$(echo test)_test_$(echo test)_test - """ + def test_echo_command_021(self) -> None: self.proto.lineReceived(b"echo test_$(echo test)_test_$(echo test)_test") self.assertEqual(self.tr.value(), b"test_test_test_test_test\n" + PROMPT) - def test_echo_command_022(self): - """ - echo test; (echo test) - """ + def test_echo_command_022(self) -> None: self.proto.lineReceived(b"echo test; (echo test)") self.assertEqual(self.tr.value(), b"test\ntest\n" + PROMPT) - def test_echo_command_023(self): - """ - echo `echo test` - """ + def test_echo_command_023(self) -> None: self.proto.lineReceived(b"echo `echo test`") self.assertEqual(self.tr.value(), b"test\n" + PROMPT) - def test_echo_command_024(self): - """ - echo test_`echo test`_test - """ + def test_echo_command_024(self) -> None: self.proto.lineReceived(b"echo test_`echo test`_test") self.assertEqual(self.tr.value(), b"test_test_test\n" + PROMPT) - def test_echo_command_025(self): - """ - echo test_`echo test`_test_`echo test`_test - """ + def test_echo_command_025(self) -> None: self.proto.lineReceived(b"echo test_`echo test`_test_`echo test`_test") self.assertEqual(self.tr.value(), b"test_test_test_test_test\n" + PROMPT) - def test_echo_command_026(self): - """ - echo "TEST1: `echo test1`, TEST2: `echo test2`" - """ - self.proto.lineReceived(b'echo "TEST1: `echo test1`, TEST2: `echo test2`"') + def test_echo_command_026(self) -> None: + self.proto.lineReceived(b"echo \"TEST1: `echo test1`, TEST2: `echo test2`\"") self.assertEqual(self.tr.value(), b"TEST1: test1, TEST2: test2\n" + PROMPT) - - def tearDown(self): - self.proto.connectionLost("tearDown From Unit Test") diff --git a/src/cowrie/test/test_ftpget.py b/src/cowrie/test/test_ftpget.py index 93dfd400..1ef2d744 100644 --- a/src/cowrie/test/test_ftpget.py +++ b/src/cowrie/test/test_ftpget.py @@ -1,54 +1,50 @@ -# -*- test-case-name: Cowrie Test Cases -*- - # Copyright (c) 2018 Michel Oosterhof # See LICENSE for details. from __future__ import annotations - import os +import unittest -from twisted.trial import unittest +from cowrie.shell.protocol import HoneyPotInteractiveProtocol +from cowrie.test.fake_server import FakeAvatar, FakeServer +from cowrie.test.fake_transport import FakeTransport -from cowrie.shell import protocol -from cowrie.test import fake_server, fake_transport - -os.environ["COWRIE_HONEYPOT_DATA_PATH"] = "../data" +os.environ["COWRIE_HONEYPOT_DATA_PATH"] = "data" os.environ["COWRIE_HONEYPOT_DOWNLOAD_PATH"] = "/tmp" -os.environ["COWRIE_SHELL_FILESYSTEM"] = "../share/cowrie/fs.pickle" +os.environ["COWRIE_SHELL_FILESYSTEM"] = "share/cowrie/fs.pickle" PROMPT = b"root@unitTest:~# " -class ShellftpgetCommandTests(unittest.TestCase): - def setUp(self): - self.proto = protocol.HoneyPotInteractiveProtocol( - fake_server.FakeAvatar(fake_server.FakeServer()) - ) - self.tr = fake_transport.FakeTransport("1.1.1.1", "1111") - self.proto.makeConnection(self.tr) +class ShellFtpGetCommandTests(unittest.TestCase): + """Tests for cowrie/commands/ftpget.py.""" + + proto = HoneyPotInteractiveProtocol(FakeAvatar(FakeServer())) + tr = FakeTransport("", "31337") + + @classmethod + def setUpClass(cls) -> None: + cls.proto.makeConnection(cls.tr) + + @classmethod + def tearDownClass(cls) -> None: + cls.proto.connectionLost("tearDown From Unit Test") + + def setUp(self) -> None: self.tr.clear() - def test_help_command(self): - """ - Basic test - """ + def test_help_command(self) -> None: + usage = b"BusyBox v1.20.2 (2016-06-22 15:12:53 EDT) multi-call binary.\n" \ + b"\n" \ + b"Usage: ftpget [OPTIONS] HOST [LOCAL_FILE] REMOTE_FILE\n" \ + b"\n" \ + b"Download a file via FTP\n" \ + b"\n" \ + b" -c Continue previous transfer\n" \ + b" -v Verbose\n" \ + b" -u USER Username\n" \ + b" -p PASS Password\n" \ + b" -P NUM Port\n\n" self.proto.lineReceived(b"ftpget\n") - self.assertEqual( - self.tr.value(), - b"""BusyBox v1.20.2 (2016-06-22 15:12:53 EDT) multi-call binary. - -Usage: ftpget [OPTIONS] HOST [LOCAL_FILE] REMOTE_FILE - -Download a file via FTP - - -c Continue previous transfer - -v Verbose - -u USER Username - -p PASS Password - -P NUM Port\n\n""" - + PROMPT, - ) - - def tearDown(self): - self.proto.connectionLost("tearDown From Unit Test") + self.assertEqual(self.tr.value(), usage + PROMPT) diff --git a/src/cowrie/test/test_proxy.py b/src/cowrie/test/test_proxy.py index 4357367b..55fcad53 100644 --- a/src/cowrie/test/test_proxy.py +++ b/src/cowrie/test/test_proxy.py @@ -1,21 +1,21 @@ # -*- test-case-name: Cowrie Proxy Test Cases -*- +#mypy: ignore # noqa # Copyright (c) 2019 Guilherme Borges # See LICENSE for details. from __future__ import annotations - import os - -from twisted.cred import portal -from twisted.internet import reactor # type: ignore -from twisted.trial import unittest +import unittest from cowrie.core.checkers import HoneypotPasswordChecker, HoneypotPublicKeyChecker from cowrie.core.realm import HoneyPotRealm from cowrie.ssh.factory import CowrieSSHFactory +from twisted.cred import portal +from twisted.internet import reactor # type: ignore + # from cowrie.test.proxy_compare import ProxyTestCommand os.environ["COWRIE_HONEYPOT_TTYLOG"] = "false" diff --git a/src/cowrie/test/test_tee.py b/src/cowrie/test/test_tee.py index bd3fcbcd..a5a998fc 100644 --- a/src/cowrie/test/test_tee.py +++ b/src/cowrie/test/test_tee.py @@ -1,78 +1,61 @@ -# -*- test-case-name: Cowrie Test Cases -*- - # Copyright (c) 2018 Michel Oosterhof # See LICENSE for details. - -""" -Tests for general shell interaction and tee command -""" from __future__ import annotations - import os +import unittest -from twisted.trial import unittest +from cowrie.shell.protocol import HoneyPotInteractiveProtocol +from cowrie.test.fake_server import FakeAvatar, FakeServer +from cowrie.test.fake_transport import FakeTransport -from cowrie.shell import protocol -from cowrie.test import fake_server, fake_transport - -os.environ["COWRIE_HONEYPOT_DATA_PATH"] = "../data" +os.environ["COWRIE_HONEYPOT_DATA_PATH"] = "data" os.environ["COWRIE_HONEYPOT_DOWNLOAD_PATH"] = "/tmp" -os.environ["COWRIE_SHELL_FILESYSTEM"] = "../share/cowrie/fs.pickle" +os.environ["COWRIE_SHELL_FILESYSTEM"] = "share/cowrie/fs.pickle" PROMPT = b"root@unitTest:~# " class ShellTeeCommandTests(unittest.TestCase): - def setUp(self): - self.proto = protocol.HoneyPotInteractiveProtocol( - fake_server.FakeAvatar(fake_server.FakeServer()) - ) - self.tr = fake_transport.FakeTransport("1.1.1.1", "1111") - self.proto.makeConnection(self.tr) + """Tests for cowrie/commands/tee.py.""" + + proto = HoneyPotInteractiveProtocol(FakeAvatar(FakeServer())) + tr = FakeTransport("", "31337") + + @classmethod + def setUpClass(cls) -> None: + cls.proto.makeConnection(cls.tr) + + @classmethod + def tearDownClass(cls) -> None: + cls.proto.connectionLost("tearDown From Unit Test") + + def setUp(self) -> None: self.tr.clear() - def test_tee_command_001(self): - """ - No such file - """ + def test_tee_command_001(self) -> None: self.proto.lineReceived(b"tee /a/b/c/d\n") self.assertEqual(self.tr.value(), b"tee: /a/b/c/d: No such file or directory\n") + # tee still waiting input from stdin + self.proto.handle_CTRL_C() - def test_tee_command_002(self): - """ - argument - (stdin) - """ + def test_tee_command_002(self) -> None: self.proto.lineReceived(b"tee /a/b/c/d\n") self.proto.handle_CTRL_C() - self.assertEqual( - self.tr.value(), b"tee: /a/b/c/d: No such file or directory\n^C\n" + PROMPT - ) + self.assertEqual(self.tr.value(), b"tee: /a/b/c/d: No such file or directory\n^C\n" + PROMPT) - def test_tee_command_003(self): - """ - test ignore stdin when called without '-' - """ + def test_tee_command_003(self) -> None: self.proto.lineReceived(b"tee a\n") self.proto.lineReceived(b"test\n") self.proto.handle_CTRL_D() self.assertEqual(self.tr.value(), b"test\n" + PROMPT) - def test_tee_command_004(self): - """ - test handle of stdin - """ + def test_tee_command_004(self) -> None: self.proto.lineReceived(b"echo test | tee\n") self.assertEqual(self.tr.value(), b"test\n" + PROMPT) - def test_tee_command_005(self): - """ - test handle of CTRL_C - """ + def test_tee_command_005(self) -> None: self.proto.lineReceived(b"tee\n") self.proto.lineReceived(b"test\n") self.proto.handle_CTRL_D() self.assertEqual(self.tr.value(), b"test\n" + PROMPT) - - def tearDown(self): - self.proto.connectionLost("tearDown From Unit Test") diff --git a/src/cowrie/test/test_tftp.py b/src/cowrie/test/test_tftp.py index 233987fa..ac163cce 100644 --- a/src/cowrie/test/test_tftp.py +++ b/src/cowrie/test/test_tftp.py @@ -1,47 +1,35 @@ -# -*- test-case-name: Cowrie Test Cases -*- - # Copyright (c) 2018 Michel Oosterhof # See LICENSE for details. - -""" -Tests for general shell interaction and echo command -""" from __future__ import annotations - import os +import unittest -from twisted.trial import unittest +from cowrie.shell.protocol import HoneyPotInteractiveProtocol +from cowrie.test.fake_server import FakeAvatar, FakeServer +from cowrie.test.fake_transport import FakeTransport -from cowrie.shell import protocol -from cowrie.test import fake_server, fake_transport - -os.environ["COWRIE_HONEYPOT_DATA_PATH"] = "../data" +os.environ["COWRIE_HONEYPOT_DATA_PATH"] = "data" os.environ["COWRIE_HONEYPOT_DOWNLOAD_PATH"] = "/tmp" -os.environ["COWRIE_SHELL_FILESYSTEM"] = "../share/cowrie/fs.pickle" +os.environ["COWRIE_SHELL_FILESYSTEM"] = "share/cowrie/fs.pickle" PROMPT = b"root@unitTest:~# " class ShellTftpCommandTests(unittest.TestCase): - def setUp(self): - self.proto = protocol.HoneyPotInteractiveProtocol( - fake_server.FakeAvatar(fake_server.FakeServer()) - ) - self.tr = fake_transport.FakeTransport("1.1.1.1", "1111") + """Tests for cowrie/commands/tftp.py.""" + + def setUp(self) -> None: + self.proto = HoneyPotInteractiveProtocol(FakeAvatar(FakeServer())) + self.tr = FakeTransport("", "31337") self.proto.makeConnection(self.tr) self.tr.clear() - def test_echo_command_001(self): - """ - Basic test - """ + def tearDown(self) -> None: + self.proto.connectionLost("tearDown From Unit Test") + + def test_echo_command_001(self) -> None: self.proto.lineReceived(b"tftp\n") self.assertEqual( - self.tr.value(), - b"usage: tftp [-h] [-c C C] [-l L] [-g G] [-p P] [-r R] [hostname]\n" - + PROMPT, + self.tr.value(), b"usage: tftp [-h] [-c C C] [-l L] [-g G] [-p P] [-r R] [hostname]\n" + PROMPT ) - - def tearDown(self): - self.proto.connectionLost("tearDown From Unit Test") diff --git a/src/cowrie/test/test_uniq.py b/src/cowrie/test/test_uniq.py index 24efff81..7db0d650 100644 --- a/src/cowrie/test/test_uniq.py +++ b/src/cowrie/test/test_uniq.py @@ -1,61 +1,50 @@ -# -*- test-case-name: Cowrie Test Cases -*- - # Copyright (c) 2020 Peter Sufliarsky # See LICENSE for details. - -""" -Tests for uniq command -""" from __future__ import annotations - import os +import unittest -from twisted.trial import unittest +from cowrie.shell.protocol import HoneyPotInteractiveProtocol +from cowrie.test.fake_server import FakeAvatar, FakeServer +from cowrie.test.fake_transport import FakeTransport -from cowrie.shell import protocol -from cowrie.test import fake_server, fake_transport - -os.environ["COWRIE_HONEYPOT_DATA_PATH"] = "../data" +os.environ["COWRIE_HONEYPOT_DATA_PATH"] = "data" os.environ["COWRIE_HONEYPOT_DOWNLOAD_PATH"] = "/tmp" -os.environ["COWRIE_SHELL_FILESYSTEM"] = "../share/cowrie/fs.pickle" +os.environ["COWRIE_SHELL_FILESYSTEM"] = "share/cowrie/fs.pickle" PROMPT = b"root@unitTest:~# " class ShellUniqCommandTests(unittest.TestCase): - def setUp(self): - self.proto = protocol.HoneyPotInteractiveProtocol( - fake_server.FakeAvatar(fake_server.FakeServer()) - ) - self.tr = fake_transport.FakeTransport("1.1.1.1", "1111") - self.proto.makeConnection(self.tr) + """Tests for cowrie/commands/uniq.py.""" + + proto = HoneyPotInteractiveProtocol(FakeAvatar(FakeServer())) + tr = FakeTransport("", "31337") + + @classmethod + def setUpClass(cls) -> None: + cls.proto.makeConnection(cls.tr) + + @classmethod + def tearDownClass(cls) -> None: + cls.proto.connectionLost("tearDown From Unit Test") + + def setUp(self) -> None: self.tr.clear() - def test_uniq_command_001(self): - """ - echo test | uniq - """ + def test_uniq_command_001(self) -> None: self.proto.lineReceived(b"echo test | uniq\n") self.assertEqual(self.tr.value(), b"test\n" + PROMPT) - def test_uniq_command_002(self): - """ - echo -e "test\ntest\ntest" | uniq - """ - self.proto.lineReceived(b'echo -e "test\ntest\ntest" | uniq\n') + def test_uniq_command_002(self) -> None: + self.proto.lineReceived(b"echo -e \"test\ntest\ntest\" | uniq\n") self.assertEqual(self.tr.value(), b"test\n" + PROMPT) - def test_uniq_command_003(self): - """ - test without arguments, read stdin and quit after Ctrl+D - """ + def test_uniq_command_003(self) -> None: self.proto.lineReceived(b"uniq\n") self.proto.lineReceived(b"test\n") self.proto.lineReceived(b"test\n") self.proto.lineReceived(b"test\n") self.proto.handle_CTRL_D() self.assertEqual(self.tr.value(), b"test\n\n" + PROMPT) - - def tearDown(self): - self.proto.connectionLost("tearDown From Unit Test") diff --git a/src/cowrie/test/test_utils.py b/src/cowrie/test/test_utils.py index 904d6800..76854d69 100644 --- a/src/cowrie/test/test_utils.py +++ b/src/cowrie/test/test_utils.py @@ -1,114 +1,85 @@ from __future__ import annotations + import configparser +import unittest from io import StringIO +from cowrie.core.utils import create_endpoint_services, durationHuman, get_endpoints_from_section + from twisted.application.service import MultiService -from twisted.internet import reactor # type: ignore from twisted.internet import protocol -from twisted.trial import unittest - -from cowrie.core.utils import ( - create_endpoint_services, - durationHuman, - get_endpoints_from_section, -) +from twisted.internet import reactor # type: ignore -def get_config(config_string): - config = configparser.ConfigParser() - config.read_file(StringIO(config_string)) - return config +def get_config(config_string: str) -> configparser.ConfigParser: + """Create ConfigParser from a config_string.""" + cfg = configparser.ConfigParser() + cfg.read_file(StringIO(config_string)) + return cfg class UtilsTestCase(unittest.TestCase): - """ - Tests for cowrie/core/utils.py - """ + """Tests for cowrie/core/utils.py.""" - def test_durationHuman(self): - """ - Test of cowrie.core.utils.durationHuman - """ + def test_durationHuman(self) -> None: minute = durationHuman(60) self.assertEqual(minute, "01:00") + hour = durationHuman(3600) self.assertEqual(hour, "01:00:00") + something = durationHuman(364020) self.assertEqual(something, "4.0 days 05:07:00") - def test_get_endpoints_from_section(self): + def test_get_endpoints_from_section(self) -> None: cfg = get_config( - """ - [ssh] - listen_addr = 1.1.1.1 - """ + "[ssh]\n" + "listen_addr = 1.1.1.1\n" + ) + self.assertEqual(["tcp:2223:interface=1.1.1.1"], get_endpoints_from_section(cfg, "ssh", 2223)) + + cfg = get_config( + "[ssh]\n" + "listen_addr = 1.1.1.1\n" + ) + self.assertEqual(["tcp:2224:interface=1.1.1.1"], get_endpoints_from_section(cfg, "ssh", 2224)) + + cfg = get_config( + "[ssh]\n" + "listen_addr = 1.1.1.1 2.2.2.2\n" ) self.assertEqual( - ["tcp:2223:interface=1.1.1.1"], get_endpoints_from_section(cfg, "ssh", 2223) + ["tcp:2223:interface=1.1.1.1", "tcp:2223:interface=2.2.2.2"], get_endpoints_from_section(cfg, "ssh", 2223) ) cfg = get_config( - """ - [ssh] - listen_addr = 1.1.1.1 - """ + "[ssh]\n" + "listen_addr = 1.1.1.1 2.2.2.2\n" + "listen_port = 23\n" ) self.assertEqual( - ["tcp:2224:interface=1.1.1.1"], get_endpoints_from_section(cfg, "ssh", 2224) + ["tcp:23:interface=1.1.1.1", "tcp:23:interface=2.2.2.2"], get_endpoints_from_section(cfg, "ssh", 2223) ) cfg = get_config( - """ - [ssh] - listen_addr = 1.1.1.1 2.2.2.2 - """ + "[ssh]\n" + "listen_endpoints = tcp:23:interface=1.1.1.1 tcp:2323:interface=1.1.1.1\n" ) self.assertEqual( - ["tcp:2223:interface=1.1.1.1", "tcp:2223:interface=2.2.2.2"], - get_endpoints_from_section(cfg, "ssh", 2223), + ["tcp:23:interface=1.1.1.1", "tcp:2323:interface=1.1.1.1"], get_endpoints_from_section(cfg, "ssh", 2223) ) - cfg = get_config( - """ - [ssh] - listen_addr = 1.1.1.1 2.2.2.2 - listen_port = 23 - """ - ) - self.assertEqual( - ["tcp:23:interface=1.1.1.1", "tcp:23:interface=2.2.2.2"], - get_endpoints_from_section(cfg, "ssh", 2223), - ) - - cfg = get_config( - """ - [ssh] - listen_endpoints = tcp:23:interface=1.1.1.1 tcp:2323:interface=1.1.1.1 - """ - ) - self.assertEqual( - ["tcp:23:interface=1.1.1.1", "tcp:2323:interface=1.1.1.1"], - get_endpoints_from_section(cfg, "ssh", 2223), - ) - - def test_create_endpoint_services(self): + def test_create_endpoint_services(self) -> None: parent = MultiService() - create_endpoint_services( - reactor, parent, ["tcp:23:interface=1.1.1.1"], protocol.Factory() - ) + create_endpoint_services(reactor, parent, ["tcp:23:interface=1.1.1.1"], protocol.Factory()) + self.assertEqual(len(parent.services), 1) + + parent = MultiService() + create_endpoint_services(reactor, parent, ["tcp:23:interface=1.1.1.1"], protocol.Factory()) self.assertEqual(len(parent.services), 1) parent = MultiService() create_endpoint_services( - reactor, parent, ["tcp:23:interface=1.1.1.1"], protocol.Factory() - ) - self.assertEqual(len(parent.services), 1) - - parent = MultiService() - create_endpoint_services( - reactor, - parent, - ["tcp:23:interface=1.1.1.1", "tcp:2323:interface=2.2.2.2"], - protocol.Factory(), + reactor, parent, ["tcp:23:interface=1.1.1.1", "tcp:2323:interface=2.2.2.2"], protocol.Factory() ) self.assertEqual(len(parent.services), 2) diff --git a/tox.ini b/tox.ini index 12ca6ed4..987c3557 100644 --- a/tox.ini +++ b/tox.ini @@ -20,7 +20,7 @@ deps = # libvirt-python==5.5.0 commands = - trial cowrie + python -m unittest discover src --verbose [testenv:lint]