From 7c82418e0baca311487230074655f5f106bcdd2b Mon Sep 17 00:00:00 2001 From: Aldo Cortesi Date: Tue, 4 Mar 2014 14:12:58 +1300 Subject: [PATCH] Beef up CertStore, add DH params. --- netlib/certutils.py | 151 ++++++++++++++++++++++------------------- test/test_certutils.py | 39 ++++------- 2 files changed, 96 insertions(+), 94 deletions(-) diff --git a/netlib/certutils.py b/netlib/certutils.py index 0b29d52f3..b9c291d09 100644 --- a/netlib/certutils.py +++ b/netlib/certutils.py @@ -5,23 +5,27 @@ from pyasn1.error import PyAsn1Error import OpenSSL import tcp -default_exp = 62208000 # =24 * 60 * 60 * 720 -default_o = "mitmproxy" -default_cn = "mitmproxy" +DEFAULT_EXP = 62208000 # =24 * 60 * 60 * 720 +# Generated with "openssl dhparam". It's too slow to generate this on startup. +DEFAULT_DHPARAM = """-----BEGIN DH PARAMETERS----- +MIGHAoGBAOdPzMbYgoYfO3YBYauCLRlE8X1XypTiAjoeCFD0qWRx8YUsZ6Sj20W5 +zsfQxlZfKovo3f2MftjkDkbI/C/tDgxoe0ZPbjy5CjdOhkzxn0oTbKTs16Rw8DyK +1LjTR65sQJkJEdgsX8TSi/cicCftJZl9CaZEaObF2bdgSgGK+PezAgEC +-----END DH PARAMETERS-----""" -def create_ca(o=default_o, cn=default_cn, exp=default_exp): +def create_ca(o, cn, exp): key = OpenSSL.crypto.PKey() key.generate_key(OpenSSL.crypto.TYPE_RSA, 1024) - ca = OpenSSL.crypto.X509() - ca.set_serial_number(int(time.time()*10000)) - ca.set_version(2) - ca.get_subject().CN = cn - ca.get_subject().O = o - ca.gmtime_adj_notBefore(0) - ca.gmtime_adj_notAfter(exp) - ca.set_issuer(ca.get_subject()) - ca.set_pubkey(key) - ca.add_extensions([ + cert = OpenSSL.crypto.X509() + cert.set_serial_number(int(time.time()*10000)) + cert.set_version(2) + cert.get_subject().CN = cn + cert.get_subject().O = o + cert.gmtime_adj_notBefore(0) + cert.gmtime_adj_notAfter(exp) + cert.set_issuer(cert.get_subject()) + cert.set_pubkey(key) + cert.add_extensions([ OpenSSL.crypto.X509Extension("basicConstraints", True, "CA:TRUE"), OpenSSL.crypto.X509Extension("nsCertType", True, @@ -32,80 +36,39 @@ def create_ca(o=default_o, cn=default_cn, exp=default_exp): OpenSSL.crypto.X509Extension("keyUsage", False, "keyCertSign, cRLSign"), OpenSSL.crypto.X509Extension("subjectKeyIdentifier", False, "hash", - subject=ca), + subject=cert), ]) - ca.sign(key, "sha1") - return key, ca + cert.sign(key, "sha1") + return key, cert -def dummy_ca(path, o=default_o, cn=default_cn, exp=default_exp): - dirname = os.path.dirname(path) - if not os.path.exists(dirname): - os.makedirs(dirname) - if path.endswith(".pem"): - basename, _ = os.path.splitext(path) - basename = os.path.basename(basename) - else: - basename = os.path.basename(path) - - key, ca = create_ca(o=o, cn=cn, exp=exp) - - # Dump the CA plus private key - f = open(path, "wb") - f.write(OpenSSL.crypto.dump_privatekey(OpenSSL.crypto.FILETYPE_PEM, key)) - f.write(OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, ca)) - f.close() - - # Dump the certificate in PEM format - f = open(os.path.join(dirname, basename + "-cert.pem"), "wb") - f.write(OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, ca)) - f.close() - - # Create a .cer file with the same contents for Android - f = open(os.path.join(dirname, basename + "-cert.cer"), "wb") - f.write(OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, ca)) - f.close() - - # Dump the certificate in PKCS12 format for Windows devices - f = open(os.path.join(dirname, basename + "-cert.p12"), "wb") - p12 = OpenSSL.crypto.PKCS12() - p12.set_certificate(ca) - p12.set_privatekey(key) - f.write(p12.export()) - f.close() - return True - - -def dummy_cert(ca, commonname, sans): +def dummy_cert(pkey, cacert, commonname, sans): """ - Generates and writes a certificate to fp. + Generates a dummy certificate. - ca: Path to the certificate authority file, or None. + pkey: CA private key + cacert: CA certificate commonname: Common name for the generated certificate. sans: A list of Subject Alternate Names. - Returns cert path if operation succeeded, None if not. + Returns cert if operation succeeded, None if not. """ ss = [] for i in sans: ss.append("DNS: %s"%i) ss = ", ".join(ss) - raw = file(ca, "rb").read() - ca = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, raw) - key = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, raw) - cert = OpenSSL.crypto.X509() cert.gmtime_adj_notBefore(-3600*48) cert.gmtime_adj_notAfter(60 * 60 * 24 * 30) - cert.set_issuer(ca.get_subject()) + cert.set_issuer(cacert.get_subject()) cert.get_subject().CN = commonname cert.set_serial_number(int(time.time()*10000)) if ss: cert.set_version(2) cert.add_extensions([OpenSSL.crypto.X509Extension("subjectAltName", True, ss)]) - cert.set_pubkey(ca.get_pubkey()) - cert.sign(key, "sha1") + cert.set_pubkey(cacert.get_pubkey()) + cert.sign(pkey, "sha1") return SSLCert(cert) @@ -113,9 +76,59 @@ class CertStore: """ Implements an in-memory certificate store. """ - def __init__(self, cacert): + def __init__(self, pkey, cert): + self.pkey, self.cert = pkey, cert self.certs = {} - self.cacert = cacert + + @classmethod + def from_store(klass, path, basename): + p = os.path.join(path, basename + "-ca.pem") + if not os.path.exists(p): + key, ca = klass.create_store(path, basename) + else: + p = os.path.join(path, basename + "-ca.pem") + raw = file(p, "rb").read() + ca = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, raw) + key = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, raw) + return klass(key, ca) + + @classmethod + def create_store(klass, path, basename, o=None, cn=None, expiry=DEFAULT_EXP): + if not os.path.exists(path): + os.makedirs(path) + + o = o or basename + cn = cn or basename + + key, ca = create_ca(o=o, cn=cn, exp=expiry) + # Dump the CA plus private key + f = open(os.path.join(path, basename + "-ca.pem"), "wb") + f.write(OpenSSL.crypto.dump_privatekey(OpenSSL.crypto.FILETYPE_PEM, key)) + f.write(OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, ca)) + f.close() + + # Dump the certificate in PEM format + f = open(os.path.join(path, basename + "-cert.pem"), "wb") + f.write(OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, ca)) + f.close() + + # Create a .cer file with the same contents for Android + f = open(os.path.join(path, basename + "-cert.cer"), "wb") + f.write(OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, ca)) + f.close() + + # Dump the certificate in PKCS12 format for Windows devices + f = open(os.path.join(path, basename + "-cert.p12"), "wb") + p12 = OpenSSL.crypto.PKCS12() + p12.set_certificate(ca) + p12.set_privatekey(key) + f.write(p12.export()) + f.close() + + f = open(os.path.join(path, basename + "-dhparam.pem"), "wb") + f.write(DEFAULT_DHPARAM) + f.close() + return key, ca def get_cert(self, commonname, sans): """ @@ -130,7 +143,7 @@ class CertStore: """ if commonname in self.certs: return self.certs[commonname] - c = dummy_cert(self.cacert, commonname, sans) + c = dummy_cert(self.pkey, self.cert, commonname, sans) self.certs[commonname] = c return c diff --git a/test/test_certutils.py b/test/test_certutils.py index 4fab69e6f..f741bdece 100644 --- a/test/test_certutils.py +++ b/test/test_certutils.py @@ -3,43 +3,32 @@ from netlib import certutils import tutils -def test_dummy_ca(): - with tutils.tmpdir() as d: - path = os.path.join(d, "foo/cert.cnf") - assert certutils.dummy_ca(path) - assert os.path.exists(path) - - path = os.path.join(d, "foo/cert2.pem") - assert certutils.dummy_ca(path) - assert os.path.exists(path) - assert os.path.exists(os.path.join(d, "foo/cert2-cert.pem")) - assert os.path.exists(os.path.join(d, "foo/cert2-cert.p12")) - - class TestCertStore: def test_create_explicit(self): with tutils.tmpdir() as d: - ca = os.path.join(d, "ca") - assert certutils.dummy_ca(ca) - c = certutils.CertStore(ca) + ca = certutils.CertStore.from_store(d, "test") + assert ca.get_cert("foo", []) + + ca2 = certutils.CertStore.from_store(d, "test") + assert ca2.get_cert("foo", []) + + assert ca.cert.get_serial_number() == ca2.cert.get_serial_number() def test_create_tmp(self): with tutils.tmpdir() as d: - ca = os.path.join(d, "ca") - assert certutils.dummy_ca(ca) - c = certutils.CertStore(ca) - assert c.get_cert("foo.com", []) - assert c.get_cert("foo.com", []) - assert c.get_cert("*.foo.com", []) + ca = certutils.CertStore.from_store(d, "test") + assert ca.get_cert("foo.com", []) + assert ca.get_cert("foo.com", []) + assert ca.get_cert("*.foo.com", []) class TestDummyCert: def test_with_ca(self): with tutils.tmpdir() as d: - cacert = os.path.join(d, "cacert") - assert certutils.dummy_ca(cacert) + ca = certutils.CertStore.from_store(d, "test") r = certutils.dummy_cert( - cacert, + ca.pkey, + ca.cert, "foo.com", ["one.com", "two.com", "*.three.com"] )