Add key_size option to define rsa key size (#3657)
This commit is contained in:
parent
7ef91f46a3
commit
93f9e30728
|
@ -36,9 +36,9 @@ rD693XKIHUCWOjMh1if6omGXKHH40QuME2gNa50+YPn1iYDl88uDbbMCAQI=
|
|||
"""
|
||||
|
||||
|
||||
def create_ca(organization, cn, exp):
|
||||
def create_ca(organization, cn, exp, key_size):
|
||||
key = OpenSSL.crypto.PKey()
|
||||
key.generate_key(OpenSSL.crypto.TYPE_RSA, 2048)
|
||||
key.generate_key(OpenSSL.crypto.TYPE_RSA, key_size)
|
||||
cert = OpenSSL.crypto.X509()
|
||||
cert.set_serial_number(int(time.time() * 10000))
|
||||
cert.set_version(2)
|
||||
|
@ -182,10 +182,10 @@ class CertStore:
|
|||
return dh
|
||||
|
||||
@classmethod
|
||||
def from_store(cls, path, basename):
|
||||
def from_store(cls, path, basename, key_size):
|
||||
ca_path = os.path.join(path, basename + "-ca.pem")
|
||||
if not os.path.exists(ca_path):
|
||||
key, ca = cls.create_store(path, basename)
|
||||
key, ca = cls.create_store(path, basename, key_size)
|
||||
else:
|
||||
with open(ca_path, "rb") as f:
|
||||
raw = f.read()
|
||||
|
@ -215,14 +215,14 @@ class CertStore:
|
|||
os.umask(original_umask)
|
||||
|
||||
@staticmethod
|
||||
def create_store(path, basename, organization=None, cn=None, expiry=DEFAULT_EXP):
|
||||
def create_store(path, basename, key_size, organization=None, cn=None, expiry=DEFAULT_EXP):
|
||||
if not os.path.exists(path):
|
||||
os.makedirs(path)
|
||||
|
||||
organization = organization or basename
|
||||
cn = cn or basename
|
||||
|
||||
key, ca = create_ca(organization=organization, cn=cn, exp=expiry)
|
||||
key, ca = create_ca(organization=organization, cn=cn, exp=expiry, key_size=key_size)
|
||||
# Dump the CA plus private key
|
||||
with CertStore.umask_secret(), open(os.path.join(path, basename + "-ca.pem"), "wb") as f:
|
||||
f.write(
|
||||
|
|
|
@ -7,6 +7,7 @@ from mitmproxy.net import tls
|
|||
CONF_DIR = "~/.mitmproxy"
|
||||
LISTEN_PORT = 8080
|
||||
CONTENT_VIEW_LINES_CUTOFF = 512
|
||||
KEY_SIZE = 2048
|
||||
|
||||
|
||||
class Options(optmanager.OptManager):
|
||||
|
@ -173,5 +174,11 @@ class Options(optmanager.OptManager):
|
|||
speedup flows browsing.
|
||||
"""
|
||||
)
|
||||
self.add_option(
|
||||
"key_size", int, KEY_SIZE,
|
||||
"""
|
||||
TLS key size for certificates and CA.
|
||||
"""
|
||||
)
|
||||
|
||||
self.update(**kwargs)
|
||||
|
|
|
@ -64,9 +64,11 @@ class ProxyConfig:
|
|||
"Certificate Authority parent directory does not exist: %s" %
|
||||
os.path.dirname(certstore_path)
|
||||
)
|
||||
key_size = options.key_size
|
||||
self.certstore = certs.CertStore.from_store(
|
||||
certstore_path,
|
||||
CONF_BASENAME
|
||||
CONF_BASENAME,
|
||||
key_size
|
||||
)
|
||||
|
||||
for c in options.certs:
|
||||
|
|
|
@ -68,6 +68,7 @@ def common_options(parser, opts):
|
|||
group = parser.add_argument_group("SSL")
|
||||
opts.make_parser(group, "certs", metavar="SPEC")
|
||||
opts.make_parser(group, "ssl_insecure", short="k")
|
||||
opts.make_parser(group, "key_size", metavar="KEY_SIZE")
|
||||
|
||||
# Client replay
|
||||
group = parser.add_argument_group("Client Replay")
|
||||
|
|
|
@ -21,6 +21,7 @@ CONFDIR = "~/.mitmproxy"
|
|||
CERTSTORE_BASENAME = "mitmproxy"
|
||||
CA_CERT_NAME = "mitmproxy-ca.pem"
|
||||
DEFAULT_CRAFT_ANCHOR = "/p/"
|
||||
KEY_SIZE = 2048
|
||||
|
||||
logger = logging.getLogger('pathod')
|
||||
|
||||
|
@ -54,7 +55,8 @@ class SSLOptions:
|
|||
self.alpn_select = alpn_select
|
||||
self.certstore = mcerts.CertStore.from_store(
|
||||
os.path.expanduser(confdir),
|
||||
CERTSTORE_BASENAME
|
||||
CERTSTORE_BASENAME,
|
||||
KEY_SIZE
|
||||
)
|
||||
for i in certs or []:
|
||||
self.certstore.add_cert_file(*i)
|
||||
|
|
|
@ -35,20 +35,20 @@ from ..conftest import skip_windows
|
|||
class TestCertStore:
|
||||
|
||||
def test_create_explicit(self, tmpdir):
|
||||
ca = certs.CertStore.from_store(str(tmpdir), "test")
|
||||
ca = certs.CertStore.from_store(str(tmpdir), "test", 2048)
|
||||
assert ca.get_cert(b"foo", [])
|
||||
|
||||
ca2 = certs.CertStore.from_store(str(tmpdir), "test")
|
||||
ca2 = certs.CertStore.from_store(str(tmpdir), "test", 2048)
|
||||
assert ca2.get_cert(b"foo", [])
|
||||
|
||||
assert ca.default_ca.get_serial_number() == ca2.default_ca.get_serial_number()
|
||||
|
||||
def test_create_no_common_name(self, tmpdir):
|
||||
ca = certs.CertStore.from_store(str(tmpdir), "test")
|
||||
ca = certs.CertStore.from_store(str(tmpdir), "test", 2048)
|
||||
assert ca.get_cert(None, [])[0].cn is None
|
||||
|
||||
def test_create_tmp(self, tmpdir):
|
||||
ca = certs.CertStore.from_store(str(tmpdir), "test")
|
||||
ca = certs.CertStore.from_store(str(tmpdir), "test", 2048)
|
||||
assert ca.get_cert(b"foo.com", [])
|
||||
assert ca.get_cert(b"foo.com", [])
|
||||
assert ca.get_cert(b"*.foo.com", [])
|
||||
|
@ -57,7 +57,7 @@ class TestCertStore:
|
|||
assert r[1] == ca.default_privatekey
|
||||
|
||||
def test_sans(self, tmpdir):
|
||||
ca = certs.CertStore.from_store(str(tmpdir), "test")
|
||||
ca = certs.CertStore.from_store(str(tmpdir), "test", 2048)
|
||||
c1 = ca.get_cert(b"foo.com", [b"*.bar.com"])
|
||||
ca.get_cert(b"foo.bar.com", [])
|
||||
# assert c1 == c2
|
||||
|
@ -65,13 +65,13 @@ class TestCertStore:
|
|||
assert not c1 == c3
|
||||
|
||||
def test_sans_change(self, tmpdir):
|
||||
ca = certs.CertStore.from_store(str(tmpdir), "test")
|
||||
ca = certs.CertStore.from_store(str(tmpdir), "test", 2048)
|
||||
ca.get_cert(b"foo.com", [b"*.bar.com"])
|
||||
cert, key, chain_file = ca.get_cert(b"foo.bar.com", [b"*.baz.com"])
|
||||
assert b"*.baz.com" in cert.altnames
|
||||
|
||||
def test_expire(self, tmpdir):
|
||||
ca = certs.CertStore.from_store(str(tmpdir), "test")
|
||||
ca = certs.CertStore.from_store(str(tmpdir), "test", 2048)
|
||||
ca.STORE_CAP = 3
|
||||
ca.get_cert(b"one.com", [])
|
||||
ca.get_cert(b"two.com", [])
|
||||
|
@ -95,8 +95,8 @@ class TestCertStore:
|
|||
assert (b"four.com", ()) in ca.certs
|
||||
|
||||
def test_overrides(self, tmpdir):
|
||||
ca1 = certs.CertStore.from_store(str(tmpdir.join("ca1")), "test")
|
||||
ca2 = certs.CertStore.from_store(str(tmpdir.join("ca2")), "test")
|
||||
ca1 = certs.CertStore.from_store(str(tmpdir.join("ca1")), "test", 2048)
|
||||
ca2 = certs.CertStore.from_store(str(tmpdir.join("ca2")), "test", 2048)
|
||||
assert not ca1.default_ca.get_serial_number() == ca2.default_ca.get_serial_number()
|
||||
|
||||
dc = ca2.get_cert(b"foo.com", [b"sans.example.com"])
|
||||
|
@ -124,7 +124,7 @@ class TestCertStore:
|
|||
class TestDummyCert:
|
||||
|
||||
def test_with_ca(self, tmpdir):
|
||||
ca = certs.CertStore.from_store(str(tmpdir), "test")
|
||||
ca = certs.CertStore.from_store(str(tmpdir), "test", 2048)
|
||||
r = certs.dummy_cert(
|
||||
ca.default_privatekey,
|
||||
ca.default_ca,
|
||||
|
|
Loading…
Reference in New Issue