* shell var expansion fix + test scripts
* add `ruff` testing
* deprecation warning explanation
* updated prereqs
This commit is contained in:
Michel Oosterhof 2023-01-03 16:59:24 +08:00 committed by GitHub
parent b56e257c29
commit f77bd0b8f3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
63 changed files with 435 additions and 313 deletions

6
.github/ISSUE_TEMPLATE/config.yml vendored Normal file
View File

@ -0,0 +1,6 @@
---
blank_issues_enabled: false
contact_links:
- name: Ask a Question
url: https://www.cowrie.org/slack
about: If you have a question, or are looking for advice, please join our Slack

View File

@ -71,15 +71,15 @@ Next you need to create your virtual environment::
$ pwd
/home/cowrie/cowrie
$ virtualenv --python=python3 cowrie-env
$ python -m venv cowrie-env
New python executable in ./cowrie/cowrie-env/bin/python
Installing setuptools, pip, wheel...done.
Activate the virtual environment and install packages::
$ source cowrie-env/bin/activate
(cowrie-env) $ pip install --upgrade pip
(cowrie-env) $ pip install --upgrade -r requirements.txt
(cowrie-env) $ python -m pip install --upgrade pip
(cowrie-env) $ python -m pip install --upgrade -r requirements.txt
Step 5: Install configuration file
**********************************
@ -183,7 +183,7 @@ and their Python interface. In Debian/Ubuntu::
Then install the Python API to run the backend pool::
(cowrie-env) $ pip install libvirt-python==6.4.0
(cowrie-env) $ python -m pip install libvirt-python==6.4.0
To allow QEMU to use disk images and snapshots, set it to run with the user and group of the user running the pool
(usually called 'cowrie' too::
@ -232,6 +232,20 @@ See ~/cowrie/docs/[Output Plugin]/README.rst for details.
Troubleshooting
***************
CryptographyDeprecationWarning: Blowfish has been deprecated
============================================================
The following warnings may occur, these can be safely ignored, and
are not the reason your Cowrie installation is not working::
CryptographyDeprecationWarning: Blowfish has been deprecated b"blowfish-cbc": (algorithms.Blowfish, 16, modes.CBC),
CryptographyDeprecationWarning: CAST5 has been deprecated b"cast128-cbc": (algorithms.CAST5, 16, modes.CBC),
CryptographyDeprecationWarning: Blowfish has been deprecated b"blowfish-ctr": (algorithms.Blowfish, 16, modes.CTR),
CryptographyDeprecationWarning: CAST5 has been deprecated b"cast128-ctr": (algorithms.CAST5, 16, modes.CTR),
The algorithms are used in Cowrie to support old attackers that use
these deprecated algorithms.
twistd: unknown command: cowrie
===============================
@ -258,11 +272,11 @@ First stop your honeypot. Then pull updates from GitHub, and upgrade your Python
$ bin/cowrie stop
$ git pull
$ pip install --upgrade -r requirements.txt
$ python -m pip install --upgrade -r requirements.txt
If you use output plugins like SQL, Splunk, or ELK, remember to also upgrade your dependencies for these too::
$ pip install --upgrade -r requirements-output.txt
$ python -m pip install --upgrade -r requirements-output.txt
And finally, restart Cowrie after finishing all updates::

View File

@ -20,7 +20,7 @@ test:
.PHONY: build
build:
python setup.py build sdist bdist
python -m build
.PHONY: docs
docs: ## Create documentation
@ -42,11 +42,11 @@ pre-commit: ## Run pre-commit checks
.PHONY: pip-upgrade
pip-upgrade: ## Upgrade environment from requirements.txt
pip install --upgrade -r requirements.txt
python -m pip install --upgrade -r requirements.txt
.PHONY: pip-check
pip-check: ## Verify python packages
pip check
python -m pip check
# This assumes two remotes, one is `origin`, your fork. The second is `cowrie` the main project
.PHONY: git-remote

View File

@ -1,5 +1,17 @@
[tool.mypy]
[project]
name = "cowrie"
version = "2.4.0"
[tool.setuptools]
package-dir = {"" = "src"}
[tool.setuptools.packages.find]
where = ["src"]
[tool.mypy]
namespace_packages = true
plugins = [ "mypy_zope:plugin" ]
@ -33,5 +45,25 @@ disallow_any_expr = false
disallow_any_generics = false
disallow_untyped_calls = false
[tool.pylint."MESSAGES CONTROL"]
disable = [ "R0912", "C0103", "C0114", "C0115", "C0116", "C0301" ]
disable = ["R0901", "R0902", "R0903", "R0904", "R0912", "R0913", "R0914", "R0915", "C0103", "C0114", "C0115", "C0116", "C0301"]
[tool.isort]
profile = "black"
known_zope = "zope"
known_twisted = "twisted"
known_first_party = ["cowrie","backend_pool"]
sections=["FUTURE","STDLIB","THIRDPARTY","ZOPE","TWISTED","FIRSTPARTY","LOCALFOLDER"]
[tool.ruff]
line-length = 88
# Enable Pyflakes `E` and `F` codes by default.
select = ["E", "F"]
ignore = ["E501"]
# Assume Python 3.10.
target-version = "py310"

View File

@ -1,23 +1,26 @@
Sphinx==5.3.0
build==0.9.0
coverage==7.0.0
flake8-bugbear==22.12.6
flake8==6.0.0
mypy-extensions==0.4.3; platform_python_implementation=='CPython' and python_version>'3.8'
mypy-zope==0.3.11; platform_python_implementation=='CPython' and python_version>'3.8'
mypy==0.981; platform_python_implementation=='CPython' and python_version>'3.8'
pathspec==0.10.1
pipdeptree==2.2.1
pathspec==0.10.3
pipdeptree==2.3.3
pre-commit==2.20.0
pur==7.0.0
pyre-check==0.9.15; python_version>'3.8'
pytype==2022.8.30; platform_python_implementation=='CPython' and python_version<'3.11'
pyupgrade==2.37.3
pyre-check==0.9.17; python_version>'3.8'
pytype==2022.12.15; platform_python_implementation=='CPython' and python_version<'3.11'
pyupgrade==3.3.1
pyyaml==6.0
readthedocs-sphinx-search==0.1.2
setuptools==65.5.1
sphinx-copybutton==0.5.0
sphinx_rtd_theme==1.0.0
tox==3.25.1
twistedchecker==0.7.4
types-python-dateutil==2.8.19; python_version>'3.8'
types-redis==4.3.20; python_version>'3.8'
types-requests==2.28.9; python_version>'3.8'
yamllint==1.27.1
ruff==0.0.189
setuptools==65.6.3
sphinx-copybutton==0.5.1
sphinx_rtd_theme==1.1.1
tox==4.0.16
types-python-dateutil==2.8.19.5; python_version>'3.8'
types-redis==4.3.21.6; python_version>'3.8'
types-requests==2.28.11.6; python_version>'3.8'
yamllint==1.28.0

View File

@ -6,7 +6,7 @@ geoip2
requests==2.28.1
# elasticsearch
elasticsearch==8.5.2
elasticsearch==8.5.3
# hpfeeds
hpfeeds3==0.9.10
@ -21,7 +21,7 @@ pymongo==4.3.3
rethinkdb==2.4.9
# s3
botocore==1.29.20
botocore==1.29.34
# slack
slackclient==2.9.4
@ -33,7 +33,7 @@ influxdb==5.3.1
wokkel==18.0.0
# misp
pymisp==2.4.165.1
pymisp==2.4.166
# redis
redis==4.3.4
redis==4.4.0

View File

@ -1,9 +1,9 @@
appdirs==1.4.4
attrs==22.1.0
attrs==22.2.0
bcrypt==4.0.1
configparser==5.3.0
cryptography==38.0.4
packaging==21.3
packaging==22.0
pyasn1_modules==0.2.8
pyopenssl==22.1.0
pyparsing==3.0.9
@ -11,4 +11,4 @@ python-dateutil==2.8.2
service_identity==21.1.0
tftpy==0.8.2
treq==22.2.0
twisted==22.10.0
twisted==22.10.0

View File

@ -2,13 +2,6 @@
inputs = src/cowrie
keep_going = True
[isort]
profile = black
known_zope = zope
known_twisted = twisted
known_first_party = cowrie,backend_pool
sections=FUTURE,STDLIB,THIRDPARTY,ZOPE,TWISTED,FIRSTPARTY,LOCALFOLDER
[flake8]
ignore = E203,E501,W503
count = True

View File

@ -17,7 +17,17 @@ def create_disk_snapshot(source_img, destination_img):
# could use `capture_output=True` instead of `stdout` and `stderr` args in Python 3.7
out = subprocess.run(
["qemu-img", "create", "-f", "qcow2", "-F", "qcow2", "-b", source_img, destination_img],
[
"qemu-img",
"create",
"-f",
"qcow2",
"-F",
"qcow2",
"-b",
source_img,
destination_img,
],
capture_output=True,
)
return out.returncode == 0

View File

@ -38,14 +38,14 @@ class TelnetClient(StatefulTelnetProtocol):
when we detect the shell prompt.
TODO: Need to handle authentication failure
"""
if self.factory.prompt.strip() == br"#":
self.re_prompt = re.compile(br"#")
if self.factory.prompt.strip() == rb"#":
self.re_prompt = re.compile(rb"#")
else:
self.re_prompt = re.compile(self.factory.prompt.encode())
if re.search(br"([Ll]ogin:\s+$)", data):
if re.search(rb"([Ll]ogin:\s+$)", data):
self.sendLine(self.factory.username.encode())
elif re.search(br"([Pp]assword:\s+$)", data):
elif re.search(rb"([Pp]assword:\s+$)", data):
self.sendLine(self.factory.password.encode())
elif self.re_prompt.search(data):
self.setLineMode()

View File

@ -42,7 +42,7 @@ class Command_awk(HoneyPotCommand):
self.exit()
return
for o, a in optlist:
for o, _a in optlist:
if o in "--help":
self.help()
self.exit()

View File

@ -39,7 +39,7 @@ class Command_cat(HoneyPotCommand):
self.exit()
return
for o, a in optlist:
for o, _a in optlist:
if o in ("--help"):
self.help()
self.exit()

View File

@ -34,7 +34,7 @@ class Command_free(HoneyPotCommand):
return
# Parse options
for o, a in opts:
for o, _a in opts:
if o in ("-h"):
self.do_free(fmt="human")
return

View File

@ -74,7 +74,7 @@ class Command_grep(HoneyPotCommand):
self.exit()
return
for opt, arg in optlist:
for opt, _arg in optlist:
if opt == "-h":
self.help()
@ -342,7 +342,7 @@ or available locally via: info '(coreutils) rm invocation'\n"""
self.exit()
return
for o, a in optlist:
for o, _a in optlist:
if o in ("--recursive", "-r", "-R"):
recursive = True
elif o in ("--force", "-f"):

View File

@ -202,7 +202,7 @@ gcc version {} (Debian {}-5)""".format(
# Data contains random garbage from an actual file, so when
# catting the file, you'll see some 'real' compiled data
for i in range(random.randint(3, 15)):
for _i in range(random.randint(3, 15)):
if random.randint(1, 3) == 1:
data = data + Command_gcc.RANDOM_DATA[::-1]
else:

View File

@ -229,9 +229,9 @@ class Command_iptables(HoneyPotCommand):
)
# Get the tables
self.tables: dict[str, dict[str, list[Any]]] = getattr(
self.protocol.user.server, "iptables"
)
self.tables: dict[
str, dict[str, list[Any]]
] = self.protocol.user.server.iptables
# Verify selected table
if not self.is_valid_table(table):

View File

@ -49,7 +49,7 @@ class Command_ls(HoneyPotCommand):
self.write("Try 'ls --help' for more information.\n")
return
for x, a in opts:
for x, _a in opts:
if x in ("-l"):
func = self.do_ls_l
if x in ("-a"):

View File

@ -84,7 +84,7 @@ class Command_perl(HoneyPotCommand):
self.exit()
# Parse options
for o, a in opts:
for o, _a in opts:
if o in ("-v"):
self.version()
self.exit()

View File

@ -86,7 +86,7 @@ class Command_python(HoneyPotCommand):
return
# Parse options
for o, a in opts:
for o, _a in opts:
if o in "-V":
self.version()
self.exit()

View File

@ -103,7 +103,7 @@ class Command_service(HoneyPotCommand):
self.help()
return
for o, a in opts:
for o, _a in opts:
if o in ("--help") or o in ("-h"):
self.help()
return

View File

@ -107,7 +107,7 @@ class Command_sudo(HoneyPotCommand):
self.short_help()
return
for o, a in optlist:
for o, _a in optlist:
if o in ("-V"):
self.version()
return

View File

@ -41,7 +41,7 @@ class Command_tee(HoneyPotCommand):
self.exit()
return
for o, a in optlist:
for o, _a in optlist:
if o in ("--help"):
self.help()
self.exit()

View File

@ -151,7 +151,7 @@ class Command_uname(HoneyPotCommand):
# Set all opts for -a/--all, single opt otherwise:
if target_opt == "__ALL__":
for key, value in opts.items():
for key in opts.keys():
opts[key] = True
else:
opts[target_opt] = True

View File

@ -59,17 +59,13 @@ class Command_unzip(HoneyPotCommand):
path = self.fs.resolve_path(filename, self.protocol.cwd)
if not path:
self.write(
"unzip: cannot find or open {0}, {0}.zip or {0}.ZIP.\n".format(
filename
)
f"unzip: cannot find or open {filename}, {filename}.zip or {filename}.ZIP.\n"
)
return
if not self.protocol.fs.exists(path):
if not self.protocol.fs.exists(path + ".zip"):
self.write(
"unzip: cannot find or open {0}, {0}.zip or {0}.ZIP.\n".format(
filename
)
f"unzip: cannot find or open {filename}, {filename}.zip or {filename}.ZIP.\n"
)
return
else:
@ -85,9 +81,7 @@ class Command_unzip(HoneyPotCommand):
)
self.write(output)
self.write(
"unzip: cannot find or open {0}, {0}.zip or {0}.ZIP.\n".format(
filename
)
f"unzip: cannot find or open {filename}, {filename}.zip or {filename}.ZIP.\n"
)
return
@ -102,9 +96,7 @@ class Command_unzip(HoneyPotCommand):
)
self.write(output)
self.write(
"unzip: cannot find zipfile directory in one of {0}, {0}.zip or {0}.ZIP.\n".format(
filename
)
f"unzip: cannot find or open {filename}, {filename}.zip or {filename}.ZIP.\n"
)
return
self.write(f"Archive: {filename}\n")

View File

@ -69,7 +69,7 @@ class Command_wc(HoneyPotCommand):
self.errorWrite(f"wc: {filename}: No such file or directory\n")
def wc_application(self, contents: bytes, optlist: list[Tuple[str, str]]) -> None:
for opt, arg in optlist:
for opt, _arg in optlist:
if opt == "-l":
contentsplit = contents.split(b"\n")
self.write(f"{len(contentsplit) - 1}\n")

View File

@ -19,7 +19,8 @@ from twisted.cred.error import UnauthorizedLogin, UnhandledCredentials
from twisted.internet import defer
from twisted.python import failure, log
from cowrie.core import auth, credentials
from cowrie.core import auth
from cowrie.core import credentials as conchcredentials
from cowrie.core.config import CowrieConfig
@ -51,7 +52,7 @@ class HoneypotNoneChecker:
Checker that does no authentication check
"""
credentialInterfaces = (credentials.IUsername,)
credentialInterfaces = (conchcredentials.IUsername,)
def requestAvatarId(self, credentials):
return defer.succeed(credentials.username)
@ -64,8 +65,8 @@ class HoneypotPasswordChecker:
"""
credentialInterfaces = (
credentials.IUsernamePasswordIP,
credentials.IPluggableAuthenticationModulesIP,
conchcredentials.IUsernamePasswordIP,
conchcredentials.IPluggableAuthenticationModulesIP,
)
def requestAvatarId(self, credentials):
@ -75,7 +76,7 @@ class HoneypotPasswordChecker:
):
return defer.succeed(credentials.username)
return defer.fail(UnauthorizedLogin())
elif hasattr(credentials, "pamConversion"):
if hasattr(credentials, "pamConversion"):
return self.checkPamUser(
credentials.username, credentials.pamConversion, credentials.ip
)

View File

@ -50,7 +50,7 @@ class HoneyPotRealm:
serv = shellserver.CowrieServer(self)
user = shellavatar.CowrieUser(avatarId, serv)
return interfaces[0], user, user.logout
elif ITelnetProtocol in interfaces:
if ITelnetProtocol in interfaces:
serv = shellserver.CowrieServer(self)
user = session.HoneyPotTelnetSession(avatarId, serv)
return interfaces[0], user, user.logout

View File

@ -28,7 +28,7 @@ class LoggingServerProtocol(insults.ServerProtocol):
"honeypot", "download_limit_size", fallback=0
)
def __init__(self, prot=None, *a, **kw):
def __init__(self, protocolFactory=None, *a, **kw):
self.type: str
self.ttylogFile: str
self.ttylogSize: int = 0
@ -42,9 +42,9 @@ class LoggingServerProtocol(insults.ServerProtocol):
self.startTime: float
self.stdinlogFile: str
insults.ServerProtocol.__init__(self, prot, *a, **kw)
insults.ServerProtocol.__init__(self, protocolFactory, *a, **kw)
if prot is protocol.HoneyPotExecProtocol:
if protocolFactory is protocol.HoneyPotExecProtocol:
self.type = "e" # Execcmd
else:
self.type = "i" # Interactive

View File

@ -21,12 +21,18 @@ from cowrie.core.config import CowrieConfig
class Output(cowrie.core.output.Output):
def start(self) -> None:
self.url = CowrieConfig.get("output_datadog", "url").encode("utf8")
self.api_key = CowrieConfig.get("output_datadog", "api_key", fallback="").encode("utf8")
self.api_key = CowrieConfig.get(
"output_datadog", "api_key", fallback=""
).encode("utf8")
if len(self.api_key) == 0:
log.msg("Datadog output module: API key is not defined.")
self.ddsource = CowrieConfig.get("output_datadog", "ddsource", fallback="cowrie")
self.ddsource = CowrieConfig.get(
"output_datadog", "ddsource", fallback="cowrie"
)
self.ddtags = CowrieConfig.get("output_datadog", "ddtags", fallback="env:dev")
self.service = CowrieConfig.get("output_datadog", "service", fallback="honeypot")
self.service = CowrieConfig.get(
"output_datadog", "service", fallback="honeypot"
)
contextFactory = WebClientContextFactory()
self.agent = client.Agent(reactor, contextFactory)
@ -44,7 +50,7 @@ class Output(cowrie.core.output.Output):
"ddtags": self.ddtags,
"hostname": platform.node(),
"message": json.dumps(logentry),
"service": self.service
"service": self.service,
}
]
self.postentry(message)
@ -53,7 +59,7 @@ class Output(cowrie.core.output.Output):
base_headers = {
b"Accept": [b"application/json"],
b"Content-Type": [b"application/json"],
b"DD-API-KEY": [self.api_key]
b"DD-API-KEY": [self.api_key],
}
headers = http_headers.Headers(base_headers)
body = FileBodyProducer(BytesIO(json.dumps(entry).encode("utf8")))

View File

@ -26,6 +26,10 @@ class Output(cowrie.core.output.Output):
"""
dshield output
"""
debug: bool = False
userid: str
batch_size: int
batch: list
def start(self):
self.auth_key = CowrieConfig.get("output_dshield", "auth_key")
@ -45,7 +49,7 @@ class Output(cowrie.core.output.Output):
date = dateutil.parser.parse(entry["timestamp"])
self.batch.append(
{
"date": date.date().__str__(),
"date": str(date.date()),
"time": date.time().strftime("%H:%M:%S"),
"timezone": time.strftime("%z"),
"source_ip": entry["src_ip"],
@ -120,7 +124,7 @@ class Output(cowrie.core.output.Output):
log.msg(f"dshield: status code {resp.status_code}")
log.msg(f"dshield: response {resp.content}")
if resp.status_code == requests.codes.ok:
if resp.ok:
sha1_regex = re.compile(r"<sha1checksum>([^<]+)<\/sha1checksum>")
sha1_match = sha1_regex.search(response)
sha1_local = hashlib.sha1()
@ -156,9 +160,7 @@ class Output(cowrie.core.output.Output):
failed = True
log.msg(
"dshield: SUCCESS: Sent {} bytes worth of data to secure.dshield.org".format(
len(log_output)
)
f"dshield: SUCCESS: Sent {log_output} bytes worth of data to secure.dshield.org"
)
else:
log.msg(f"dshield ERROR: error {resp.status_code}.")

View File

@ -33,11 +33,11 @@ class Output(cowrie.core.output.Output):
del logentry[i]
gelf_message = {
'version': '1.1',
'host': logentry["sensor"],
'timestamp': time.time(),
'short_message': json.dumps(logentry),
'level': 1,
"version": "1.1",
"host": logentry["sensor"],
"timestamp": time.time(),
"short_message": json.dumps(logentry),
"level": 1,
}
self.postentry(gelf_message)

View File

@ -106,7 +106,7 @@ class Output(cowrie.core.output.Output):
elif entry["eventid"] == "cowrie.log.closed":
# entry["ttylog"]
with open(entry["ttylog"], 'rb') as ttylog:
with open(entry["ttylog"], "rb") as ttylog:
self.meta[session]["ttylog"] = ttylog.read().hex()
elif entry["eventid"] == "cowrie.session.closed":

View File

@ -164,7 +164,7 @@ class Output(cowrie.core.output.Output):
elif entry["eventid"] == "cowrie.session.params":
self.simpleQuery(
"INSERT INTO `params` (`session`, `arch`) " "VALUES (%s, %s)",
"INSERT INTO `params` (`session`, `arch`) VALUES (%s, %s)",
(entry["session"], entry["arch"]),
)
@ -224,7 +224,7 @@ class Output(cowrie.core.output.Output):
elif entry["eventid"] == "cowrie.client.version":
r = yield self.db.runQuery(
"SELECT `id` FROM `clients` " "WHERE `version` = %s",
"SELECT `id` FROM `clients` WHERE `version` = %s",
(entry["version"],),
)
@ -232,20 +232,20 @@ class Output(cowrie.core.output.Output):
id = int(r[0][0])
else:
yield self.db.runQuery(
"INSERT INTO `clients` (`version`) " "VALUES (%s)",
"INSERT INTO `clients` (`version`) VALUES (%s)",
(entry["version"],),
)
r = yield self.db.runQuery("SELECT LAST_INSERT_ID()")
id = int(r[0][0])
self.simpleQuery(
"UPDATE `sessions` " "SET `client` = %s " "WHERE `id` = %s",
"UPDATE `sessions` SET `client` = %s WHERE `id` = %s",
(id, entry["session"]),
)
elif entry["eventid"] == "cowrie.client.size":
self.simpleQuery(
"UPDATE `sessions` " "SET `termsize` = %s " "WHERE `id` = %s",
"UPDATE `sessions` SET `termsize` = %s WHERE `id` = %s",
("{}x{}".format(entry["width"], entry["height"]), entry["session"]),
)

View File

@ -14,8 +14,8 @@ class Output(cowrie.core.output.Output):
"""
def start(self):
self.bot_token = CowrieConfig.get('output_telegram', 'bot_token')
self.chat_id = CowrieConfig.get('output_telegram', 'chat_id')
self.bot_token = CowrieConfig.get("output_telegram", "bot_token")
self.chat_id = CowrieConfig.get("output_telegram", "chat_id")
def stop(self):
pass
@ -23,29 +23,29 @@ class Output(cowrie.core.output.Output):
def write(self, logentry):
for i in list(logentry.keys()):
# remove twisted 15 legacy keys
if i.startswith('log_'):
if i.startswith("log_"):
del logentry[i]
logon_type = ""
# Prepare logon type
if 'HoneyPotSSHTransport' in (logentry['system'].split(','))[0]:
logon_type = 'SSH'
elif 'CowrieTelnetTransport' in (logentry['system'].split(','))[0]:
logon_type = 'Telnet'
if "HoneyPotSSHTransport" in (logentry["system"].split(","))[0]:
logon_type = "SSH"
elif "CowrieTelnetTransport" in (logentry["system"].split(","))[0]:
logon_type = "Telnet"
# Prepare base message
msgtxt = "<strong>[Cowrie " + logentry['sensor'] + "]</strong>"
msgtxt = "<strong>[Cowrie " + logentry["sensor"] + "]</strong>"
msgtxt += "\nEvent: " + logentry["eventid"]
msgtxt += "\nLogon type: " + logon_type
msgtxt += "\nSource: <code>" + logentry['src_ip'] + "</code>"
msgtxt += "\nSession: <code>" + logentry['session'] + "</code>"
msgtxt += "\nSource: <code>" + logentry["src_ip"] + "</code>"
msgtxt += "\nSession: <code>" + logentry["session"] + "</code>"
if logentry['eventid'] == "cowrie.login.success":
msgtxt += "\nUsername: <code>" + logentry['username'] + "</code>"
msgtxt += "\nPassword: <code>" + logentry['password'] + "</code>"
if logentry["eventid"] == "cowrie.login.success":
msgtxt += "\nUsername: <code>" + logentry["username"] + "</code>"
msgtxt += "\nPassword: <code>" + logentry["password"] + "</code>"
self.send_message(msgtxt)
elif logentry["eventid"] in ["cowrie.command.failed", "cowrie.command.input"]:
msgtxt += "\nCommand: <pre>" + logentry['input'] + "</pre>"
msgtxt += "\nCommand: <pre>" + logentry["input"] + "</pre>"
self.send_message(msgtxt)
elif logentry["eventid"] == "cowrie.session.file_download":
msgtxt += "\nUrl: " + logentry.get("url", "")
@ -54,7 +54,13 @@ class Output(cowrie.core.output.Output):
def send_message(self, message):
log.msg("Telegram plugin will try to call TelegramBot")
try:
treq.get('https://api.telegram.org/bot' + self.bot_token + '/sendMessage',
params=[('chat_id', str(self.chat_id)), ('parse_mode', 'HTML'), ('text', message)])
treq.get(
"https://api.telegram.org/bot" + self.bot_token + "/sendMessage",
params=[
("chat_id", str(self.chat_id)),
("parse_mode", "HTML"),
("text", message),
],
)
except Exception:
log.msg("Telegram plugin request error")

View File

@ -41,7 +41,9 @@ class Output(cowrie.core.output.Output):
def start(self):
self.format = CowrieConfig.get("output_textlog", "format")
self.outfile = open(CowrieConfig.get("output_textlog", "logfile"), "a")
self.outfile = open(
CowrieConfig.get("output_textlog", "logfile"), "a", encoding="utf-8"
)
def stop(self):
pass

View File

@ -22,7 +22,7 @@ __author__ = "Diego Parrilla Santamaria"
__version__ = "0.1.0"
import datetime
from typing import Generator, List, Set
from typing import Generator, List, Optional, Set
from treq import post
@ -73,7 +73,11 @@ class HTTPClient:
self.api_url = api_url
def report(
self, ip_set: Set[str], category: str, ttl: int = 0, tags: List[str] = []
self,
ip_set: Set[str],
category: str,
ttl: int = 0,
tags: Optional[List[str]] = None,
) -> None:
payload: dict = {
"addresses": list(ip_set),

View File

@ -100,7 +100,6 @@ class Output(cowrie.core.output.Output):
"""
Stop output plugin
"""
pass
def write(self, entry: dict[str, Any]) -> None:
if entry["eventid"] == "cowrie.session.file_download":

View File

@ -32,8 +32,8 @@ class CowrieDailyLogFile(logfile.DailyLogFile):
def logger():
dir = CowrieConfig.get("honeypot", "log_path", fallback="log")
logfile = CowrieDailyLogFile("cowrie.log", dir)
directory = CowrieConfig.get("honeypot", "log_path", fallback="var/log/cowrie")
logfile = CowrieDailyLogFile("cowrie.log", directory)
# use Z for UTC (Zulu) time, it's shorter.
if "TZ" in environ and environ["TZ"] == "UTC":

View File

@ -33,7 +33,7 @@ class HoneyPotCommand:
self.args = list(args)
self.environ = self.protocol.cmdstack[0].environ
self.fs = self.protocol.fs
self.data: bytes = b'' # output data
self.data: bytes = b"" # output data
self.input_data: Optional[
bytes
] = None # used to store STDIN data passed via PIPE

View File

@ -108,12 +108,6 @@ class CowrieSFTPDirectory:
def __iter__(self):
return self
def next(self):
"""
Py2 compatibility
"""
return self.__next__()
def __next__(self):
try:
f = self.files.pop(0)

View File

@ -12,6 +12,7 @@ import os
from pathlib import Path
import pickle
import re
import sys
import stat
import time
from typing import Any, Optional
@ -74,24 +75,12 @@ class TooManyLevels(Exception):
raise OSError(errno.ELOOP, os.strerror(errno.ENOENT))
"""
pass
class IsADirectoryError(Exception):
"""
Something is a directory where the user was expecting a file
"""
pass
class FileNotFound(Exception):
"""
raise OSError(errno.ENOENT, os.strerror(errno.ENOENT))
"""
pass
class PermissionDenied(Exception):
"""
@ -112,8 +101,6 @@ class PermissionDenied(Exception):
touch: cannot touch '/sys/fs/fuse/connections/.nippon': Permission denied
"""
pass
class HoneyPotFilesystem:
def __init__(self, arch: str, home: str) -> None:
@ -128,7 +115,7 @@ class HoneyPotFilesystem:
self.fs = pickle.load(f, encoding="utf8")
except Exception as e:
log.err(e, "ERROR: Failed to load filesystem")
exit(2)
sys.exit(2)
# Keep track of arch so we can return appropriate binary
self.arch: str = arch
@ -151,7 +138,7 @@ class HoneyPotFilesystem:
the virtual filesystem.
"""
for path, directories, filenames in os.walk(honeyfs_path):
for path, _directories, filenames in os.walk(honeyfs_path):
for filename in filenames:
realfile_path: str = os.path.join(path, filename)
virtual_path: str = "/" + os.path.relpath(realfile_path, honeyfs_path)
@ -182,11 +169,11 @@ class HoneyPotFilesystem:
cwdpieces = [x for x in cwd.split("/") if len(x) and x is not None]
while 1:
if not len(pieces):
if not pieces:
break
piece = pieces.pop(0)
if piece == "..":
if len(cwdpieces):
if cwdpieces:
cwdpieces.pop()
continue
if piece in (".", ""):
@ -209,7 +196,7 @@ class HoneyPotFilesystem:
found: list[str] = []
def foo(p, cwd):
if not len(p):
if not p:
found.append("/{}".format("/".join(cwd)))
elif p[0] == ".":
foo(p[1:], cwd)
@ -230,7 +217,7 @@ class HoneyPotFilesystem:
"""
cwd: list[Any] = self.fs
for part in path.split("/"):
if not len(part):
if not part:
continue
ok = False
for c in cwd[A_CONTENTS]:
@ -333,20 +320,19 @@ class HoneyPotFilesystem:
f: Any = self.getfile(path)
if f[A_TYPE] == T_DIR:
raise IsADirectoryError
elif f[A_TYPE] == T_FILE and f[A_REALFILE]:
if f[A_TYPE] == T_FILE and f[A_REALFILE]:
return Path(f[A_REALFILE]).read_bytes()
elif f[A_TYPE] == T_FILE and f[A_SIZE] == 0:
if f[A_TYPE] == T_FILE and f[A_SIZE] == 0:
# Zero-byte file lacking A_REALFILE backing: probably empty.
# (The exceptions to this are some system files in /proc and /sys,
# but it's likely better to return nothing than suspiciously fail.)
return b""
elif f[A_TYPE] == T_FILE and f[A_MODE] & stat.S_IXUSR:
if f[A_TYPE] == T_FILE and f[A_MODE] & stat.S_IXUSR:
return open(
CowrieConfig.get("honeypot", "share_path") + "/arch/" + self.arch,
"rb",
).read()
else:
return b""
return b""
def mkfile(
self,
@ -387,13 +373,13 @@ class HoneyPotFilesystem:
raise OSError(errno.EDQUOT, os.strerror(errno.EDQUOT), path)
if ctime is None:
ctime = time.time()
if not len(path.strip("/")):
if not path.strip("/"):
raise OSError(errno.ENOENT, os.strerror(errno.ENOENT), path)
try:
dir = self.get_path(os.path.dirname(path.strip("/")))
directory = self.get_path(os.path.dirname(path.strip("/")))
except IndexError:
raise OSError(errno.ENOENT, os.strerror(errno.ENOENT), path)
dir.append(
directory.append(
[os.path.basename(path), T_DIR, uid, gid, size, mode, ctime, [], None, None]
)
self.newcount += 1
@ -411,8 +397,7 @@ class HoneyPotFilesystem:
return False
if f[A_TYPE] == T_FILE:
return True
else:
return False
return False
def islink(self, path: str) -> bool:
"""
@ -428,8 +413,7 @@ class HoneyPotFilesystem:
return False
if f[A_TYPE] == T_LINK:
return True
else:
return False
return False
def isdir(self, path: str) -> bool:
"""
@ -439,15 +423,14 @@ class HoneyPotFilesystem:
if path == "/":
return True
try:
dir = self.getfile(path)
directory = self.getfile(path)
except Exception:
dir = None
if dir is None:
directory = None
if directory is None:
return False
if dir[A_TYPE] == T_DIR:
if directory[A_TYPE] == T_DIR:
return True
else:
return False
return False
"""
Below additions for SFTP support, try to keep functions here similar to os.*
@ -487,7 +470,7 @@ class HoneyPotFilesystem:
return fd
# TODO: throw exception
elif openFlags & os.O_RDONLY == os.O_RDONLY:
if openFlags & os.O_RDONLY == os.O_RDONLY:
return None
# TODO: throw exception
@ -504,9 +487,8 @@ class HoneyPotFilesystem:
if not fd:
return
if self.tempfiles[fd] is not None:
shasum: str = hashlib.sha256(
open(self.tempfiles[fd], "rb").read()
).hexdigest()
with open(self.tempfiles[fd], "rb") as f:
shasum: str = hashlib.sha256(f.read()).hexdigest()
shasumfile: str = (
CowrieConfig.get("honeypot", "download_path") + "/" + shasum
)
@ -535,8 +517,8 @@ class HoneyPotFilesystem:
"""
FIXME mkdir() name conflicts with existing mkdir
"""
dir: Optional[list[Any]] = self.getfile(path)
if dir:
directory: Optional[list[Any]] = self.getfile(path)
if directory:
raise OSError(errno.EEXIST, os.strerror(errno.EEXIST), path)
self.mkdir(path, 0, 0, 4096, 16877)
@ -544,10 +526,10 @@ class HoneyPotFilesystem:
p: str = path.rstrip("/")
name: str = os.path.basename(p)
parent: str = os.path.dirname(p)
dir: Any = self.getfile(p, follow_symlinks=False)
if not dir:
directory: Any = self.getfile(p, follow_symlinks=False)
if not directory:
raise OSError(errno.EEXIST, os.strerror(errno.EEXIST), p)
if dir[A_TYPE] != T_DIR:
if directory[A_TYPE] != T_DIR:
raise OSError(errno.ENOTDIR, os.strerror(errno.ENOTDIR), p)
if len(self.get_path(p)) > 0:
raise OSError(errno.ENOTEMPTY, os.strerror(errno.ENOTEMPTY), p)
@ -558,7 +540,7 @@ class HoneyPotFilesystem:
return True
return False
def utime(self, path: str, atime: float, mtime: float) -> None:
def utime(self, path: str, _atime: float, mtime: float) -> None:
p: Optional[list[Any]] = self.getfile(path)
if not p:
raise OSError(errno.ENOENT, os.strerror(errno.ENOENT))
@ -589,7 +571,7 @@ class HoneyPotFilesystem:
p: Optional[list[Any]] = self.getfile(path, follow_symlinks=False)
if not p:
raise OSError(errno.ENOENT, os.strerror(errno.ENOENT))
if not (p[A_MODE] & stat.S_IFLNK):
if not p[A_MODE] & stat.S_IFLNK:
raise OSError
return p[A_TARGET] # type: ignore

View File

@ -81,7 +81,7 @@ class HoneyPotShell:
elif "$(" in tok or "`" in tok:
tok = self.do_command_substitution(tok)
elif tok.startswith("${"):
envRex = re.compile(r"^\$([_a-zA-Z0-9]+)$")
envRex = re.compile(r"^\${([_a-zA-Z0-9]+)}$")
envSearch = envRex.search(tok)
if envSearch is not None:
envMatch = envSearch.group(1)
@ -90,7 +90,7 @@ class HoneyPotShell:
else:
continue
elif tok.startswith("$"):
envRex = re.compile(r"^\${([_a-zA-Z0-9]+)}$")
envRex = re.compile(r"^\$([_a-zA-Z0-9]+)$")
envSearch = envRex.search(tok)
if envSearch is not None:
envMatch = envSearch.group(1)
@ -264,7 +264,7 @@ class HoneyPotShell:
# Gather all arguments with pipes
for index, pipe_indice in enumerate(pipe_indices):
for _index, pipe_indice in enumerate(pipe_indices):
multipleCmdArgs.append(cmdAndArgs[start:pipe_indice])
start = pipe_indice + 1
@ -274,7 +274,7 @@ class HoneyPotShell:
cmd_array.append(cmd)
cmd = {}
for index, value in enumerate(multipleCmdArgs):
for value in multipleCmdArgs:
cmd["command"] = value.pop(0)
cmd["rargs"] = parse_arguments(value)
cmd_array.append(cmd)
@ -424,7 +424,7 @@ class HoneyPotShell:
return
# Clear early so we can call showPrompt if needed
for i in range(self.protocol.lineBufferIndex):
for _i in range(self.protocol.lineBufferIndex):
self.protocol.terminal.cursorBackward()
self.protocol.terminal.deleteCharacter()
@ -544,7 +544,7 @@ class StdOutStdErrEmulationProtocol:
pass
def processExited(self, reason):
log.msg("processExited for %s, status %d" % (self.cmd, reason.value.exitCode))
log.msg(f"processExited for {self.cmd}, status {reason.value.exitCode}")
def processEnded(self, reason):
log.msg("processEnded for %s, status %d" % (self.cmd, reason.value.exitCode))
log.msg(f"processEnded for {self.cmd}, status {reason.value.exitCode}")

View File

@ -33,7 +33,7 @@ class HoneyPotBaseProtocol(insults.TerminalProtocol, TimeoutMixin):
f"cowrie.commands.{c}", globals(), locals(), ["commands"]
)
commands.update(module.commands)
except Exception as e:
except ImportError as e:
exc_type, exc_value, exc_traceback = sys.exc_info()
log.err(
"Failed to import command {}: {}: {}".format(
@ -45,11 +45,11 @@ class HoneyPotBaseProtocol(insults.TerminalProtocol, TimeoutMixin):
)
)
def __init__(self, user):
self.user = user
self.environ = user.environ
self.hostname: str = user.server.hostname
self.fs = user.server.fs
def __init__(self, avatar):
self.user = avatar
self.environ = avatar.environ
self.hostname: str = self.user.server.hostname
self.fs = self.user.server.fs
self.pp = None
self.logintime: float
self.realClientIP: str
@ -59,8 +59,8 @@ class HoneyPotBaseProtocol(insults.TerminalProtocol, TimeoutMixin):
self.sessionno: int
self.factory = None
if self.fs.exists(user.avatar.home):
self.cwd = user.avatar.home
if self.fs.exists(self.user.avatar.home):
self.cwd = self.user.avatar.home
else:
self.cwd = "/"
self.data = None
@ -82,7 +82,7 @@ class HoneyPotBaseProtocol(insults.TerminalProtocol, TimeoutMixin):
args["sessionno"] = self.sessionno
self.factory.logDispatch(**args)
def connectionMade(self):
def connectionMade(self) -> None:
pt = self.getProtoTransport()
self.factory = pt.factory
@ -112,7 +112,7 @@ class HoneyPotBaseProtocol(insults.TerminalProtocol, TimeoutMixin):
except Exception:
self.kippoIP = "192.168.0.1"
def timeoutConnection(self):
def timeoutConnection(self) -> None:
"""
this logs out when connection times out
"""
@ -134,7 +134,7 @@ class HoneyPotBaseProtocol(insults.TerminalProtocol, TimeoutMixin):
self.user = None
self.environ = None
def txtcmd(self, txt):
def txtcmd(self, txt: str) -> object:
class Command_txtcmd(command.HoneyPotCommand):
def call(self):
log.msg(f'Reading txtcmd from "{txt}"')
@ -177,18 +177,18 @@ class HoneyPotBaseProtocol(insults.TerminalProtocol, TimeoutMixin):
log.msg(f"Can't find command {cmd}")
return None
def lineReceived(self, line):
def lineReceived(self, line: bytes) -> None:
"""
IMPORTANT
Before this, all data is 'bytes'. Here it converts to 'string' and
commands work with string rather than bytes.
"""
line = line.decode("utf8")
string = line.decode("utf8")
if self.cmdstack:
self.cmdstack[-1].lineReceived(line)
self.cmdstack[-1].lineReceived(string)
else:
log.msg(f"discarding input {line}")
log.msg(f"discarding input {string}")
def call_command(self, pp, cmd, *args):
self.pp = pp
@ -208,7 +208,7 @@ class HoneyPotBaseProtocol(insults.TerminalProtocol, TimeoutMixin):
r = time.time() - pt.factory.starttime
return r
def eofReceived(self):
def eofReceived(self) -> None:
# Shell received EOF, nicely exit
"""
TODO: this should probably not go through transport, but use processprotocol to close stdin
@ -234,7 +234,7 @@ class HoneyPotExecProtocol(HoneyPotBaseProtocol):
HoneyPotBaseProtocol.__init__(self, avatar)
def connectionMade(self):
def connectionMade(self) -> None:
HoneyPotBaseProtocol.connectionMade(self)
self.setTimeout(60)
self.cmdstack = [honeypot.HoneyPotShell(self, interactive=False)]
@ -251,7 +251,7 @@ class HoneyPotInteractiveProtocol(HoneyPotBaseProtocol, recvline.HistoricRecvLin
recvline.HistoricRecvLine.__init__(self)
HoneyPotBaseProtocol.__init__(self, avatar)
def connectionMade(self):
def connectionMade(self) -> None:
self.displayMOTD()
HoneyPotBaseProtocol.connectionMade(self)
@ -279,13 +279,13 @@ class HoneyPotInteractiveProtocol(HoneyPotBaseProtocol, recvline.HistoricRecvLin
}
)
def displayMOTD(self):
def displayMOTD(self) -> None:
try:
self.terminal.write(self.fs.file_contents("/etc/motd"))
except Exception:
pass
def timeoutConnection(self):
def timeoutConnection(self) -> None:
"""
this logs out when connection times out
"""
@ -297,7 +297,7 @@ class HoneyPotInteractiveProtocol(HoneyPotBaseProtocol, recvline.HistoricRecvLin
recvline.HistoricRecvLine.connectionLost(self, reason)
self.keyHandlers = {}
def initializeScreen(self):
def initializeScreen(self) -> None:
"""
Overriding super to prevent terminal.reset()
"""
@ -327,23 +327,23 @@ class HoneyPotInteractiveProtocol(HoneyPotBaseProtocol, recvline.HistoricRecvLin
self.historyPosition = len(self.historyLines)
return recvline.RecvLine.handle_RETURN(self)
def handle_CTRL_C(self):
def handle_CTRL_C(self) -> None:
if self.cmdstack:
self.cmdstack[-1].handle_CTRL_C()
def handle_CTRL_D(self):
def handle_CTRL_D(self) -> None:
if self.cmdstack:
self.cmdstack[-1].handle_CTRL_D()
def handle_TAB(self):
def handle_TAB(self) -> None:
if self.cmdstack:
self.cmdstack[-1].handle_TAB()
def handle_CTRL_K(self):
def handle_CTRL_K(self) -> None:
self.terminal.eraseToLineEnd()
self.lineBuffer = self.lineBuffer[0 : self.lineBufferIndex]
def handle_CTRL_L(self):
def handle_CTRL_L(self) -> None:
"""
Handle a 'form feed' byte - generally used to request a screen
refresh/redraw.
@ -352,17 +352,17 @@ class HoneyPotInteractiveProtocol(HoneyPotBaseProtocol, recvline.HistoricRecvLin
self.terminal.cursorHome()
self.drawInputLine()
def handle_CTRL_U(self):
def handle_CTRL_U(self) -> None:
for _ in range(self.lineBufferIndex):
self.terminal.cursorBackward()
self.terminal.deleteCharacter()
self.lineBuffer = self.lineBuffer[self.lineBufferIndex :]
self.lineBufferIndex = 0
def handle_CTRL_V(self):
def handle_CTRL_V(self) -> None:
pass
def handle_ESC(self):
def handle_ESC(self) -> None:
pass
@ -373,7 +373,6 @@ class HoneyPotInteractiveTelnetProtocol(HoneyPotInteractiveProtocol):
"""
def __init__(self, avatar):
recvline.HistoricRecvLine.__init__(self)
HoneyPotInteractiveProtocol.__init__(self, avatar)
def getProtoTransport(self):

View File

@ -76,7 +76,7 @@ class SSHSessionForCowrieUser:
self.protocol.makeConnection(processprotocol)
processprotocol.makeConnection(session.wrapProtocol(self.protocol))
def closed(self):
def closed(self) -> None:
"""
this is reliably called on both logout and disconnect
we notify the protocol here we lost the connection
@ -85,7 +85,7 @@ class SSHSessionForCowrieUser:
self.protocol.connectionLost("disconnected")
self.protocol = None
def eofReceived(self):
def eofReceived(self) -> None:
if self.protocol:
self.protocol.eofReceived()

View File

@ -38,7 +38,7 @@ class CowrieSSHFactory(factory.SSHFactory):
portal: Optional[tp.Portal] = None # gets set by plugin
ourVersionString: bytes = CowrieConfig.get(
"ssh", "version", fallback="SSH-2.0-OpenSSH_6.0p1 Debian-4+deb7u2"
).encode('ascii')
).encode("ascii")
def __init__(self, backend, pool_handler):
self.pool_handler = pool_handler
@ -101,7 +101,7 @@ class CowrieSSHFactory(factory.SSHFactory):
# this can come from backend in the future, check HonSSH's slim client
self.ourVersionString = CowrieConfig.get(
"ssh", "version", fallback="SSH-2.0-OpenSSH_6.0p1 Debian-4+deb7u2"
).encode('ascii')
).encode("ascii")
factory.SSHFactory.startFactory(self)
log.msg("Ready to accept SSH connections")

