Merge pull request #1601 from cortesi/certcap
certutils: cap the cert store size at 100 by default
This commit is contained in:
commit
8e7ec6117a
|
@ -169,6 +169,7 @@ class CertStore(object):
|
|||
"""
|
||||
Implements an in-memory certificate store.
|
||||
"""
|
||||
STORE_CAP = 100
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
|
@ -181,6 +182,15 @@ class CertStore(object):
|
|||
self.default_chain_file = default_chain_file
|
||||
self.dhparams = dhparams
|
||||
self.certs = dict()
|
||||
self.expire_queue = []
|
||||
|
||||
def expire(self, entry):
|
||||
self.expire_queue.append(entry)
|
||||
if len(self.expire_queue) > self.STORE_CAP:
|
||||
d = self.expire_queue.pop(0)
|
||||
for k, v in list(self.certs.items()):
|
||||
if v == d:
|
||||
del self.certs[k]
|
||||
|
||||
@staticmethod
|
||||
def load_dhparam(path):
|
||||
|
@ -342,6 +352,7 @@ class CertStore(object):
|
|||
privatekey=self.default_privatekey,
|
||||
chain_file=self.default_chain_file)
|
||||
self.certs[(commonname, tuple(sans))] = entry
|
||||
self.expire(entry)
|
||||
|
||||
return entry.cert, entry.privatekey, entry.chain_file
|
||||
|
||||
|
|
|
@ -91,7 +91,7 @@ def dump_info(signal=None, frame=None, file=sys.stdout, testing=False): # pragm
|
|||
itms = list(d.items())
|
||||
itms.sort(key=lambda x: x[1])
|
||||
for i in itms[-20:]:
|
||||
print(i[1], i[0])
|
||||
print(i[1], i[0], file=file)
|
||||
print("****************************************************", file=file)
|
||||
|
||||
if not testing:
|
||||
|
|
|
@ -74,6 +74,31 @@ class TestCertStore:
|
|||
cert, key, chain_file = ca.get_cert(b"foo.bar.com", [b"*.baz.com"])
|
||||
assert b"*.baz.com" in cert.altnames
|
||||
|
||||
def test_expire(self):
|
||||
with tutils.tmpdir() as d:
|
||||
ca = certutils.CertStore.from_store(d, "test")
|
||||
ca.STORE_CAP = 3
|
||||
ca.get_cert(b"one.com", [])
|
||||
ca.get_cert(b"two.com", [])
|
||||
ca.get_cert(b"three.com", [])
|
||||
|
||||
assert (b"one.com", ()) in ca.certs
|
||||
assert (b"two.com", ()) in ca.certs
|
||||
assert (b"three.com", ()) in ca.certs
|
||||
|
||||
ca.get_cert(b"one.com", [])
|
||||
|
||||
assert (b"one.com", ()) in ca.certs
|
||||
assert (b"two.com", ()) in ca.certs
|
||||
assert (b"three.com", ()) in ca.certs
|
||||
|
||||
ca.get_cert(b"four.com", [])
|
||||
|
||||
assert (b"one.com", ()) not in ca.certs
|
||||
assert (b"two.com", ()) in ca.certs
|
||||
assert (b"three.com", ()) in ca.certs
|
||||
assert (b"four.com", ()) in ca.certs
|
||||
|
||||
def test_overrides(self):
|
||||
with tutils.tmpdir() as d:
|
||||
ca1 = certutils.CertStore.from_store(os.path.join(d, "ca1"), "test")
|
||||
|
|
Loading…
Reference in New Issue