From e63ef3a520075407f1a9d0584fb228d318834244 Mon Sep 17 00:00:00 2001 From: Maximilian Hils Date: Tue, 8 Sep 2015 21:35:15 +0200 Subject: [PATCH] add inline script for conditional tls passthrough fix #646 --- examples/README | 4 ++ examples/tls_passthrough.py | 136 ++++++++++++++++++++++++++++++++++++ 2 files changed, 140 insertions(+) create mode 100644 examples/tls_passthrough.py diff --git a/examples/README b/examples/README index adfcd0f25..b4dec8e56 100644 --- a/examples/README +++ b/examples/README @@ -1,3 +1,7 @@ +Some inline scripts may require additional dependencies, which can be installed using +`pip install mitmproxy[examples]`. + + # inline script examples add_header.py Simple script that just adds a header to every request. change_upstream_proxy.py Dynamically change the upstream proxy diff --git a/examples/tls_passthrough.py b/examples/tls_passthrough.py new file mode 100644 index 000000000..7b4dec622 --- /dev/null +++ b/examples/tls_passthrough.py @@ -0,0 +1,136 @@ +""" +This inline script allows conditional TLS Interception based +on a user-defined strategy. + +Example: + + > mitmdump -s tls_passthrough.py + + 1. curl --proxy http://localhost:8080 https://example.com --insecure + // works - we'll also see the contents in mitmproxy + + 2. curl --proxy http://localhost:8080 https://example.com --insecure + // still works - we'll also see the contents in mitmproxy + + 3. curl --proxy http://localhost:8080 https://example.com + // fails with a certificate error, which we will also see in mitmproxy + + 4. curl --proxy http://localhost:8080 https://example.com + // works again, but mitmproxy does not intercept and we do *not* see the contents + +Authors: Maximilian Hils, Matthew Tuusberg +""" +from __future__ import (absolute_import, print_function, division) +import collections +import random + +from enum import Enum + +from libmproxy.exceptions import TlsException +from libmproxy.protocol import TlsLayer, RawTCPLayer + + +class InterceptionResult(Enum): + success = True + failure = False + skipped = None + + +class _TlsStrategy(object): + """ + Abstract base class for interception strategies. + """ + def __init__(self): + # A server_address -> interception results mapping + self.history = collections.defaultdict(lambda: collections.deque(maxlen=200)) + + def should_intercept(self, server_address): + """ + Returns: + True, if we should attempt to intercept the connection. + False, if we want to employ pass-through instead. + """ + raise NotImplementedError() + + def record_success(self, server_address): + self.history[server_address].append(InterceptionResult.success) + + def record_failure(self, server_address): + self.history[server_address].append(InterceptionResult.failure) + + def record_skipped(self, server_address): + self.history[server_address].append(InterceptionResult.skipped) + + +class ConservativeStrategy(_TlsStrategy): + """ + Conservative Interception Strategy - only intercept if there haven't been any failed attempts + in the history. + """ + + def should_intercept(self, server_address): + if InterceptionResult.failure in self.history[server_address]: + return False + return True + + +class ProbabilisticStrategy(_TlsStrategy): + """ + Fixed probability that we intercept a given connection. + """ + def __init__(self, p): + self.p = p + super(ProbabilisticStrategy, self).__init__() + + def should_intercept(self, server_address): + return random.uniform(0, 1) < self.p + + +class TlsFeedback(TlsLayer): + """ + Monkey-patch _establish_tls_with_client to get feedback if TLS could be established + successfully on the client connection (which may fail due to cert pinning). + """ + + def _establish_tls_with_client(self): + server_address = self.server_conn.address + tls_strategy = self.script_context.tls_strategy + + try: + super(TlsFeedback, self)._establish_tls_with_client() + except TlsException as e: + tls_strategy.record_failure(server_address) + raise e + else: + tls_strategy.record_success(server_address) + + +# inline script hooks below. + + +def start(context, argv): + if len(argv) == 2: + context.tls_strategy = ProbabilisticStrategy(float(argv[1])) + else: + context.tls_strategy = ConservativeStrategy() + + +def next_layer(context, next_layer): + """ + This hook does the actual magic - if the next layer is planned to be a TLS layer, + we check if we want to enter pass-through mode instead. + """ + if isinstance(next_layer, TlsLayer) and next_layer._client_tls: + server_address = next_layer.server_conn.address + + if context.tls_strategy.should_intercept(server_address): + # We try to intercept. + # Monkey-Patch the layer to get feedback from the TLSLayer if interception worked. + next_layer.__class__ = TlsFeedback + next_layer.script_context = context + else: + # We don't intercept - reply with a pass-through layer and add a "skipped" entry. + context.log("TLS passthrough for %s" % repr(next_layer.server_conn.address), "info") + next_layer_replacement = RawTCPLayer(next_layer.ctx, logging=False) + next_layer.reply(next_layer_replacement) + context.tls_strategy.record_skipped(server_address)