moving from twisted.trial.unittest to unittest (#1672)

* replace twisted.trial.unittest with unittest
This commit is contained in:
CLO 2022-01-29 06:38:46 +03:00 committed by GitHub
parent 6140b11f19
commit 9c528fdc0e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 572 additions and 806 deletions

2
.gitignore vendored
View File

@ -5,7 +5,7 @@ etc/ssh_host_dsa_key.pub
etc/ssh_host_rsa_key
etc/ssh_host_rsa_key.pub
var/log/cowrie/*
cowrie.egg-info/
*.egg-info
dl/
dist/
docs/_build

View File

@ -1043,70 +1043,63 @@ commands["sh"] = Command_sh
class Command_php(HoneyPotCommand):
def start(self):
if not len(self.args):
pass
elif self.args[0] == "-v":
output = ("PHP 5.3.5 (cli)", "Copyright (c) 1997-2010 The PHP Group")
for line in output:
self.write(f"{line}\n")
self.exit()
elif self.args[0] == "-h":
output = [
"Usage: php [options] [-f] <file> [--] [args...]",
" php [options] -r <code> [--] [args...]",
" php [options] [-B <begin_code>] -R <code> [-E <end_code>] [--] [args...]",
" php [options] [-B <begin_code>] -F <file> [-E <end_code>] [--] [args...]",
" php [options] -- [args...]",
" php [options] -a",
"",
" -a Run interactively",
" -c <path>|<file> Look for php.ini file in this directory",
" -n No php.ini file will be used",
" -d foo[=bar] Define INI entry foo with value 'bar'",
" -e Generate extended information for debugger/profiler",
" -f <file> Parse and execute <file>.",
" -h This help",
" -i PHP information",
" -l Syntax check only (lint)",
" -m Show compiled in modules",
" -r <code> Run PHP <code> without using script tags <?..?>",
" -B <begin_code> Run PHP <begin_code> before processing input lines",
" -R <code> Run PHP <code> for every input line",
" -F <file> Parse and execute <file> for every input line",
" -E <end_code> Run PHP <end_code> after processing all input lines",
" -H Hide any passed arguments from external tools.",
" -s Output HTML syntax highlighted source.",
" -v Version number",
" -w Output source with stripped comments and whitespace.",
" -z <file> Load Zend extension <file>.",
"",
" args... Arguments passed to script. Use -- args when first argument",
" starts with - or script is read from stdin",
"",
" --ini Show configuration file names",
"",
" --rf <name> Show information about function <name>.",
" --rc <name> Show information about class <name>.",
" --re <name> Show information about extension <name>.",
" --ri <name> Show configuration for extension <name>.",
"",
]
for line in output:
self.write(f"{line}\n")
self.exit()
else:
HELP = "Usage: php [options] [-f] <file> [--] [args...]\n" \
" php [options] -r <code> [--] [args...]\n" \
" php [options] [-B <begin_code>] -R <code> [-E <end_code>] [--] [args...]\n" \
" php [options] [-B <begin_code>] -F <file> [-E <end_code>] [--] [args...]\n" \
" php [options] -- [args...]\n" \
" php [options] -a\n" \
"\n" \
" -a Run interactively\n" \
" -c <path>|<file> Look for php.ini file in this directory\n" \
" -n No php.ini file will be used\n" \
" -d foo[=bar] Define INI entry foo with value 'bar'\n" \
" -e Generate extended information for debugger/profiler\n" \
" -f <file> Parse and execute <file>.\n" \
" -h This help\n" \
" -i PHP information\n" \
" -l Syntax check only (lint)\n" \
" -m Show compiled in modules\n" \
" -r <code> Run PHP <code> without using script tags <?..?>\n" \
" -B <begin_code> Run PHP <begin_code> before processing input lines\n" \
" -R <code> Run PHP <code> for every input line\n" \
" -F <file> Parse and execute <file> for every input line\n" \
" -E <end_code> Run PHP <end_code> after processing all input lines\n" \
" -H Hide any passed arguments from external tools.\n" \
" -s Output HTML syntax highlighted source.\n" \
" -v Version number\n" \
" -w Output source with stripped comments and whitespace.\n" \
" -z <file> Load Zend extension <file>.\n" \
"\n" \
" args... Arguments passed to script. Use -- args when first argument\n" \
" starts with - or script is read from stdin\n" \
"\n" \
" --ini Show configuration file names\n" \
"\n" \
" --rf <name> Show information about function <name>.\n" \
" --rc <name> Show information about class <name>.\n" \
" --re <name> Show information about extension <name>.\n" \
" --ri <name> Show configuration for extension <name>.\n" \
"\n"
VERSION = "PHP 5.3.5 (cli)\n" \
"Copyright (c) 1997-2010 The PHP Group\n"
def start(self) -> None:
if len(self.args):
if self.args[0] == "-v":
self.write(Command_php.VERSION)
elif self.args[0] == "-h":
self.write(Command_php.HELP)
self.exit()
def lineReceived(self, line):
log.msg(
eventid="cowrie.command.success",
realm="php",
input=line,
format="INPUT (%(realm)s): %(input)s",
)
def lineReceived(self, line: str) -> None:
log.msg(eventid="cowrie.command.success",
realm="php",
input=line,
format="INPUT (%(realm)s): %(input)s")
def handle_CTRL_D(self):
def handle_CTRL_D(self) -> None:
self.exit()

View File

@ -1,5 +1,3 @@
# -*- test-case-name: Cowrie Test Cases -*-
# Copyright (c) 2016 Dave Germiquet
# See LICENSE for details.
@ -9,7 +7,8 @@ from cowrie.shell import fs
class FakeServer:
"""
"""FakeServer class.
@ivar hostname Servers Host Name
@ivar fs File System for cowrie to use
"""
@ -23,13 +22,13 @@ class FakeServer:
class FakeAvatar:
"""
"""FakeAvatar class.
@var avatar itself
@ivar server server configuration
@var fs File System for cowrie to use
@var environ for user
@var uid for user
@var
"""
def __init__(self, server):

View File

@ -1,5 +1,3 @@
# -*- test-case-name: Cowrie Test Cases -*-
# Copyright (c) 2016 Dave Germiquet
# See LICENSE for details.
@ -12,12 +10,11 @@ from twisted.test import proto_helpers
class Container:
"""
This class is placeholder for creating a fake interface
"""This class is placeholder for creating a fake interface.
@var host Client fake information
@var port Fake Port for connection
@var otherVersionString version
@var
"""
otherVersionString = "1.0"
@ -32,24 +29,18 @@ class Container:
factory: Container | None
def getPeer(self):
"""
Fake function for mockup
"""
"""Fake function for mockup."""
self.host = "1.1.1.1"
self.port = 2222
return self
def processEnded(self, reason):
"""
Fake function for mockup
"""
"""Fake function for mockup."""
pass
class FakeTransport(proto_helpers.StringTransport):
"""
Fake transport with abortConnection() method.
"""
"""Fake transport with abortConnection() method."""
# Thanks to TerminalBuffer (some code was taken from twisted Terminal Buffer)
@ -132,8 +123,7 @@ class FakeTransport(proto_helpers.StringTransport):
pass
def setPrivateModes(self, modes):
"""
Enable the given modes.
"""Enable the given modes.
Track which modes have been enabled so that the implementations of
other L{insults.ITerminalTransport} methods can be properly implemented

View File

@ -1,9 +1,11 @@
#mypy: ignore # noqa
from __future__ import annotations
from twisted.internet import defer
from backend_pool.ssh_exec import execute_ssh
from backend_pool.telnet_exec import execute_telnet
from twisted.internet import defer
class ProxyTestCommand:
"""

View File

@ -1,71 +1,54 @@
# -*- test-case-name: Cowrie Test Cases -*-
# Copyright (c) 2018 Michel Oosterhof
# See LICENSE for details.
"""
Tests for general shell interaction and echo command
"""
from __future__ import annotations
import os
import unittest
from twisted.trial import unittest
from cowrie.shell.protocol import HoneyPotInteractiveProtocol
from cowrie.test.fake_server import FakeAvatar, FakeServer
from cowrie.test.fake_transport import FakeTransport
from cowrie.shell import protocol
from cowrie.test import fake_server, fake_transport
os.environ["COWRIE_HONEYPOT_DATA_PATH"] = "../data"
os.environ["COWRIE_HONEYPOT_DATA_PATH"] = "data"
os.environ["COWRIE_HONEYPOT_DOWNLOAD_PATH"] = "/tmp"
os.environ["COWRIE_SHELL_FILESYSTEM"] = "../share/cowrie/fs.pickle"
os.environ["COWRIE_SHELL_FILESYSTEM"] = "share/cowrie/fs.pickle"
PROMPT = b"root@unitTest:~# "
class ShellEchoCommandTests(unittest.TestCase):
def setUp(self):
self.proto = protocol.HoneyPotInteractiveProtocol(
fake_server.FakeAvatar(fake_server.FakeServer())
)
self.tr = fake_transport.FakeTransport("1.1.1.1", "1111")
self.proto.makeConnection(self.tr)
"""Tests for cowrie/commands/awk.py."""
proto = HoneyPotInteractiveProtocol(FakeAvatar(FakeServer()))
tr = FakeTransport("", "31337")
@classmethod
def setUpClass(cls) -> None:
cls.proto.makeConnection(cls.tr)
@classmethod
def tearDownClass(cls) -> None:
cls.proto.connectionLost("tearDown From Unit Test")
def setUp(self) -> None:
self.tr.clear()
def test_awk_command_001(self):
"""
Test $0, full input line contents
"""
self.proto.lineReceived(b'echo "test test" | awk "{ print $0 }"\n')
def test_awk_command_001(self) -> None:
self.proto.lineReceived(b"echo \"test test\" | awk \"{ print $0 }\"\n")
self.assertEqual(self.tr.value(), b"test test\n" + PROMPT)
def test_awk_command_002(self):
"""
Test $1, first agument
"""
self.proto.lineReceived(b'echo "test" | awk "{ print $1 }"\n')
def test_awk_command_002(self) -> None:
self.proto.lineReceived(b"echo \"test\" | awk \"{ print $1 }\"\n")
self.assertEqual(self.tr.value(), b"test\n" + PROMPT)
def test_awk_command_003(self):
"""
Test $1 $2 space separated
"""
self.proto.lineReceived(b'echo "test test" | awk "{ print $1 $2 }"\n')
def test_awk_command_003(self) -> None:
self.proto.lineReceived(b"echo \"test test\" | awk \"{ print $1 $2 }\"\n")
self.assertEqual(self.tr.value(), b"test test\n" + PROMPT)
def test_awk_command_004(self):
"""
Test $1,$2 comma separated
"""
self.proto.lineReceived(b'echo "test test" | awk "{ print $1,$2 }"\n')
def test_awk_command_004(self) -> None:
self.proto.lineReceived(b"echo \"test test\" | awk \"{ print $1,$2 }\"\n")
self.assertEqual(self.tr.value(), b"test test\n" + PROMPT)
def test_awk_command_005(self):
"""
Test $1$2 not separated
"""
self.proto.lineReceived(b'echo "test test" | awk "{ print $1$2 }"\n')
def test_awk_command_005(self) -> None:
self.proto.lineReceived(b"echo \"test test\" | awk \"{ print $1$2 }\"\n")
self.assertEqual(self.tr.value(), b"testtest\n" + PROMPT)
def tearDown(self):
self.proto.connectionLost("tearDown From Unit Test")

View File

@ -1,51 +1,43 @@
# -*- test-case-name: Cowrie Test Cases -*-
# Copyright (c) 2020 Peter Sufliarsky
# See LICENSE for details.
"""
Tests for general shell interaction and base64 command
"""
from __future__ import annotations
import os
import unittest
from twisted.trial import unittest
from cowrie.shell.protocol import HoneyPotInteractiveProtocol
from cowrie.test.fake_server import FakeAvatar, FakeServer
from cowrie.test.fake_transport import FakeTransport
from cowrie.shell import protocol
from cowrie.test import fake_server, fake_transport
os.environ["COWRIE_HONEYPOT_DATA_PATH"] = "../data"
os.environ["COWRIE_HONEYPOT_DATA_PATH"] = "data"
os.environ["COWRIE_HONEYPOT_DOWNLOAD_PATH"] = "/tmp"
os.environ["COWRIE_SHELL_FILESYSTEM"] = "../share/cowrie/fs.pickle"
os.environ["COWRIE_SHELL_FILESYSTEM"] = "share/cowrie/fs.pickle"
TRY_CHMOD_HELP_MSG = b"Try 'base64 --help' for more information.\n"
PROMPT = b"root@unitTest:~# "
class ShellBase64CommandTests(unittest.TestCase):
def setUp(self):
self.proto = protocol.HoneyPotInteractiveProtocol(
fake_server.FakeAvatar(fake_server.FakeServer())
)
self.tr = fake_transport.FakeTransport("1.1.1.1", "1111")
self.proto.makeConnection(self.tr)
"""Tests for cowrie/commands/base64.py"""
proto = HoneyPotInteractiveProtocol(FakeAvatar(FakeServer()))
tr = FakeTransport("", "31337")
@classmethod
def setUpClass(cls) -> None:
cls.proto.makeConnection(cls.tr)
@classmethod
def tearDownClass(cls) -> None:
cls.proto.connectionLost("tearDown From Unit Test")
def setUp(self) -> None:
self.tr.clear()
def test_base64_command_001(self):
"""
Missing operand
"""
def test_base64_command_001(self) -> None:
self.proto.lineReceived(b"echo cowrie | base64")
self.assertEqual(self.tr.value(), b"Y293cmllCg==\n" + PROMPT)
def test_base64_command_002(self):
"""
Missing operand
"""
def test_base64_command_002(self) -> None:
self.proto.lineReceived(b"echo Y293cmllCg== | base64 -d")
self.assertEqual(self.tr.value(), b"cowrie\n" + PROMPT)
def tearDown(self):
self.proto.connectionLost("tearDown From Unit Test")

View File

@ -1,261 +1,297 @@
# -*- test-case-name: Cowrie Test Cases -*-
# Copyright (c) 2016 Dave Germiquet
# See LICENSE for details.
from __future__ import annotations
import os
import unittest
from twisted.trial import unittest
from cowrie.commands.base import Command_php
from cowrie.shell.protocol import HoneyPotInteractiveProtocol
from cowrie.test.fake_server import FakeAvatar, FakeServer
from cowrie.test.fake_transport import FakeTransport
from cowrie.shell import protocol
from cowrie.test import fake_server, fake_transport
os.environ["COWRIE_HONEYPOT_DATA_PATH"] = "../data"
os.environ["COWRIE_SHELL_FILESYSTEM"] = "../share/cowrie/fs.pickle"
os.environ["COWRIE_HONEYPOT_DATA_PATH"] = "data"
os.environ["COWRIE_SHELL_FILESYSTEM"] = "share/cowrie/fs.pickle"
PROMPT = b"root@unitTest:~# "
NONEXISTEN_FILE = "/path/to/the/file/that/does/not/exist"
class ShellBaseCommandsTests(unittest.TestCase):
def setUp(self):
self.proto = protocol.HoneyPotInteractiveProtocol(
fake_server.FakeAvatar(fake_server.FakeServer())
)
self.tr = fake_transport.FakeTransport("1.1.1.1", "1111")
class ShellBaseCommandsTests(unittest.TestCase): # TODO: ps, history
"""Tests for basic commands from cowrie/commands/base.py."""
def setUp(self) -> None:
self.proto = HoneyPotInteractiveProtocol(FakeAvatar(FakeServer()))
self.tr = FakeTransport("", "31337")
self.proto.makeConnection(self.tr)
self.tr.clear()
def test_whoami_command(self):
def tearDown(self) -> None:
self.proto.connectionLost("tearDown From Unit Test")
def test_whoami_command(self) -> None:
self.proto.lineReceived(b"whoami\n")
self.assertEqual(self.tr.value(), b"root\n" + PROMPT)
def test_users_command(self):
def test_users_command(self) -> None:
self.proto.lineReceived(b"users \n")
self.assertEqual(self.tr.value(), b"root\n" + PROMPT)
# def test_exit_command(self):
# self.proto.lineReceived(b'exit \n')
def test_exit_command(self) -> None:
self.proto.lineReceived(b"exit\n")
self.assertEqual(self.tr.value(), b"")
# def test_logout_command(self):
# self.proto.lineReceived(b'logout \n')
def test_logout_command(self) -> None:
self.proto.lineReceived(b"logout\n")
self.assertEqual(self.tr.value(), b"")
# def test_clear_command(self):
# self.proto.lineReceived(b'clear \n')
def test_clear_command(self) -> None:
self.proto.lineReceived(b"clear\n")
self.assertEqual(self.tr.value(), PROMPT)
def test_hostname_command(self):
def test_hostname_command(self) -> None:
self.proto.lineReceived(b"hostname unitChanged\n")
self.assertEqual(self.tr.value(), b"root@unitChanged:~# ")
# def test_reset_command(self):
# self.proto.lineReceived(b'reset')
def test_reset_command(self) -> None:
self.proto.lineReceived(b"reset\n")
self.assertEqual(self.tr.value(), PROMPT)
# def test_ps_command(self):
# self.proto.lineReceived(b'ps\n')
# self.assertEqual(self.tr.value().decode('utf8'), "\n".join(self.data['results']['ps']))
def test_id_command(self):
def test_id_command(self) -> None:
self.proto.lineReceived(b"id\n")
self.assertEqual(
self.tr.value(), b"uid=0(root) gid=0(root) groups=0(root)\n" + PROMPT
)
self.assertEqual(self.tr.value(), b"uid=0(root) gid=0(root) groups=0(root)\n" + PROMPT)
def test_passwd_command(self):
def test_passwd_command(self) -> None:
self.proto.lineReceived(b"passwd\n")
self.proto.lineReceived(b"changeme\n")
self.proto.lineReceived(b"changeme\n")
self.assertEqual(
self.tr.value(),
b"Enter new UNIX password: Retype new UNIX password: passwd: password updated successfully\n"
+ PROMPT,
)
self.assertEqual(self.tr.value(),
b"Enter new UNIX password: Retype new UNIX password: passwd: password updated successfully\n" + PROMPT)
# def test_shutdown_command(self):
# self.proto.lineReceived(b'shutdown\n')
def test_shutdown_command(self) -> None:
self.proto.lineReceived(b"shutdown\n")
self.assertEqual(self.tr.value(), b"Try `shutdown --help' for more information.\n" + PROMPT) # TODO: Is it right?..
# def test_poweroff_command(self):
# self.proto.lineReceived(b'poweroff\n')
def test_poweroff_command(self) -> None:
self.proto.lineReceived(b"poweroff\n")
self.assertEqual(self.tr.value(), b"Try `shutdown --help' for more information.\n" + PROMPT) # TODO: Is it right?..
# def test_history_command(self):
# self.proto.lineReceived(b"history\n")
# self.proto.lineReceived(b"history\n")
# print("THIS TEST IS INCOMPLETE")
def test_date_command(self):
def test_date_command(self) -> None:
self.proto.lineReceived(b"date\n")
self.assertRegex(
self.tr.value(),
b"[A-Za-z][A-Za-z][A-Za-z] [A-Za-z][A-Za-z][A-Za-z] [0-9][0-9] [0-9][0-9]:[0-9][0-9]:[0-9][0-9] UTC [0-9][0-9][0-9][0-9]\n"
+ PROMPT,
)
self.assertRegex(self.tr.value(), rb"[A-Za-z]{3} [A-Za-z]{3} \d{2} \d{2}:\d{2}:\d{2} UTC \d{4}\n" + PROMPT)
# def test_bash_command(self):
# self.proto.lineReceived(b'bash\n')
# print("THIS TEST IS INCOMPLETE")
def test_bash_command(self) -> None:
self.proto.lineReceived(b"bash\n")
self.assertEqual(self.tr.value(), PROMPT)
def test_sh_command(self):
def test_sh_command(self) -> None:
self.proto.lineReceived(b"sh -c id\n")
self.assertEqual(
self.tr.value(), b"uid=0(root) gid=0(root) groups=0(root)\n" + PROMPT
)
self.assertEqual(self.tr.value(), b"uid=0(root) gid=0(root) groups=0(root)\n" + PROMPT)
# def test_php_command(self):
# self.proto.lineReceived(b'php -h')
# print("THIS TEST IS INCOMPLETE")
def test_php_help_command(self) -> None:
self.proto.lineReceived(b"php -h\n")
self.assertEqual(self.tr.value(), Command_php.HELP.encode() + PROMPT)
def test_chattr_command(self):
def test_php_version_command(self) -> None:
self.proto.lineReceived(b"php -v\n")
self.assertEqual(self.tr.value(), Command_php.VERSION.encode() + PROMPT)
def test_chattr_command(self) -> None:
self.proto.lineReceived(b"chattr\n")
self.assertEqual(self.tr.value(), PROMPT)
def test_umask_command(self):
def test_umask_command(self) -> None:
self.proto.lineReceived(b"umask\n")
self.assertEqual(self.tr.value(), PROMPT)
def test_set_command(self):
def test_set_command(self) -> None:
self.proto.lineReceived(b"set\n")
self.assertEqual(self.tr.value(),
b"COLUMNS=80\nHOME=/root\nLINES=25\nLOGNAME=root\nPATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin\nTMOUT=1800\nUSER=root\n" + PROMPT)
self.assertEqual(
self.tr.value(),
b"COLUMNS=80\nHOME=/root\nLINES=25\nLOGNAME=root\nPATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin\nTMOUT=1800\nUSER=root\n"
+ PROMPT,
)
def test_unset_command(self):
def test_unset_command(self) -> None:
self.proto.lineReceived(b"unset\n")
self.assertEqual(self.tr.value(), PROMPT)
def test_export_command(self):
def test_export_command(self) -> None:
self.proto.lineReceived(b"export\n")
self.assertEqual(self.tr.value(), PROMPT)
def test_alias_command(self):
def test_alias_command(self) -> None:
self.proto.lineReceived(b"alias\n")
self.assertEqual(self.tr.value(), PROMPT)
def test_jobs_command(self):
def test_jobs_command(self) -> None:
self.proto.lineReceived(b"jobs\n")
self.assertEqual(self.tr.value(), PROMPT)
def test_kill_command(self):
def test_kill_command(self) -> None:
self.proto.lineReceived(b"/bin/kill\n")
self.assertEqual(self.tr.value(), PROMPT)
def test_pkill_command(self):
def test_pkill_command(self) -> None:
self.proto.lineReceived(b"/bin/pkill\n")
self.assertEqual(self.tr.value(), PROMPT)
def test_killall_command(self):
def test_killall_command(self) -> None:
self.proto.lineReceived(b"/bin/killall\n")
self.assertEqual(self.tr.value(), PROMPT)
def test_killall5_command(self):
def test_killall5_command(self) -> None:
self.proto.lineReceived(b"/bin/killall5\n")
self.assertEqual(self.tr.value(), PROMPT)
def test_su_command(self):
def test_su_command(self) -> None:
self.proto.lineReceived(b"su\n")
self.assertEqual(self.tr.value(), PROMPT)
def test_chown_command(self):
def test_chown_command(self) -> None:
self.proto.lineReceived(b"chown\n")
self.assertEqual(self.tr.value(), PROMPT)
def test_chgrp_command(self):
def test_chgrp_command(self) -> None:
self.proto.lineReceived(b"chgrp\n")
self.assertEqual(self.tr.value(), PROMPT)
def tearDown(self):
self.proto.connectionLost("tearDown From Unit Test")
def test_cd_output(self) -> None:
path = "/usr/bin"
self.proto.lineReceived(f"cd {path:s}".encode())
self.assertEqual(self.tr.value(), PROMPT.replace(b"~", path.encode()))
self.assertEqual(self.proto.cwd, path)
def test_cd_error_output(self) -> None:
self.proto.lineReceived(f"cd {NONEXISTEN_FILE:s}".encode())
self.assertEqual(self.tr.value(), f"bash: cd: {NONEXISTEN_FILE:s}: No such file or directory\n".encode() + PROMPT)
class ShellFileCommandsTests(unittest.TestCase):
def setUp(self):
self.proto = protocol.HoneyPotInteractiveProtocol(
fake_server.FakeAvatar(fake_server.FakeServer())
)
self.tr = fake_transport.FakeTransport("1.1.1.1", "1111")
self.proto.makeConnection(self.tr)
proto = HoneyPotInteractiveProtocol(FakeAvatar(FakeServer()))
tr = FakeTransport("", "31337")
cpuinfo = proto.fs.file_contents("/proc/cpuinfo")
# def test_cat_output(self):
# self.proto.lineReceived(b'cat /proc/cpuinfo')
# print("THIS TEST IS INCOMPLETE")
# print(self.tr.value())
@classmethod
def setUpClass(cls) -> None:
cls.proto.makeConnection(cls.tr)
# def test_grep_output(self):
# self.proto.lineReceived(b'grep cpu /proc/cpuinfo')
# print("THIS TEST IS INCOMPLETE")
# print(self.tr.value())
@classmethod
def tearDownClass(cls) -> None:
cls.proto.connectionLost("tearDown From Unit Test")
# def test_tail_output(self):
# self.proto.lineReceived(b'tail -n 10 /proc/cpuinfo')
# print("THIS TEST IS INCOMPLETE")
# print(self.tr.value())
def setUp(self) -> None:
self.tr.clear()
# def test_cd_output(self):
# self.proto.lineReceived(b'cd /usr/bin')
# print("THIS TEST IS INCOMPLETE")
# print(self.tr.value())
def test_cat_output(self) -> None:
self.proto.lineReceived(b"cat /proc/cpuinfo\n")
self.assertEqual(self.tr.value(), self.cpuinfo + PROMPT)
# def test_rm_output(self):
# self.proto.lineReceived(b'rm /usr/bin/gcc')
# print("THIS TEST IS INCOMPLETE")
# print(self.tr.value())
def test_grep_output(self) -> None:
lines = [line.strip() for line in self.cpuinfo.splitlines() if b"cpu" in line]
lines.append(b"")
self.proto.lineReceived(b"grep cpu /proc/cpuinfo\n")
self.assertEqual(self.tr.value(), b"\n".join(lines) + PROMPT)
# def test_cp_output(self):
# self.proto.lineReceived(b'cp /usr/bin/gcc /tmp')
# print("THIS TEST IS INCOMPLETE")
# print(self.tr.value())
def test_tail_output(self) -> None:
lines = [line.strip() for line in self.cpuinfo.splitlines()][-10:]
lines.append(b"")
self.proto.lineReceived(b"tail -n 10 /proc/cpuinfo\n")
self.assertEqual(self.tr.value(), b"\n".join(lines) + PROMPT)
# def test_mv_output(self):
# self.proto.lineReceived(b'mv /usr/bin/gcc /tmp')
# print("THIS TEST IS INCOMPLETE")
# print(self.tr.value())
def test_head_output(self) -> None:
lines = [line.strip() for line in self.cpuinfo.splitlines()][:10]
lines.append(b"")
self.proto.lineReceived(b"head -n 10 /proc/cpuinfo\n")
self.assertEqual(self.tr.value(), b"\n".join(lines) + PROMPT)
# def test_mkdir_output(self):
# self.proto.lineReceived(b'mkdir /tmp/hello')
# print("THIS TEST IS INCOMPLETE")
# print(self.tr.value())
def test_rm_output(self) -> None:
self.proto.lineReceived(b"rm /usr/bin/gcc\n")
self.assertEqual(self.tr.value(), PROMPT)
# def test_rmdir_output(self):
# self.proto.lineReceived(b'mkdir /tmp/blah')
# self.proto.lineReceived(b'rmdir /tmp/blah')
# print("THIS TEST IS INCOMPLETE")
# print(self.tr.value())
def test_rm_error_output(self) -> None: # TODO: quotes?..
self.proto.lineReceived(f"rm {NONEXISTEN_FILE:s}\n".encode())
self.assertEqual(self.tr.value(),
f"rm: cannot remove `{NONEXISTEN_FILE:s}': No such file or directory\n".encode() + PROMPT)
# def test_pwd_output(self):
# self.proto.lineReceived(b'pwd')
# print("THIS TEST IS INCOMPLETE")
# print(self.tr.value())
def test_cp_output(self) -> None:
self.proto.lineReceived(b"cp /usr/bin/gcc /tmp\n")
self.assertEqual(self.tr.value(), PROMPT)
# def test_touch_output(self):
# self.proto.lineReceived(b'touch unittests.txt')
# print("THIS TEST IS INCOMPLETE")
# print(self.tr.value())
def test_cp_error_output(self) -> None: # TODO: quotes?..
self.proto.lineReceived(f"cp {NONEXISTEN_FILE:s} /tmp\n".encode())
self.assertEqual(self.tr.value(),
f"cp: cannot stat `{NONEXISTEN_FILE:s}': No such file or directory\n".encode() + PROMPT)
def tearDown(self):
self.proto.connectionLost("tearDown From Unit Test")
def test_mv_output(self) -> None:
self.proto.lineReceived(b"mv /usr/bin/awk /tmp\n")
self.assertEqual(self.tr.value(), PROMPT)
def test_mv_error_output(self) -> None: # TODO: quotes?..
self.proto.lineReceived(f"mv {NONEXISTEN_FILE:s} /tmp\n".encode())
self.assertEqual(self.tr.value(),
f"mv: cannot stat `{NONEXISTEN_FILE:s}': No such file or directory\n".encode() + PROMPT)
def test_mkdir_output(self) -> None:
path = "/tmp/hello"
self.proto.lineReceived(f"mkdir {path:s}\n".encode())
self.assertEqual(self.tr.value(), PROMPT)
self.assertTrue(self.proto.fs.exists(path))
self.assertTrue(self.proto.fs.isdir(path))
def test_mkdir_error_output(self) -> None: # TODO: quotes?..
path = "/etc"
self.proto.lineReceived(f"mkdir {path:s}\n".encode())
self.assertEqual(self.tr.value(), f"mkdir: cannot create directory `{path:s}': File exists\n".encode() + PROMPT)
def test_rmdir_output(self) -> None:
path = "/tmp/bye"
self.proto.lineReceived(f"mkdir {path:s}\n".encode())
self.tr.clear()
self.proto.lineReceived(f"rmdir {path:s}\n".encode())
self.assertEqual(self.tr.value(), PROMPT)
self.assertFalse(self.proto.fs.exists(path))
def test_rmdir_error_output(self) -> None: # TODO: quotes?..
self.proto.lineReceived(f"rmdir {NONEXISTEN_FILE:s}\n".encode())
self.assertEqual(self.tr.value(),
f"rmdir: failed to remove `{NONEXISTEN_FILE:s}': No such file or directory\n".encode() + PROMPT)
def test_pwd_output(self) -> None:
self.proto.lineReceived(b"pwd\n")
self.assertEqual(self.tr.value(), self.proto.cwd.encode() + b"\n" + PROMPT)
def test_touch_output(self) -> None:
path = "/tmp/test.txt"
self.proto.lineReceived(f"touch {path:s}\n".encode())
self.assertEqual(self.tr.value(), PROMPT)
self.assertTrue(self.proto.fs.exists(path))
class ShellPipeCommandsTests(unittest.TestCase):
def setUp(self):
self.proto = protocol.HoneyPotInteractiveProtocol(
fake_server.FakeAvatar(fake_server.FakeServer())
)
self.tr = fake_transport.FakeTransport("1.1.1.1", "1111")
self.proto.makeConnection(self.tr)
proto = HoneyPotInteractiveProtocol(FakeAvatar(FakeServer()))
tr = FakeTransport("", "31337")
def test_shell_pipe_with_cat_tail(self):
@classmethod
def setUpClass(cls) -> None:
cls.proto.makeConnection(cls.tr)
@classmethod
def tearDownClass(cls) -> None:
cls.proto.connectionLost("tearDown From Unit Test")
def setUp(self) -> None:
self.tr.clear()
def test_shell_pipe_with_cat_tail(self) -> None:
self.proto.lineReceived(b"echo test | tail -n 1\n")
self.assertEqual(self.tr.value(), PROMPT + b"test\n" + PROMPT)
self.assertEqual(self.tr.value(), b"test\n" + PROMPT)
def test_shell_pipe_with_cat_head(self):
def test_shell_pipe_with_cat_head(self) -> None:
self.proto.lineReceived(b"echo test | head -n 1\n")
self.assertEqual(self.tr.value(), PROMPT + b"test\n" + PROMPT)
self.assertEqual(self.tr.value(), b"test\n" + PROMPT)
# def test_shell_busybox_with_cat_and_sudo_grep(self):
# def test_shell_busybox_with_cat_and_sudo_grep(self) -> None:
# self.proto.lineReceived(b'busybox cat /proc/cpuinfo | sudo grep cpu \n')
def tearDown(self):
self.proto.connectionLost("tearDown From Unit Test")

View File

@ -1,70 +1,51 @@
# -*- test-case-name: Cowrie Test Cases -*-
# Copyright (c) 2018 Michel Oosterhof
# See LICENSE for details.
"""
Tests for general shell interaction and cat command
"""
from __future__ import annotations
import os
import unittest
from twisted.trial import unittest
from cowrie.shell.protocol import HoneyPotInteractiveProtocol
from cowrie.test.fake_server import FakeAvatar, FakeServer
from cowrie.test.fake_transport import FakeTransport
from cowrie.shell import protocol
from cowrie.test import fake_server, fake_transport
os.environ["COWRIE_HONEYPOT_DATA_PATH"] = "../data"
os.environ["COWRIE_HONEYPOT_DATA_PATH"] = "data"
os.environ["COWRIE_HONEYPOT_DOWNLOAD_PATH"] = "/tmp"
os.environ["COWRIE_SHELL_FILESYSTEM"] = "../share/cowrie/fs.pickle"
os.environ["COWRIE_SHELL_FILESYSTEM"] = "share/cowrie/fs.pickle"
PROMPT = b"root@unitTest:~# "
class ShellCatCommandTests(unittest.TestCase):
def setUp(self):
self.proto = protocol.HoneyPotInteractiveProtocol(
fake_server.FakeAvatar(fake_server.FakeServer())
)
self.tr = fake_transport.FakeTransport("1.1.1.1", "1111")
"""Test for cowrie/commands/cat.py."""
def setUp(self) -> None:
self.proto = HoneyPotInteractiveProtocol(FakeAvatar(FakeServer()))
self.tr = FakeTransport("", "31337")
self.proto.makeConnection(self.tr)
self.tr.clear()
def test_cat_command_001(self):
"""
No such file
"""
def tearDown(self) -> None:
self.proto.connectionLost("tearDown From Unit Test")
def test_cat_command_001(self) -> None:
self.proto.lineReceived(b"cat nonExisting\n")
self.assertEqual(
self.tr.value(), b"cat: nonExisting: No such file or directory\n" + PROMPT
)
def test_cat_command_002(self):
"""
argument - (stdin)
"""
def test_cat_command_002(self) -> None:
self.proto.lineReceived(b"echo test | cat -\n")
self.assertEqual(self.tr.value(), b"test\n" + PROMPT)
def test_cat_command_003(self):
"""
test without arguments, read stdin only and quit
"""
def test_cat_command_003(self) -> None:
self.proto.lineReceived(b"echo 1 | cat\n")
self.proto.lineReceived(b"echo 2\n")
self.proto.handle_CTRL_D()
self.assertEqual(self.tr.value(), b"1\n" + PROMPT + b"2\n" + PROMPT)
def test_cat_command_004(self):
"""
test handle of CTRL_C
"""
def test_cat_command_004(self) -> None:
self.proto.lineReceived(b"cat\n")
self.proto.lineReceived(b"test\n")
self.proto.handle_CTRL_C()
self.assertEqual(self.tr.value(), b"test\n^C\n" + PROMPT)
def tearDown(self):
self.proto.connectionLost("tearDown From Unit Test")

View File

@ -1,166 +1,114 @@
# -*- test-case-name: Cowrie Test Cases -*-
# Copyright (c) 2020 Peter Sufliarsky
# See LICENSE for details.
"""
Tests for general shell interaction and chmod command
"""
from __future__ import annotations
import os
import unittest
from twisted.trial import unittest
from cowrie.shell.protocol import HoneyPotInteractiveProtocol
from cowrie.test.fake_server import FakeAvatar, FakeServer
from cowrie.test.fake_transport import FakeTransport
from cowrie.shell import protocol
from cowrie.test import fake_server, fake_transport
os.environ["COWRIE_HONEYPOT_DATA_PATH"] = "../data"
os.environ["COWRIE_HONEYPOT_DATA_PATH"] = "data"
os.environ["COWRIE_HONEYPOT_DOWNLOAD_PATH"] = "/tmp"
os.environ["COWRIE_SHELL_FILESYSTEM"] = "../share/cowrie/fs.pickle"
os.environ["COWRIE_SHELL_FILESYSTEM"] = "share/cowrie/fs.pickle"
TRY_CHMOD_HELP_MSG = b"Try 'chmod --help' for more information.\n"
PROMPT = b"root@unitTest:~# "
class ShellChmodCommandTests(unittest.TestCase):
def setUp(self):
self.proto = protocol.HoneyPotInteractiveProtocol(
fake_server.FakeAvatar(fake_server.FakeServer())
)
self.tr = fake_transport.FakeTransport("1.1.1.1", "1111")
self.proto.makeConnection(self.tr)
"""Test for cowrie/commands/chmod.py."""
proto = HoneyPotInteractiveProtocol(FakeAvatar(FakeServer()))
tr = FakeTransport("", "31337")
@classmethod
def setUpClass(cls) -> None:
cls.proto.makeConnection(cls.tr)
@classmethod
def tearDownClass(cls) -> None:
cls.proto.connectionLost("tearDown From Unit Test")
def setUp(self) -> None:
self.tr.clear()
def test_chmod_command_001(self):
"""
Missing operand
"""
def test_chmod_command_001(self) -> None:
self.proto.lineReceived(b"chmod")
self.assertEqual(
self.tr.value(), b"chmod: missing operand\n" + TRY_CHMOD_HELP_MSG + PROMPT
)
def test_chmod_command_002(self):
"""
Missing operand
"""
def test_chmod_command_002(self) -> None:
self.proto.lineReceived(b"chmod -x")
self.assertEqual(
self.tr.value(), b"chmod: missing operand\n" + TRY_CHMOD_HELP_MSG + PROMPT
)
def test_chmod_command_003(self):
"""
Missing operand after ...
"""
def test_chmod_command_003(self) -> None:
self.proto.lineReceived(b"chmod +x")
self.assertEqual(
self.tr.value(),
b"chmod: missing operand after \xe2\x80\x98+x\xe2\x80\x99\n"
+ TRY_CHMOD_HELP_MSG
+ PROMPT,
b"chmod: missing operand after \xe2\x80\x98+x\xe2\x80\x99\n" + TRY_CHMOD_HELP_MSG + PROMPT
)
def test_chmod_command_004(self):
"""
Invalid option
"""
def test_chmod_command_004(self) -> None:
self.proto.lineReceived(b"chmod -A")
self.assertEqual(
self.tr.value(),
b"chmod: invalid option -- 'A'\n" + TRY_CHMOD_HELP_MSG + PROMPT,
b"chmod: invalid option -- 'A'\n" + TRY_CHMOD_HELP_MSG + PROMPT
)
def test_chmod_command_005(self):
"""
Unrecognized option
"""
def test_chmod_command_005(self) -> None:
self.proto.lineReceived(b"chmod --A")
self.assertEqual(
self.tr.value(),
b"chmod: unrecognized option '--A'\n" + TRY_CHMOD_HELP_MSG + PROMPT,
b"chmod: unrecognized option '--A'\n" + TRY_CHMOD_HELP_MSG + PROMPT
)
def test_chmod_command_006(self):
"""
No such file or directory
"""
def test_chmod_command_006(self) -> None:
self.proto.lineReceived(b"chmod -x abcd")
self.assertEqual(
self.tr.value(),
b"chmod: cannot access 'abcd': No such file or directory\n" + PROMPT,
b"chmod: cannot access 'abcd': No such file or directory\n" + PROMPT
)
def test_chmod_command_007(self):
"""
Invalid mode
"""
def test_chmod_command_007(self) -> None:
self.proto.lineReceived(b"chmod abcd efgh")
self.assertEqual(
self.tr.value(),
b"chmod: invalid mode: \xe2\x80\x98abcd\xe2\x80\x99\n"
+ TRY_CHMOD_HELP_MSG
+ PROMPT,
b"chmod: invalid mode: \xe2\x80\x98abcd\xe2\x80\x99\n" + TRY_CHMOD_HELP_MSG + PROMPT
)
def test_chmod_command_008(self):
"""
Valid directory .ssh
"""
def test_chmod_command_008(self) -> None:
self.proto.lineReceived(b"chmod +x .ssh")
self.assertEqual(self.tr.value(), PROMPT)
def test_chmod_command_009(self):
"""
Valid directory .ssh recursive
"""
def test_chmod_command_009(self) -> None:
self.proto.lineReceived(b"chmod -R +x .ssh")
self.assertEqual(self.tr.value(), PROMPT)
def test_chmod_command_010(self):
"""
Valid directory /root/.ssh
"""
def test_chmod_command_010(self) -> None:
self.proto.lineReceived(b"chmod +x /root/.ssh")
self.assertEqual(self.tr.value(), PROMPT)
def test_chmod_command_011(self):
"""
Valid directory ~/.ssh
"""
def test_chmod_command_011(self) -> None:
self.proto.lineReceived(b"chmod +x ~/.ssh")
self.assertEqual(self.tr.value(), PROMPT)
def test_chmod_command_012(self):
"""
chmod a+x
"""
def test_chmod_command_012(self) -> None:
self.proto.lineReceived(b"chmod a+x .ssh")
self.assertEqual(self.tr.value(), PROMPT)
def test_chmod_command_013(self):
"""
chmod ug+x
"""
def test_chmod_command_013(self) -> None:
self.proto.lineReceived(b"chmod ug+x .ssh")
self.assertEqual(self.tr.value(), PROMPT)
def test_chmod_command_014(self):
"""
chmod 777
"""
def test_chmod_command_014(self) -> None:
self.proto.lineReceived(b"chmod 777 .ssh")
self.assertEqual(self.tr.value(), PROMPT)
def test_chmod_command_015(self):
"""
chmod 0775
"""
def test_chmod_command_015(self) -> None:
self.proto.lineReceived(b"chmod 0755 .ssh")
self.assertEqual(self.tr.value(), PROMPT)
def tearDown(self):
self.proto.connectionLost("tearDown From Unit Test")

View File

@ -1,219 +1,134 @@
# -*- test-case-name: Cowrie Test Cases -*-
# Copyright (c) 2018 Michel Oosterhof
# See LICENSE for details.
"""
Tests for general shell interaction and echo command
"""
from __future__ import annotations
import os
import unittest
from twisted.trial import unittest
from cowrie.shell.protocol import HoneyPotInteractiveProtocol
from cowrie.test.fake_server import FakeAvatar, FakeServer
from cowrie.test.fake_transport import FakeTransport
from cowrie.shell import protocol
from cowrie.test import fake_server, fake_transport
os.environ["COWRIE_HONEYPOT_DATA_PATH"] = "../data"
os.environ["COWRIE_HONEYPOT_DATA_PATH"] = "data"
os.environ["COWRIE_HONEYPOT_DOWNLOAD_PATH"] = "/tmp"
os.environ["COWRIE_SHELL_FILESYSTEM"] = "../share/cowrie/fs.pickle"
os.environ["COWRIE_SHELL_FILESYSTEM"] = "share/cowrie/fs.pickle"
PROMPT = b"root@unitTest:~# "
class ShellEchoCommandTests(unittest.TestCase):
def setUp(self):
self.proto = protocol.HoneyPotInteractiveProtocol(
fake_server.FakeAvatar(fake_server.FakeServer())
)
self.tr = fake_transport.FakeTransport("1.1.1.1", "1111")
self.proto.makeConnection(self.tr)
"""Test for echo command from cowrie/commands/base.py."""
proto = HoneyPotInteractiveProtocol(FakeAvatar(FakeServer()))
tr = FakeTransport("", "31337")
@classmethod
def setUpClass(cls) -> None:
cls.proto.makeConnection(cls.tr)
@classmethod
def tearDownClass(cls) -> None:
cls.proto.connectionLost("tearDown From Unit Test")
def setUp(self) -> None:
self.tr.clear()
def test_echo_command_001(self):
"""
Basic test
"""
self.proto.lineReceived(b'echo "test"\n')
def test_echo_command_001(self) -> None:
self.proto.lineReceived(b"echo \"test\"\n")
self.assertEqual(self.tr.value(), b"test\n" + PROMPT)
def test_echo_command_002(self):
"""
argument splitting and recombining
"""
def test_echo_command_002(self) -> None:
self.proto.lineReceived(b"echo test test\n")
self.assertEqual(self.tr.value(), b"test test\n" + PROMPT)
def test_echo_command_003(self):
"""
echo -n
"""
self.proto.lineReceived(b'echo -n "test test"\n')
def test_echo_command_003(self) -> None:
self.proto.lineReceived(b"echo -n \"test test\"\n")
self.assertEqual(self.tr.value(), b"test test" + PROMPT)
def test_echo_command_004(self):
"""
echo -n
"""
self.proto.lineReceived(b'echo -n "test test"\n')
self.assertEqual(self.tr.value(), b"test test" + PROMPT)
def test_echo_command_005(self):
"""
echo test >> test; cat test
"""
def test_echo_command_005(self) -> None:
self.proto.lineReceived(b"echo test > test5; cat test5")
self.assertEqual(self.tr.value(), b"test\n" + PROMPT)
def test_echo_command_006(self):
"""
echo -n
"""
self.proto.lineReceived(b'echo "\\n"\n')
def test_echo_command_006(self) -> None:
self.proto.lineReceived(b"echo \"\\n\"\n")
self.assertEqual(self.tr.value(), b"\\n\n" + PROMPT)
def test_echo_command_007(self):
"""
echo test >> test; cat test
"""
def test_echo_command_007(self) -> None:
self.proto.lineReceived(b"echo test >> test7; cat test7")
self.assertEqual(self.tr.value(), b"test\n" + PROMPT)
def test_echo_command_008(self):
"""
echo test > test; echo test >> test; cat test
"""
def test_echo_command_008(self) -> None:
self.proto.lineReceived(b"echo test > test8; echo test >> test8; cat test8")
self.assertEqual(self.tr.value(), b"test\ntest\n" + PROMPT)
def test_echo_command_009(self):
"""
echo test | grep test
"""
def test_echo_command_009(self) -> None:
self.proto.lineReceived(b"echo test | grep test")
self.assertEqual(self.tr.value(), b"test\n" + PROMPT)
def test_echo_command_010(self):
"""
echo test | grep test
"""
def test_echo_command_010(self) -> None:
self.proto.lineReceived(b"echo test | grep test2")
self.assertEqual(self.tr.value(), PROMPT)
def test_echo_command_011(self):
"""
echo test > test011; cat test011 | grep test
"""
def test_echo_command_011(self) -> None:
self.proto.lineReceived(b"echo test > test011; cat test011 | grep test")
self.assertEqual(self.tr.value(), b"test\n" + PROMPT)
def test_echo_command_012(self):
"""
echo test > test012; grep test test012
"""
def test_echo_command_012(self) -> None:
self.proto.lineReceived(b"echo test > test012; grep test test012")
self.assertEqual(self.tr.value(), b"test\n" + PROMPT)
def test_echo_command_013(self):
"""
echo "ls""ls"
"""
self.proto.lineReceived(b'echo "ls""ls"')
def test_echo_command_013(self) -> None:
self.proto.lineReceived(b"echo \"ls\"\"ls\"")
self.assertEqual(self.tr.value(), b"lsls\n" + PROMPT)
def test_echo_command_014(self):
"""
echo '"ls"'
"""
def test_echo_command_014(self) -> None:
self.proto.lineReceived(b"echo '\"ls\"'")
self.assertEqual(self.tr.value(), b'"ls"\n' + PROMPT)
def test_echo_command_015(self):
"""
echo "'ls'"
"""
def test_echo_command_015(self) -> None:
self.proto.lineReceived(b"echo \"'ls'\"")
self.assertEqual(self.tr.value(), b"'ls'\n" + PROMPT)
def test_echo_command_016(self):
"""
echo -e "\x6b\x61\x6d\x69"
"""
self.proto.lineReceived(b'echo -e "\x6b\x61\x6d\x69"')
def test_echo_command_016(self) -> None:
self.proto.lineReceived(b"echo -e \"\x6b\x61\x6d\x69\"")
self.assertEqual(self.tr.value(), b"kami\n" + PROMPT)
def test_echo_command_017(self):
"""
echo -e "\x6b\x61\x6d\x69"
"""
def test_echo_command_017(self) -> None:
self.proto.lineReceived(b"echo echo test | bash")
self.assertEqual(self.tr.value(), b"test\n" + PROMPT)
def test_echo_command_018(self):
"""
echo $(echo test)
"""
def test_echo_command_018(self) -> None:
self.proto.lineReceived(b"echo $(echo test)")
self.assertEqual(self.tr.value(), b"test\n" + PROMPT)
def test_echo_command_019(self):
"""
echo $(echo $(echo test))
"""
def test_echo_command_019(self) -> None:
self.proto.lineReceived(b"echo $(echo $(echo test))")
self.assertEqual(self.tr.value(), b"test\n" + PROMPT)
def test_echo_command_020(self):
"""
echo test_$(echo test)_test
"""
def test_echo_command_020(self) -> None:
self.proto.lineReceived(b"echo test_$(echo test)_test")
self.assertEqual(self.tr.value(), b"test_test_test\n" + PROMPT)
def test_echo_command_021(self):
"""
echo test_$(echo test)_test_$(echo test)_test
"""
def test_echo_command_021(self) -> None:
self.proto.lineReceived(b"echo test_$(echo test)_test_$(echo test)_test")
self.assertEqual(self.tr.value(), b"test_test_test_test_test\n" + PROMPT)
def test_echo_command_022(self):
"""
echo test; (echo test)
"""
def test_echo_command_022(self) -> None:
self.proto.lineReceived(b"echo test; (echo test)")
self.assertEqual(self.tr.value(), b"test\ntest\n" + PROMPT)
def test_echo_command_023(self):
"""
echo `echo test`
"""
def test_echo_command_023(self) -> None:
self.proto.lineReceived(b"echo `echo test`")
self.assertEqual(self.tr.value(), b"test\n" + PROMPT)
def test_echo_command_024(self):
"""
echo test_`echo test`_test
"""
def test_echo_command_024(self) -> None:
self.proto.lineReceived(b"echo test_`echo test`_test")
self.assertEqual(self.tr.value(), b"test_test_test\n" + PROMPT)
def test_echo_command_025(self):
"""
echo test_`echo test`_test_`echo test`_test
"""
def test_echo_command_025(self) -> None:
self.proto.lineReceived(b"echo test_`echo test`_test_`echo test`_test")
self.assertEqual(self.tr.value(), b"test_test_test_test_test\n" + PROMPT)
def test_echo_command_026(self):
"""
echo "TEST1: `echo test1`, TEST2: `echo test2`"
"""
self.proto.lineReceived(b'echo "TEST1: `echo test1`, TEST2: `echo test2`"')
def test_echo_command_026(self) -> None:
self.proto.lineReceived(b"echo \"TEST1: `echo test1`, TEST2: `echo test2`\"")
self.assertEqual(self.tr.value(), b"TEST1: test1, TEST2: test2\n" + PROMPT)
def tearDown(self):
self.proto.connectionLost("tearDown From Unit Test")

View File

@ -1,54 +1,50 @@
# -*- test-case-name: Cowrie Test Cases -*-
# Copyright (c) 2018 Michel Oosterhof
# See LICENSE for details.
from __future__ import annotations
import os
import unittest
from twisted.trial import unittest
from cowrie.shell.protocol import HoneyPotInteractiveProtocol
from cowrie.test.fake_server import FakeAvatar, FakeServer
from cowrie.test.fake_transport import FakeTransport
from cowrie.shell import protocol
from cowrie.test import fake_server, fake_transport
os.environ["COWRIE_HONEYPOT_DATA_PATH"] = "../data"
os.environ["COWRIE_HONEYPOT_DATA_PATH"] = "data"
os.environ["COWRIE_HONEYPOT_DOWNLOAD_PATH"] = "/tmp"
os.environ["COWRIE_SHELL_FILESYSTEM"] = "../share/cowrie/fs.pickle"
os.environ["COWRIE_SHELL_FILESYSTEM"] = "share/cowrie/fs.pickle"
PROMPT = b"root@unitTest:~# "
class ShellftpgetCommandTests(unittest.TestCase):
def setUp(self):
self.proto = protocol.HoneyPotInteractiveProtocol(
fake_server.FakeAvatar(fake_server.FakeServer())
)
self.tr = fake_transport.FakeTransport("1.1.1.1", "1111")
self.proto.makeConnection(self.tr)
class ShellFtpGetCommandTests(unittest.TestCase):
"""Tests for cowrie/commands/ftpget.py."""
proto = HoneyPotInteractiveProtocol(FakeAvatar(FakeServer()))
tr = FakeTransport("", "31337")
@classmethod
def setUpClass(cls) -> None:
cls.proto.makeConnection(cls.tr)
@classmethod
def tearDownClass(cls) -> None:
cls.proto.connectionLost("tearDown From Unit Test")
def setUp(self) -> None:
self.tr.clear()
def test_help_command(self):
"""
Basic test
"""
def test_help_command(self) -> None:
usage = b"BusyBox v1.20.2 (2016-06-22 15:12:53 EDT) multi-call binary.\n" \
b"\n" \
b"Usage: ftpget [OPTIONS] HOST [LOCAL_FILE] REMOTE_FILE\n" \
b"\n" \
b"Download a file via FTP\n" \
b"\n" \
b" -c Continue previous transfer\n" \
b" -v Verbose\n" \
b" -u USER Username\n" \
b" -p PASS Password\n" \
b" -P NUM Port\n\n"
self.proto.lineReceived(b"ftpget\n")
self.assertEqual(
self.tr.value(),
b"""BusyBox v1.20.2 (2016-06-22 15:12:53 EDT) multi-call binary.
Usage: ftpget [OPTIONS] HOST [LOCAL_FILE] REMOTE_FILE
Download a file via FTP
-c Continue previous transfer
-v Verbose
-u USER Username
-p PASS Password
-P NUM Port\n\n"""
+ PROMPT,
)
def tearDown(self):
self.proto.connectionLost("tearDown From Unit Test")
self.assertEqual(self.tr.value(), usage + PROMPT)

View File

@ -1,21 +1,21 @@
# -*- test-case-name: Cowrie Proxy Test Cases -*-
#mypy: ignore # noqa
# Copyright (c) 2019 Guilherme Borges
# See LICENSE for details.
from __future__ import annotations
import os
from twisted.cred import portal
from twisted.internet import reactor # type: ignore
from twisted.trial import unittest
import unittest
from cowrie.core.checkers import HoneypotPasswordChecker, HoneypotPublicKeyChecker
from cowrie.core.realm import HoneyPotRealm
from cowrie.ssh.factory import CowrieSSHFactory
from twisted.cred import portal
from twisted.internet import reactor # type: ignore
# from cowrie.test.proxy_compare import ProxyTestCommand
os.environ["COWRIE_HONEYPOT_TTYLOG"] = "false"

View File

@ -1,78 +1,61 @@
# -*- test-case-name: Cowrie Test Cases -*-
# Copyright (c) 2018 Michel Oosterhof
# See LICENSE for details.
"""
Tests for general shell interaction and tee command
"""
from __future__ import annotations
import os
import unittest
from twisted.trial import unittest
from cowrie.shell.protocol import HoneyPotInteractiveProtocol
from cowrie.test.fake_server import FakeAvatar, FakeServer
from cowrie.test.fake_transport import FakeTransport
from cowrie.shell import protocol
from cowrie.test import fake_server, fake_transport
os.environ["COWRIE_HONEYPOT_DATA_PATH"] = "../data"
os.environ["COWRIE_HONEYPOT_DATA_PATH"] = "data"
os.environ["COWRIE_HONEYPOT_DOWNLOAD_PATH"] = "/tmp"
os.environ["COWRIE_SHELL_FILESYSTEM"] = "../share/cowrie/fs.pickle"
os.environ["COWRIE_SHELL_FILESYSTEM"] = "share/cowrie/fs.pickle"
PROMPT = b"root@unitTest:~# "
class ShellTeeCommandTests(unittest.TestCase):
def setUp(self):
self.proto = protocol.HoneyPotInteractiveProtocol(
fake_server.FakeAvatar(fake_server.FakeServer())
)
self.tr = fake_transport.FakeTransport("1.1.1.1", "1111")
self.proto.makeConnection(self.tr)
"""Tests for cowrie/commands/tee.py."""
proto = HoneyPotInteractiveProtocol(FakeAvatar(FakeServer()))
tr = FakeTransport("", "31337")
@classmethod
def setUpClass(cls) -> None:
cls.proto.makeConnection(cls.tr)
@classmethod
def tearDownClass(cls) -> None:
cls.proto.connectionLost("tearDown From Unit Test")
def setUp(self) -> None:
self.tr.clear()
def test_tee_command_001(self):
"""
No such file
"""
def test_tee_command_001(self) -> None:
self.proto.lineReceived(b"tee /a/b/c/d\n")
self.assertEqual(self.tr.value(), b"tee: /a/b/c/d: No such file or directory\n")
# tee still waiting input from stdin
self.proto.handle_CTRL_C()
def test_tee_command_002(self):
"""
argument - (stdin)
"""
def test_tee_command_002(self) -> None:
self.proto.lineReceived(b"tee /a/b/c/d\n")
self.proto.handle_CTRL_C()
self.assertEqual(
self.tr.value(), b"tee: /a/b/c/d: No such file or directory\n^C\n" + PROMPT
)
self.assertEqual(self.tr.value(), b"tee: /a/b/c/d: No such file or directory\n^C\n" + PROMPT)
def test_tee_command_003(self):
"""
test ignore stdin when called without '-'
"""
def test_tee_command_003(self) -> None:
self.proto.lineReceived(b"tee a\n")
self.proto.lineReceived(b"test\n")
self.proto.handle_CTRL_D()
self.assertEqual(self.tr.value(), b"test\n" + PROMPT)
def test_tee_command_004(self):
"""
test handle of stdin
"""
def test_tee_command_004(self) -> None:
self.proto.lineReceived(b"echo test | tee\n")
self.assertEqual(self.tr.value(), b"test\n" + PROMPT)
def test_tee_command_005(self):
"""
test handle of CTRL_C
"""
def test_tee_command_005(self) -> None:
self.proto.lineReceived(b"tee\n")
self.proto.lineReceived(b"test\n")
self.proto.handle_CTRL_D()
self.assertEqual(self.tr.value(), b"test\n" + PROMPT)
def tearDown(self):
self.proto.connectionLost("tearDown From Unit Test")

View File

@ -1,47 +1,35 @@
# -*- test-case-name: Cowrie Test Cases -*-
# Copyright (c) 2018 Michel Oosterhof
# See LICENSE for details.
"""
Tests for general shell interaction and echo command
"""
from __future__ import annotations
import os
import unittest
from twisted.trial import unittest
from cowrie.shell.protocol import HoneyPotInteractiveProtocol
from cowrie.test.fake_server import FakeAvatar, FakeServer
from cowrie.test.fake_transport import FakeTransport
from cowrie.shell import protocol
from cowrie.test import fake_server, fake_transport
os.environ["COWRIE_HONEYPOT_DATA_PATH"] = "../data"
os.environ["COWRIE_HONEYPOT_DATA_PATH"] = "data"
os.environ["COWRIE_HONEYPOT_DOWNLOAD_PATH"] = "/tmp"
os.environ["COWRIE_SHELL_FILESYSTEM"] = "../share/cowrie/fs.pickle"
os.environ["COWRIE_SHELL_FILESYSTEM"] = "share/cowrie/fs.pickle"
PROMPT = b"root@unitTest:~# "
class ShellTftpCommandTests(unittest.TestCase):
def setUp(self):
self.proto = protocol.HoneyPotInteractiveProtocol(
fake_server.FakeAvatar(fake_server.FakeServer())
)
self.tr = fake_transport.FakeTransport("1.1.1.1", "1111")
"""Tests for cowrie/commands/tftp.py."""
def setUp(self) -> None:
self.proto = HoneyPotInteractiveProtocol(FakeAvatar(FakeServer()))
self.tr = FakeTransport("", "31337")
self.proto.makeConnection(self.tr)
self.tr.clear()
def test_echo_command_001(self):
"""
Basic test
"""
def tearDown(self) -> None:
self.proto.connectionLost("tearDown From Unit Test")
def test_echo_command_001(self) -> None:
self.proto.lineReceived(b"tftp\n")
self.assertEqual(
self.tr.value(),
b"usage: tftp [-h] [-c C C] [-l L] [-g G] [-p P] [-r R] [hostname]\n"
+ PROMPT,
self.tr.value(), b"usage: tftp [-h] [-c C C] [-l L] [-g G] [-p P] [-r R] [hostname]\n" + PROMPT
)
def tearDown(self):
self.proto.connectionLost("tearDown From Unit Test")

View File

@ -1,61 +1,50 @@
# -*- test-case-name: Cowrie Test Cases -*-
# Copyright (c) 2020 Peter Sufliarsky
# See LICENSE for details.
"""
Tests for uniq command
"""
from __future__ import annotations
import os
import unittest
from twisted.trial import unittest
from cowrie.shell.protocol import HoneyPotInteractiveProtocol
from cowrie.test.fake_server import FakeAvatar, FakeServer
from cowrie.test.fake_transport import FakeTransport
from cowrie.shell import protocol
from cowrie.test import fake_server, fake_transport
os.environ["COWRIE_HONEYPOT_DATA_PATH"] = "../data"
os.environ["COWRIE_HONEYPOT_DATA_PATH"] = "data"
os.environ["COWRIE_HONEYPOT_DOWNLOAD_PATH"] = "/tmp"
os.environ["COWRIE_SHELL_FILESYSTEM"] = "../share/cowrie/fs.pickle"
os.environ["COWRIE_SHELL_FILESYSTEM"] = "share/cowrie/fs.pickle"
PROMPT = b"root@unitTest:~# "
class ShellUniqCommandTests(unittest.TestCase):
def setUp(self):
self.proto = protocol.HoneyPotInteractiveProtocol(
fake_server.FakeAvatar(fake_server.FakeServer())
)
self.tr = fake_transport.FakeTransport("1.1.1.1", "1111")
self.proto.makeConnection(self.tr)
"""Tests for cowrie/commands/uniq.py."""
proto = HoneyPotInteractiveProtocol(FakeAvatar(FakeServer()))
tr = FakeTransport("", "31337")
@classmethod
def setUpClass(cls) -> None:
cls.proto.makeConnection(cls.tr)
@classmethod
def tearDownClass(cls) -> None:
cls.proto.connectionLost("tearDown From Unit Test")
def setUp(self) -> None:
self.tr.clear()
def test_uniq_command_001(self):
"""
echo test | uniq
"""
def test_uniq_command_001(self) -> None:
self.proto.lineReceived(b"echo test | uniq\n")
self.assertEqual(self.tr.value(), b"test\n" + PROMPT)
def test_uniq_command_002(self):
"""
echo -e "test\ntest\ntest" | uniq
"""
self.proto.lineReceived(b'echo -e "test\ntest\ntest" | uniq\n')
def test_uniq_command_002(self) -> None:
self.proto.lineReceived(b"echo -e \"test\ntest\ntest\" | uniq\n")
self.assertEqual(self.tr.value(), b"test\n" + PROMPT)
def test_uniq_command_003(self):
"""
test without arguments, read stdin and quit after Ctrl+D
"""
def test_uniq_command_003(self) -> None:
self.proto.lineReceived(b"uniq\n")
self.proto.lineReceived(b"test\n")
self.proto.lineReceived(b"test\n")
self.proto.lineReceived(b"test\n")
self.proto.handle_CTRL_D()
self.assertEqual(self.tr.value(), b"test\n\n" + PROMPT)
def tearDown(self):
self.proto.connectionLost("tearDown From Unit Test")

View File

@ -1,114 +1,85 @@
from __future__ import annotations
import configparser
import unittest
from io import StringIO
from cowrie.core.utils import create_endpoint_services, durationHuman, get_endpoints_from_section
from twisted.application.service import MultiService
from twisted.internet import reactor # type: ignore
from twisted.internet import protocol
from twisted.trial import unittest
from cowrie.core.utils import (
create_endpoint_services,
durationHuman,
get_endpoints_from_section,
)
from twisted.internet import reactor # type: ignore
def get_config(config_string):
config = configparser.ConfigParser()
config.read_file(StringIO(config_string))
return config
def get_config(config_string: str) -> configparser.ConfigParser:
"""Create ConfigParser from a config_string."""
cfg = configparser.ConfigParser()
cfg.read_file(StringIO(config_string))
return cfg
class UtilsTestCase(unittest.TestCase):
"""
Tests for cowrie/core/utils.py
"""
"""Tests for cowrie/core/utils.py."""
def test_durationHuman(self):
"""
Test of cowrie.core.utils.durationHuman
"""
def test_durationHuman(self) -> None:
minute = durationHuman(60)
self.assertEqual(minute, "01:00")
hour = durationHuman(3600)
self.assertEqual(hour, "01:00:00")
something = durationHuman(364020)
self.assertEqual(something, "4.0 days 05:07:00")
def test_get_endpoints_from_section(self):
def test_get_endpoints_from_section(self) -> None:
cfg = get_config(
"""
[ssh]
listen_addr = 1.1.1.1
"""
"[ssh]\n"
"listen_addr = 1.1.1.1\n"
)
self.assertEqual(["tcp:2223:interface=1.1.1.1"], get_endpoints_from_section(cfg, "ssh", 2223))
cfg = get_config(
"[ssh]\n"
"listen_addr = 1.1.1.1\n"
)
self.assertEqual(["tcp:2224:interface=1.1.1.1"], get_endpoints_from_section(cfg, "ssh", 2224))
cfg = get_config(
"[ssh]\n"
"listen_addr = 1.1.1.1 2.2.2.2\n"
)
self.assertEqual(
["tcp:2223:interface=1.1.1.1"], get_endpoints_from_section(cfg, "ssh", 2223)
["tcp:2223:interface=1.1.1.1", "tcp:2223:interface=2.2.2.2"], get_endpoints_from_section(cfg, "ssh", 2223)
)
cfg = get_config(
"""
[ssh]
listen_addr = 1.1.1.1
"""
"[ssh]\n"
"listen_addr = 1.1.1.1 2.2.2.2\n"
"listen_port = 23\n"
)
self.assertEqual(
["tcp:2224:interface=1.1.1.1"], get_endpoints_from_section(cfg, "ssh", 2224)
["tcp:23:interface=1.1.1.1", "tcp:23:interface=2.2.2.2"], get_endpoints_from_section(cfg, "ssh", 2223)
)
cfg = get_config(
"""
[ssh]
listen_addr = 1.1.1.1 2.2.2.2
"""
"[ssh]\n"
"listen_endpoints = tcp:23:interface=1.1.1.1 tcp:2323:interface=1.1.1.1\n"
)
self.assertEqual(
["tcp:2223:interface=1.1.1.1", "tcp:2223:interface=2.2.2.2"],
get_endpoints_from_section(cfg, "ssh", 2223),
["tcp:23:interface=1.1.1.1", "tcp:2323:interface=1.1.1.1"], get_endpoints_from_section(cfg, "ssh", 2223)
)
cfg = get_config(
"""
[ssh]
listen_addr = 1.1.1.1 2.2.2.2
listen_port = 23
"""
)
self.assertEqual(
["tcp:23:interface=1.1.1.1", "tcp:23:interface=2.2.2.2"],
get_endpoints_from_section(cfg, "ssh", 2223),
)
cfg = get_config(
"""
[ssh]
listen_endpoints = tcp:23:interface=1.1.1.1 tcp:2323:interface=1.1.1.1
"""
)
self.assertEqual(
["tcp:23:interface=1.1.1.1", "tcp:2323:interface=1.1.1.1"],
get_endpoints_from_section(cfg, "ssh", 2223),
)
def test_create_endpoint_services(self):
def test_create_endpoint_services(self) -> None:
parent = MultiService()
create_endpoint_services(
reactor, parent, ["tcp:23:interface=1.1.1.1"], protocol.Factory()
)
create_endpoint_services(reactor, parent, ["tcp:23:interface=1.1.1.1"], protocol.Factory())
self.assertEqual(len(parent.services), 1)
parent = MultiService()
create_endpoint_services(reactor, parent, ["tcp:23:interface=1.1.1.1"], protocol.Factory())
self.assertEqual(len(parent.services), 1)
parent = MultiService()
create_endpoint_services(
reactor, parent, ["tcp:23:interface=1.1.1.1"], protocol.Factory()
)
self.assertEqual(len(parent.services), 1)
parent = MultiService()
create_endpoint_services(
reactor,
parent,
["tcp:23:interface=1.1.1.1", "tcp:2323:interface=2.2.2.2"],
protocol.Factory(),
reactor, parent, ["tcp:23:interface=1.1.1.1", "tcp:2323:interface=2.2.2.2"], protocol.Factory()
)
self.assertEqual(len(parent.services), 2)

View File

@ -20,7 +20,7 @@ deps =
# libvirt-python==5.5.0
commands =
trial cowrie
python -m unittest discover src --verbose
[testenv:lint]