From f6b5d9f2f6965cb2afb1948eefb182683aea8932 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Fri, 23 Mar 2018 09:29:39 +0545 Subject: [PATCH] issue #162: implement mitogen.service.DeduplicatingService This abstracts the pattern found in parent.ModuleForwarder and to a lesser degree master.ModuleResponser. We can probably use it in those contexts later. --- mitogen/service.py | 105 ++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 103 insertions(+), 2 deletions(-) diff --git a/mitogen/service.py b/mitogen/service.py index 63992d20..5663fbb0 100644 --- a/mitogen/service.py +++ b/mitogen/service.py @@ -26,6 +26,7 @@ # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE # POSSIBILITY OF SUCH DAMAGE. +import pprint import sys import threading @@ -35,11 +36,20 @@ from mitogen.core import LOG class Service(object): + #: Sentinel object to suppress reply generation, since returning ``None`` + #: will trigger a response message containing the pickled ``None``. + NO_REPLY = object() + #: If ``None``, a handle is dynamically allocated, otherwise the fixed #: integer handle to use. handle = None max_message_size = 0 + #: Mapping from required key names to their required corresponding types, + #: used by the default :py:meth:`validate_args` implementation to validate + #: requests. + required_args = {} + def __init__(self, router): self.router = router self.recv = mitogen.core.Receiver(router, self.handle) @@ -48,7 +58,14 @@ class Service(object): self.running = True def validate_args(self, args): - return True + return ( + isinstance(args, dict) and + all(isinstance(args.get(k), t) + for k, t in self.required_args.iteritems()) + ) + + def dispatch(self, args, msg): + raise NotImplementedError() def dispatch_one(self, msg): if len(msg.data) > self.max_message_size: @@ -64,7 +81,9 @@ class Service(object): return try: - msg.reply(self.dispatch(args, msg)) + response = self.dispatch(args, msg) + if response is not self.NO_REPLY: + msg.reply(response) except Exception, e: LOG.exception('While invoking %r.dispatch()', self) msg.reply(mitogen.core.CallError(e)) @@ -85,8 +104,90 @@ class Service(object): self.run_once() +class DeduplicatingService(Service): + """ + A service that deduplicates and caches expensive responses. Requests are + deduplicated according to a customizable key, and the single expensive + response is broadcast to all requestors. + + A side effect of this class is that processing of the single response is + always serialized according to the result of :py:meth:`key_from_request`. + + Only one pool thread is blocked during generation of the response, + regardless of the number of requestors. + """ + def __init__(self, router): + super(DeduplicatingService, self).__init__(router) + self._responses = {} + self._waiters = {} + self._lock = threading.Lock() + + def key_from_request(self, args): + """ + Generate a deduplication key from the request. The default + implementation returns a string based on a stable representation of the + input dictionary generated by :py:func:`pprint.pformat`. + """ + return pprint.pformat(args) + + def get_response(self, args): + raise NotImplementedError() + + def _produce_response(self, key, response): + self._lock.acquire() + try: + assert key not in self._responses + assert key in self._waiters + self._responses[key] = response + for msg in self._waiters.pop(key): + msg.reply(response) + finally: + self._lock.release() + + def dispatch(self, args, msg): + key = self.key_from_request(args) + + self._lock.acquire() + try: + if key in self._responses: + return self._responses[key] + + if key in self._waiters: + self._waiters[key].append(msg) + return self.NO_REPLY + + self._waiters[key] = [msg] + finally: + self._lock.release() + + # I'm the unlucky thread that must generate the response. + try: + self._produce_response(key, self.get_response(args)) + except Exception, e: + self._produce_response(key, mitogen.core.CallError(e)) + + return self.NO_REPLY + + class Pool(object): + """ + Manage a pool of at least one thread that will be used to process messages + for a collection of services. + + Internally this is implemented by subscribing every :py:class:`Service`'s + :py:class:`mitogen.core.Receiver` using a single + :py:class:`mitogen.master.Select`, then arranging for every thread to + consume messages delivered to that select. + + In this way the threads are fairly shared by all available services, and no + resources are dedicated to a single idle service. + + There is no penalty for exposing large numbers of services; the list of + exposed services could even be generated dynamically in response to your + program's configuration or its input data. + """ def __init__(self, router, services, size=1): + assert size > 0 self.router = router self.services = list(services) self.size = size