ConnectionPool can't be used after .resize(..., reset=True) (resolves #2018) (#2024)

* ConnectionPool can't be used after .resize(..., reset=True) (resolves #2018)

* Update kombu/resource.py

---------

Co-authored-by: Asif Saif Uddin <auvipy@gmail.com>
This commit is contained in:
FrankK-1234 2024-06-11 21:43:13 +02:00 committed by GitHub
parent e32f8fc39b
commit 1217865d40
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 102 additions and 6 deletions

View File

@ -1065,7 +1065,8 @@ class ConnectionPool(Resource):
def setup(self):
if self.limit:
q = self._resource.queue
while len(q) < self.limit:
# Keep in mind dirty/used resources
while len(q) < self.limit - len(self._dirty):
self._resource.put_nowait(lazy(self.new))
def prepare(self, resource):
@ -1091,7 +1092,8 @@ class ChannelPool(Resource):
channel = self.new()
if self.limit:
q = self._resource.queue
while len(q) < self.limit:
# Keep in mind dirty/used resources
while len(q) < self.limit - len(self._dirty):
self._resource.put_nowait(lazy(channel))
def prepare(self, channel):

View File

@ -144,15 +144,20 @@ class Resource:
def collect_resource(self, resource):
pass
def force_close_all(self):
def force_close_all(self, close_pool=True):
"""Close and remove all resources in the pool (also those in use).
Used to close resources from parent processes after fork
(e.g. sockets/connections).
Arguments:
---------
close_pool (bool): If True (default) then the pool is marked
as closed. In case of False the pool can be reused.
"""
if self._closed:
return
self._closed = True
self._closed = close_pool
dirty = self._dirty
resource = self._resource
while 1: # - acquired
@ -188,7 +193,7 @@ class Resource:
self._limit = limit
if reset:
try:
self.force_close_all()
self.force_close_all(close_pool=False)
except Exception:
pass
self.setup()
@ -211,7 +216,9 @@ class Resource:
resource = self._resource
# Items to the left are last recently used, so we remove those first.
with getattr(resource, 'mutex', Noop()):
while len(resource.queue) > self.limit:
# keep in mind the dirty resources are not shrinking
while len(resource.queue) and \
(len(resource.queue) + len(self._dirty)) > self.limit:
R = resource.queue.popleft()
if collect:
self.collect_resource(R)

View File

@ -811,6 +811,93 @@ class ResourceCase:
P = self.create_resource(None)
P.acquire().release()
def test_acquire_resize_in_use(self):
P = self.create_resource(5)
self.assert_state(P, 5, 0)
chans = [P.acquire() for _ in range(5)]
self.assert_state(P, 0, 5)
with pytest.raises(RuntimeError):
P.resize(4)
[chan.release() for chan in chans]
self.assert_state(P, 5, 0)
def test_acquire_resize_ignore_err_no_shrink(self):
P = self.create_resource(5)
self.assert_state(P, 5, 0)
chans = [P.acquire() for _ in range(5)]
self.assert_state(P, 0, 5)
P.resize(4, ignore_errors=True)
self.assert_state(P, 0, 5)
[chan.release() for chan in chans]
self.assert_state(P, 5, 0)
def test_acquire_resize_ignore_err_shrink(self):
P = self.create_resource(5)
self.assert_state(P, 5, 0)
chans = [P.acquire() for _ in range(4)]
self.assert_state(P, 1, 4)
P.resize(4, ignore_errors=True)
self.assert_state(P, 0, 4)
[chan.release() for chan in chans]
self.assert_state(P, 4, 0)
def test_acquire_resize_larger(self):
P = self.create_resource(1)
self.assert_state(P, 1, 0)
c1 = P.acquire()
self.assert_state(P, 0, 1)
with pytest.raises(P.LimitExceeded):
P.acquire()
P.resize(2)
self.assert_state(P, 1, 1)
c2 = P.acquire()
self.assert_state(P, 0, 2)
c1.release()
c2.release()
self.assert_state(P, 2, 0)
def test_acquire_resize_force_smaller(self):
P = self.create_resource(2)
self.assert_state(P, 2, 0)
c1 = P.acquire()
c2 = P.acquire()
self.assert_state(P, 0, 2)
with pytest.raises(P.LimitExceeded):
P.acquire()
P.resize(1, force=True) # acts like reset
del c1
del c2
self.assert_state(P, 1, 0)
c1 = P.acquire()
self.assert_state(P, 0, 1)
with pytest.raises(P.LimitExceeded):
P.acquire()
c1.release()
self.assert_state(P, 1, 0)
def test_acquire_resize_reset(self):
P = self.create_resource(2)
self.assert_state(P, 2, 0)
c1 = P.acquire()
c2 = P.acquire()
self.assert_state(P, 0, 2)
with pytest.raises(P.LimitExceeded):
P.acquire()
P.resize(3, reset=True)
del c1
del c2
self.assert_state(P, 3, 0)
c1 = P.acquire()
c2 = P.acquire()
c3 = P.acquire()
self.assert_state(P, 0, 3)
with pytest.raises(P.LimitExceeded):
P.acquire()
c1.release()
c2.release()
c3.release()
self.assert_state(P, 3, 0)
def test_replace_when_limit(self):
P = self.create_resource(10)
r = P.acquire()