diff --git a/kombu/connection.py b/kombu/connection.py index 0762650d..ff447a68 100644 --- a/kombu/connection.py +++ b/kombu/connection.py @@ -464,6 +464,9 @@ class Resource(object): def prepare(self, resource): return resource + def close_resource(self, resource): + resource.close() + def release(self, resource): """Release resource so it can be used by another thread. @@ -475,6 +478,33 @@ class Resource(object): self._dirty.discard(resource) self._resource.put_nowait(resource) + def force_close_all(self): + """Closes and removes all resources in the pool (also those in use). + + Can be used to close resources from parent processes after fork (e.g. + sockets). + + """ + dirty = self._dirty + resource = self._resource + while 1: + try: + dres = dirty.pop() + except KeyError: + break + self.close_resource(dres) + + resource.mutex.acquire() + try: + while 1: + try: + res = resource.queue.pop() + except IndexError: + break + self.close_resource(res) + finally: + resource.mutex.release() + class ConnectionPool(Resource): LimitExceeded = exceptions.ConnectionLimitExceeded