Move manager initialization outside of top level scope. Fixes #233 (#236)

This commit is contained in:
Abhinav Singh 2019-12-20 14:01:26 -08:00 committed by GitHub
parent fdf0cc557f
commit 625dc4d3e5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 22 additions and 22 deletions

2
.gitignore vendored
View File

@ -18,7 +18,7 @@ proxy.py.iml
*.crt *.crt
*.key *.key
venv venv*
cover cover
htmlcov htmlcov
dist dist

View File

@ -37,13 +37,29 @@ eventNames = EventNames(1, 2, 3, 4, 5, 6, 7, 8)
class EventQueue: class EventQueue:
"""Global event queue.""" """Global event queue.
MANAGER: multiprocessing.managers.SyncManager = multiprocessing.Manager() Each event contains:
1. Request ID - Globally unique
2. Process ID - Process ID of event publisher.
This will be process id of acceptor workers.
3. Thread ID - Thread ID of event publisher.
When --threadless is enabled, this value will
be same for all the requests
received by a single acceptor worker.
When --threadless is disabled, this value will be
Thread ID of the thread handling the client request.
4. Event Timestamp - Time when this event occur
5. Event Name - One of the defined or custom event name
6. Event Payload - Optional data associated with the event
7. Publisher ID (optional) - Optionally, publishing entity unique name / ID
"""
def __init__(self) -> None: def __init__(self) -> None:
super().__init__() super().__init__()
self.queue = EventQueue.MANAGER.Queue() self.manager = multiprocessing.Manager()
self.queue = self.manager.Queue()
def publish( def publish(
self, self,
@ -52,21 +68,6 @@ class EventQueue:
event_payload: Dict[str, Any], event_payload: Dict[str, Any],
publisher_id: Optional[str] = None publisher_id: Optional[str] = None
) -> None: ) -> None:
"""Publish an event into the queue.
1. Request ID - Globally unique
2. Process ID - Process ID of event publisher.
This will be process id of acceptor workers.
3. Thread ID - Thread ID of event publisher.
When --threadless is enabled, this value will be same for all the requests
received by a single acceptor worker.
When --threadless is disabled, this value will be
Thread ID of the thread handling the client request.
4. Event Timestamp - Time when this event occur
5. Event Name - One of the defined or custom event name
6. Event Payload - Optional data associated with the event
7. Publisher ID (optional) - Optionally, publishing entity unique name / ID
"""
self.queue.put({ self.queue.put({
'request_id': request_id, 'request_id': request_id,
'process_id': os.getpid(), 'process_id': os.getpid(),
@ -169,9 +170,8 @@ class EventDispatcher:
class EventSubscriber: class EventSubscriber:
"""Core event subscriber.""" """Core event subscriber."""
MANAGER: multiprocessing.managers.SyncManager = multiprocessing.Manager()
def __init__(self, event_queue: EventQueue) -> None: def __init__(self, event_queue: EventQueue) -> None:
self.manager = multiprocessing.Manager()
self.event_queue = event_queue self.event_queue = event_queue
self.relay_thread: Optional[threading.Thread] = None self.relay_thread: Optional[threading.Thread] = None
self.relay_shutdown: Optional[threading.Event] = None self.relay_shutdown: Optional[threading.Event] = None
@ -180,7 +180,7 @@ class EventSubscriber:
def subscribe(self, callback: Callable[[Dict[str, Any]], None]) -> None: def subscribe(self, callback: Callable[[Dict[str, Any]], None]) -> None:
self.relay_shutdown = threading.Event() self.relay_shutdown = threading.Event()
self.relay_channel = EventSubscriber.MANAGER.Queue() self.relay_channel = self.manager.Queue()
self.relay_thread = threading.Thread( self.relay_thread = threading.Thread(
target=self.relay, target=self.relay,
args=(self.relay_shutdown, self.relay_channel, callback)) args=(self.relay_shutdown, self.relay_channel, callback))