issue #307: require partial line when matching interactive prompt.

This is a best-effort attempt to avoid SSHd banner spam from breaking
our password entry loop.

Closes #307.
This commit is contained in:
David Wilson 2018-07-17 20:41:00 +01:00
parent 54a93f3c46
commit 830a133ad6
2 changed files with 33 additions and 3 deletions

View File

@ -61,7 +61,18 @@ def filter_debug(stream, it):
This contains the mess of dealing with both line-oriented input, and partial
lines such as the password prompt.
Yields `(line, partial)` tuples, where `line` is the line, `partial` is
:data:`True` if no terminating newline character was present and no more
data exists in the read buffer. Consuming code can use this to unreliably
detect the presence of an interactive prompt.
"""
# The `partial` test is unreliable, but is only problematic when verbosity
# is enabled: it's possible for a combination of SSH banner, password
# prompt, verbose output, timing and OS buffering specifics to create a
# situation where an otherwise newline-terminated line appears to not be
# terminated, due to a partial read(). If something is broken when
# ssh_debug_level>0, this is the first place to look.
state = 'start_of_line'
buf = b('')
for chunk in it:
@ -86,7 +97,7 @@ def filter_debug(stream, it):
state = 'start_of_line'
elif state == 'in_plain':
line, nl, buf = buf.partition(b('\n'))
yield line + nl
yield line + nl, not (nl or buf)
if nl:
state = 'start_of_line'
@ -237,7 +248,7 @@ class Stream(mitogen.parent.Stream):
deadline=self.connect_deadline
)
for buf in filter_debug(self, it):
for buf, partial in filter_debug(self, it):
LOG.debug('%r: received %r', self, buf)
if buf.endswith(self.EC0_MARKER):
self._router.broker.start_receive(self.tty_stream)
@ -255,7 +266,7 @@ class Stream(mitogen.parent.Stream):
raise PasswordError(self.password_incorrect_msg)
else:
raise PasswordError(self.auth_incorrect_msg)
elif PASSWORD_PROMPT in buf.lower():
elif partial and PASSWORD_PROMPT in buf.lower():
if self.password is None:
raise PasswordError(self.password_required_msg)
LOG.debug('%r: sending password', self)

View File

@ -105,5 +105,24 @@ class SshTest(testlib.DockerMixin, unittest2.TestCase):
)
class BannerTest(testlib.DockerMixin, unittest2.TestCase):
# Verify the ability to disambiguate random spam appearing in the SSHd's
# login banner from a legitimate password prompt.
stream_class = mitogen.ssh.Stream
def test_verbose_enabled(self):
context = self.docker_ssh(
username='mitogen__has_sudo',
password='has_sudo_password',
ssh_debug_level=3,
)
name = 'ssh.%s:%s' % (
self.dockerized_ssh.get_host(),
self.dockerized_ssh.port,
)
self.assertEquals(name, context.name)
if __name__ == '__main__':
unittest2.main()