mirror of https://github.com/celery/kombu.git
Added Resource.force_close_all
This commit is contained in:
parent
5ec28c9115
commit
6a4862b288
|
@ -464,6 +464,9 @@ class Resource(object):
|
||||||
def prepare(self, resource):
|
def prepare(self, resource):
|
||||||
return resource
|
return resource
|
||||||
|
|
||||||
|
def close_resource(self, resource):
|
||||||
|
resource.close()
|
||||||
|
|
||||||
def release(self, resource):
|
def release(self, resource):
|
||||||
"""Release resource so it can be used by another thread.
|
"""Release resource so it can be used by another thread.
|
||||||
|
|
||||||
|
@ -475,6 +478,33 @@ class Resource(object):
|
||||||
self._dirty.discard(resource)
|
self._dirty.discard(resource)
|
||||||
self._resource.put_nowait(resource)
|
self._resource.put_nowait(resource)
|
||||||
|
|
||||||
|
def force_close_all(self):
|
||||||
|
"""Closes and removes all resources in the pool (also those in use).
|
||||||
|
|
||||||
|
Can be used to close resources from parent processes after fork (e.g.
|
||||||
|
sockets).
|
||||||
|
|
||||||
|
"""
|
||||||
|
dirty = self._dirty
|
||||||
|
resource = self._resource
|
||||||
|
while 1:
|
||||||
|
try:
|
||||||
|
dres = dirty.pop()
|
||||||
|
except KeyError:
|
||||||
|
break
|
||||||
|
self.close_resource(dres)
|
||||||
|
|
||||||
|
resource.mutex.acquire()
|
||||||
|
try:
|
||||||
|
while 1:
|
||||||
|
try:
|
||||||
|
res = resource.queue.pop()
|
||||||
|
except IndexError:
|
||||||
|
break
|
||||||
|
self.close_resource(res)
|
||||||
|
finally:
|
||||||
|
resource.mutex.release()
|
||||||
|
|
||||||
|
|
||||||
class ConnectionPool(Resource):
|
class ConnectionPool(Resource):
|
||||||
LimitExceeded = exceptions.ConnectionLimitExceeded
|
LimitExceeded = exceptions.ConnectionLimitExceeded
|
||||||
|
|
Loading…
Reference in New Issue