mirror of https://github.com/cowrie/cowrie.git
* Command substitution with backticks
This commit is contained in:
parent
fefd994a2c
commit
322e304993
|
@ -40,7 +40,7 @@ class HoneyPotShell(object):
|
|||
log.msg(eventid='cowrie.command.input', input=line, format='CMD: %(input)s')
|
||||
self.lexer = shlex.shlex(instream=line, punctuation_chars=True, posix=True)
|
||||
# Add these special characters that are not in the default lexer
|
||||
self.lexer.wordchars += '@%{}=$:+^,()'
|
||||
self.lexer.wordchars += '@%{}=$:+^,()`'
|
||||
|
||||
tokens = []
|
||||
|
||||
|
@ -79,7 +79,7 @@ class HoneyPotShell(object):
|
|||
cmd = self.do_command_substitution(tok)
|
||||
tokens = cmd.split()
|
||||
continue
|
||||
elif '$(' in tok:
|
||||
elif '$(' in tok or '`' in tok:
|
||||
tok = self.do_command_substitution(tok)
|
||||
elif tok.startswith('${'):
|
||||
envRex = re.compile(r'^\$([_a-zA-Z0-9]+)$')
|
||||
|
@ -118,62 +118,74 @@ class HoneyPotShell(object):
|
|||
def do_command_substitution(self, start_tok):
|
||||
if start_tok[0] == '(':
|
||||
# start parsing the (...) expression
|
||||
dollar_expr = start_tok
|
||||
cmd_expr = start_tok
|
||||
pos = 1
|
||||
else:
|
||||
elif '$(' in start_tok:
|
||||
# split the first token to prefix and $(... part
|
||||
dollar_pos = start_tok.index('$(')
|
||||
result = start_tok[:dollar_pos]
|
||||
dollar_expr = start_tok[dollar_pos:]
|
||||
cmd_expr = start_tok[dollar_pos:]
|
||||
pos = 2
|
||||
elif '`' in start_tok:
|
||||
# split the first token to prefix and `... part
|
||||
backtick_pos = start_tok.index('`')
|
||||
result = start_tok[:backtick_pos]
|
||||
cmd_expr = start_tok[backtick_pos:]
|
||||
pos = 1
|
||||
opening_count = 1
|
||||
closing_count = 0
|
||||
|
||||
# parse the remaining tokens and execute $(...) parts when found
|
||||
# parse the remaining tokens and execute subshells
|
||||
while opening_count > closing_count:
|
||||
if dollar_expr[pos:pos + 2] == '$(':
|
||||
opening_count += 1
|
||||
pos += 2
|
||||
elif dollar_expr[pos] == ')':
|
||||
if cmd_expr[pos] in (')', '`'):
|
||||
# found an end of $(...) or `...`
|
||||
closing_count += 1
|
||||
if opening_count == closing_count:
|
||||
|
||||
if dollar_expr[0] == '(':
|
||||
if cmd_expr[0] == '(':
|
||||
# return the command in () without executing it
|
||||
result = dollar_expr[1:pos]
|
||||
result = cmd_expr[1:pos]
|
||||
else:
|
||||
# execute the command in $() and retrieve the output
|
||||
cmd = dollar_expr[2:pos]
|
||||
# instantiate new shell with redirect output
|
||||
self.protocol.cmdstack.append(HoneyPotShell(self.protocol, interactive=False, redirect=True))
|
||||
# call lineReceived method that indicates that we have some commands to parse
|
||||
self.protocol.cmdstack[-1].lineReceived(cmd)
|
||||
# remove the shell
|
||||
res = self.protocol.cmdstack.pop()
|
||||
result += res.protocol.pp.redirected_data.decode()[:-1]
|
||||
# execute the command in $() or `` or () and return the output
|
||||
result += self.run_subshell_command(cmd_expr[:pos+1])
|
||||
|
||||
if pos < len(dollar_expr) - 1:
|
||||
dollar_expr = dollar_expr[pos + 1:]
|
||||
if '$(' in dollar_expr:
|
||||
dollar_pos = dollar_expr.index('$(')
|
||||
result += dollar_expr[:dollar_pos]
|
||||
dollar_expr = dollar_expr[dollar_pos:]
|
||||
opening_count = 1
|
||||
closing_count = 0
|
||||
pos = 1
|
||||
# check whether there are more command substitutions remaining
|
||||
if pos < len(cmd_expr) - 1:
|
||||
remainder = cmd_expr[pos+1:]
|
||||
if '$(' in remainder or '`' in remainder:
|
||||
result = self.do_command_substitution(result + remainder)
|
||||
else:
|
||||
result += dollar_expr
|
||||
pos += 1
|
||||
result += remainder
|
||||
else:
|
||||
pos += 1
|
||||
elif cmd_expr[pos:pos+2] == '$(':
|
||||
# found a new $(...) expression
|
||||
opening_count += 1
|
||||
pos += 2
|
||||
else:
|
||||
if opening_count > closing_count and pos == len(dollar_expr) - 1:
|
||||
if opening_count > closing_count and pos == len(cmd_expr) - 1:
|
||||
tok = self.lexer.get_token()
|
||||
dollar_expr = dollar_expr + ' ' + tok
|
||||
cmd_expr = cmd_expr + ' ' + tok
|
||||
elif opening_count == closing_count:
|
||||
result += dollar_expr[pos]
|
||||
result += cmd_expr[pos]
|
||||
pos += 1
|
||||
|
||||
return result
|
||||
|
||||
def run_subshell_command(self, cmd_expr):
|
||||
# extract the command from $(...) or `...` or (...) expression
|
||||
if cmd_expr.startswith("$("):
|
||||
cmd = cmd_expr[2:-1]
|
||||
else:
|
||||
cmd = cmd_expr[1:-1]
|
||||
|
||||
# instantiate new shell with redirect output
|
||||
self.protocol.cmdstack.append(HoneyPotShell(self.protocol, interactive=False, redirect=True))
|
||||
# call lineReceived method that indicates that we have some commands to parse
|
||||
self.protocol.cmdstack[-1].lineReceived(cmd)
|
||||
# remove the shell
|
||||
res = self.protocol.cmdstack.pop()
|
||||
return res.protocol.pp.redirected_data.decode()[:-1]
|
||||
|
||||
def runCommand(self):
|
||||
pp = None
|
||||
|
||||
|
|
|
@ -180,10 +180,38 @@ class ShellEchoCommandTests(unittest.TestCase):
|
|||
|
||||
def test_echo_command_022(self):
|
||||
"""
|
||||
(echo test)
|
||||
echo test; (echo test)
|
||||
"""
|
||||
self.proto.lineReceived(b'(echo test)')
|
||||
self.proto.lineReceived(b'echo test; (echo test)')
|
||||
self.assertEquals(self.tr.value(), b'test\ntest\n' + PROMPT)
|
||||
|
||||
def test_echo_command_023(self):
|
||||
"""
|
||||
echo `echo test`
|
||||
"""
|
||||
self.proto.lineReceived(b'echo `echo test`')
|
||||
self.assertEquals(self.tr.value(), b'test\n' + PROMPT)
|
||||
|
||||
def test_echo_command_024(self):
|
||||
"""
|
||||
echo test_`echo test`_test
|
||||
"""
|
||||
self.proto.lineReceived(b'echo test_`echo test`_test')
|
||||
self.assertEquals(self.tr.value(), b'test_test_test\n' + PROMPT)
|
||||
|
||||
def test_echo_command_025(self):
|
||||
"""
|
||||
echo test_`echo test`_test_`echo test`_test
|
||||
"""
|
||||
self.proto.lineReceived(b'echo test_`echo test`_test_`echo test`_test')
|
||||
self.assertEquals(self.tr.value(), b'test_test_test_test_test\n' + PROMPT)
|
||||
|
||||
def test_echo_command_026(self):
|
||||
"""
|
||||
echo "TEST1: `echo test1`, TEST2: `echo test2`"
|
||||
"""
|
||||
self.proto.lineReceived(b'echo "TEST1: `echo test1`, TEST2: `echo test2`"')
|
||||
self.assertEquals(self.tr.value(), b'TEST1: test1, TEST2: test2\n' + PROMPT)
|
||||
|
||||
def tearDown(self):
|
||||
self.proto.connectionLost("tearDown From Unit Test")
|
||||
|
|
Loading…
Reference in New Issue