View File

@ -120,7 +120,7 @@ class HoneyPotSSHTransport(transport.SSHServerTransport, TimeoutMixin):
),
format="Remote SSH version: %(version)s",
)
m = re.match(br"SSH-(\d+.\d+)-(.*)", self.otherVersionString)
m = re.match(rb"SSH-(\d+.\d+)-(.*)", self.otherVersionString)
if m is None:
log.msg(
"Bad protocol version identification: {}".format(

View File

@ -27,6 +27,7 @@ class HoneyPotSSHUserAuthServer(userauth.SSHUserAuthServer):
* Keyboard-interactive authentication (PAM)
* IP based authentication
"""
user: str
_pamDeferred: defer.Deferred | None

View File

@ -70,12 +70,12 @@ class ExecTerm(base_protocol.BaseProtocol):
)
ttylog.ttylog_open(self.ttylogFile, self.startTime)
def parse_packet(self, parent: str, payload: bytes) -> None:
def parse_packet(self, parent: str, data: bytes) -> None:
if self.ttylogEnabled:
ttylog.ttylog_write(
self.ttylogFile, len(payload), ttylog.TYPE_OUTPUT, time.time(), payload
self.ttylogFile, len(data), ttylog.TYPE_OUTPUT, time.time(), data
)
self.ttylogSize += len(payload)
self.ttylogSize += len(data)
def channel_closed(self):
if self.ttylogEnabled:

View File

@ -54,6 +54,7 @@ class FrontendSSHTransport(transport.SSHServerTransport, TimeoutMixin):
at the same time, perform the userauth service via ProxySSHAuthServer (built-in Cowrie's mechanism).
After both sides are authenticated, forward all things from one side to another.
"""
buf: bytes
ourVersionString: bytes
gotVersion: bool
@ -219,7 +220,7 @@ class FrontendSSHTransport(transport.SSHServerTransport, TimeoutMixin):
),
format="Remote SSH version: %(version)s",
)
m = re.match(br"SSH-(\d+.\d+)-(.*)", self.otherVersionString)
m = re.match(rb"SSH-(\d+.\d+)-(.*)", self.otherVersionString)
if m is None:
log.msg(
"Bad protocol version identification: {}".format(

View File

@ -110,8 +110,8 @@ class TelnetSessionProcessProtocol(protocol.ProcessProtocol):
def outReceived(self, data: bytes) -> None:
self.session.write(data)
def errReceived(self, err: bytes) -> None:
log.msg(f"Error received: {err.decode()}")
def errReceived(self, data: bytes) -> None:
log.msg(f"Error received: {data.decode()}")
# EXTENDED_DATA_STDERR is from ssh, no equivalent in telnet?
# self.session.writeExtended(connection.EXTENDED_DATA_STDERR, err)

View File

@ -19,10 +19,17 @@ from cowrie.core.config import CowrieConfig
class CowrieTelnetTransport(TelnetTransport, TimeoutMixin):
def connectionMade(self):
self.transportId = uuid.uuid4().hex[:12]
sessionno = self.transport.sessionno
"""
CowrieTelnetTransport
"""
def __init__(self):
TelnetTransport.__init__()
TimeoutMixin.__init__()
self.transportId: str = uuid.uuid4().hex[:12]
def connectionMade(self):
sessionno = self.transport.sessionno
self.startTime = time.time()
self.setTimeout(
CowrieConfig.getint("honeypot", "authentication_timeout", fallback=120)

View File

@ -1,4 +1,3 @@
#mypy: ignore # noqa
from __future__ import annotations
from backend_pool.ssh_exec import execute_ssh

View File

@ -34,21 +34,21 @@ class ShellEchoCommandTests(unittest.TestCase):
self.tr.clear()
def test_awk_command_001(self) -> None:
self.proto.lineReceived(b"echo \"test test\" | awk \"{ print $0 }\"\n")
self.proto.lineReceived(b'echo "test test" | awk "{ print $0 }"\n')
self.assertEqual(self.tr.value(), b"test test\n" + PROMPT)
def test_awk_command_002(self) -> None:
self.proto.lineReceived(b"echo \"test\" | awk \"{ print $1 }\"\n")
self.proto.lineReceived(b'echo "test" | awk "{ print $1 }"\n')
self.assertEqual(self.tr.value(), b"test\n" + PROMPT)
def test_awk_command_003(self) -> None:
self.proto.lineReceived(b"echo \"test test\" | awk \"{ print $1 $2 }\"\n")
self.proto.lineReceived(b'echo "test test" | awk "{ print $1 $2 }"\n')
self.assertEqual(self.tr.value(), b"test test\n" + PROMPT)
def test_awk_command_004(self) -> None:
self.proto.lineReceived(b"echo \"test test\" | awk \"{ print $1,$2 }\"\n")
self.proto.lineReceived(b'echo "test test" | awk "{ print $1,$2 }"\n')
self.assertEqual(self.tr.value(), b"test test\n" + PROMPT)
def test_awk_command_005(self) -> None:
self.proto.lineReceived(b"echo \"test test\" | awk \"{ print $1$2 }\"\n")
self.proto.lineReceived(b'echo "test test" | awk "{ print $1$2 }"\n')
self.assertEqual(self.tr.value(), b"testtest\n" + PROMPT)

View File

@ -59,26 +59,38 @@ class ShellBaseCommandsTests(unittest.TestCase): # TODO: ps, history
def test_id_command(self) -> None:
self.proto.lineReceived(b"id\n")
self.assertEqual(self.tr.value(), b"uid=0(root) gid=0(root) groups=0(root)\n" + PROMPT)
self.assertEqual(
self.tr.value(), b"uid=0(root) gid=0(root) groups=0(root)\n" + PROMPT
)
def test_passwd_command(self) -> None:
self.proto.lineReceived(b"passwd\n")
self.proto.lineReceived(b"changeme\n")
self.proto.lineReceived(b"changeme\n")
self.assertEqual(self.tr.value(),
b"Enter new UNIX password: Retype new UNIX password: passwd: password updated successfully\n" + PROMPT)
self.assertEqual(
self.tr.value(),
b"Enter new UNIX password: Retype new UNIX password: passwd: password updated successfully\n"
+ PROMPT,
)
def test_shutdown_command(self) -> None:
self.proto.lineReceived(b"shutdown\n")
self.assertEqual(self.tr.value(), b"Try `shutdown --help' for more information.\n" + PROMPT) # TODO: Is it right?..
self.assertEqual(
self.tr.value(), b"Try `shutdown --help' for more information.\n" + PROMPT
) # TODO: Is it right?..
def test_poweroff_command(self) -> None:
self.proto.lineReceived(b"poweroff\n")
self.assertEqual(self.tr.value(), b"Try `shutdown --help' for more information.\n" + PROMPT) # TODO: Is it right?..
self.assertEqual(
self.tr.value(), b"Try `shutdown --help' for more information.\n" + PROMPT
) # TODO: Is it right?..
def test_date_command(self) -> None:
self.proto.lineReceived(b"date\n")
self.assertRegex(self.tr.value(), rb"[A-Za-z]{3} [A-Za-z]{3} \d{2} \d{2}:\d{2}:\d{2} UTC \d{4}\n" + PROMPT)
self.assertRegex(
self.tr.value(),
rb"[A-Za-z]{3} [A-Za-z]{3} \d{2} \d{2}:\d{2}:\d{2} UTC \d{4}\n" + PROMPT,
)
def test_bash_command(self) -> None:
self.proto.lineReceived(b"bash\n")
@ -86,7 +98,9 @@ class ShellBaseCommandsTests(unittest.TestCase): # TODO: ps, history
def test_sh_command(self) -> None:
self.proto.lineReceived(b"sh -c id\n")
self.assertEqual(self.tr.value(), b"uid=0(root) gid=0(root) groups=0(root)\n" + PROMPT)
self.assertEqual(
self.tr.value(), b"uid=0(root) gid=0(root) groups=0(root)\n" + PROMPT
)
def test_php_help_command(self) -> None:
self.proto.lineReceived(b"php -h\n")
@ -106,8 +120,11 @@ class ShellBaseCommandsTests(unittest.TestCase): # TODO: ps, history
def test_set_command(self) -> None:
self.proto.lineReceived(b"set\n")
self.assertEqual(self.tr.value(),
b"COLUMNS=80\nHOME=/root\nLINES=25\nLOGNAME=root\nPATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin\nTMOUT=1800\nUSER=root\n" + PROMPT)
self.assertEqual(
self.tr.value(),
b"COLUMNS=80\nHOME=/root\nLINES=25\nLOGNAME=root\nPATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin\nTMOUT=1800\nUSER=root\n"
+ PROMPT,
)
def test_unset_command(self) -> None:
self.proto.lineReceived(b"unset\n")
@ -162,7 +179,11 @@ class ShellBaseCommandsTests(unittest.TestCase): # TODO: ps, history
def test_cd_error_output(self) -> None:
self.proto.lineReceived(f"cd {NONEXISTEN_FILE:s}".encode())
self.assertEqual(self.tr.value(), f"bash: cd: {NONEXISTEN_FILE:s}: No such file or directory\n".encode() + PROMPT)
self.assertEqual(
self.tr.value(),
f"bash: cd: {NONEXISTEN_FILE:s}: No such file or directory\n".encode()
+ PROMPT,
)
class ShellFileCommandsTests(unittest.TestCase):
@ -209,8 +230,11 @@ class ShellFileCommandsTests(unittest.TestCase):
def test_rm_error_output(self) -> None: # TODO: quotes?..
self.proto.lineReceived(f"rm {NONEXISTEN_FILE:s}\n".encode())
self.assertEqual(self.tr.value(),
f"rm: cannot remove `{NONEXISTEN_FILE:s}': No such file or directory\n".encode() + PROMPT)
self.assertEqual(
self.tr.value(),
f"rm: cannot remove `{NONEXISTEN_FILE:s}': No such file or directory\n".encode()
+ PROMPT,
)
def test_cp_output(self) -> None:
self.proto.lineReceived(b"cp /usr/bin/gcc /tmp\n")
@ -218,8 +242,11 @@ class ShellFileCommandsTests(unittest.TestCase):
def test_cp_error_output(self) -> None: # TODO: quotes?..
self.proto.lineReceived(f"cp {NONEXISTEN_FILE:s} /tmp\n".encode())
self.assertEqual(self.tr.value(),
f"cp: cannot stat `{NONEXISTEN_FILE:s}': No such file or directory\n".encode() + PROMPT)
self.assertEqual(
self.tr.value(),
f"cp: cannot stat `{NONEXISTEN_FILE:s}': No such file or directory\n".encode()
+ PROMPT,
)
def test_mv_output(self) -> None:
self.proto.lineReceived(b"mv /usr/bin/awk /tmp\n")
@ -227,8 +254,11 @@ class ShellFileCommandsTests(unittest.TestCase):
def test_mv_error_output(self) -> None: # TODO: quotes?..
self.proto.lineReceived(f"mv {NONEXISTEN_FILE:s} /tmp\n".encode())
self.assertEqual(self.tr.value(),
f"mv: cannot stat `{NONEXISTEN_FILE:s}': No such file or directory\n".encode() + PROMPT)
self.assertEqual(
self.tr.value(),
f"mv: cannot stat `{NONEXISTEN_FILE:s}': No such file or directory\n".encode()
+ PROMPT,
)
def test_mkdir_output(self) -> None:
path = "/tmp/hello"
@ -242,7 +272,11 @@ class ShellFileCommandsTests(unittest.TestCase):
path = "/etc"
self.proto.lineReceived(f"mkdir {path:s}\n".encode())
self.assertEqual(self.tr.value(), f"mkdir: cannot create directory `{path:s}': File exists\n".encode() + PROMPT)
self.assertEqual(
self.tr.value(),
f"mkdir: cannot create directory `{path:s}': File exists\n".encode()
+ PROMPT,
)
def test_rmdir_output(self) -> None:
path = "/tmp/bye"
@ -255,8 +289,11 @@ class ShellFileCommandsTests(unittest.TestCase):
def test_rmdir_error_output(self) -> None: # TODO: quotes?..
self.proto.lineReceived(f"rmdir {NONEXISTEN_FILE:s}\n".encode())
self.assertEqual(self.tr.value(),
f"rmdir: failed to remove `{NONEXISTEN_FILE:s}': No such file or directory\n".encode() + PROMPT)
self.assertEqual(
self.tr.value(),
f"rmdir: failed to remove `{NONEXISTEN_FILE:s}': No such file or directory\n".encode()
+ PROMPT,
)
def test_pwd_output(self) -> None:
self.proto.lineReceived(b"pwd\n")

View File

@ -50,35 +50,39 @@ class ShellChmodCommandTests(unittest.TestCase):
self.proto.lineReceived(b"chmod +x")
self.assertEqual(
self.tr.value(),
b"chmod: missing operand after \xe2\x80\x98+x\xe2\x80\x99\n" + TRY_CHMOD_HELP_MSG + PROMPT
b"chmod: missing operand after \xe2\x80\x98+x\xe2\x80\x99\n"
+ TRY_CHMOD_HELP_MSG
+ PROMPT,
)
def test_chmod_command_004(self) -> None:
self.proto.lineReceived(b"chmod -A")
self.assertEqual(
self.tr.value(),
b"chmod: invalid option -- 'A'\n" + TRY_CHMOD_HELP_MSG + PROMPT
b"chmod: invalid option -- 'A'\n" + TRY_CHMOD_HELP_MSG + PROMPT,
)
def test_chmod_command_005(self) -> None:
self.proto.lineReceived(b"chmod --A")
self.assertEqual(
self.tr.value(),
b"chmod: unrecognized option '--A'\n" + TRY_CHMOD_HELP_MSG + PROMPT
b"chmod: unrecognized option '--A'\n" + TRY_CHMOD_HELP_MSG + PROMPT,
)
def test_chmod_command_006(self) -> None:
self.proto.lineReceived(b"chmod -x abcd")
self.assertEqual(
self.tr.value(),
b"chmod: cannot access 'abcd': No such file or directory\n" + PROMPT
b"chmod: cannot access 'abcd': No such file or directory\n" + PROMPT,
)
def test_chmod_command_007(self) -> None:
self.proto.lineReceived(b"chmod abcd efgh")
self.assertEqual(
self.tr.value(),
b"chmod: invalid mode: \xe2\x80\x98abcd\xe2\x80\x99\n" + TRY_CHMOD_HELP_MSG + PROMPT
b"chmod: invalid mode: \xe2\x80\x98abcd\xe2\x80\x99\n"
+ TRY_CHMOD_HELP_MSG
+ PROMPT,
)
def test_chmod_command_008(self) -> None:

View File

@ -34,7 +34,7 @@ class ShellEchoCommandTests(unittest.TestCase):
self.tr.clear()
def test_echo_command_001(self) -> None:
self.proto.lineReceived(b"echo \"test\"\n")
self.proto.lineReceived(b'echo "test"\n')
self.assertEqual(self.tr.value(), b"test\n" + PROMPT)
def test_echo_command_002(self) -> None:
@ -42,7 +42,7 @@ class ShellEchoCommandTests(unittest.TestCase):
self.assertEqual(self.tr.value(), b"test test\n" + PROMPT)
def test_echo_command_003(self) -> None:
self.proto.lineReceived(b"echo -n \"test test\"\n")
self.proto.lineReceived(b'echo -n "test test"\n')
self.assertEqual(self.tr.value(), b"test test" + PROMPT)
def test_echo_command_005(self) -> None:
@ -50,7 +50,7 @@ class ShellEchoCommandTests(unittest.TestCase):
self.assertEqual(self.tr.value(), b"test\n" + PROMPT)
def test_echo_command_006(self) -> None:
self.proto.lineReceived(b"echo \"\\n\"\n")
self.proto.lineReceived(b'echo "\\n"\n')
self.assertEqual(self.tr.value(), b"\\n\n" + PROMPT)
def test_echo_command_007(self) -> None:
@ -78,7 +78,7 @@ class ShellEchoCommandTests(unittest.TestCase):
self.assertEqual(self.tr.value(), b"test\n" + PROMPT)
def test_echo_command_013(self) -> None:
self.proto.lineReceived(b"echo \"ls\"\"ls\"")
self.proto.lineReceived(b'echo "ls""ls"')
self.assertEqual(self.tr.value(), b"lsls\n" + PROMPT)
def test_echo_command_014(self) -> None:
@ -90,7 +90,7 @@ class ShellEchoCommandTests(unittest.TestCase):
self.assertEqual(self.tr.value(), b"'ls'\n" + PROMPT)
def test_echo_command_016(self) -> None:
self.proto.lineReceived(b"echo -e \"\x6b\x61\x6d\x69\"")
self.proto.lineReceived(b'echo -e "\x6b\x61\x6d\x69"')
self.assertEqual(self.tr.value(), b"kami\n" + PROMPT)
def test_echo_command_017(self) -> None:
@ -130,5 +130,14 @@ class ShellEchoCommandTests(unittest.TestCase):
self.assertEqual(self.tr.value(), b"test_test_test_test_test\n" + PROMPT)
def test_echo_command_026(self) -> None:
self.proto.lineReceived(b"echo \"TEST1: `echo test1`, TEST2: `echo test2`\"")
self.proto.lineReceived(b'echo "TEST1: `echo test1`, TEST2: `echo test2`"')
self.assertEqual(self.tr.value(), b"TEST1: test1, TEST2: test2\n" + PROMPT)
def test_echo_command_027(self) -> None:
self.proto.lineReceived(b"echo $LOGNAME")
self.assertEqual(self.tr.value(), b"root\n" + PROMPT)
def test_echo_command_028(self) -> None:
self.proto.lineReceived(b"echo ${LOGNAME}")
self.assertEqual(self.tr.value(), b"root\n" + PROMPT)

View File

@ -35,16 +35,18 @@ class ShellFtpGetCommandTests(unittest.TestCase):
self.tr.clear()
def test_help_command(self) -> None:
usage = b"BusyBox v1.20.2 (2016-06-22 15:12:53 EDT) multi-call binary.\n" \
b"\n" \
b"Usage: ftpget [OPTIONS] HOST [LOCAL_FILE] REMOTE_FILE\n" \
b"\n" \
b"Download a file via FTP\n" \
b"\n" \
b" -c Continue previous transfer\n" \
b" -v Verbose\n" \
b" -u USER Username\n" \
b" -p PASS Password\n" \
b" -P NUM Port\n\n"
usage = (
b"BusyBox v1.20.2 (2016-06-22 15:12:53 EDT) multi-call binary.\n"
b"\n"
b"Usage: ftpget [OPTIONS] HOST [LOCAL_FILE] REMOTE_FILE\n"
b"\n"
b"Download a file via FTP\n"
b"\n"
b" -c Continue previous transfer\n"
b" -v Verbose\n"
b" -u USER Username\n"
b" -p PASS Password\n"
b" -P NUM Port\n\n"
)
self.proto.lineReceived(b"ftpget\n")
self.assertEqual(self.tr.value(), usage + PROMPT)

View File

@ -1,5 +1,4 @@
# -*- test-case-name: Cowrie Proxy Test Cases -*-
#mypy: ignore # noqa
# Copyright (c) 2019 Guilherme Borges
# See LICENSE for details.

View File

@ -42,7 +42,9 @@ class ShellTeeCommandTests(unittest.TestCase):
def test_tee_command_002(self) -> None:
self.proto.lineReceived(b"tee /a/b/c/d\n")
self.proto.handle_CTRL_C()
self.assertEqual(self.tr.value(), b"tee: /a/b/c/d: No such file or directory\n^C\n" + PROMPT)
self.assertEqual(
self.tr.value(), b"tee: /a/b/c/d: No such file or directory\n^C\n" + PROMPT
)
def test_tee_command_003(self) -> None:
self.proto.lineReceived(b"tee a\n")

View File

@ -31,5 +31,7 @@ class ShellTftpCommandTests(unittest.TestCase):
def test_echo_command_001(self) -> None:
self.proto.lineReceived(b"tftp\n")
self.assertEqual(
self.tr.value(), b"usage: tftp [-h] [-c C C] [-l L] [-g G] [-p P] [-r R] [hostname]\n" + PROMPT
self.tr.value(),
b"usage: tftp [-h] [-c C C] [-l L] [-g G] [-p P] [-r R] [hostname]\n"
+ PROMPT,
)

View File

@ -38,7 +38,7 @@ class ShellUniqCommandTests(unittest.TestCase):
self.assertEqual(self.tr.value(), b"test\n" + PROMPT)
def test_uniq_command_002(self) -> None:
self.proto.lineReceived(b"echo -e \"test\ntest\ntest\" | uniq\n")
self.proto.lineReceived(b'echo -e "test\ntest\ntest" | uniq\n')
self.assertEqual(self.tr.value(), b"test\n" + PROMPT)
def test_uniq_command_003(self) -> None:

View File

@ -4,7 +4,11 @@ import configparser
import unittest
from io import StringIO
from cowrie.core.utils import create_endpoint_services, durationHuman, get_endpoints_from_section
from cowrie.core.utils import (
create_endpoint_services,
durationHuman,
get_endpoints_from_section,
)
from twisted.application.service import MultiService
from twisted.internet import protocol
@ -32,33 +36,28 @@ class UtilsTestCase(unittest.TestCase):
self.assertEqual(something, "4.0 days 05:07:00")
def test_get_endpoints_from_section(self) -> None:
cfg = get_config(
"[ssh]\n"
"listen_addr = 1.1.1.1\n"
cfg = get_config("[ssh]\n" "listen_addr = 1.1.1.1\n")
self.assertEqual(
["tcp:2223:interface=1.1.1.1"], get_endpoints_from_section(cfg, "ssh", 2223)
)
cfg = get_config("[ssh]\n" "listen_addr = 1.1.1.1\n")
self.assertEqual(
["tcp:2224:interface=1.1.1.1"], get_endpoints_from_section(cfg, "ssh", 2224)
)
cfg = get_config("[ssh]\n" "listen_addr = 1.1.1.1 2.2.2.2\n")
self.assertEqual(
["tcp:2223:interface=1.1.1.1", "tcp:2223:interface=2.2.2.2"],
get_endpoints_from_section(cfg, "ssh", 2223),
)
self.assertEqual(["tcp:2223:interface=1.1.1.1"], get_endpoints_from_section(cfg, "ssh", 2223))
cfg = get_config(
"[ssh]\n"
"listen_addr = 1.1.1.1\n"
)
self.assertEqual(["tcp:2224:interface=1.1.1.1"], get_endpoints_from_section(cfg, "ssh", 2224))
cfg = get_config(
"[ssh]\n"
"listen_addr = 1.1.1.1 2.2.2.2\n"
"[ssh]\n" "listen_addr = 1.1.1.1 2.2.2.2\n" "listen_port = 23\n"
)
self.assertEqual(
["tcp:2223:interface=1.1.1.1", "tcp:2223:interface=2.2.2.2"], get_endpoints_from_section(cfg, "ssh", 2223)
)
cfg = get_config(
"[ssh]\n"
"listen_addr = 1.1.1.1 2.2.2.2\n"
"listen_port = 23\n"
)
self.assertEqual(
["tcp:23:interface=1.1.1.1", "tcp:23:interface=2.2.2.2"], get_endpoints_from_section(cfg, "ssh", 2223)
["tcp:23:interface=1.1.1.1", "tcp:23:interface=2.2.2.2"],
get_endpoints_from_section(cfg, "ssh", 2223),
)
cfg = get_config(
@ -66,20 +65,28 @@ class UtilsTestCase(unittest.TestCase):
"listen_endpoints = tcp:23:interface=1.1.1.1 tcp:2323:interface=1.1.1.1\n"
)
self.assertEqual(
["tcp:23:interface=1.1.1.1", "tcp:2323:interface=1.1.1.1"], get_endpoints_from_section(cfg, "ssh", 2223)
["tcp:23:interface=1.1.1.1", "tcp:2323:interface=1.1.1.1"],
get_endpoints_from_section(cfg, "ssh", 2223),
)
def test_create_endpoint_services(self) -> None:
parent = MultiService()
create_endpoint_services(reactor, parent, ["tcp:23:interface=1.1.1.1"], protocol.Factory())
self.assertEqual(len(parent.services), 1)
parent = MultiService()
create_endpoint_services(reactor, parent, ["tcp:23:interface=1.1.1.1"], protocol.Factory())
create_endpoint_services(
reactor, parent, ["tcp:23:interface=1.1.1.1"], protocol.Factory()
)
self.assertEqual(len(parent.services), 1)
parent = MultiService()
create_endpoint_services(
reactor, parent, ["tcp:23:interface=1.1.1.1", "tcp:2323:interface=2.2.2.2"], protocol.Factory()
reactor, parent, ["tcp:23:interface=1.1.1.1"], protocol.Factory()
)
self.assertEqual(len(parent.services), 1)
parent = MultiService()
create_endpoint_services(
reactor,
parent,
["tcp:23:interface=1.1.1.1", "tcp:2323:interface=2.2.2.2"],
protocol.Factory(),
)
self.assertEqual(len(parent.services), 2)

View File

@ -67,3 +67,10 @@ commands =
- pytype --keep-going --jobs auto
- pyre --noninteractive analyze
basepython = python3.10
[testenv:coverage-report]
deps = coverage
skip_install = true
commands =
coverage combine
coverage report