diff --git a/ansible_mitogen/services.py b/ansible_mitogen/services.py index e9f40130..9d94ed18 100644 --- a/ansible_mitogen/services.py +++ b/ansible_mitogen/services.py @@ -347,27 +347,81 @@ class ContextService(mitogen.service.Service): class FileService(mitogen.service.Service): """ - Primitive latency-inducing file server for old-style incantations of the - module runner. This is to be replaced later with a scheme that forwards - files known to be missing without the target having to ask for them, - avoiding a corresponding roundtrip per file. + Streaming file server, used to serve both small files like Ansible module + sources, and huge files like ISO images. Paths must be explicitly added to + the service by a trusted context before they will be served to an untrusted + context. - Paths must be explicitly added to the service by a trusted context before - they will be served to an untrusted context. + The file service nominally lives on the mitogen.service.Pool() threads + shared with ContextService above, however for simplicity it also maintains + a dedicated thread from where file chunks are scheduled. + + The scheduler thread is responsible for dividing transfer requests up among + the physical streams that connect to those contexts, and ensure each stream + never has an excessive amount of data buffered in RAM at any time. + + Transfers proceeed one-at-a-time per stream. When multiple contexts exist + reachable over the same stream (e.g. one is the SSH account, another is a + sudo account, and a third is a proxied SSH connection), each request is + satisfied in turn before chunks for subsuquent requests start flowing. This + ensures when a connection is contended, that preference is given to + completing individual transfers, rather than potentially aborting many + partially complete transfers, causing all the bandwidth used to be wasted. + + Theory of operation: + 1. Trusted context (i.e. a WorkerProcess) calls register(), making a + file available to any untrusted context. + 2. Untrusted context creates a mitogen.core.Receiver() to receive + file chunks. It then calls fetch(path, recv.to_sender()), which sets + up the transfer. The fetch() method returns the final file size and + notifies the dedicated thread of the transfer request. + 3. The dedicated thread wakes from perpetual sleep, looks up the stream + used to communicate with the untrusted context, and begins pumping + 128KiB-sized chunks until that stream's output queue reaches a + limit (1MiB). + 4. The thread sleeps for 10ms, wakes, and pumps new chunks as necesarry + to refill any drained output queue, which are being asynchronously + drained by the Stream implementation running on the Broker thread. + 5. Once the last chunk has been pumped for a single transfer, + Sender.close() is called causing the receive loop in + target.py::_get_file() to exit, and allows that code to compare the + transferred size with the total file size indicated by the return + value of the fetch() method. + 6. If the sizes mismatch, the caller is informed, which will discard + the result and log an error. + 7. Once all chunks have been pumped for all transfers, the dedicated + thread stops waking at 10ms intervals and resumes perpetual sleep. + + Shutdown: + 1. process.py calls service.Pool.shutdown(), which arranges for all the + service pool threads to exit and be joined, guranteeing no new + requests can arrive, before calling Service.on_shutdown() for each + registered service. + 2. FileService.on_shutdown() marks the dedicated thread's queue as + closed, causing the dedicated thread to wake immediately. It will + throw an exception that begins shutdown of the main loop. + 3. The main loop calls Sender.close() prematurely for every pending + transfer, causing any Receiver loops in the target contexts to exit + early. The file size check fails, and the partially downloaded file + is discarded, and an error is logged. + 4. Control exits the file transfer function in every target, and + graceful target shutdown can proceed normally, without the + associated thread needing to be forcefully killed. """ handle = 501 max_message_size = 1000 unregistered_msg = 'Path is not registered with FileService.' #: Maximum size of any stream's output queue before we temporarily stop - #: pumping more file chunks. The queue may overspill by up to - #: mitogen.core.CHUNK_SIZE-1 bytes (128KiB-1). + #: pumping more file chunks on that stream. The queue may overspill by up + #: to mitogen.core.CHUNK_SIZE-1 bytes (128KiB-1). max_queue_size = 1048576 - #: Time spent by the scheduler thread asleep when it has no more queues to - #: pump. With max_queue_size=1MiB and a sleep of 10ms, maximum throughput - #: on any single stream is 100MiB/sec, which is 5x what SSH can handle on - #: my laptop. + #: Time spent by the scheduler thread asleep when it has no more data to + #: pump, but while at least one transfer remains active. With + #: max_queue_size=1MiB and a sleep of 10ms, maximum throughput on any + #: single stream is 100MiB/sec, which is 5x what SSH can handle on my + #: laptop. sleep_delay_ms = 0.01 def __init__(self, router): @@ -385,7 +439,7 @@ class FileService(mitogen.service.Service): """ Respond to shutdown of the service pool by marking our queue closed. This causes :meth:`_sleep_on_queue` to wake immediately and return - :data:`False`, causing the scheduler thread main function to exit. + :data:`False`, causing the scheduler main thread to exit. """ self._queue.close()