diff --git a/netlib/certutils.py b/netlib/certutils.py index 9eb41d036..bdc2b77e4 100644 --- a/netlib/certutils.py +++ b/netlib/certutils.py @@ -169,6 +169,7 @@ class CertStore(object): """ Implements an in-memory certificate store. """ + STORE_CAP = 100 def __init__( self, @@ -181,6 +182,15 @@ class CertStore(object): self.default_chain_file = default_chain_file self.dhparams = dhparams self.certs = dict() + self.expire_queue = [] + + def expire(self, entry): + self.expire_queue.append(entry) + if len(self.expire_queue) > self.STORE_CAP: + d = self.expire_queue.pop(0) + for k, v in list(self.certs.items()): + if v == d: + del self.certs[k] @staticmethod def load_dhparam(path): @@ -342,6 +352,7 @@ class CertStore(object): privatekey=self.default_privatekey, chain_file=self.default_chain_file) self.certs[(commonname, tuple(sans))] = entry + self.expire(entry) return entry.cert, entry.privatekey, entry.chain_file diff --git a/netlib/debug.py b/netlib/debug.py index 657d44657..450b058a2 100644 --- a/netlib/debug.py +++ b/netlib/debug.py @@ -91,7 +91,7 @@ def dump_info(signal=None, frame=None, file=sys.stdout, testing=False): # pragm itms = list(d.items()) itms.sort(key=lambda x: x[1]) for i in itms[-20:]: - print(i[1], i[0]) + print(i[1], i[0], file=file) print("****************************************************", file=file) if not testing: diff --git a/test/netlib/test_certutils.py b/test/netlib/test_certutils.py index 027dcc93a..cf9a671b1 100644 --- a/test/netlib/test_certutils.py +++ b/test/netlib/test_certutils.py @@ -74,6 +74,31 @@ class TestCertStore: cert, key, chain_file = ca.get_cert(b"foo.bar.com", [b"*.baz.com"]) assert b"*.baz.com" in cert.altnames + def test_expire(self): + with tutils.tmpdir() as d: + ca = certutils.CertStore.from_store(d, "test") + ca.STORE_CAP = 3 + ca.get_cert(b"one.com", []) + ca.get_cert(b"two.com", []) + ca.get_cert(b"three.com", []) + + assert (b"one.com", ()) in ca.certs + assert (b"two.com", ()) in ca.certs + assert (b"three.com", ()) in ca.certs + + ca.get_cert(b"one.com", []) + + assert (b"one.com", ()) in ca.certs + assert (b"two.com", ()) in ca.certs + assert (b"three.com", ()) in ca.certs + + ca.get_cert(b"four.com", []) + + assert (b"one.com", ()) not in ca.certs + assert (b"two.com", ()) in ca.certs + assert (b"three.com", ()) in ca.certs + assert (b"four.com", ()) in ca.certs + def test_overrides(self): with tutils.tmpdir() as d: ca1 = certutils.CertStore.from_store(os.path.join(d, "ca1"), "test")