diff --git a/kombu/connection.py b/kombu/connection.py index 13b1e9e5..beec4c73 100644 --- a/kombu/connection.py +++ b/kombu/connection.py @@ -1065,7 +1065,8 @@ class ConnectionPool(Resource): def setup(self): if self.limit: q = self._resource.queue - while len(q) < self.limit: + # Keep in mind dirty/used resources + while len(q) < self.limit - len(self._dirty): self._resource.put_nowait(lazy(self.new)) def prepare(self, resource): @@ -1091,7 +1092,8 @@ class ChannelPool(Resource): channel = self.new() if self.limit: q = self._resource.queue - while len(q) < self.limit: + # Keep in mind dirty/used resources + while len(q) < self.limit - len(self._dirty): self._resource.put_nowait(lazy(channel)) def prepare(self, channel): diff --git a/kombu/resource.py b/kombu/resource.py index 9f75eb06..d911587a 100644 --- a/kombu/resource.py +++ b/kombu/resource.py @@ -144,15 +144,20 @@ class Resource: def collect_resource(self, resource): pass - def force_close_all(self): + def force_close_all(self, close_pool=True): """Close and remove all resources in the pool (also those in use). Used to close resources from parent processes after fork (e.g. sockets/connections). + + Arguments: + --------- + close_pool (bool): If True (default) then the pool is marked + as closed. In case of False the pool can be reused. """ if self._closed: return - self._closed = True + self._closed = close_pool dirty = self._dirty resource = self._resource while 1: # - acquired @@ -188,7 +193,7 @@ class Resource: self._limit = limit if reset: try: - self.force_close_all() + self.force_close_all(close_pool=False) except Exception: pass self.setup() @@ -211,7 +216,9 @@ class Resource: resource = self._resource # Items to the left are last recently used, so we remove those first. with getattr(resource, 'mutex', Noop()): - while len(resource.queue) > self.limit: + # keep in mind the dirty resources are not shrinking + while len(resource.queue) and \ + (len(resource.queue) + len(self._dirty)) > self.limit: R = resource.queue.popleft() if collect: self.collect_resource(R) diff --git a/t/unit/test_connection.py b/t/unit/test_connection.py index e57f4db0..4f1b71d8 100644 --- a/t/unit/test_connection.py +++ b/t/unit/test_connection.py @@ -811,6 +811,93 @@ class ResourceCase: P = self.create_resource(None) P.acquire().release() + def test_acquire_resize_in_use(self): + P = self.create_resource(5) + self.assert_state(P, 5, 0) + chans = [P.acquire() for _ in range(5)] + self.assert_state(P, 0, 5) + with pytest.raises(RuntimeError): + P.resize(4) + [chan.release() for chan in chans] + self.assert_state(P, 5, 0) + + def test_acquire_resize_ignore_err_no_shrink(self): + P = self.create_resource(5) + self.assert_state(P, 5, 0) + chans = [P.acquire() for _ in range(5)] + self.assert_state(P, 0, 5) + P.resize(4, ignore_errors=True) + self.assert_state(P, 0, 5) + [chan.release() for chan in chans] + self.assert_state(P, 5, 0) + + def test_acquire_resize_ignore_err_shrink(self): + P = self.create_resource(5) + self.assert_state(P, 5, 0) + chans = [P.acquire() for _ in range(4)] + self.assert_state(P, 1, 4) + P.resize(4, ignore_errors=True) + self.assert_state(P, 0, 4) + [chan.release() for chan in chans] + self.assert_state(P, 4, 0) + + def test_acquire_resize_larger(self): + P = self.create_resource(1) + self.assert_state(P, 1, 0) + c1 = P.acquire() + self.assert_state(P, 0, 1) + with pytest.raises(P.LimitExceeded): + P.acquire() + P.resize(2) + self.assert_state(P, 1, 1) + c2 = P.acquire() + self.assert_state(P, 0, 2) + c1.release() + c2.release() + self.assert_state(P, 2, 0) + + def test_acquire_resize_force_smaller(self): + P = self.create_resource(2) + self.assert_state(P, 2, 0) + c1 = P.acquire() + c2 = P.acquire() + self.assert_state(P, 0, 2) + with pytest.raises(P.LimitExceeded): + P.acquire() + P.resize(1, force=True) # acts like reset + del c1 + del c2 + self.assert_state(P, 1, 0) + c1 = P.acquire() + self.assert_state(P, 0, 1) + with pytest.raises(P.LimitExceeded): + P.acquire() + c1.release() + self.assert_state(P, 1, 0) + + def test_acquire_resize_reset(self): + P = self.create_resource(2) + self.assert_state(P, 2, 0) + c1 = P.acquire() + c2 = P.acquire() + self.assert_state(P, 0, 2) + with pytest.raises(P.LimitExceeded): + P.acquire() + P.resize(3, reset=True) + del c1 + del c2 + self.assert_state(P, 3, 0) + c1 = P.acquire() + c2 = P.acquire() + c3 = P.acquire() + self.assert_state(P, 0, 3) + with pytest.raises(P.LimitExceeded): + P.acquire() + c1.release() + c2.release() + c3.release() + self.assert_state(P, 3, 0) + def test_replace_when_limit(self): P = self.create_resource(10) r = P.acquire()