upgrade to python3.6+ syntax using `pyupgrade --py36-plus` (#1502)

This commit is contained in:
Michel Oosterhof 2021-03-01 11:01:03 +08:00 committed by GitHub
parent 04487d3310
commit 63bdbdd520
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
140 changed files with 444 additions and 597 deletions

View File

@ -48,7 +48,7 @@ def create_guest(connection, mac_address, guest_unique_id):
snapshot_path = os.getcwd()
# create a disk snapshot to be used by the guest
disk_img = os.path.join(snapshot_path, 'snapshot-{0}-{1}.qcow2'.format(version_tag, guest_unique_id))
disk_img = os.path.join(snapshot_path, f'snapshot-{version_tag}-{guest_unique_id}.qcow2')
if not backend_pool.libvirt.snapshot_handler.create_disk_snapshot(base_image, disk_img):
log.msg(eventid='cowrie.backend_pool.guest_handler',

View File

@ -47,7 +47,7 @@ class PoolServer(Protocol):
recv = struct.unpack('!H', data[1:3])
ip_len = recv[0]
recv = struct.unpack('!{0}s'.format(ip_len), data[3:])
recv = struct.unpack(f'!{ip_len}s', data[3:])
attacker_ip = recv[0].decode()
log.msg(eventid='cowrie.backend_pool.server',
@ -69,11 +69,11 @@ class PoolServer(Protocol):
nat_ssh_port, nat_telnet_port = self.factory.nat.request_binding(guest_id, guest_ip,
ssh_port, telnet_port)
fmt = '!cIIH{0}sHHH{1}s'.format(len(self.nat_public_ip), len(guest_snapshot))
fmt = '!cIIH{}sHHH{}s'.format(len(self.nat_public_ip), len(guest_snapshot))
response = struct.pack(fmt, b'r', 0, guest_id, len(self.nat_public_ip), self.nat_public_ip.encode(),
nat_ssh_port, nat_telnet_port, len(guest_snapshot), guest_snapshot.encode())
else:
fmt = '!cIIH{0}sHHH{1}s'.format(len(guest_ip), len(guest_snapshot))
fmt = '!cIIH{}sHHH{}s'.format(len(guest_ip), len(guest_snapshot))
response = struct.pack(fmt, b'r', 0, guest_id, len(guest_ip), guest_ip.encode(),
ssh_port, telnet_port, len(guest_snapshot), guest_snapshot.encode())
except NoAvailableVMs:

View File

@ -3,20 +3,20 @@ from twisted.internet import defer, protocol, reactor
# object is added for Python 2.7 compatibility (#1198) - as is super with args
class PasswordAuth(userauth.SSHUserAuthClient, object):
class PasswordAuth(userauth.SSHUserAuthClient):
def __init__(self, user, password, conn):
super(PasswordAuth, self).__init__(user, conn)
super().__init__(user, conn)
self.password = password
def getPassword(self, prompt=None):
return defer.succeed(self.password)
class CommandChannel(channel.SSHChannel, object):
class CommandChannel(channel.SSHChannel):
name = 'session'
def __init__(self, command, done_deferred, callback, *args, **kwargs):
super(CommandChannel, self).__init__(*args, **kwargs)
super().__init__(*args, **kwargs)
self.command = command
self.done_deferred = done_deferred
self.callback = callback
@ -41,9 +41,9 @@ class CommandChannel(channel.SSHChannel, object):
self.callback(self.data)
class ClientConnection(connection.SSHConnection, object):
class ClientConnection(connection.SSHConnection):
def __init__(self, cmd, done_deferred, callback):
super(ClientConnection, self).__init__()
super().__init__()
self.command = cmd
self.done_deferred = done_deferred
self.callback = callback

View File

@ -20,7 +20,7 @@ def nmap_port(guest_ip, port):
def read_file(file_name):
with open(file_name, 'r') as file:
with open(file_name) as file:
return file.read()

View File

@ -1,7 +1,6 @@
# Copyright (c) 2010 Upi Tamminen <desaster@gmail.com>
# See the COPYRIGHT file for more information
from __future__ import absolute_import, division
import random

View File

@ -1,7 +1,6 @@
# Copyright (c) 2009 Upi Tamminen <desaster@gmail.com>
# See the COPYRIGHT file for more information
from __future__ import absolute_import, division
import random
import re
@ -14,12 +13,12 @@ from cowrie.shell.command import HoneyPotCommand
commands = {}
class command_faked_package_class_factory(object):
class command_faked_package_class_factory:
@staticmethod
def getCommand(name):
class command_faked_installation(HoneyPotCommand):
def call(self):
self.write("{}: Segmentation fault\n".format(name))
self.write(f"{name}: Segmentation fault\n")
return command_faked_installation
@ -122,7 +121,7 @@ pages for more information and options.
packages = {}
for y in [re.sub('[^A-Za-z0-9]', '', x) for x in self.args[1:]]:
packages[y] = {
'version': '{0}.{1}-{2}'.format(random.choice([0, 1]), random.randint(1, 40), random.randint(1, 10)),
'version': '{}.{}-{}'.format(random.choice([0, 1]), random.randint(1, 40), random.randint(1, 10)),
'size': random.randint(100, 900)
}
totalsize = sum([packages[x]['size'] for x in packages])
@ -134,7 +133,7 @@ pages for more information and options.
self.write(' %s ' % ' '.join(packages) + '\n')
self.write('0 upgraded, %d newly installed, 0 to remove and 259 not upgraded.\n' % len(packages))
self.write('Need to get %s.2kB of archives.\n' % (totalsize))
self.write('After this operation, %skB of additional disk space will be used.\n' % (totalsize * 2.2,))
self.write('After this operation, {}kB of additional disk space will be used.\n'.format(totalsize * 2.2))
i = 1
for p in packages:
self.write('Get:%d http://ftp.debian.org stable/main %s %s [%s.2kB]\n' %
@ -148,12 +147,12 @@ pages for more information and options.
self.write('(Reading database ... 177887 files and directories currently installed.)\n')
yield self.sleep(1, 2)
for p in packages:
self.write('Unpacking %s (from .../archives/%s_%s_i386.deb) ...\n' % (p, p, packages[p]['version']))
self.write('Unpacking {} (from .../archives/{}_{}_i386.deb) ...\n'.format(p, p, packages[p]['version']))
yield self.sleep(1, 2)
self.write('Processing triggers for man-db ...\n')
yield self.sleep(2)
for p in packages:
self.write('Setting up %s (%s) ...\n' % (p, packages[p]['version']))
self.write('Setting up {} ({}) ...\n'.format(p, packages[p]['version']))
self.fs.mkfile('/usr/bin/%s' % p, 0, 0, random.randint(10000, 90000), 33188)
self.protocol.commands['/usr/bin/%s' % p] = \
command_faked_package_class_factory.getCommand(p)

View File

@ -8,7 +8,6 @@ awk command
limited implementation that only supports `print` command.
"""
from __future__ import absolute_import, division
import getopt
import re
@ -72,7 +71,7 @@ class command_awk(HoneyPotCommand):
pname = self.fs.resolve_path(arg, self.protocol.cwd)
if self.fs.isdir(pname):
self.errorWrite("awk: {}: Is a directory\n".format(arg))
self.errorWrite(f"awk: {arg}: Is a directory\n")
continue
try:
@ -82,7 +81,7 @@ class command_awk(HoneyPotCommand):
else:
raise FileNotFound
except FileNotFound:
self.errorWrite("awk: {}: No such file or directory\n".format(arg))
self.errorWrite(f"awk: {arg}: No such file or directory\n")
else:
self.output(self.input_data)

View File

@ -3,7 +3,6 @@
# coding=utf-8
from __future__ import absolute_import, division
import codecs
import datetime
@ -25,7 +24,7 @@ commands = {}
class command_whoami(HoneyPotCommand):
def call(self):
self.write('{0}\n'.format(self.protocol.user.username))
self.write(f'{self.protocol.user.username}\n')
commands['/usr/bin/whoami'] = command_whoami
@ -228,7 +227,7 @@ class command_hostname(HoneyPotCommand):
else:
self.write("hostname: you must be root to change the host name\n")
else:
self.write('{0}\n'.format(self.protocol.hostname))
self.write(f'{self.protocol.hostname}\n')
commands['/bin/hostname'] = command_hostname
@ -460,7 +459,7 @@ class command_ps(HoneyPotCommand):
s = ''.join([output[i][x] for x in line])
if 'w' not in args:
s = s[:(int(self.environ['COLUMNS']) if 'COLUMNS' in self.environ else 80)]
self.write('{0}\n'.format(s))
self.write(f'{s}\n')
commands['/bin/ps'] = command_ps
@ -532,19 +531,19 @@ class command_shutdown(HoneyPotCommand):
"** the \"time\" argument is mandatory! (try \"now\") **",
)
for line in output:
self.write('{0}\n'.format(line))
self.write(f'{line}\n')
self.exit()
elif len(self.args) > 1 and self.args[0].strip().count('-h') \
and self.args[1].strip().count('now'):
self.write('\n')
self.write('Broadcast message from root@{} (pts/0) ({}):\n'.format(self.protocol.hostname, time.ctime()))
self.write(f'Broadcast message from root@{self.protocol.hostname} (pts/0) ({time.ctime()}):\n')
self.write('\n')
self.write('The system is going down for maintenance NOW!\n')
reactor.callLater(3, self.finish)
elif len(self.args) > 1 and self.args[0].strip().count('-r') \
and self.args[1].strip().count('now'):
self.write('\n')
self.write('Broadcast message from root@{} (pts/0) ({}):\n'.format(self.protocol.hostname, time.ctime()))
self.write(f'Broadcast message from root@{self.protocol.hostname} (pts/0) ({time.ctime()}):\n')
self.write('\n')
self.write('The system is going down for reboot NOW!\n')
reactor.callLater(3, self.finish)
@ -569,7 +568,7 @@ class command_reboot(HoneyPotCommand):
def start(self):
self.write('\n')
self.write('Broadcast message from root@{} (pts/0) ({}):\n\n'.format(self.protocol.hostname, time.ctime()))
self.write(f'Broadcast message from root@{self.protocol.hostname} (pts/0) ({time.ctime()}):\n\n')
self.write('The system is going down for reboot NOW!\n')
reactor.callLater(3, self.finish)
@ -592,7 +591,7 @@ class command_history(HoneyPotCommand):
return
count = 1
for line in self.protocol.historyLines:
self.write(' %s %s\n' % (str(count).rjust(4), line))
self.write(' {} {}\n'.format(str(count).rjust(4), line))
count += 1
except Exception:
# Non-interactive shell, do nothing
@ -606,7 +605,7 @@ class command_date(HoneyPotCommand):
def call(self):
time = datetime.datetime.utcnow()
self.write('{0}\n'.format(time.strftime("%a %b %d %H:%M:%S UTC %Y")))
self.write('{}\n'.format(time.strftime("%a %b %d %H:%M:%S UTC %Y")))
commands['/bin/date'] = command_date
@ -620,7 +619,7 @@ class command_yes(HoneyPotCommand):
def y(self):
if len(self.args):
self.write("{0}\n".format(' '.join(self.args, '\n')))
self.write("{}\n".format(' '.join(self.args, '\n')))
else:
self.write('y\n')
self.scheduled = reactor.callLater(0.01, self.y)
@ -681,7 +680,7 @@ class command_php(HoneyPotCommand):
'Copyright (c) 1997-2010 The PHP Group'
)
for line in output:
self.write('{0}\n'.format(line))
self.write(f'{line}\n')
self.exit()
elif self.args[0] == '-h':
output = (
@ -725,7 +724,7 @@ class command_php(HoneyPotCommand):
''
)
for line in output:
self.write('{0}\n'.format(line))
self.write(f'{line}\n')
self.exit()
else:
self.exit()
@ -767,7 +766,7 @@ class command_set(HoneyPotCommand):
# With enhancements it should work like env when -o posix is used
def call(self):
for i in sorted(list(self.environ.keys())):
self.write('{0}={1}\n'.format(i, self.environ[i]))
self.write('{}={}\n'.format(i, self.environ[i]))
commands['set'] = command_set

View File

@ -1,5 +1,3 @@
from __future__ import absolute_import, division
import getopt
from twisted.python import log

View File

@ -1,5 +1,3 @@
from __future__ import absolute_import, division
from twisted.python import log
from cowrie.shell.command import HoneyPotCommand
@ -57,7 +55,7 @@ class command_busybox(HoneyPotCommand):
def help(self):
for ln in busybox_help:
self.errorWrite('{0}\n'.format(ln))
self.errorWrite(f'{ln}\n')
def call(self):
if len(self.args) == 0:
@ -85,7 +83,7 @@ class command_busybox(HoneyPotCommand):
if self.input_data:
self.write(self.input_data)
else:
self.write('{}: applet not found\n'.format(cmd))
self.write(f'{cmd}: applet not found\n')
commands['/bin/busybox'] = command_busybox

View File

@ -6,7 +6,6 @@ cat command
"""
from __future__ import absolute_import, division
import getopt
@ -30,7 +29,7 @@ class command_cat(HoneyPotCommand):
try:
optlist, args = getopt.gnu_getopt(self.args, 'AbeEnstTuv', ['help', 'number', 'version'])
except getopt.GetoptError as err:
self.errorWrite("cat: invalid option -- '{}'\nTry 'cat --help' for more information.\n".format(err.opt))
self.errorWrite(f"cat: invalid option -- '{err.opt}'\nTry 'cat --help' for more information.\n")
self.exit()
return
@ -51,7 +50,7 @@ class command_cat(HoneyPotCommand):
pname = self.fs.resolve_path(arg, self.protocol.cwd)
if self.fs.isdir(pname):
self.errorWrite('cat: {}: Is a directory\n'.format(arg))
self.errorWrite(f'cat: {arg}: Is a directory\n')
continue
try:
@ -61,7 +60,7 @@ class command_cat(HoneyPotCommand):
else:
raise FileNotFound
except FileNotFound:
self.errorWrite('cat: {}: No such file or directory\n'.format(arg))
self.errorWrite(f'cat: {arg}: No such file or directory\n')
self.exit()
elif self.input_data is not None:
self.output(self.input_data)
@ -86,7 +85,7 @@ class command_cat(HoneyPotCommand):
lines.pop()
for line in lines:
if self.number:
self.write('{:>6} '.format(self.linenumber))
self.write(f'{self.linenumber:>6} ')
self.linenumber = self.linenumber + 1
self.writeBytes(line + b'\n')

View File

@ -1,7 +1,6 @@
# Copyright (c) 2020 Peter Sufliarsky <sufliarskyp@gmail.com>
# See the COPYRIGHT file for more information
from __future__ import absolute_import, division
import getopt
import re
@ -69,12 +68,12 @@ class command_chmod(HoneyPotCommand):
self.write('chmod: missing operand\n' + TRY_CHMOD_HELP_MSG)
return
if mode and not files:
self.write('chmod: missing operand after {}\n'.format(mode) + TRY_CHMOD_HELP_MSG)
self.write(f'chmod: missing operand after {mode}\n' + TRY_CHMOD_HELP_MSG)
return
# mode has to match the regex
if not re.fullmatch(MODE_REGEX, mode):
self.write('chmod: invalid mode: {}\n'.format(mode) + TRY_CHMOD_HELP_MSG)
self.write(f'chmod: invalid mode: {mode}\n' + TRY_CHMOD_HELP_MSG)
return
# go through the list of files and check whether they exist
@ -87,7 +86,7 @@ class command_chmod(HoneyPotCommand):
else:
path = self.fs.resolve_path(file, self.protocol.cwd)
if not self.fs.exists(path):
self.write('chmod: cannot access \'{}\': No such file or directory\n'.format(file))
self.write(f'chmod: cannot access \'{file}\': No such file or directory\n')
def parse_args(self):
mode = None
@ -109,9 +108,9 @@ class command_chmod(HoneyPotCommand):
except getopt.GetoptError as err:
failed_opt = err.msg.split(' ')[1]
if failed_opt.startswith("--"):
self.errorWrite("chmod: unrecognized option '--{}'\n".format(err.opt) + TRY_CHMOD_HELP_MSG)
self.errorWrite(f"chmod: unrecognized option '--{err.opt}'\n" + TRY_CHMOD_HELP_MSG)
else:
self.errorWrite("chmod: invalid option -- '{}'\n".format(err.opt) + TRY_CHMOD_HELP_MSG)
self.errorWrite(f"chmod: invalid option -- '{err.opt}'\n" + TRY_CHMOD_HELP_MSG)
return [], None, [], True
# if mode was not found before, use the first arg as mode

View File

@ -6,7 +6,6 @@
This module contains the chpasswd commnad
"""
from __future__ import absolute_import, division
import getopt
@ -43,7 +42,7 @@ class command_chpasswd(HoneyPotCommand):
if len(line):
u, p = line.split(b':')
if not len(p):
self.write('chpasswd: line {}: missing new password\n'.format(c))
self.write(f'chpasswd: line {c}: missing new password\n')
else:
"""
TODO:
@ -54,7 +53,7 @@ class command_chpasswd(HoneyPotCommand):
pass
c += 1
except Exception:
self.write('chpasswd: line {}: missing new password\n'.format(c))
self.write(f'chpasswd: line {c}: missing new password\n')
def start(self):
try:
@ -73,7 +72,7 @@ class command_chpasswd(HoneyPotCommand):
return
elif o in "-c":
if args not in ["NONE", "DES", "MD5", "SHA256", "SHA512"]:
self.errorWrite("chpasswd: unsupported crypt method: {}\n".format(a))
self.errorWrite(f"chpasswd: unsupported crypt method: {a}\n")
self.help()
self.exit()

View File

@ -6,7 +6,6 @@
This module contains the crontab commnad
"""
from __future__ import absolute_import, division
import getopt
@ -36,7 +35,7 @@ class command_crontab(HoneyPotCommand):
try:
opts, args = getopt.getopt(self.args, 'u:elri')
except getopt.GetoptError as err:
self.write("crontab: invalid option -- \'{0}\'\n".format(err.opt))
self.write(f"crontab: invalid option -- \'{err.opt}\'\n")
self.write("crontab: usage error: unrecognized option\n")
self.help()
self.exit()
@ -52,11 +51,11 @@ class command_crontab(HoneyPotCommand):
opt = o
if opt == "-e":
self.write("must be privileged to use {0}\n".format(opt))
self.write(f"must be privileged to use {opt}\n")
self.exit()
return
elif opt in ["-l", "-r", "-i"]:
self.write("no crontab for {0}\n".format(user))
self.write(f"no crontab for {user}\n")
self.exit()
return

View File

@ -1,7 +1,6 @@
# Copyright (c) 2009 Upi Tamminen <desaster@gmail.com>
# See the COPYRIGHT file for more information
from __future__ import absolute_import, division
import getopt
import os
@ -186,7 +185,7 @@ class command_curl(HoneyPotCommand):
optlist, args = getopt.getopt(self.args, 'sho:O', ['help', 'manual', 'silent'])
except getopt.GetoptError as err:
# TODO: should be 'unknown' instead of 'not recognized'
self.write("curl: {}\n".format(err))
self.write(f"curl: {err}\n")
self.write("curl: try 'curl --help' or 'curl --manual' for more information\n")
self.exit()
return
@ -252,7 +251,7 @@ class command_curl(HoneyPotCommand):
if scheme != b'http' and scheme != b'https':
raise NotImplementedError
except Exception:
self.errorWrite('curl: (1) Protocol "{}" not supported or disabled in libcurl\n'.format(scheme))
self.errorWrite(f'curl: (1) Protocol "{scheme}" not supported or disabled in libcurl\n')
self.exit()
return None

View File

@ -5,7 +5,6 @@
dd commands
"""
from __future__ import absolute_import, division
import re
@ -30,11 +29,11 @@ class command_dd(HoneyPotCommand):
for arg in self.args:
if arg.find('=') == -1:
self.write('unknown operand: {}'.format(arg))
self.write(f'unknown operand: {arg}')
HoneyPotCommand.exit(self)
operand, value = arg.split('=')
if operand not in ('if', 'bs', 'of', 'count'):
self.write('unknown operand: {}'.format(operand))
self.write(f'unknown operand: {operand}')
self.exit(success=False)
self.ddargs[operand] = value
@ -48,21 +47,21 @@ class command_dd(HoneyPotCommand):
iname = self.ddargs['if']
pname = self.fs.resolve_path(iname, self.protocol.cwd)
if self.fs.isdir(pname):
self.errorWrite('dd: {}: Is a directory\n'.format(iname))
self.errorWrite(f'dd: {iname}: Is a directory\n')
bSuccess = False
if bSuccess:
if 'bs' in self.ddargs:
block = parse_size(self.ddargs['bs'])
if block <= 0:
self.errorWrite('dd: invalid number \'{}\'\n'.format(block))
self.errorWrite(f'dd: invalid number \'{block}\'\n')
bSuccess = False
if bSuccess:
if 'count' in self.ddargs:
c = int(self.ddargs['count'])
if c < 0:
self.errorWrite('dd: invalid number \'{}\'\n'.format(c))
self.errorWrite(f'dd: invalid number \'{c}\'\n')
bSuccess = False
if bSuccess:
@ -78,7 +77,7 @@ class command_dd(HoneyPotCommand):
else:
self.writeBytes(data)
except FileNotFound:
self.errorWrite('dd: {}: No such file or directory\n'.format(iname))
self.errorWrite(f'dd: {iname}: No such file or directory\n')
bSuccess = False
self.exit(success=bSuccess)

View File

@ -1,8 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2018 Danilo Vargas <danilo.vargas@csiete.org>
# See the COPYRIGHT file for more information
from __future__ import absolute_import, division
import os
@ -112,7 +110,7 @@ or available locally via: info '(coreutils) du invocation'\n"""
files = (self.protocol.fs.getfile(path)[:],)
except Exception:
self.write(
'ls: cannot access %s: No such file or directory\n' % (path,))
f'ls: cannot access {path}: No such file or directory\n')
return
filenames = [x[A_NAME] for x in files]
@ -122,10 +120,10 @@ or available locally via: info '(coreutils) du invocation'\n"""
if all:
isdir = self.protocol.fs.isdir(os.path.join(path, filename))
if isdir:
filename = "4 ./{0}\n".format(filename)
filename = f"4 ./{filename}\n"
self.write(filename)
else:
filename = "4 {0}\n".format(filename)
filename = f"4 {filename}\n"
self.write(filename)
if all:
self.write("36 .\n")

View File

@ -1,5 +1,3 @@
from __future__ import absolute_import, division
from cowrie.shell.command import HoneyPotCommand
commands = {}
@ -31,7 +29,7 @@ class command_env(HoneyPotCommand):
def call(self):
# This only show environ vars, not the shell vars. Need just to mimic real systems
for i in list(self.protocol.environ.keys()):
self.write('{0}={1}\n'.format(i, self.protocol.environ[i]))
self.write('{}={}\n'.format(i, self.protocol.environ[i]))
commands['/usr/bin/env'] = command_env

View File

@ -1,8 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2014 Peter Reuterås <peter@reuteras.com>
# See the COPYRIGHT file for more information
from __future__ import absolute_import, division
from cowrie.shell.command import HoneyPotCommand

View File

@ -5,7 +5,6 @@
This module ...
"""
from __future__ import absolute_import, division
import getopt
@ -81,7 +80,7 @@ class command_free(HoneyPotCommand):
"""
needed_keys = ["Buffers", "Cached", "MemTotal", "MemFree", "SwapTotal", "SwapFree", "Shmem", "MemAvailable"]
mem_info_map = {}
with open('/proc/meminfo', 'r') as proc_file:
with open('/proc/meminfo') as proc_file:
for line in proc_file:
tokens = line.split(':')

View File

@ -6,7 +6,6 @@
Filesystem related commands
"""
from __future__ import absolute_import, division
import copy
import getopt
@ -31,7 +30,7 @@ class command_grep(HoneyPotCommand):
contents = self.fs.file_contents(filename)
self.grep_application(contents, match)
except Exception:
self.errorWrite("grep: {}: No such file or directory\n".format(filename))
self.errorWrite(f"grep: {filename}: No such file or directory\n")
def grep_application(self, contents, match):
match = os.path.basename(match).replace('\"', '').encode('utf8')
@ -60,7 +59,7 @@ class command_grep(HoneyPotCommand):
try:
optlist, args = getopt.getopt(self.args, 'abcDEFGHhIiJLlmnOoPqRSsUVvwxZA:B:C:e:f:')
except getopt.GetoptError as err:
self.errorWrite("grep: invalid option -- {}\n".format(err.opt))
self.errorWrite(f"grep: invalid option -- {err.opt}\n")
self.help()
self.exit()
return
@ -104,7 +103,7 @@ class command_tail(HoneyPotCommand):
contents = self.fs.file_contents(filename)
self.tail_application(contents)
except Exception:
self.errorWrite("tail: cannot open `{}' for reading: No such file or directory\n".format(filename))
self.errorWrite(f"tail: cannot open `{filename}' for reading: No such file or directory\n")
def tail_application(self, contents):
contentsplit = contents.split(b'\n')
@ -126,7 +125,7 @@ class command_tail(HoneyPotCommand):
try:
optlist, args = getopt.getopt(self.args, 'n:')
except getopt.GetoptError as err:
self.errorWrite("tail: invalid option -- '{}'\n".format(err.opt))
self.errorWrite(f"tail: invalid option -- '{err.opt}'\n")
self.exit()
return
@ -178,7 +177,7 @@ class command_head(HoneyPotCommand):
contents = self.fs.file_contents(filename)
self.head_application(contents)
except Exception:
self.errorWrite("head: cannot open `{}' for reading: No such file or directory\n".format(filename))
self.errorWrite(f"head: cannot open `{filename}' for reading: No such file or directory\n")
def start(self):
self.n = 10
@ -188,7 +187,7 @@ class command_head(HoneyPotCommand):
try:
optlist, args = getopt.getopt(self.args, 'n:')
except getopt.GetoptError as err:
self.errorWrite("head: invalid option -- '{}'\n".format(err.opt))
self.errorWrite(f"head: invalid option -- '{err.opt}'\n")
self.exit()
return
@ -239,10 +238,10 @@ class command_cd(HoneyPotCommand):
self.errorWrite('bash: cd: OLDPWD not set\n')
return
if inode is None or inode is False:
self.errorWrite('bash: cd: {}: No such file or directory\n'.format(pname))
self.errorWrite(f'bash: cd: {pname}: No such file or directory\n')
return
if inode[fs.A_TYPE] != fs.T_DIR:
self.errorWrite('bash: cd: {}: Not a directory\n'.format(pname))
self.errorWrite(f'bash: cd: {pname}: Not a directory\n')
return
self.protocol.cwd = newpath
@ -310,7 +309,7 @@ or available locally via: info '(coreutils) rm invocation'\n"""
try:
optlist, args = getopt.gnu_getopt(self.args, 'rTfvh', ['help', 'recursive', 'force', 'verbose'])
except getopt.GetoptError as err:
self.errorWrite("rm: invalid option -- '{}'\n".format(err.opt))
self.errorWrite(f"rm: invalid option -- '{err.opt}'\n")
self.paramError()
self.exit()
return
@ -336,7 +335,7 @@ or available locally via: info '(coreutils) rm invocation'\n"""
except (IndexError, fs.FileNotFound):
if not force:
self.errorWrite(
'rm: cannot remove `{}\': No such file or directory\n'.format(f))
f'rm: cannot remove `{f}\': No such file or directory\n')
continue
basename = pname.split('/')[-1]
for i in dir[:]:
@ -386,13 +385,13 @@ class command_cp(HoneyPotCommand):
return
sources, dest = args[:-1], args[-1]
if len(sources) > 1 and not self.fs.isdir(resolv(dest)):
self.errorWrite("cp: target `{}' is not a directory\n".format(dest))
self.errorWrite(f"cp: target `{dest}' is not a directory\n")
return
if dest[-1] == '/' and not self.fs.exists(resolv(dest)) and \
not recursive:
self.errorWrite(
"cp: cannot create regular file `{}': Is a directory\n".format(dest))
f"cp: cannot create regular file `{dest}': Is a directory\n")
return
if self.fs.isdir(resolv(dest)):
@ -401,16 +400,16 @@ class command_cp(HoneyPotCommand):
isdir = False
parent = os.path.dirname(resolv(dest))
if not self.fs.exists(parent):
self.errorWrite("cp: cannot create regular file " + "`{}': No such file or directory\n".format(dest))
self.errorWrite("cp: cannot create regular file " + f"`{dest}': No such file or directory\n")
return
for src in sources:
if not self.fs.exists(resolv(src)):
self.errorWrite(
"cp: cannot stat `{}': No such file or directory\n".format(src))
f"cp: cannot stat `{src}': No such file or directory\n")
continue
if not recursive and self.fs.isdir(resolv(src)):
self.errorWrite("cp: omitting directory `{}'\n".format(src))
self.errorWrite(f"cp: omitting directory `{src}'\n")
continue
s = copy.deepcopy(self.fs.getfile(resolv(src)))
if isdir:
@ -455,12 +454,12 @@ class command_mv(HoneyPotCommand):
return
sources, dest = args[:-1], args[-1]
if len(sources) > 1 and not self.fs.isdir(resolv(dest)):
self.errorWrite("mv: target `{}' is not a directory\n".format(dest))
self.errorWrite(f"mv: target `{dest}' is not a directory\n")
return
if dest[-1] == '/' and not self.fs.exists(resolv(dest)) and len(sources) != 1:
self.errorWrite(
"mv: cannot create regular file `{}': Is a directory\n".format(dest))
f"mv: cannot create regular file `{dest}': Is a directory\n")
return
if self.fs.isdir(resolv(dest)):
@ -469,13 +468,13 @@ class command_mv(HoneyPotCommand):
isdir = False
parent = os.path.dirname(resolv(dest))
if not self.fs.exists(parent):
self.errorWrite("mv: cannot create regular file " + "`{}': No such file or directory\n".format(dest))
self.errorWrite("mv: cannot create regular file " + f"`{dest}': No such file or directory\n")
return
for src in sources:
if not self.fs.exists(resolv(src)):
self.errorWrite(
"mv: cannot stat `{}': No such file or directory\n".format(src))
f"mv: cannot stat `{src}': No such file or directory\n")
continue
s = self.fs.getfile(resolv(src))
if isdir:
@ -507,12 +506,12 @@ class command_mkdir(HoneyPotCommand):
pname = self.fs.resolve_path(f, self.protocol.cwd)
if self.fs.exists(pname):
self.errorWrite(
'mkdir: cannot create directory `{}\': File exists\n'.format(f))
f'mkdir: cannot create directory `{f}\': File exists\n')
return
try:
self.fs.mkdir(pname, 0, 0, 4096, 16877)
except (fs.FileNotFound):
self.errorWrite('mkdir: cannot create directory `{}\': No such file or directory\n'.format(f))
self.errorWrite(f'mkdir: cannot create directory `{f}\': No such file or directory\n')
return
@ -531,7 +530,7 @@ class command_rmdir(HoneyPotCommand):
try:
if len(self.fs.get_path(pname)):
self.errorWrite(
'rmdir: failed to remove `{}\': Directory not empty\n'.format(f))
f'rmdir: failed to remove `{f}\': Directory not empty\n')
continue
dir = self.fs.get_path('/'.join(pname.split('/')[:-1]))
except (IndexError, fs.FileNotFound):
@ -539,12 +538,12 @@ class command_rmdir(HoneyPotCommand):
fname = os.path.basename(f)
if not dir or fname not in [x[fs.A_NAME] for x in dir]:
self.errorWrite(
'rmdir: failed to remove `{}\': No such file or directory\n'.format(f))
f'rmdir: failed to remove `{f}\': No such file or directory\n')
continue
for i in dir[:]:
if i[fs.A_NAME] == fname:
if i[fs.A_TYPE] != fs.T_DIR:
self.errorWrite("rmdir: failed to remove '{}': Not a directory\n".format(f))
self.errorWrite(f"rmdir: failed to remove '{f}': Not a directory\n")
return
dir.remove(i)
break
@ -581,7 +580,7 @@ class command_touch(HoneyPotCommand):
pname = self.fs.resolve_path(f, self.protocol.cwd)
if not self.fs.exists(os.path.dirname(pname)):
self.errorWrite(
'touch: cannot touch `{}`: No such file or directory\n'.format(pname))
f'touch: cannot touch `{pname}`: No such file or directory\n')
return
if self.fs.exists(pname):
# FIXME: modify the timestamp here
@ -589,7 +588,7 @@ class command_touch(HoneyPotCommand):
# can't touch in special directories
if any([pname.startswith(_p) for _p in fs.SPECIAL_PATHS]):
self.errorWrite(
'touch: cannot touch `{}`: Permission denied\n'.format(pname))
f'touch: cannot touch `{pname}`: Permission denied\n')
return
self.fs.mkfile(pname, 0, 0, 0, 33188)

View File

@ -1,7 +1,5 @@
# -*- coding: utf-8 -*-
# Author: Claud Xiao
from __future__ import absolute_import, division
import ftplib
import getopt
@ -148,14 +146,14 @@ Download a file via FTP
self.url_log = 'ftp://'
if self.username:
self.url_log = '{}{}'.format(self.url_log, self.username)
self.url_log = f'{self.url_log}{self.username}'
if self.password:
self.url_log = '{}:{}'.format(self.url_log, self.password)
self.url_log = '{}@'.format(self.url_log)
self.url_log = '{}{}'.format(self.url_log, self.host)
self.url_log = f'{self.url_log}:{self.password}'
self.url_log = f'{self.url_log}@'
self.url_log = f'{self.url_log}{self.host}'
if self.port != 21:
self.url_log = '{}:{}'.format(self.url_log, self.port)
self.url_log = '{}/{}'.format(self.url_log, self.remote_path)
self.url_log = f'{self.url_log}:{self.port}'
self.url_log = f'{self.url_log}/{self.remote_path}'
self.artifactFile = Artifact(self.local_file)
@ -208,7 +206,7 @@ Download a file via FTP
try:
ftp.connect(host=self.host, port=self.port, timeout=30)
except Exception as e:
log.msg('FTP connect failed: host=%s, port=%s, err=%s' % (self.host, self.port, str(e)))
log.msg('FTP connect failed: host={}, port={}, err={}'.format(self.host, self.port, str(e)))
self.write('ftpget: can\'t connect to remote host: Connection refused\n')
return False
@ -227,7 +225,7 @@ Download a file via FTP
try:
ftp.login(user=self.username, passwd=self.password)
except Exception as e:
log.msg('FTP login failed: user=%s, passwd=%s, err=%s' % (self.username, self.password, str(e)))
log.msg('FTP login failed: user={}, passwd={}, err={}'.format(self.username, self.password, str(e)))
self.write('ftpget: unexpected server response to USER: %s\n' % str(e))
try:
ftp.quit()

View File

@ -1,6 +1,5 @@
# Copyright (c) 2013 Bas Stottelaar <basstottelaar [AT] gmail [DOT] com>
from __future__ import absolute_import, division
import getopt
import os
@ -115,7 +114,7 @@ class command_gcc(HoneyPotCommand):
if self.fs.exists(sourcefile):
input_files = input_files + 1
else:
self.write("%s: %s: No such file or directory\n" % (command_gcc.APP_NAME, value))
self.write(f"{command_gcc.APP_NAME}: {value}: No such file or directory\n")
complete = False
# To generate, or not
@ -168,7 +167,7 @@ Thread model: posix
gcc version {} (Debian {}-5)""".format(version, version_short, version_short, version_short, version, version)) # noqa: E501
# Write
self.write('{0}\n'.format(data))
self.write(f'{data}\n')
self.exit()
def generate_file(self, outfile):
@ -216,7 +215,7 @@ gcc version {} (Debian {}-5)""".format(version, version_short, version_short, ve
"""
Print missing argument message, and exit
"""
self.write("%s: argument to '%s' is missing\n" % (command_gcc.APP_NAME, arg))
self.write(f"{command_gcc.APP_NAME}: argument to '{arg}' is missing\n")
self.exit()
def help(self):

View File

@ -1,17 +1,15 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2014 Peter Reuterås <peter@reuteras.com>
# See the COPYRIGHT file for more information
from __future__ import absolute_import, division
from random import randint, randrange
from cowrie.shell.command import HoneyPotCommand
HWaddr = "%02x:%02x:%02x:%02x:%02x:%02x" % (
HWaddr = "{:02x}:{:02x}:{:02x}:{:02x}:{:02x}:{:02x}".format(
randint(0, 255), randint(0, 255), randint(0, 255), randint(0, 255), randint(0, 255), randint(0, 255))
inet6 = "fe%02x::%02x:%02xff:fe%02x:%02x01/64" % (
inet6 = "fe{:02x}::{:02x}:{:02x}ff:fe{:02x}:{:02x}01/64".format(
randint(0, 255), randrange(111, 888), randint(0, 255), randint(0, 255), randint(0, 255))
commands = {}
@ -26,7 +24,7 @@ class command_ifconfig(HoneyPotCommand):
@staticmethod
def convert_bytes_to_mx(bytes_eth0):
mb = float(bytes_eth0) / 1000 / 1000
return "{0:.1f}".format(mb)
return f"{mb:.1f}"
def calculate_rx(self):
rx_bytes = randrange(111111111, 555555555)
@ -68,7 +66,7 @@ lo Link encap:Local Loopback
self.protocol.kippoIP.rsplit('.', 1)[0], inet6, rx_packets,
tx_packets, rx_bytes_eth0, rx_mb_eth0, tx_bytes_eth0, tx_mb_eth0,
lo_bytes, lo_mb, lo_bytes, lo_mb)
self.write('{0}\n'.format(result))
self.write(f'{result}\n')
commands['/sbin/ifconfig'] = command_ifconfig

View File

@ -1,6 +1,5 @@
# Copyright (c) 2013 Bas Stottelaar <basstottelaar [AT] gmail [DOT] com>
from __future__ import absolute_import, division
import optparse
@ -204,8 +203,8 @@ class command_iptables(HoneyPotCommand):
if self.user_is_root():
# Verify table existence
if table not in list(self.tables.keys()):
self.write("""%s: can\'t initialize iptables table \'%s\': Table does not exist (do you need to insmod?)
Perhaps iptables or your kernel needs to be upgraded.\n""" % (command_iptables.APP_NAME, table))
self.write("""{}: can\'t initialize iptables table \'{}\': Table does not exist (do you need to insmod?)
Perhaps iptables or your kernel needs to be upgraded.\n""".format(command_iptables.APP_NAME, table))
self.exit()
else:
# Exists
@ -230,7 +229,7 @@ Perhaps iptables or your kernel needs to be upgraded.\n""" % (command_iptables.A
"""
Show version and exit
"""
self.write('%s %s\n' % (command_iptables.APP_NAME, command_iptables.APP_VERSION))
self.write(f'{command_iptables.APP_NAME} {command_iptables.APP_VERSION}\n')
self.exit()
def show_help(self):
@ -238,7 +237,7 @@ Perhaps iptables or your kernel needs to be upgraded.\n""" % (command_iptables.A
Show help and exit
"""
self.write("""%s %s'
self.write("""{} {}'
Usage: iptables -[AD] chain rule-specification [options]
iptables -I chain [rulenum] rule-specification [options]
@ -300,7 +299,7 @@ Options:
[!] --fragment -f match second or further fragments only
--modprobe=<command> try to insert modules using this command
--set-counters PKTS BYTES set the counter during insert/append
[!] --version -V print package version.\n""" % (command_iptables.APP_NAME, command_iptables.APP_VERSION))
[!] --version -V print package version.\n""".format(command_iptables.APP_NAME, command_iptables.APP_VERSION))
self.exit()
def list_rules(self, chain):
@ -326,7 +325,7 @@ Options:
output.append("-P %s ACCEPT" % chain)
# Done
self.write('{0}\n'.format('\n'.join(output)))
self.write('{}\n'.format('\n'.join(output)))
self.exit()
else:
self.no_permission()
@ -365,7 +364,7 @@ Options:
output.append("\n".join(chain_output))
# Done
self.write("{0}\n".format('\n\n'.join(output)))
self.write("{}\n".format('\n\n'.join(output)))
self.exit()
else:
self.no_permission()
@ -394,8 +393,10 @@ Options:
self.no_permission()
def no_permission(self):
self.write("""%s %s: can\'t initialize iptables table \'filter\': Permission denied (you must be root)
Perhaps iptables or your kernel needs to be upgraded.\n""" % (command_iptables.APP_NAME, command_iptables.APP_VERSION))
self.write("{} {}: ".format(command_iptables.APP_NAME, command_iptables.APP_VERSION) +
"can\'t initialize iptables table \'filter\': " +
"Permission denied (you must be root)\n" +
"Perhaps iptables or your kernel needs to be upgraded.\n")
self.exit()
def no_command(self):

View File

@ -1,7 +1,6 @@
# Copyright (c) 2009 Upi Tamminen <desaster@gmail.com>
# See the COPYRIGHT file for more information
from __future__ import absolute_import, division
import time

View File

@ -1,7 +1,6 @@
# Copyright (c) 2009 Upi Tamminen <desaster@gmail.com>
# See the COPYRIGHT file for more information
from __future__ import absolute_import, division
import getopt
import os.path
@ -41,7 +40,7 @@ class command_ls(HoneyPotCommand):
opts, args = getopt.gnu_getopt(self.args, '1@ABCFGHLOPRSTUWabcdefghiklmnopqrstuvwx',
['help', 'version', 'param'])
except getopt.GetoptError as err:
self.write("ls: {}\n".format(err))
self.write(f"ls: {err}\n")
self.write("Try 'ls --help' for more information.\n")
return
@ -82,7 +81,7 @@ class command_ls(HoneyPotCommand):
files = (self.protocol.fs.getfile(path)[:],)
except Exception:
self.write(
'ls: cannot access %s: No such file or directory\n' % (path,))
f'ls: cannot access {path}: No such file or directory\n')
return
return files
@ -172,7 +171,7 @@ class command_ls(HoneyPotCommand):
perms[0] = 'd'
elif file[fs.A_TYPE] == fs.T_LINK:
perms[0] = 'l'
linktarget = ' -> %s' % (file[fs.A_TARGET],)
linktarget = ' -> {}'.format(file[fs.A_TARGET])
perms = ''.join(perms)
ctime = time.localtime(file[fs.A_CTIME])
@ -186,7 +185,7 @@ class command_ls(HoneyPotCommand):
file[fs.A_NAME],
linktarget)
self.write('{0}\n'.format(line))
self.write(f'{line}\n')
commands['/bin/ls'] = command_ls

View File

@ -1,16 +1,12 @@
from __future__ import absolute_import, division
import getopt
import re
import socket
import struct
import sys
from cowrie.core.config import CowrieConfig
from cowrie.shell.command import HoneyPotCommand
if sys.version_info > (3,):
long = int
long = int
commands = {}
@ -74,7 +70,7 @@ usage: nc [-46bCDdhjklnrStUuvZz] [-I length] [-i interval] [-O length]
port = args[1]
if not re.match(r'^\d+$', port):
self.errorWrite('nc: port number invalid: {}\n'.format(port))
self.errorWrite(f'nc: port number invalid: {port}\n')
self.exit()
return

View File

@ -1,6 +1,5 @@
# Based on work by Peter Reuteras (https://bitbucket.org/reuteras/kippo/)
from __future__ import absolute_import, division
import socket
@ -69,13 +68,13 @@ Destination Gateway Genmask Flags MSS Window irtt Iface\n
destination = self.protocol.kippoIP.rsplit('.', 1)[0] + ".0"
gateway = self.protocol.kippoIP.rsplit('.', 1)[0] + ".1"
l1 = "%s%s0.0.0.0 UG 0 0 0 eth0" % \
('{:<16}'.format(default),
'{:<16}'.format(gateway))
(f'{default:<16}',
f'{gateway:<16}')
l2 = "%s%s255.255.255.0 U 0 0 0 eth0" % \
('{:<16}'.format(destination),
'{:<16}'.format(lgateway))
self.write('{0}\n'.format(l1))
self.write('{0}\n'.format(l2))
(f'{destination:<16}',
f'{lgateway:<16}')
self.write(f'{l1}\n')
self.write(f'{l2}\n')
def do_netstat_normal(self):
self.write("""Active Internet connections (w/o servers)
@ -99,7 +98,7 @@ Proto Recv-Q Send-Q Local Address Foreign Address State\n""")
(s_name, s_port, " " * (24 - len(s_name + s_port) - 1),
c_name, c_port, " " * (24 - len(c_name + c_port) - 1),
"ESTABLISHED")
self.write('{0}\n'.format(line))
self.write(f'{line}\n')
if self.show_listen or self.show_all:
self.write("tcp6 0 0 [::]:ssh [::]:* LISTEN\n")
self.write("""Active UNIX domain sockets (only servers)

View File

@ -1,8 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2014 Peter Reuterås <peter@reuteras.com>
# See the COPYRIGHT file for more information
from __future__ import absolute_import, division
from cowrie.shell.command import HoneyPotCommand

View File

@ -5,7 +5,6 @@
This module contains the perl command
"""
from __future__ import absolute_import, division
import getopt

View File

@ -1,7 +1,6 @@
# Copyright (c) 2009 Upi Tamminen <desaster@gmail.com>
# See the COPYRIGHT file for more information
from __future__ import absolute_import, division
import getopt
import hashlib
@ -33,7 +32,7 @@ class command_ping(HoneyPotCommand):
try:
optlist, args = getopt.gnu_getopt(self.args, "c:")
except getopt.GetoptError as err:
self.write('ping: %s\n' % (err,))
self.write(f'ping: {err}\n')
self.exit()
return
@ -55,7 +54,7 @@ class command_ping(HoneyPotCommand):
' [-M mtu discovery hint] [-S sndbuf]',
' [ -T timestamp option ] [ -Q tos ] [hop1 ...] destination',
):
self.write('{0}\n'.format(line))
self.write(f'{line}\n')
self.exit()
return
self.host = args[0].strip()
@ -64,14 +63,14 @@ class command_ping(HoneyPotCommand):
if self.valid_ip(self.host):
self.ip = self.host
else:
self.write('ping: unknown host %s\n' % (self.host,))
self.write(f'ping: unknown host {self.host}\n')
self.exit()
else:
s = hashlib.md5((self.host).encode("utf-8")).hexdigest()
self.ip = '.'.join([str(int(x, 16)) for x in (s[0:2], s[2:4], s[4:6], s[6:8])])
self.running = True
self.write('PING %s (%s) 56(84) bytes of data.\n' % (self.host, self.ip))
self.write(f'PING {self.host} ({self.ip}) 56(84) bytes of data.\n')
self.scheduled = reactor.callLater(0.2, self.showreply)
self.count = 0
@ -89,7 +88,7 @@ class command_ping(HoneyPotCommand):
self.scheduled = reactor.callLater(1, self.showreply)
def printstatistics(self):
self.write('--- %s ping statistics ---\n' % (self.host,))
self.write(f'--- {self.host} ping statistics ---\n')
self.write('%d packets transmitted, %d received, 0%% packet loss, time 907ms\n' % (self.count, self.count))
self.write('rtt min/avg/max/mdev = 48.264/50.352/52.441/2.100 ms\n')

View File

@ -5,7 +5,6 @@
This module contains the python commnad
"""
from __future__ import absolute_import, division
import getopt
@ -76,7 +75,7 @@ class command_python(HoneyPotCommand):
try:
opts, args = getopt.gnu_getopt(self.args, 'BdEhiORsStuvVx3c:m:Q:W:', ['help', 'version'])
except getopt.GetoptError as err:
self.write("Unknown option: -{0}\n".format(err.opt))
self.write(f"Unknown option: -{err.opt}\n")
self.write("usage: python [option] ... [-c cmd | -m mod | file | -] [arg] ... \n")
self.write("Try `python -h' for more information.\n")
self.exit()

View File

@ -26,7 +26,6 @@
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
# SUCH DAMAGE.
from __future__ import absolute_import, division
import getopt
import hashlib
@ -75,7 +74,7 @@ class command_scp(HoneyPotCommand):
outdir = self.fs.resolve_path(self.out_dir, self.protocol.cwd)
if not self.fs.exists(outdir):
self.errorWrite('-scp: {}: No such file or directory\n'.format(self.out_dir))
self.errorWrite(f'-scp: {self.out_dir}: No such file or directory\n')
self.exit()
self.write('\x00')
@ -175,7 +174,7 @@ class command_scp(HoneyPotCommand):
self.fs.mkfile(outfile, 0, 0, r.group(2), r.group(1))
except fs.FileNotFound:
# The outfile locates at a non-existing directory.
self.errorWrite('-scp: {}: No such file or directory\n'.format(outfile))
self.errorWrite(f'-scp: {outfile}: No such file or directory\n')
self.safeoutfile = None
return ''

View File

@ -5,7 +5,6 @@
This module contains the service commnad
"""
from __future__ import absolute_import, division
import getopt

View File

@ -5,7 +5,6 @@
This module contains the sleep command
"""
from __future__ import absolute_import, division
import re

View File

@ -1,7 +1,6 @@
# Copyright (c) 2009 Upi Tamminen <desaster@gmail.com>
# See the COPYRIGHT file for more information
from __future__ import absolute_import, division
import getopt
import hashlib
@ -53,7 +52,7 @@ class command_ssh(HoneyPotCommand):
return
if not len(args):
for line in OUTPUT:
self.write('{0}\n'.format(line))
self.write(f'{line}\n')
self.exit()
return
user, host = 'root', args[0]
@ -67,8 +66,8 @@ class command_ssh(HoneyPotCommand):
if self.valid_ip(host):
self.ip = host
else:
self.write('ssh: Could not resolve hostname %s: \
Name or service not known\n' % (host,))
self.write('ssh: Could not resolve hostname {}: \
Name or service not known\n'.format(host))
self.exit()
else:
s = hashlib.md5(host.encode()).hexdigest()
@ -78,8 +77,8 @@ class command_ssh(HoneyPotCommand):
self.host = host
self.user = user
self.write('The authenticity of host \'%s (%s)\' \
can\'t be established.\n' % (self.host, self.ip))
self.write('The authenticity of host \'{} ({})\' \
can\'t be established.\n'.format(self.host, self.ip))
self.write('RSA key fingerprint is \
9d:30:97:8a:9e:48:0d:de:04:8d:76:3a:7b:4b:30:f8.\n')
self.write('Are you sure you want to continue connecting (yes/no)? ')
@ -88,7 +87,7 @@ class command_ssh(HoneyPotCommand):
def yesno(self, line):
self.write('Warning: Permanently added \'{}\' (RSA) to the \
list of known hosts.\n'.format(self.host))
self.write('%s@%s\'s password: ' % (self.user, self.host))
self.write(f'{self.user}@{self.host}\'s password: ')
self.protocol.password_input = True
def wait(self, line):

View File

@ -1,5 +1,3 @@
from __future__ import absolute_import, division
import getopt
from cowrie.shell.command import HoneyPotCommand
@ -57,12 +55,12 @@ class command_sudo(HoneyPotCommand):
def short_help(self):
for ln in sudo_shorthelp:
self.errorWrite('{0}\n'.format(ln))
self.errorWrite(f'{ln}\n')
self.exit()
def long_help(self):
for ln in sudo_longhelp:
self.errorWrite('{0}\n'.format(ln))
self.errorWrite(f'{ln}\n')
self.exit()
def version(self):

View File

@ -1,7 +1,6 @@
# Copyright (c) 2009 Upi Tamminen <desaster@gmail.com>
# See the COPYRIGHT file for more information
from __future__ import absolute_import, division
import os
import tarfile
@ -64,7 +63,7 @@ class command_tar(HoneyPotCommand):
for f in t:
dest = self.fs.resolve_path(f.name.strip('/'), self.protocol.cwd)
if verbose:
self.write('{0}\n'.format(f.name))
self.write(f'{f.name}\n')
if not extract or not len(dest):
continue
if f.isdir():
@ -73,7 +72,7 @@ class command_tar(HoneyPotCommand):
self.mkfullpath(os.path.dirname(dest), f)
self.fs.mkfile(dest, 0, 0, f.size, f.mode, f.mtime)
else:
log.msg("tar: skipping [{}]".format(f.name))
log.msg(f"tar: skipping [{f.name}]")
commands['/bin/tar'] = command_tar

View File

@ -5,7 +5,6 @@ tee command
"""
from __future__ import absolute_import, division
import getopt
import os
@ -32,7 +31,7 @@ class command_tee(HoneyPotCommand):
try:
optlist, args = getopt.gnu_getopt(self.args, 'aip', ['help', 'append', 'version'])
except getopt.GetoptError as err:
self.errorWrite("tee: invalid option -- '{}'\nTry 'tee --help' for more information.\n".format(err.opt))
self.errorWrite(f"tee: invalid option -- '{err.opt}'\nTry 'tee --help' for more information.\n")
self.exit()
return
@ -50,7 +49,7 @@ class command_tee(HoneyPotCommand):
pname = self.fs.resolve_path(arg, self.protocol.cwd)
if self.fs.isdir(pname):
self.errorWrite('tee: {}: Is a directory\n'.format(arg))
self.errorWrite(f'tee: {arg}: Is a directory\n')
continue
try:
@ -65,7 +64,7 @@ class command_tee(HoneyPotCommand):
self.fs.mkfile(pname, 0, 0, 0, 0o644)
except FileNotFound:
self.errorWrite('tee: {}: No such file or directory\n'.format(arg))
self.errorWrite(f'tee: {arg}: No such file or directory\n')
if self.input_data:
self.output(self.input_data)

View File

@ -1,5 +1,3 @@
from __future__ import absolute_import, division
import tftpy
try:
@ -17,7 +15,7 @@ from cowrie.shell.customparser import CustomParser
commands = {}
class Progress(object):
class Progress:
def __init__(self, protocol):
self.progress = 0
@ -52,7 +50,7 @@ class command_tftp(HoneyPotCommand):
# so we have to convert unicode type to str type
tclient.download(str(self.file_to_get), self.artifactFile, progresshook)
url = 'tftp://%s/%s' % (self.hostname, self.file_to_get.strip('/'))
url = 'tftp://{}/{}'.format(self.hostname, self.file_to_get.strip('/'))
self.file_to_get = self.fs.resolve_path(self.file_to_get, self.protocol.cwd)

View File

@ -5,7 +5,6 @@
This module ...
"""
from __future__ import absolute_import, division
import getopt
@ -26,7 +25,7 @@ class command_ulimit(HoneyPotCommand):
try:
opts, args = getopt.getopt(self.args, 'SHacdfilmnpqstuvx')
except getopt.GetoptError as err:
self.errorWrite("-bash: ulimit: {}\n".format(err))
self.errorWrite(f"-bash: ulimit: {err}\n")
self.write("ulimit: usage: ulimit [-SHacdfilmnpqstuvx] [limit]\n")
return

View File

@ -5,7 +5,6 @@
uname command
"""
from __future__ import absolute_import, division
from cowrie.core.config import CowrieConfig
from cowrie.shell.command import HoneyPotCommand
@ -80,7 +79,7 @@ class command_uname(HoneyPotCommand):
}
if not self.args:
# IF no params output default
self.write('{}\n'.format(kernel_name()))
self.write(f'{kernel_name()}\n')
else:
# I have parameter to parse
for a in self.args:
@ -107,17 +106,17 @@ class command_uname(HoneyPotCommand):
I have all the option set
'''
if opts['name']:
self.write('{} '.format(kernel_name()))
self.write(f'{kernel_name()} ')
if opts['node']:
self.write('{} '.format(self.protocol.hostname))
self.write(f'{self.protocol.hostname} ')
if opts['release']:
self.write('{} '.format(kernel_version()))
self.write(f'{kernel_version()} ')
if opts['version']:
self.write('{} '.format(kernel_build_string()))
self.write(f'{kernel_build_string()} ')
if opts['machine']:
self.write('{} '.format(hardware_platform()))
self.write(f'{hardware_platform()} ')
if opts['os']:
self.write('{} '.format(operating_system()))
self.write(f'{operating_system()} ')
self.write('\n')

View File

@ -5,7 +5,6 @@
uniq command
"""
from __future__ import absolute_import, division
from twisted.python import log

View File

@ -2,7 +2,6 @@
# Based on code made by Upi Tamminen <desaster@gmail.com>
# See the COPYRIGHT file for more information
from __future__ import absolute_import, division
import os
import zipfile
@ -90,11 +89,11 @@ class command_unzip(HoneyPotCommand):
self.write(
'unzip: cannot find zipfile directory in one of {0}, {0}.zip or {0}.ZIP.\n'.format(filename))
return
self.write('Archive: {}\n'.format(filename))
self.write(f'Archive: {filename}\n')
for f in t:
dest = self.fs.resolve_path(
f.filename.strip('/'), self.protocol.cwd)
self.write(' inflating: {0}\n'.format(f.filename))
self.write(f' inflating: {f.filename}\n')
if not len(dest):
continue
if f.is_dir():
@ -103,7 +102,7 @@ class command_unzip(HoneyPotCommand):
self.mkfullpath(os.path.dirname(dest), f)
self.fs.mkfile(dest, 0, 0, f.file_size, 33188)
else:
log.msg(" skipping: {}\n".format(f.name))
log.msg(f" skipping: {f.name}\n")
commands['/bin/unzip'] = command_unzip

View File

@ -1,7 +1,6 @@
# Copyright (c) 2009 Upi Tamminen <desaster@gmail.com>
# See the COPYRIGHT file for more information
from __future__ import absolute_import, division
import time

View File

@ -6,7 +6,6 @@
This module contains the wc commnad
"""
from __future__ import absolute_import, division
import getopt
import re
@ -54,7 +53,7 @@ class command_wc(HoneyPotCommand):
contents = self.fs.file_contents(filename)
self.wc_application(contents, optlist)
except Exception:
self.errorWrite("wc: {}: No such file or directory\n".format(filename))
self.errorWrite(f"wc: {filename}: No such file or directory\n")
def wc_application(self, contents, optlist):
for opt, arg in optlist:
@ -82,7 +81,7 @@ class command_wc(HoneyPotCommand):
try:
optlist, args = getopt.getopt(self.args, 'cmlLwhv')
except getopt.GetoptError as err:
self.errorWrite("wc: invalid option -- {}\n".format(err.opt))
self.errorWrite(f"wc: invalid option -- {err.opt}\n")
self.help()
self.exit()
return

View File

@ -1,7 +1,6 @@
# Copyright (c) 2009 Upi Tamminen <desaster@gmail.com>
# See the COPYRIGHT file for more information
from __future__ import absolute_import, division
import getopt
import os
@ -134,7 +133,7 @@ class command_wget(HoneyPotCommand):
if not host:
return None
except Exception:
self.errorWrite('%s: Unsupported scheme.\n' % (url,))
self.errorWrite(f'{url}: Unsupported scheme.\n')
return None
# File in host's fs that will hold content of the downloaded file
@ -142,7 +141,7 @@ class command_wget(HoneyPotCommand):
self.artifactFile = Artifact(self.outfile)
if not self.quiet:
self.errorWrite('--%s-- %s\n' % (time.strftime('%Y-%m-%d %H:%M:%S'), url.decode('utf8')))
self.errorWrite('--{}-- {}\n'.format(time.strftime('%Y-%m-%d %H:%M:%S'), url.decode('utf8')))
self.errorWrite('Connecting to %s:%d... connected.\n' % (host, port))
self.errorWrite('HTTP request sent, awaiting response... ')
@ -198,9 +197,9 @@ class command_wget(HoneyPotCommand):
def error(self, error, url):
# we need to handle 301 redirects separately
if hasattr(error, 'webStatus') and error.webStatus.decode() == '301':
self.errorWrite('{} {}\n'.format(error.webStatus.decode(), error.webMessage.decode()))
self.errorWrite(f'{error.webStatus.decode()} {error.webMessage.decode()}\n')
https_url = error.getErrorMessage().replace('301 Moved Permanently to ', '')
self.errorWrite('Location {} [following]\n'.format(https_url))
self.errorWrite(f'Location {https_url} [following]\n')
# do the download again with the https URL
self.deferred = self.download(https_url.encode('utf8'), self.outfile)
@ -276,15 +275,15 @@ class HTTPProgressDownloader(client.HTTPDownloader):
self.contenttype))
else:
if not self.quiet:
self.wget.errorWrite('Length: unspecified [{}]\n'.format(self.contenttype))
self.wget.errorWrite(f'Length: unspecified [{self.contenttype}]\n')
if 0 < self.wget.limit_size < self.totallength:
log.msg('Not saving URL ({}) due to file size limit'.format(self.wget.url))
log.msg(f'Not saving URL ({self.wget.url}) due to file size limit')
self.nomore = True
if not self.quiet:
if self.fakeoutfile == '-':
self.wget.errorWrite('Saving to: `STDOUT\'\n\n')
else:
self.wget.errorWrite('Saving to: `{}\'\n\n'.format(self.fakeoutfile))
self.wget.errorWrite(f'Saving to: `{self.fakeoutfile}\'\n\n')
return client.HTTPDownloader.gotHeaders(self, headers)
@ -300,7 +299,7 @@ class HTTPProgressDownloader(client.HTTPDownloader):
return client.HTTPDownloader.pagePart(self, data)
if self.totallength:
percent = int(self.currentlength / self.totallength * 100)
spercent = "{}%".format(percent)
spercent = f"{percent}%"
else:
spercent = '%dK' % (self.currentlength / 1000)
percent = 0

View File

@ -1,6 +1,5 @@
# Copyright (c) 2013 Bas Stottelaar <basstottelaar [AT] gmail [DOT] com>
from __future__ import absolute_import, division
from cowrie.shell.command import HoneyPotCommand
@ -26,7 +25,7 @@ class command_which(HoneyPotCommand):
resolved = self.fs.resolve_path(f, path)
if self.fs.exists(resolved):
self.write("%s/%s\n" % (path, f))
self.write(f"{path}/{f}\n")
commands['which'] = command_which

View File

@ -4,7 +4,6 @@
# Modified by Fabiola Buschendorf, https://github.com/FabiolaBusch
from __future__ import absolute_import, division
import hashlib
import random
@ -20,7 +19,7 @@ arch = 'x86_64'
commands = {}
class command_faked_package_class_factory(object):
class command_faked_package_class_factory:
@staticmethod
def getCommand(name):
class command_faked_installation(HoneyPotCommand):
@ -63,8 +62,8 @@ class command_yum(HoneyPotCommand):
randhash = hashlib.sha1(b'{}'.format(randnum)).hexdigest()
randhash2 = hashlib.sha1(b'{}'.format(randnum2)).hexdigest()
yield self.sleep(1, 2)
self.write('Installed: 7/{0} {1}:{2}\n'.format(arch, random.randint(500, 800), randhash))
self.write('Group-Installed: yum 13:{}\n'.format(randhash2))
self.write('Installed: 7/{} {}:{}\n'.format(arch, random.randint(500, 800), randhash))
self.write(f'Group-Installed: yum 13:{randhash2}\n')
self.write('version\n')
self.exit()
@ -193,9 +192,9 @@ Options:
packages = {}
for y in [re.sub('[^A-Za-z0-9]', '', x) for x in self.args[1:]]:
packages[y] = {
'version': '{0}.{1}-{2}'.format(random.choice([0, 1]), random.randint(1, 40), random.randint(1, 10)),
'version': '{}.{}-{}'.format(random.choice([0, 1]), random.randint(1, 40), random.randint(1, 10)),
'size': random.randint(100, 900),
'release': '{0}.el7'.format(random.randint(1, 15))
'release': '{}.el7'.format(random.randint(1, 15))
}
totalsize = sum([packages[x]['size'] for x in packages])
repository = 'base'
@ -209,8 +208,8 @@ Options:
self.write('Resolving Dependencies\n')
self.write('--> Running transaction check\n')
for p in packages:
self.write('---> Package {0}.{1} {2}.{3} will be installed\n'.format(p, packages[p]['version'], arch,
packages[p]['release']))
self.write('---> Package {}.{} {}.{} will be installed\n'.format(p, packages[p]['version'], arch,
packages[p]['release']))
self.write('--> Finished Dependency Resolution\n')
self.write('Beginning Kernel Module Plugin\n')
self.write('Finished Kernel Module Plugin\n\n')
@ -225,16 +224,15 @@ Options:
self.write('{}\n'.format('=' * 176))
self.write('Installing:\n')
for p in packages:
self.write(' {0}\t\t\t\t{1}\t\t\t{2}-{3}\t\t\t{4}\t\t\t\t{5} k\n'.format(p, arch, packages[p]['version'],
packages[p]['release'], repository,
packages[p]['size']))
self.write(' {}\t\t\t\t{}\t\t\t{}-{}\t\t\t{}\t\t\t\t{} k\n'.format(p, arch, packages[p]['version'],
packages[p]['release'], repository, packages[p]['size']))
self.write('\n')
self.write('Transaction Summary\n')
self.write('{}\n'.format('=' * 176))
self.write('Install {0} Packages\n\n'.format(len(packages)))
self.write('Install {} Packages\n\n'.format(len(packages)))
self.write('Total download size: {0} k\n'.format(totalsize))
self.write('Installed size: {:.1f} M\n'.format((totalsize * 0.0032)))
self.write(f'Total download size: {totalsize} k\n')
self.write('Installed size: {:.1f} M\n'.format(totalsize * 0.0032))
self.write('Is this ok [y/d/N]: ')
# Assume 'yes'
@ -251,20 +249,20 @@ Options:
self.write('Running transaction\n')
i = 1
for p in packages:
self.write(' Installing : {0}-{1}-{2}.{3} \t\t\t\t {4}/{5} \n'.format
self.write(' Installing : {}-{}-{}.{} \t\t\t\t {}/{} \n'.format
(p, packages[p]['version'], packages[p]['release'], arch, i, len(packages)))
yield self.sleep(0.5, 1)
i += 1
i = 1
for p in packages:
self.write(' Verifying : {0}-{1}-{2}.{3} \t\t\t\t {4}/{5} \n'.format
self.write(' Verifying : {}-{}-{}.{} \t\t\t\t {}/{} \n'.format
(p, packages[p]['version'], packages[p]['release'], arch, i, len(packages)))
yield self.sleep(0.5, 1)
i += 1
self.write('\n')
self.write('Installed:\n')
for p in packages:
self.write(' {0}.{1} {2}:{3}-{4} \t\t'.format
self.write(' {}.{} {}:{}-{} \t\t'.format
(p, arch, random.randint(0, 2), packages[p]['version'], packages[p]['release']))
self.write('\n')
self.write('Complete!\n')

View File

@ -20,7 +20,6 @@ or:
"""
from __future__ import absolute_import, division
import hashlib
import os

View File

@ -5,7 +5,6 @@
This module contains authentication code
"""
from __future__ import absolute_import, division
import json
import re
@ -27,7 +26,7 @@ _USERDB_DEFAULTS = [
]
class UserDB(object):
class UserDB:
"""
By Walter de Jong <walter@sara.nl>
"""
@ -42,9 +41,9 @@ class UserDB(object):
"""
try:
with open('{}/userdb.txt'.format(CowrieConfig().get('honeypot', 'etc_path')), 'r') as db:
with open('{}/userdb.txt'.format(CowrieConfig().get('honeypot', 'etc_path'))) as db:
userdb = db.readlines()
except IOError:
except OSError:
log.msg("Could not read etc/userdb.txt, default database activated")
userdb = _USERDB_DEFAULTS
@ -108,7 +107,7 @@ class UserDB(object):
self.userdb[(login, passwd)] = policy
class AuthRandom(object):
class AuthRandom:
"""
Alternative class that defines the checklogin() method.
Users will be authenticated after a random number of attempts.
@ -129,7 +128,7 @@ class AuthRandom(object):
if self.maxtry < self.mintry:
self.maxtry = self.mintry + 1
log.msg("maxtry < mintry, adjusting maxtry to: {}".format(self.maxtry))
log.msg(f"maxtry < mintry, adjusting maxtry to: {self.maxtry}")
self.uservar = {}
self.uservar_file = '{}/auth_random.json'.format(CowrieConfig().get('honeypot', 'state_path'))
self.loadvars()
@ -139,7 +138,7 @@ class AuthRandom(object):
Load user vars from json file
"""
if path.isfile(self.uservar_file):
with open(self.uservar_file, 'r') as fp:
with open(self.uservar_file) as fp:
try:
self.uservar = json.load(fp)
except Exception:
@ -178,7 +177,7 @@ class AuthRandom(object):
ipinfo = self.uservar[src_ip]
ipinfo['try'] = 0
if userpass in cache:
log.msg("first time for {}, found cached: {}".format(src_ip, userpass))
log.msg(f"first time for {src_ip}, found cached: {userpass}")
ipinfo['max'] = 1
ipinfo['user'] = str(thelogin)
ipinfo['pw'] = str(thepasswd)
@ -191,7 +190,7 @@ class AuthRandom(object):
else:
if userpass in cache:
ipinfo = self.uservar[src_ip]
log.msg("Found cached: {}".format(userpass))
log.msg(f"Found cached: {userpass}")
ipinfo['max'] = 1
ipinfo['user'] = str(thelogin)
ipinfo['pw'] = str(thepasswd)
@ -218,7 +217,7 @@ class AuthRandom(object):
ipinfo['try'] += 1
attempts = ipinfo['try']
need = ipinfo['max']
log.msg("login attempt: {}".format(attempts))
log.msg(f"login attempt: {attempts}")
# Check if enough login attempts are tried
if attempts < need:

View File

@ -26,8 +26,6 @@
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
# SUCH DAMAGE.
from __future__ import absolute_import, division
# cowrie.client.fingerprint
# cowrie.client.size
@ -45,7 +43,6 @@ from __future__ import absolute_import, division
# cowrie.session.file_download
# cowrie.session.file_upload
def formatCef(logentry):
"""
Take logentry and turn into CEF string
@ -93,7 +90,7 @@ def formatCef(logentry):
cefList = []
for key in list(cefExtensions.keys()):
value = str(cefExtensions[key])
cefList.append('{}={}'.format(key, value))
cefList.append(f'{key}={value}')
cefExtension = ' '.join(cefList)

View File

@ -5,7 +5,6 @@
This module contains ...
"""
from __future__ import absolute_import, division
from sys import modules
@ -25,7 +24,7 @@ from cowrie.core.config import CowrieConfig
@implementer(ICredentialsChecker)
class HoneypotPublicKeyChecker(object):
class HoneypotPublicKeyChecker:
"""
Checker that accepts, logs and denies public key authentication attempts
"""
@ -45,7 +44,7 @@ class HoneypotPublicKeyChecker(object):
@implementer(ICredentialsChecker)
class HoneypotNoneChecker(object):
class HoneypotNoneChecker:
"""
Checker that does no authentication check
"""
@ -57,7 +56,7 @@ class HoneypotNoneChecker(object):
@implementer(ICredentialsChecker)
class HoneypotPasswordChecker(object):
class HoneypotPasswordChecker:
"""
Checker that accepts "keyboard-interactive" and "password"
"""
@ -99,7 +98,7 @@ class HoneypotPasswordChecker(object):
if hasattr(modules[authmodule], authclass):
authname = getattr(modules[authmodule], authclass)
else:
log.msg('auth_class: %s not found in %s' % (authclass, authmodule))
log.msg(f'auth_class: {authclass} not found in {authmodule}')
theauth = authname()

View File

@ -5,7 +5,6 @@
This module contains code to deal with Cowrie's configuration
"""
from __future__ import absolute_import, division
import configparser
from os import environ
@ -16,7 +15,7 @@ def to_environ_key(key):
return key.upper()
class CowrieConfig(object):
class CowrieConfig:
"""
Singleton class for configuration data
"""
@ -38,13 +37,13 @@ class EnvironmentConfigParser(configparser.ConfigParser):
def has_option(self, section, option):
if to_environ_key('_'.join(("cowrie", section, option))) in environ:
return True
return super(EnvironmentConfigParser, self).has_option(section, option)
return super().has_option(section, option)
def get(self, section, option, raw=False, **kwargs):
key = to_environ_key('_'.join(("cowrie", section, option)))
if key in environ:
return environ[key]
return super(EnvironmentConfigParser, self).get(section, option, raw=raw, **kwargs)
return super().get(section, option, raw=raw, **kwargs)
def readConfigFile(cfgfile):

View File

@ -26,7 +26,6 @@
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
# SUCH DAMAGE.
from __future__ import absolute_import, division
from twisted.cred.credentials import ICredentials, IUsernamePassword
@ -64,7 +63,7 @@ class IPluggableAuthenticationModulesIP(ICredentials):
@implementer(IPluggableAuthenticationModulesIP)
class PluggableAuthenticationModulesIP(object):
class PluggableAuthenticationModulesIP:
"""
Twisted removed IPAM in 15, adding in Cowrie now
"""
@ -76,14 +75,14 @@ class PluggableAuthenticationModulesIP(object):
@implementer(IUsername)
class Username(object):
class Username:
def __init__(self, username):
self.username = username
@implementer(IUsernamePasswordIP)
class UsernamePasswordIP(object):
class UsernamePasswordIP:
"""
This credential interface also provides an IP address
"""

View File

@ -26,7 +26,6 @@
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
# SUCH DAMAGE.
from __future__ import absolute_import, division
import abc
import re
@ -78,7 +77,7 @@ def convert(input):
return input
class Output(object):
class Output:
"""
This is the abstract base class intended to be inherited by
cowrie output plugins. Plugins require the mandatory

View File

@ -26,7 +26,6 @@
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
# SUCH DAMAGE.
from __future__ import absolute_import, division
from twisted.conch import interfaces as conchinterfaces
from twisted.conch.telnet import ITelnetProtocol
@ -40,7 +39,7 @@ from cowrie.telnet import session
@implementer(IRealm)
class HoneyPotRealm(object):
class HoneyPotRealm:
def __init__(self):
pass

View File

@ -7,7 +7,6 @@
Should be compatible with user mode linux
"""
from __future__ import absolute_import, division
import hashlib
import struct

View File

@ -2,7 +2,6 @@
# Copyright (c) 2010-2014 Upi Tamminen <desaster@gmail.com>
# See the COPYRIGHT file for more information
from __future__ import absolute_import, division
import sys
@ -27,16 +26,16 @@ def durationHuman(seconds):
duration = []
if years > 0:
duration.append('{0} year{1} '.format(syears, 's' * (years != 1)))
duration.append('{} year{} '.format(syears, 's' * (years != 1)))
else:
if days > 0:
duration.append('{0} day{1} '.format(days, 's' * (days != 1)))
duration.append('{} day{} '.format(days, 's' * (days != 1)))
if hours > 0:
duration.append('{0}:'.format(shours))
duration.append(f'{shours}:')
if minutes >= 0:
duration.append('{0}:'.format(sminutes))
duration.append(f'{sminutes}:')
if seconds >= 0:
duration.append('{0}'.format(sseconds))
duration.append(f'{sseconds}')
return ''.join(duration)
@ -85,7 +84,7 @@ def uptime(total_seconds):
if days > 0:
s += str(days) + " " + (days == 1 and "day" or "days") + ", "
if len(s) > 0 or hours > 0:
s += '%s:%s' % (str(hours).rjust(2), str(minutes).rjust(2, '0'))
s += '{}:{}'.format(str(hours).rjust(2), str(minutes).rjust(2, '0'))
else:
s += '{} min'.format(str(minutes))
return s
@ -107,7 +106,7 @@ def get_endpoints_from_section(cfg, section, default_port):
listen_endpoints = []
for i in listen_addr.split():
listen_endpoints.append('tcp:{}:interface={}'.format(listen_port, i))
listen_endpoints.append(f'tcp:{listen_port}:interface={i}')
return listen_endpoints

View File

@ -1,7 +1,6 @@
# Copyright (c) 2009-2014 Upi Tamminen <desaster@gmail.com>
# See the COPYRIGHT file for more information
from __future__ import absolute_import, division
import hashlib
import os
@ -135,7 +134,7 @@ class LoggingServerProtocol(insults.ServerProtocol):
outfile=shasumfile,
shasum=shasum,
destfile='')
except IOError:
except OSError:
pass
finally:
self.stdinlogOpen = False
@ -173,7 +172,7 @@ class LoggingServerProtocol(insults.ServerProtocol):
outfile=shasumfile,
shasum=shasum,
destfile=url)
except IOError:
except OSError:
pass
self.redirFiles.clear()

View File

@ -128,7 +128,7 @@ class Output(output.Output):
log.msg(
eventid='cowrie.abuseipdb.started',
format='AbuseIPDB Plugin version {} started. Currently in beta.'.format(__version__),
format=f'AbuseIPDB Plugin version {__version__} started. Currently in beta.',
)
def stop(self):

View File

@ -6,7 +6,6 @@ It has its own emit() function and does not use cowrie eventid's
to avoid circular calls
"""
from __future__ import absolute_import, division
import json
@ -19,7 +18,7 @@ import cowrie.core.output
from cowrie._version import __version__
from cowrie.core.config import CowrieConfig
COWRIE_USER_AGENT = 'Cowrie Honeypot {}'.format(__version__).encode('ascii')
COWRIE_USER_AGENT = f'Cowrie Honeypot {__version__}'.encode('ascii')
COWRIE_URL = 'https://api.cowrie.org/v1/crash'

View File

@ -1,5 +1,3 @@
from __future__ import absolute_import, division
import os
from datetime import datetime

View File

@ -30,7 +30,6 @@
Send downloaded/uplaoded files to Cuckoo
"""
from __future__ import absolute_import, division
import os
@ -93,11 +92,11 @@ class Output(cowrie.core.output.Output):
"""
res = None
try:
print("Looking for tasks for: {}".format(sha256))
print(f"Looking for tasks for: {sha256}")
res = requests.get(
urljoin(
self.url_base,
"/files/view/sha256/{}".format(sha256)
f"/files/view/sha256/{sha256}"
),
verify=False,
auth=HTTPBasicAuth(
@ -135,9 +134,9 @@ class Output(cowrie.core.output.Output):
if res and res.ok:
print("Cuckoo Request: {}, Task created with ID: {}".format(res.status_code, res.json()["task_id"]))
else:
print("Cuckoo Request failed: {}".format(res.status_code))
print(f"Cuckoo Request failed: {res.status_code}")
except Exception as e:
print("Cuckoo Request failed: {}".format(e))
print(f"Cuckoo Request failed: {e}")
def posturl(self, scanUrl):
"""
@ -160,6 +159,6 @@ class Output(cowrie.core.output.Output):
if res and res.ok:
print("Cuckoo Request: {}, Task created with ID: {}".format(res.status_code, res.json()["task_id"]))
else:
print("Cuckoo Request failed: {}".format(res.status_code))
print(f"Cuckoo Request failed: {res.status_code}")
except Exception as e:
print("Cuckoo Request failed: {}".format(e))
print(f"Cuckoo Request failed: {e}")

View File

@ -3,7 +3,6 @@ Send SSH logins to SANS DShield.
See https://isc.sans.edu/ssh.html
"""
from __future__ import absolute_import, division
import base64
import hashlib
@ -70,9 +69,9 @@ class Output(cowrie.core.output.Output):
# fixed nonce to mix up the limited userid.
_nonceb64 = 'ElWO1arph+Jifqme6eXD8Uj+QTAmijAWxX1msbJzXDM='
log_output = u''
log_output = ''
for attempt in self.batch:
log_output += u'{0}\t{1}\t{2}\t{3}\t{4}\t{5}\n'.format(
log_output += '{}\t{}\t{}\t{}\t{}\t{}\n'.format(
attempt['date'],
attempt['time'],
attempt['timezone'],
@ -88,7 +87,7 @@ class Output(cowrie.core.output.Output):
base64.b64decode(self.auth_key),
hashlib.sha256).digest()
)
auth_header = 'credentials={0} nonce={1} userid={2}'.format(digest.decode('ascii'), _nonceb64, self.userid)
auth_header = 'credentials={} nonce={} userid={}'.format(digest.decode('ascii'), _nonceb64, self.userid)
headers = {
'X-ISC-Authorization': auth_header,
'Content-Type': 'text/plain'
@ -96,7 +95,7 @@ class Output(cowrie.core.output.Output):
if self.debug:
log.msg('dshield: posting: {}'.format(repr(headers)))
log.msg('dshield: posting: {}'.format(log_output))
log.msg(f'dshield: posting: {log_output}')
req = threads.deferToThread(
requests.request,
@ -112,20 +111,20 @@ class Output(cowrie.core.output.Output):
response = resp.content.decode('utf8')
if self.debug:
log.msg("dshield: status code {}".format(resp.status_code))
log.msg("dshield: response {}".format(resp.content))
log.msg(f"dshield: status code {resp.status_code}")
log.msg(f"dshield: response {resp.content}")
if resp.status_code == requests.codes.ok:
sha1_regex = re.compile(r'<sha1checksum>([^<]+)<\/sha1checksum>')
sha1_match = sha1_regex.search(response)
if sha1_match is None:
log.msg('dshield: ERROR: Could not find sha1checksum in response: {0}'.format(repr(response)))
log.msg('dshield: ERROR: Could not find sha1checksum in response: {}'.format(repr(response)))
failed = True
sha1_local = hashlib.sha1()
sha1_local.update(log_output.encode('utf8'))
if sha1_match.group(1) != sha1_local.hexdigest():
log.msg(
'dshield: ERROR: SHA1 Mismatch {0} {1} .'.format(sha1_match.group(1), sha1_local.hexdigest()))
'dshield: ERROR: SHA1 Mismatch {} {} .'.format(sha1_match.group(1), sha1_local.hexdigest()))
failed = True
md5_regex = re.compile(r'<md5checksum>([^<]+)<\/md5checksum>')
md5_match = md5_regex.search(response)
@ -135,12 +134,12 @@ class Output(cowrie.core.output.Output):
md5_local = hashlib.md5()
md5_local.update(log_output.encode('utf8'))
if md5_match.group(1) != md5_local.hexdigest():
log.msg('dshield: ERROR: MD5 Mismatch {0} {1} .'.format(md5_match.group(1), md5_local.hexdigest()))
log.msg('dshield: ERROR: MD5 Mismatch {} {} .'.format(md5_match.group(1), md5_local.hexdigest()))
failed = True
log.msg('dshield: SUCCESS: Sent {0} bytes worth of data to secure.dshield.org'.format(len(log_output)))
log.msg('dshield: SUCCESS: Sent {} bytes worth of data to secure.dshield.org'.format(len(log_output)))
else:
log.msg('dshield ERROR: error {0}.'.format(resp.status_code))
log.msg('dshield response was {0}'.format(response))
log.msg(f'dshield ERROR: error {resp.status_code}.')
log.msg(f'dshield response was {response}')
failed = True
if failed:

View File

@ -1,6 +1,5 @@
# Simple elasticsearch logger
from __future__ import absolute_import, division
from elasticsearch import Elasticsearch, NotFoundError
@ -49,7 +48,7 @@ class Output(cowrie.core.output.Output):
options["ca_certs"] = self.ca_certs
# connect
self.es = Elasticsearch("{0}:{1}".format(self.host, self.port), **options)
self.es = Elasticsearch(f"{self.host}:{self.port}", **options)
# self.es = Elasticsearch('{0}:{1}'.format(self.host, self.port))
self.check_index()

View File

@ -2,7 +2,6 @@
Send attackers IP to GreyNoise
"""
from __future__ import absolute_import, division
import treq
@ -55,7 +54,7 @@ class Output(cowrie.core.output.Output):
meta=query['metadata']
)
gnUrl = '{0}query/ip'.format(GNAPI_URL).encode('utf8')
gnUrl = f'{GNAPI_URL}query/ip'.encode('utf8')
headers = ({'User-Agent': [COWRIE_USER_AGENT]})
fields = {'key': self.apiKey, 'ip': entry['src_ip']}
@ -71,7 +70,7 @@ class Output(cowrie.core.output.Output):
if response.code != 200:
rsp = yield response.text()
log.error("greynoise: got error {}".format(rsp))
log.error(f"greynoise: got error {rsp}")
return
j = yield response.json()
@ -87,4 +86,4 @@ class Output(cowrie.core.output.Output):
for query in j['records']:
message(query)
else:
log.msg("greynoise: no results for for IP {0}".format(entry['src_ip']))
log.msg("greynoise: no results for for IP {}".format(entry['src_ip']))

View File

@ -2,7 +2,6 @@
Output plugin for HPFeeds
"""
from __future__ import absolute_import, division
import hashlib
import json
@ -88,7 +87,7 @@ def msgauth(rand, ident, secret):
return msghdr(OP_AUTH, strpack8(ident) + hash)
class FeedUnpack(object):
class FeedUnpack:
def __init__(self):
self.buf = bytearray()
@ -120,9 +119,9 @@ class FeedUnpack(object):
return opcode, data
class hpclient(object):
class hpclient:
def __init__(self, server, port, ident, secret, debug):
log.msg('hpfeeds client init broker {0}:{1}, identifier {2}'.format(server, port, ident))
log.msg(f'hpfeeds client init broker {server}:{port}, identifier {ident}')
self.server, self.port = server, int(port)
self.ident, self.secret = ident.encode('latin1'), secret.encode('latin1')
self.debug = debug
@ -186,16 +185,16 @@ class hpclient(object):
try:
for opcode, data in self.unpacker:
if self.debug:
log.msg('hpfeeds: msg opcode {0:x} data {1}'.format(
log.msg('hpfeeds: msg opcode {:x} data {}'.format(
opcode,
''.join('{:02x}'.format(x) for x in data))
''.join(f'{x:02x}' for x in data))
)
if opcode == OP_INFO:
name, rand = strunpack8(data)
if self.debug:
log.msg('hpfeeds: server name {0} rand {1}'.format(
log.msg('hpfeeds: server name {} rand {}'.format(
name,
''.join('{:02x}'.format(x) for x in rand))
''.join(f'{x:02x}' for x in rand))
)
self.send(msgauth(rand, self.ident, self.secret))
self.state = 'GOTINFO'
@ -205,12 +204,12 @@ class hpclient(object):
chan, data = strunpack8(data)
if self.debug:
log.msg(
'hpfeeds: publish to {0} by {1}: {2}'.format(
chan, ident, ''.join('{:02x}'.format(x) for x in data)))
'hpfeeds: publish to {} by {}: {}'.format(
chan, ident, ''.join(f'{x:02x}' for x in data)))
elif opcode == OP_ERROR:
log.msg('hpfeeds: errormessage from server: {0}'.format(''.join('{:02x}'.format(x) for x in data)))
log.msg('hpfeeds: errormessage from server: {}'.format(''.join(f'{x:02x}' for x in data)))
else:
log.msg('hpfeeds: unknown opcode message: {0:x}'.format(opcode))
log.msg(f'hpfeeds: unknown opcode message: {opcode:x}')
except BadClient:
log.msg('hpfeeds: unpacker error, disconnecting.')
self.close()
@ -219,7 +218,7 @@ class hpclient(object):
try:
self.send(msgpublish(self.ident, channel, json.dumps(kwargs, default=set2json).encode('latin1')))
except Exception as e:
log.msg('hpfeeds: connection to hpfriends lost: {0}, reconnecting'.format(e))
log.msg(f'hpfeeds: connection to hpfriends lost: {e}, reconnecting')
self.connect()
self.send(msgpublish(self.ident, channel, json.dumps(kwargs, default=set2json).encode('latin1')))

View File

@ -2,7 +2,6 @@
Output plugin for HPFeeds
"""
from __future__ import absolute_import, division
import json
import logging

View File

@ -22,7 +22,7 @@ class Output(cowrie.core.output.Output):
try:
self.client = InfluxDBClient(host=host, port=port, ssl=ssl, verify_ssl=ssl)
except InfluxDBClientError as e:
log.msg("output_influx: I/O error({0}): '{1}'".format(
log.msg("output_influx: I/O error({}): '{}'".format(
e.code, e.message))
return

View File

@ -26,7 +26,6 @@
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
# SUCH DAMAGE.
from __future__ import absolute_import, division
import json
import os

View File

@ -30,7 +30,6 @@
Work in progress Kafka output. Not functional yet
"""
from __future__ import absolute_import, division
import json
import logging

View File

@ -26,7 +26,6 @@
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
# SUCH DAMAGE.
from __future__ import absolute_import, division
import syslog

View File

@ -31,7 +31,6 @@ Send files to https://malshare.com/
More info https://malshare.com/doc.php
"""
from __future__ import absolute_import, division
import os
@ -94,6 +93,6 @@ class Output(cowrie.core.output.Output):
if res and res.ok:
log.msg("Submitted to MalShare")
else:
log.msg("MalShare Request failed: {}".format(res.status_code))
log.msg(f"MalShare Request failed: {res.status_code}")
except Exception as e:
log.msg("MalShare Request failed: {}".format(e))
log.msg(f"MalShare Request failed: {e}")

View File

@ -1,7 +1,3 @@
# -*- coding: utf-8 -*-
from __future__ import absolute_import, division
import pymongo
from twisted.python import log
@ -19,14 +15,14 @@ class Output(cowrie.core.output.Output):
object_id = collection.insert_one(event).inserted_id
return object_id
except Exception as e:
log.msg('mongo error - {0}'.format(e))
log.msg(f'mongo error - {e}')
def update_one(self, collection, session, doc):
try:
object_id = collection.update({'session': session}, doc)
return object_id
except Exception as e:
log.msg('mongo error - {0}'.format(e))
log.msg(f'mongo error - {e}')
def start(self):
db_addr = CowrieConfig().get('output_mongodb', 'connection_string')
@ -99,7 +95,7 @@ class Output(cowrie.core.output.Output):
elif eventid == 'cowrie.client.size':
doc = self.col_sessions.find_one({'session': entry['session']})
if doc:
doc['termsize'] = '{0}x{1}'.format(entry['width'], entry['height'])
doc['termsize'] = '{}x{}'.format(entry['width'], entry['height'])
self.update_one(self.col_sessions, entry['session'], doc)
else:
pass

View File

@ -2,7 +2,6 @@
MySQL output connector. Writes audit logs to MySQL database
"""
from __future__ import absolute_import, division
import MySQLdb
@ -34,7 +33,7 @@ class ReconnectingConnectionPool(adbapi.ConnectionPool):
except (MySQLdb.OperationalError, MySQLdb._exceptions.OperationalError) as e:
if e.args[0] not in (2003, 2006, 2013):
raise e
log.msg("RCP: got error {0}, retrying operation".format(e))
log.msg(f"RCP: got error {e}, retrying operation")
conn = self.connections.get(self.threadID())
self.disconnect(conn)
# Try the interaction again
@ -78,10 +77,10 @@ class Output(cowrie.core.output.Output):
1406, "Data too long for column '...' at row ..."
"""
if error.value[0] in (1146, 1406):
log.msg("output_mysql: MySQL Error: {}".format(error.value))
log.msg(f"output_mysql: MySQL Error: {error.value}")
log.msg("MySQL schema maybe misconfigured, doublecheck database!")
else:
log.err("output_mysql: MySQL Error: {}".format(error.value))
log.err(f"output_mysql: MySQL Error: {error.value}")
def simpleQuery(self, sql, args):
"""
@ -188,7 +187,7 @@ class Output(cowrie.core.output.Output):
'UPDATE `sessions` '
'SET `termsize` = %s '
'WHERE `id` = %s',
('%sx%s' % (entry['width'], entry['height']), entry["session"]))
('{}x{}'.format(entry['width'], entry['height']), entry["session"]))
elif entry["eventid"] == 'cowrie.session.closed':
self.simpleQuery(

View File

@ -1,5 +1,3 @@
from __future__ import absolute_import, division
import json
from configparser import NoOptionError

View File

@ -1,5 +1,3 @@
from __future__ import absolute_import, division
import time
from datetime import datetime

View File

@ -1,5 +1,3 @@
from __future__ import absolute_import, division
# `ipaddress` system library only on Python3.4+
import ipaddress

View File

@ -2,7 +2,6 @@
Send downloaded/uplaoded files to S3 (or compatible)
"""
from __future__ import absolute_import, division
from configparser import NoOptionError
@ -71,16 +70,16 @@ class Output(cowrie.core.output.Output):
@defer.inlineCallbacks
def upload(self, shasum, filename):
if shasum in self.seen:
print("Already uploaded file with sha {} to S3".format(shasum))
print(f"Already uploaded file with sha {shasum} to S3")
return
exists = yield self._object_exists_remote(shasum)
if exists:
print("Somebody else already uploaded file with sha {} to S3".format(shasum))
print(f"Somebody else already uploaded file with sha {shasum} to S3")
self.seen.add(shasum)
return
print("Uploading file with sha {} ({}) to S3".format(shasum, filename))
print(f"Uploading file with sha {shasum} ({filename}) to S3")
with open(filename, 'rb') as fp:
yield threads.deferToThread(
self.client.put_object,

View File

@ -26,7 +26,6 @@
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
# SUCH DAMAGE.
from __future__ import absolute_import, division
import json
import time
@ -59,5 +58,5 @@ class Output(cowrie.core.output.Output):
self.sc.api_call(
"chat.postMessage",
channel=self.slack_channel,
text="%s %s" % (time.strftime('%Y-%m-%d %H:%M:%S'), json.dumps(logentry, indent=4, sort_keys=True))
text="{} {}".format(time.strftime('%Y-%m-%d %H:%M:%S'), json.dumps(logentry, indent=4, sort_keys=True))
)

View File

@ -1,7 +1,3 @@
# encoding: utf-8
from __future__ import absolute_import, division
import json
import socket
@ -37,7 +33,7 @@ class Output(cowrie.core.output.Output):
try:
self.sock.sendall(message.encode())
except socket.error as ex:
except OSError as ex:
if ex.errno == 32: # Broken pipe
self.start()
self.sock.sendall(message.encode())

View File

@ -6,7 +6,6 @@ Not ready for production use.
JSON log file is still recommended way to go
"""
from __future__ import absolute_import, division
import json
@ -90,7 +89,7 @@ class Output(cowrie.core.output.Output):
if response.code == 200:
return
else:
log.msg("SplunkHEC response: {} {}".format(response.code, response.phrase))
log.msg(f"SplunkHEC response: {response.code} {response.phrase}")
d = client.readBody(response)
d.addCallback(cbBody)
d.addErrback(cbPartial)

View File

@ -1,5 +1,3 @@
from __future__ import absolute_import, division
import sqlite3
from twisted.enterprise import adbapi
@ -148,7 +146,7 @@ class Output(cowrie.core.output.Output):
'UPDATE `sessions` '
'SET `termsize` = ? '
'WHERE `id` = ?',
('%sx%s' % (entry['width'], entry['height']), entry["session"]))
('{}x{}'.format(entry['width'], entry['height']), entry["session"]))
elif entry["eventid"] == 'cowrie.session.closed':
self.simpleQuery(

View File

@ -26,7 +26,6 @@
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
# SUCH DAMAGE.
from __future__ import absolute_import, division
import cowrie.core.cef
import cowrie.core.output
@ -47,10 +46,10 @@ class Output(cowrie.core.output.Output):
def write(self, logentry):
if self.format == 'cef':
self.outfile.write('{0} '.format(logentry['timestamp']))
self.outfile.write('{0}\n'.format(cowrie.core.cef.formatCef(logentry)))
self.outfile.write('{} '.format(logentry['timestamp']))
self.outfile.write('{}\n'.format(cowrie.core.cef.formatCef(logentry)))
else:
self.outfile.write('{0} '.format(logentry['timestamp']))
self.outfile.write('{0} '.format(logentry['session']))
self.outfile.write('{0}\n'.format(logentry['message']))
self.outfile.write('{} '.format(logentry['timestamp']))
self.outfile.write('{} '.format(logentry['session']))
self.outfile.write('{}\n'.format(logentry['message']))
self.outfile.flush()

View File

@ -30,7 +30,6 @@
Send SSH logins to Virustotal
"""
from __future__ import absolute_import, division
import datetime
import json
@ -110,7 +109,7 @@ class Output(cowrie.core.output.Output):
# If the file was first downloaded more than a "period of time" (e.g 1 min) ago -
# it has been apparently scanned before in VT and therefore is not going to be checked again
if file_modification_time < datetime.datetime.now()-TIME_SINCE_FIRST_DOWNLOAD:
log.msg("File with shasum '%s' was downloaded before" % (shasum, ))
log.msg(f"File with shasum '{shasum}' was downloaded before")
return False
return True
@ -119,7 +118,7 @@ class Output(cowrie.core.output.Output):
Check file scan report for a hash
Argument is full event so we can access full file later on
"""
vtUrl = '{0}file/report'.format(VTAPI_URL).encode('utf8')
vtUrl = f'{VTAPI_URL}file/report'.encode('utf8')
headers = http_headers.Headers({'User-Agent': [COWRIE_USER_AGENT]})
fields = {'apikey': self.apiKey, 'resource': entry['shasum'], 'allinfo': 1}
body = StringProducer(urlencode(fields).encode("utf-8"))
@ -134,7 +133,7 @@ class Output(cowrie.core.output.Output):
d.addCallback(cbBody)
return d
else:
log.msg("VT Request failed: {} {}".format(response.code, response.phrase))
log.msg(f"VT Request failed: {response.code} {response.phrase}")
def cbBody(body):
"""
@ -157,7 +156,7 @@ class Output(cowrie.core.output.Output):
Extract the information we need from the body
"""
if self.debug:
log.msg("VT scanfile result: {}".format(result))
log.msg(f"VT scanfile result: {result}")
result = result.decode('utf8')
j = json.loads(result)
log.msg("VT: {}".format(j['verbose_msg']))
@ -216,11 +215,11 @@ class Output(cowrie.core.output.Output):
"""
Send a file to VirusTotal
"""
vtUrl = '{0}file/scan'.format(VTAPI_URL).encode('utf8')
vtUrl = f'{VTAPI_URL}file/scan'.encode('utf8')
fields = {('apikey', self.apiKey)}
files = {('file', fileName, open(artifact, 'rb'))}
if self.debug:
log.msg("submitting to VT: {0}".format(repr(files)))
log.msg("submitting to VT: {}".format(repr(files)))
contentType, body = encode_multipart_formdata(fields, files)
producer = StringProducer(body)
headers = http_headers.Headers({
@ -247,14 +246,14 @@ class Output(cowrie.core.output.Output):
d.addErrback(cbPartial)
return d
else:
log.msg("VT Request failed: {} {}".format(response.code, response.phrase))
log.msg(f"VT Request failed: {response.code} {response.phrase}")
def cbError(failure):
failure.printTraceback()
def processResult(result):
if self.debug:
log.msg("VT postfile result: {}".format(result))
log.msg(f"VT postfile result: {result}")
result = result.decode('utf8')
j = json.loads(result)
# This is always a new resource, since we did the scan before
@ -273,7 +272,7 @@ class Output(cowrie.core.output.Output):
"""
Check url scan report for a hash
"""
vtUrl = '{0}url/report'.format(VTAPI_URL).encode('utf8')
vtUrl = f'{VTAPI_URL}url/report'.encode('utf8')
headers = http_headers.Headers({'User-Agent': [COWRIE_USER_AGENT]})
fields = {'apikey': self.apiKey, 'resource': entry['url'], 'scan': 1, 'allinfo': 1}
body = StringProducer(urlencode(fields).encode("utf-8"))
@ -288,7 +287,7 @@ class Output(cowrie.core.output.Output):
d.addCallback(cbBody)
return d
else:
log.msg("VT Request failed: {} {}".format(response.code, response.phrase))
log.msg(f"VT Request failed: {response.code} {response.phrase}")
def cbBody(body):
"""
@ -311,7 +310,7 @@ class Output(cowrie.core.output.Output):
Extract the information we need from the body
"""
if self.debug:
log.msg("VT scanurl result: {}".format(result))
log.msg(f"VT scanurl result: {result}")
result = result.decode('utf8')
j = json.loads(result)
log.msg("VT: {}".format(j['verbose_msg']))
@ -361,7 +360,7 @@ class Output(cowrie.core.output.Output):
"""
Send a comment to VirusTotal with Twisted
"""
vtUrl = '{0}comments/put'.format(VTAPI_URL).encode('utf8')
vtUrl = f'{VTAPI_URL}comments/put'.encode('utf8')
parameters = {
"resource": resource,
"comment": self.commenttext,
@ -387,14 +386,14 @@ class Output(cowrie.core.output.Output):
d.addErrback(cbPartial)
return d
else:
log.msg("VT Request failed: {} {}".format(response.code, response.phrase))
log.msg(f"VT Request failed: {response.code} {response.phrase}")
def cbError(failure):
failure.printTraceback()
def processResult(result):
if self.debug:
log.msg("VT postcomment result: {}".format(result))
log.msg(f"VT postcomment result: {result}")
result = result.decode('utf8')
j = json.loads(result)
return j['response_code']
@ -411,7 +410,7 @@ class WebClientContextFactory(ClientContextFactory):
@implementer(IBodyProducer)
class StringProducer(object):
class StringProducer:
def __init__(self, body):
self.body = body

View File

@ -1,5 +1,3 @@
from __future__ import absolute_import, division
import json
import string
from random import choice
@ -37,7 +35,7 @@ class XMPPLoggerProtocol(muc.MUCClient):
self.join(self.jrooms, self.nick)
def joinedRoom(self, room):
log.msg('Joined room {}'.format(room.name))
log.msg(f'Joined room {room.name}')
def connectionMade(self):
log.msg('Connected!')

View File

@ -36,7 +36,7 @@ class PoolClient(Protocol):
self.transport.write(buf)
def send_vm_request(self, src_ip):
fmt = '!cH{0}s'.format(len(src_ip))
fmt = '!cH{}s'.format(len(src_ip))
buf = struct.pack(fmt, b'r', len(src_ip), src_ip.encode())
self.transport.write(buf)
@ -84,7 +84,7 @@ class PoolClient(Protocol):
ip_len = recv[0]
data = data[2:]
recv = struct.unpack('!{0}s'.format(ip_len), data[:ip_len])
recv = struct.unpack(f'!{ip_len}s', data[:ip_len])
honey_ip = recv[0]
data = data[ip_len:]
@ -99,7 +99,7 @@ class PoolClient(Protocol):
snaphsot_len = recv[0]
data = data[2:]
recv = struct.unpack('!{0}s'.format(snaphsot_len), data[:snaphsot_len])
recv = struct.unpack(f'!{snaphsot_len}s', data[:snaphsot_len])
snapshot = recv[0]
data = data[snaphsot_len:]

View File

@ -45,7 +45,7 @@ class PoolHandler:
client.send_initialisation()
def initial_pool_connection_error(self, reason):
log.err('Could not connect to VM pool: {0}'.format(reason.value))
log.err(f'Could not connect to VM pool: {reason.value}')
os._exit(1)
def initialisation_response(self, res_code):

View File

@ -2,7 +2,6 @@
# Copyright (c) 2017 Michel Oosterhof <michel@oosterhof.net>
# See the COPYRIGHT file for more information
from __future__ import absolute_import, division
from os import environ

View File

@ -1,7 +1,6 @@
# Copyright (c) 2009-2014 Upi Tamminen <desaster@gmail.com>
# See the COPYRIGHT file for more information
from __future__ import absolute_import, division
from twisted.conch import avatar
from twisted.conch.error import ConchError
@ -50,7 +49,7 @@ class CowrieUser(avatar.ConchUser):
self.channelLookup[b'direct-tcpip'] = forwarding.cowrieOpenConnectForwardingClient
def logout(self):
log.msg("avatar {} logging out".format(self.username))
log.msg(f"avatar {self.username} logging out")
def lookupChannel(self, channelType, windowSize, maxPacket, data):
"""
@ -58,7 +57,7 @@ class CowrieUser(avatar.ConchUser):
"""
klass = self.channelLookup.get(channelType, None)
if not klass:
raise ConchError(OPEN_UNKNOWN_CHANNEL_TYPE, "unknown channel: {}".format(channelType))
raise ConchError(OPEN_UNKNOWN_CHANNEL_TYPE, f"unknown channel: {channelType}")
else:
return klass(remoteWindow=windowSize,
remoteMaxPacket=maxPacket,

View File

@ -5,7 +5,6 @@
This module contains code to run a command
"""
from __future__ import absolute_import, division
import os
import re
@ -26,7 +25,7 @@ else:
from cowrie.shell import shlex
class HoneyPotCommand(object):
class HoneyPotCommand:
"""
This is the super class for all commands in cowrie/commands
"""
@ -109,7 +108,7 @@ class HoneyPotCommand(object):
for arg in args:
path = self.fs.resolve_path(arg, self.protocol.cwd)
if self.fs.isdir(path):
self.errorWrite("{}: error reading `{}': Is a directory\n".format(application, arg))
self.errorWrite(f"{application}: error reading `{arg}': Is a directory\n")
continue
files.append(path)
return files
@ -162,7 +161,7 @@ class HoneyPotCommand(object):
self.exit()
def lineReceived(self, line):
log.msg('QUEUED INPUT: {}'.format(line))
log.msg(f'QUEUED INPUT: {line}')
# FIXME: naive command parsing, see lineReceived below
# line = "".join(line)
self.protocol.cmdstack[0].cmdpending.append(shlex.split(line, posix=True))

View File

@ -1,5 +1,3 @@
from __future__ import absolute_import, division
import argparse
@ -35,23 +33,23 @@ class CustomParser(argparse.ArgumentParser):
conflict_handler='error',
add_help=True):
self.protocol = protocol
super(CustomParser, self).__init__(prog=prog,
usage=usage,
description=description,
epilog=epilog,
parents=parents,
formatter_class=formatter_class,
prefix_chars=prefix_chars,
fromfile_prefix_chars=fromfile_prefix_chars,
argument_default=argument_default,
conflict_handler=conflict_handler,
add_help=add_help)
super().__init__(prog=prog,
usage=usage,
description=description,
epilog=epilog,
parents=parents,
formatter_class=formatter_class,
prefix_chars=prefix_chars,
fromfile_prefix_chars=fromfile_prefix_chars,
argument_default=argument_default,
conflict_handler=conflict_handler,
add_help=add_help)
def exit(self, status=0, message=None):
raise ExitException("Exiting...")
def _print_message(self, message, file=None):
super(CustomParser, self)._print_message(message, self.protocol)
super()._print_message(message, self.protocol)
def error(self, message):
self.print_usage(self.protocol)

View File

@ -5,7 +5,6 @@
This module contains ...
"""
from __future__ import absolute_import, division
import os
@ -24,7 +23,7 @@ from cowrie.core.config import CowrieConfig
@implementer(ISFTPFile)
class CowrieSFTPFile(object):
class CowrieSFTPFile:
"""
SFTPTFile
"""
@ -88,7 +87,7 @@ class CowrieSFTPFile(object):
raise NotImplementedError
class CowrieSFTPDirectory(object):
class CowrieSFTPDirectory:
def __init__(self, server, directory):
self.server = server
@ -143,7 +142,7 @@ class CowrieSFTPDirectory(object):
@implementer(ISFTPServer)
class SFTPServerForCowrieUser(object):
class SFTPServerForCowrieUser:
def __init__(self, avatar):
self.avatar = avatar
@ -176,33 +175,33 @@ class SFTPServerForCowrieUser(object):
return {}
def openFile(self, filename, flags, attrs):
log.msg("SFTP openFile: {}".format(filename))
log.msg(f"SFTP openFile: {filename}")
return CowrieSFTPFile(self, self._absPath(filename), flags, attrs)
def removeFile(self, filename):
log.msg("SFTP removeFile: {}".format(filename))
log.msg(f"SFTP removeFile: {filename}")
return self.fs.remove(self._absPath(filename))
def renameFile(self, oldpath, newpath):
log.msg("SFTP renameFile: {} {}".format(oldpath, newpath))
log.msg(f"SFTP renameFile: {oldpath} {newpath}")
return self.fs.rename(self._absPath(oldpath), self._absPath(newpath))
def makeDirectory(self, path, attrs):
log.msg("SFTP makeDirectory: {}".format(path))
log.msg(f"SFTP makeDirectory: {path}")
path = self._absPath(path)
self.fs.mkdir2(path)
self._setAttrs(path, attrs)
def removeDirectory(self, path):
log.msg("SFTP removeDirectory: {}".format(path))
log.msg(f"SFTP removeDirectory: {path}")
return self.fs.rmdir(self._absPath(path))
def openDirectory(self, path):
log.msg("SFTP OpenDirectory: {}".format(path))
log.msg(f"SFTP OpenDirectory: {path}")
return CowrieSFTPDirectory(self, self._absPath(path))
def getAttrs(self, path, followLinks):
log.msg("SFTP getAttrs: {}".format(path))
log.msg(f"SFTP getAttrs: {path}")
path = self._absPath(path)
if followLinks:
s = self.fs.stat(path)
@ -211,17 +210,17 @@ class SFTPServerForCowrieUser(object):
return self._getAttrs(s)
def setAttrs(self, path, attrs):
log.msg("SFTP setAttrs: {}".format(path))
log.msg(f"SFTP setAttrs: {path}")
path = self._absPath(path)
return self._setAttrs(path, attrs)
def readLink(self, path):
log.msg("SFTP readLink: {}".format(path))
log.msg(f"SFTP readLink: {path}")
path = self._absPath(path)
return self.fs.readlink(path)
def makeLink(self, linkPath, targetPath):
log.msg("SFTP makeLink: {} {}".format(linkPath, targetPath))
log.msg(f"SFTP makeLink: {linkPath} {targetPath}")
linkPath = self._absPath(linkPath)
targetPath = self._absPath(targetPath)
return self.fs.symlink(targetPath, linkPath)

View File

@ -1,7 +1,6 @@
# Copyright (c) 2009-2014 Upi Tamminen <desaster@gmail.com>
# See the COPYRIGHT file for more information
from __future__ import absolute_import, division
try:
import cPickle as pickle
@ -70,7 +69,7 @@ class PermissionDenied(Exception):
pass
class HoneyPotFilesystem(object):
class HoneyPotFilesystem:
def __init__(self, fs, arch, home):
@ -144,7 +143,7 @@ class HoneyPotFilesystem(object):
continue
cwd.append(piece)
return '/%s' % ('/'.join(cwd),)
return '/{}'.format('/'.join(cwd))
def resolve_path_wc(self, path, cwd):
"""
@ -160,7 +159,7 @@ class HoneyPotFilesystem(object):
def foo(p, cwd):
if not len(p):
found.append('/%s' % ('/'.join(cwd),))
found.append('/{}'.format('/'.join(cwd)))
elif p[0] == '.':
foo(p[1:], cwd)
elif p[0] == '..':
@ -374,7 +373,7 @@ class HoneyPotFilesystem(object):
if openFlags & os.O_WRONLY == os.O_WRONLY or openFlags & os.O_RDWR == os.O_RDWR:
# strip executable bit
hostmode = mode & ~(111)
hostfile = '%s/%s_sftp_%s' % (
hostfile = '{}/{}_sftp_{}'.format(
CowrieConfig().get('honeypot', 'download_path'),
time.strftime('%Y%m%d-%H%M%S'),
re.sub('[^A-Za-z0-9]', '_', filename)
@ -537,7 +536,7 @@ class HoneyPotFilesystem(object):
f[A_SIZE] = size
class _statobj(object):
class _statobj:
"""
Transform a tuple into a stat object
"""

View File

@ -1,7 +1,6 @@
# Copyright (c) 2009-2014 Upi Tamminen <desaster@gmail.com>
# See the COPYRIGHT file for more information
from __future__ import absolute_import, division
import copy
import os
@ -22,7 +21,7 @@ else:
from cowrie.shell import shlex
class HoneyPotShell(object):
class HoneyPotShell:
def __init__(self, protocol, interactive=True, redirect=False):
self.protocol = protocol
@ -51,27 +50,27 @@ class HoneyPotShell(object):
if tok == self.lexer.eof:
if tokens:
self.cmdpending.append((tokens))
self.cmdpending.append(tokens)
break
# For now, treat && and || same as ;, just execute without checking return code
if tok == '&&' or tok == '||':
if tokens:
self.cmdpending.append((tokens))
self.cmdpending.append(tokens)
tokens = []
continue
else:
self.protocol.terminal.write(
'-bash: syntax error near unexpected token `{}\'\n'.format(tok).encode('utf8'))
f'-bash: syntax error near unexpected token `{tok}\'\n'.encode('utf8'))
break
elif tok == ';':
if tokens:
self.cmdpending.append((tokens))
self.cmdpending.append(tokens)
tokens = []
continue
else:
self.protocol.terminal.write(
'-bash: syntax error near unexpected token `{}\'\n'.format(tok).encode('utf8'))
f'-bash: syntax error near unexpected token `{tok}\'\n'.encode('utf8'))
break
elif tok == '$?':
tok = "0"
@ -105,7 +104,7 @@ class HoneyPotShell(object):
self.protocol.terminal.write(
b'-bash: syntax error: unexpected end of file\n')
# Could run runCommand here, but i'll just clear the list instead
log.msg("exception: {}".format(e))
log.msg(f"exception: {e}")
self.cmdpending = []
self.showPrompt()
return
@ -324,7 +323,7 @@ class HoneyPotShell(object):
# Example: [root@svr03 ~]# (More of a "CentOS" feel)
# Example: root@svr03:~# (More of a "Debian" feel)
prompt = '{0}@{1}:{2}'.format(self.protocol.user.username, self.protocol.hostname, cwd)
prompt = f'{self.protocol.user.username}@{self.protocol.hostname}:{cwd}'
if not self.protocol.user.uid:
prompt += '# ' # "Root" user
else:
@ -402,7 +401,7 @@ class HoneyPotShell(object):
newbuf = ''
if len(files) == 1:
newbuf = ' '.join(line.decode('utf8').split()[:-1] + ['%s%s' % (basedir, files[0][fs.A_NAME])])
newbuf = ' '.join(line.decode('utf8').split()[:-1] + ['{}{}'.format(basedir, files[0][fs.A_NAME])])
if files[0][fs.A_TYPE] == fs.T_DIR:
newbuf += '/'
else:
@ -414,7 +413,7 @@ class HoneyPotShell(object):
else:
prefix = ''
first = line.decode('utf8').split(' ')[:-1]
newbuf = ' '.join(first + ['%s%s' % (basedir, prefix)])
newbuf = ' '.join(first + [f'{basedir}{prefix}'])
newbuf = newbuf.encode('utf8')
if newbuf == b''.join(self.protocol.lineBuffer):
self.protocol.terminal.write(b'\n')
@ -435,7 +434,7 @@ class HoneyPotShell(object):
self.protocol.terminal.write(newbuf)
class StdOutStdErrEmulationProtocol(object):
class StdOutStdErrEmulationProtocol:
"""
Pipe support written by Dave Germiquet
Support for commands chaining added by Ivan Korolev (@fe7ch)

View File

@ -2,7 +2,6 @@
# Copyright (c) 2009-2014 Upi Tamminen <desaster@gmail.com>
# See the COPYRIGHT file for more information
from __future__ import absolute_import, division
import os
import socket
@ -29,7 +28,7 @@ class HoneyPotBaseProtocol(insults.TerminalProtocol, TimeoutMixin):
commands = {}
for c in cowrie.commands.__all__:
try:
module = __import__('cowrie.commands.%s' % (c,),
module = __import__(f'cowrie.commands.{c}',
globals(), locals(), ['commands'])
commands.update(module.commands)
except Exception as e:
@ -131,8 +130,8 @@ class HoneyPotBaseProtocol(insults.TerminalProtocol, TimeoutMixin):
def txtcmd(self, txt):
class command_txtcmd(command.HoneyPotCommand):
def call(self):
log.msg('Reading txtcmd from "{}"'.format(txt))
with open(txt, 'r') as f:
log.msg(f'Reading txtcmd from "{txt}"')
with open(txt) as f:
self.write(f.read())
return command_txtcmd
@ -155,7 +154,7 @@ class HoneyPotBaseProtocol(insults.TerminalProtocol, TimeoutMixin):
return None
else:
for i in [
'%s/%s' % (self.fs.resolve_path(x, self.cwd), cmd)
'{}/{}'.format(self.fs.resolve_path(x, self.cwd), cmd)
for x in paths
]:
if self.fs.exists(i):
@ -169,7 +168,7 @@ class HoneyPotBaseProtocol(insults.TerminalProtocol, TimeoutMixin):
if path in self.commands:
return self.commands[path]
log.msg("Can't find command {}".format(cmd))
log.msg(f"Can't find command {cmd}")
return None
def lineReceived(self, line):
@ -183,7 +182,7 @@ class HoneyPotBaseProtocol(insults.TerminalProtocol, TimeoutMixin):
if len(self.cmdstack):
self.cmdstack[-1].lineReceived(line)
else:
log.msg("discarding input {}".format(line))
log.msg(f"discarding input {line}")
def call_command(self, pp, cmd, *args):
self.pp = pp

View File

@ -26,7 +26,6 @@
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
# SUCH DAMAGE.
from __future__ import absolute_import, division
from binascii import crc32
from random import randint, seed
@ -36,13 +35,13 @@ from twisted.python import log
from cowrie.core.config import CowrieConfig
class Passwd(object):
class Passwd:
"""
This class contains code to handle the users and their properties in
/etc/passwd. Note that contrary to the name, it does not handle any
passwords.
"""
passwd_file = '%s/etc/passwd' % (CowrieConfig().get('honeypot', 'contents_path'),)
passwd_file = '{}/etc/passwd'.format(CowrieConfig().get('honeypot', 'contents_path'))
def __init__(self):
self.load()
@ -52,7 +51,7 @@ class Passwd(object):
Load /etc/passwd
"""
self.passwd = []
with open(self.passwd_file, 'r') as f:
with open(self.passwd_file) as f:
while True:
rawline = f.readline()
if not rawline:
@ -138,12 +137,12 @@ class Passwd(object):
return e
class Group(object):
class Group:
"""
This class contains code to handle the groups and their properties in
/etc/group.
"""
group_file = '%s/etc/group' % (CowrieConfig().get('honeypot', 'contents_path'),)
group_file = '{}/etc/group'.format(CowrieConfig().get('honeypot', 'contents_path'))
def __init__(self):
self.load()
@ -153,7 +152,7 @@ class Group(object):
Load /etc/group
"""
self.group = []
with open(self.group_file, 'r') as f:
with open(self.group_file) as f:
while True:
rawline = f.readline()
if not rawline:

View File

@ -26,7 +26,6 @@
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
# SUCH DAMAGE.
from __future__ import absolute_import, division
import json
import random
@ -38,7 +37,7 @@ from cowrie.core.config import CowrieConfig
from cowrie.shell import fs
class CowrieServer(object):
class CowrieServer:
"""
In traditional Kippo each connection gets its own simulated machine.
This is not always ideal, sometimes two connections come from the same
@ -59,7 +58,7 @@ class CowrieServer(object):
except NoOptionError:
self.arch = 'linux-x64-lsb'
log.msg("Initialized emulated server as architecture: {}".format(self.arch))
log.msg(f"Initialized emulated server as architecture: {self.arch}")
def getCommandOutput(self, file):
"""

Some files were not shown because too many files have changed in this diff Show More