diff --git a/.gitignore b/.gitignore index 716dc865..404007f4 100644 --- a/.gitignore +++ b/.gitignore @@ -18,7 +18,7 @@ proxy.py.iml *.crt *.key -venv +venv* cover htmlcov dist diff --git a/proxy/core/event.py b/proxy/core/event.py index a77ca804..cca9ce3f 100644 --- a/proxy/core/event.py +++ b/proxy/core/event.py @@ -37,13 +37,29 @@ eventNames = EventNames(1, 2, 3, 4, 5, 6, 7, 8) class EventQueue: - """Global event queue.""" + """Global event queue. - MANAGER: multiprocessing.managers.SyncManager = multiprocessing.Manager() + Each event contains: + + 1. Request ID - Globally unique + 2. Process ID - Process ID of event publisher. + This will be process id of acceptor workers. + 3. Thread ID - Thread ID of event publisher. + When --threadless is enabled, this value will + be same for all the requests + received by a single acceptor worker. + When --threadless is disabled, this value will be + Thread ID of the thread handling the client request. + 4. Event Timestamp - Time when this event occur + 5. Event Name - One of the defined or custom event name + 6. Event Payload - Optional data associated with the event + 7. Publisher ID (optional) - Optionally, publishing entity unique name / ID + """ def __init__(self) -> None: super().__init__() - self.queue = EventQueue.MANAGER.Queue() + self.manager = multiprocessing.Manager() + self.queue = self.manager.Queue() def publish( self, @@ -52,21 +68,6 @@ class EventQueue: event_payload: Dict[str, Any], publisher_id: Optional[str] = None ) -> None: - """Publish an event into the queue. - - 1. Request ID - Globally unique - 2. Process ID - Process ID of event publisher. - This will be process id of acceptor workers. - 3. Thread ID - Thread ID of event publisher. - When --threadless is enabled, this value will be same for all the requests - received by a single acceptor worker. - When --threadless is disabled, this value will be - Thread ID of the thread handling the client request. - 4. Event Timestamp - Time when this event occur - 5. Event Name - One of the defined or custom event name - 6. Event Payload - Optional data associated with the event - 7. Publisher ID (optional) - Optionally, publishing entity unique name / ID - """ self.queue.put({ 'request_id': request_id, 'process_id': os.getpid(), @@ -169,9 +170,8 @@ class EventDispatcher: class EventSubscriber: """Core event subscriber.""" - MANAGER: multiprocessing.managers.SyncManager = multiprocessing.Manager() - def __init__(self, event_queue: EventQueue) -> None: + self.manager = multiprocessing.Manager() self.event_queue = event_queue self.relay_thread: Optional[threading.Thread] = None self.relay_shutdown: Optional[threading.Event] = None @@ -180,7 +180,7 @@ class EventSubscriber: def subscribe(self, callback: Callable[[Dict[str, Any]], None]) -> None: self.relay_shutdown = threading.Event() - self.relay_channel = EventSubscriber.MANAGER.Queue() + self.relay_channel = self.manager.Queue() self.relay_thread = threading.Thread( target=self.relay, args=(self.relay_shutdown, self.relay_channel, callback))