diff --git a/docs/services.rst b/docs/services.rst index 4c3f0ab1..49108e80 100644 --- a/docs/services.rst +++ b/docs/services.rst @@ -17,8 +17,9 @@ Overview Service * User-supplied class with explicitly exposed methods. -* Identified in calls by its canonical name (e.g. mypkg.mymod.MyClass). * May be auto-imported/constructed in a child from a parent simply by calling it +* Identified in calls by its canonical name (e.g. mypkg.mymod.MyClass) by + default, but may use any naming scheme the configured activator understands. * Children receive refusals if the class is not already activated by a aprent * Has an associated Select instance which may be dynamically loaded with receivers over time, on_message_received() invoked if any receiver becomes @@ -28,9 +29,12 @@ Invoker * Abstracts mechanism for calling a service method and verifying permissions. * Built-in 'service.Invoker': concurrent execution of all methods on the thread pool. +* Built-in 'service.SerializedInvoker': serialization of all calls on a single + thread borrowed from the pool while any request is pending. * Built-in 'service.DeduplicatingInvoker': requests are aggregated by distinct - (method, kwargs) key, only one such method executes, return value is cached - and broadcast to all requesters. + (method, kwargs) key, only one such method ever executes, return value is + cached and broadcast to all request waiters. Waiters do not block additional + pool threads. Activator diff --git a/mitogen/service.py b/mitogen/service.py index d41240c2..33757836 100644 --- a/mitogen/service.py +++ b/mitogen/service.py @@ -195,12 +195,12 @@ class Activator(object): def activate(self, pool, service_name, msg): mod_name, _, class_name = service_name.rpartition('.') - if not self.is_permitted(mod_name, class_name, msg): + if msg and not self.is_permitted(mod_name, class_name, msg): raise mitogen.core.CallError(self.not_active_msg, service_name) module = mitogen.core.import_module(mod_name) klass = getattr(module, class_name) - service = klass(pool.router) + service = klass(router=pool.router) pool.add(service) return service @@ -261,6 +261,50 @@ class Invoker(object): msg.reply(response) +class SerializedInvoker(Invoker): + def __init__(self, **kwargs): + super(SerializedInvoker, self).__init__(**kwargs) + self._lock = threading.Lock() + self._queue = [] + self._running = False + + def _pop(self): + self._lock.acquire() + try: + try: + return self._queue.pop(0) + except IndexError: + self._running = False + finally: + self._lock.release() + + def _run(self): + while True: + tup = self._pop() + if tup is None: + return + method_name, kwargs, msg = tup + try: + super(SerializedInvoker, self).invoke(method_name, kwargs, msg) + except Exception: + LOG.exception('%r: while invoking %r of %r', + self, method_name, self.service) + msg.reply(mitogen.core.Message.dead()) + + def invoke(self, method_name, kwargs, msg): + self._lock.acquire() + try: + self._queue.append((method_name, kwargs, msg)) + first = not self._running + self._running = True + finally: + self._lock.release() + + if first: + self._run() + return Service.NO_REPLY + + class DeduplicatingInvoker(Invoker): """ A service that deduplicates and caches expensive responses. Requests are @@ -419,7 +463,7 @@ class Pool(object): if name in self._invoker_by_name: raise Error('service named %r already registered' % (name,)) assert service.select not in self._func_by_recv - invoker = service.invoker_class(service) + invoker = service.invoker_class(service=service) self._invoker_by_name[name] = invoker self._func_by_recv[service.select] = service.on_message @@ -439,13 +483,17 @@ class Pool(object): invoker = self._invoker_by_name.get(name) if not invoker: service = self._activator.activate(self, name, msg) - invoker = service.invoker_class(service) + invoker = service.invoker_class(service=service) self._invoker_by_name[name] = invoker finally: self._lock.release() return invoker + def get_service(self, name): + invoker = self.get_invoker(name, None) + return invoker.service + def _validate(self, msg): tup = msg.unpickle(throw=False) if not (isinstance(tup, tuple) and @@ -466,7 +514,8 @@ class Pool(object): LOG.warning('%r: call error: %s: %s', self, msg, e) msg.reply(e) except Exception: - LOG.exception('While invoking %r._invoke()', self) + LOG.exception('%r: while invoking %r of %r', + self, method_name, service_name) e = sys.exc_info()[1] msg.reply(mitogen.core.CallError(e)) @@ -513,6 +562,111 @@ class FileStreamState(object): self.lock = threading.Lock() +class PushFileService(Service): + """ + Push-based file service. Files are delivered and cached in RAM, sent + recursively from parent to child. A child that requests a file via + :meth:`get` will block until it has ben delivered by a parent. + + This service will eventually be merged into FileService. + """ + invoker_class = SerializedInvoker + + def __init__(self, **kwargs): + super(PushFileService, self).__init__(**kwargs) + self._lock = threading.Lock() + self._cache = {} + self._waiters = {} + self._sent_by_stream = {} + + def get(self, path): + self._lock.acquire() + try: + if path in self._cache: + return self._cache[path] + waiters = self._waiters.setdefault(path, []) + latch = mitogen.core.Latch() + waiters.append(lambda: latch.put(None)) + finally: + self._lock.release() + + latch.get() + LOG.debug('%r.get(%r) -> %r', self, path, self._cache[path]) + return self._cache[path] + + def _forward(self, context, path): + stream = self.router.stream_by_id(context.context_id) + child = mitogen.core.Context(self.router, stream.remote_id) + sent = self._sent_by_stream.setdefault(stream, set()) + if path in sent and child.context_id != context.context_id: + child.call_service_async( + service_name=self.name(), + method_name='forward', + path=path, + context=context + ).close() + else: + child.call_service_async( + service_name=self.name(), + method_name='store_and_forward', + path=path, + data=self._cache[path], + context=context + ).close() + + @expose(policy=AllowParents()) + @arg_spec({ + 'context': mitogen.core.Context, + 'path': basestring, + }) + def propagate_to(self, context, path): + LOG.debug('%r.propagate_to(%r, %r)', self, context, path) + if path not in self._cache: + fp = open(path, 'rb') + try: + self._cache[path] = mitogen.core.Blob(fp.read()) + finally: + fp.close() + self._forward(context, path) + + def _store(self, path, data): + self._lock.acquire() + try: + self._cache[path] = data + return self._waiters.pop(path, []) + finally: + self._lock.release() + + @expose(policy=AllowParents()) + @no_reply() + @arg_spec({ + 'path': basestring, + 'data': mitogen.core.Blob, + 'context': mitogen.core.Context, + }) + def store_and_forward(self, path, data, context): + LOG.debug('%r.store_and_forward(%r, %r, %r)', + self, path, data, context) + waiters = self._store(path, data) + if context.context_id != mitogen.context_id: + self._forward(path, context) + for callback in waiters: + callback() + + @expose(policy=AllowParents()) + @no_reply() + @arg_spec({ + 'path': basestring, + 'context': mitogen.core.Context, + }) + def forward(self, path, context): + LOG.debug('%r.forward(%r, %r)', self, path, context) + if path not in self._cache: + LOG.error('%r: %r is not in local cache', self, path) + return + self._forward(path, context) + + class FileService(Service): """ Streaming file server, used to serve small files and huge files alike.