""" This inline script allows conditional TLS Interception based on a user-defined strategy. Example: > mitmdump -s tls_passthrough.py 1. curl --proxy http://localhost:8080 https://example.com --insecure // works - we'll also see the contents in mitmproxy 2. curl --proxy http://localhost:8080 https://example.com --insecure // still works - we'll also see the contents in mitmproxy 3. curl --proxy http://localhost:8080 https://example.com // fails with a certificate error, which we will also see in mitmproxy 4. curl --proxy http://localhost:8080 https://example.com // works again, but mitmproxy does not intercept and we do *not* see the contents Authors: Maximilian Hils, Matthew Tuusberg """ from __future__ import absolute_import, print_function, division import collections import random import sys from enum import Enum import mitmproxy from mitmproxy.exceptions import TlsProtocolException from mitmproxy.protocol import TlsLayer, RawTCPLayer class InterceptionResult(Enum): success = True failure = False skipped = None class _TlsStrategy(): """ Abstract base class for interception strategies. """ def __init__(self): # A server_address -> interception results mapping self.history = collections.defaultdict(lambda: collections.deque(maxlen=200)) def should_intercept(self, server_address): """ Returns: True, if we should attempt to intercept the connection. False, if we want to employ pass-through instead. """ raise NotImplementedError() def record_success(self, server_address): self.history[server_address].append(InterceptionResult.success) def record_failure(self, server_address): self.history[server_address].append(InterceptionResult.failure) def record_skipped(self, server_address): self.history[server_address].append(InterceptionResult.skipped) class ConservativeStrategy(_TlsStrategy): """ Conservative Interception Strategy - only intercept if there haven't been any failed attempts in the history. """ def should_intercept(self, server_address): if InterceptionResult.failure in self.history[server_address]: return False return True class ProbabilisticStrategy(_TlsStrategy): """ Fixed probability that we intercept a given connection. """ def __init__(self, p): self.p = p super(ProbabilisticStrategy, self).__init__() def should_intercept(self, server_address): return random.uniform(0, 1) < self.p class TlsFeedback(TlsLayer): """ Monkey-patch _establish_tls_with_client to get feedback if TLS could be established successfully on the client connection (which may fail due to cert pinning). """ def _establish_tls_with_client(self): server_address = self.server_conn.address try: super(TlsFeedback, self)._establish_tls_with_client() except TlsProtocolException as e: tls_strategy.record_failure(server_address) raise e else: tls_strategy.record_success(server_address) # inline script hooks below. tls_strategy = None def start(): global tls_strategy if len(sys.argv) == 2: tls_strategy = ProbabilisticStrategy(float(sys.argv[1])) else: tls_strategy = ConservativeStrategy() def next_layer(next_layer): """ This hook does the actual magic - if the next layer is planned to be a TLS layer, we check if we want to enter pass-through mode instead. """ if isinstance(next_layer, TlsLayer) and next_layer._client_tls: server_address = next_layer.server_conn.address if tls_strategy.should_intercept(server_address): # We try to intercept. # Monkey-Patch the layer to get feedback from the TLSLayer if interception worked. next_layer.__class__ = TlsFeedback else: # We don't intercept - reply with a pass-through layer and add a "skipped" entry. mitmproxy.ctx.log("TLS passthrough for %s" % repr(next_layer.server_conn.address), "info") next_layer_replacement = RawTCPLayer(next_layer.ctx, ignore=True) next_layer.reply.send(next_layer_replacement) tls_strategy.record_skipped(server_address)