proxy.py/examples/pubsub_eventing.py

103 lines
3.3 KiB
Python

# -*- coding: utf-8 -*-
"""
proxy.py
~~~~~~~~
⚡⚡⚡ Fast, Lightweight, Pluggable, TLS interception capable proxy server focused on
Network monitoring, controls & Application development, testing, debugging.
:copyright: (c) 2013-present by Abhinav Singh and contributors.
:license: BSD, see LICENSE for more details.
"""
import time
import logging
import multiprocessing
from typing import Any, Dict, Optional
from proxy.core.event import (
EventQueue, EventManager, EventSubscriber, eventNames,
)
from proxy.common.constants import DEFAULT_LOG_FORMAT
logging.basicConfig(level=logging.DEBUG, format=DEFAULT_LOG_FORMAT)
logger = logging.getLogger(__name__)
num_events_received = [0, 0]
# Execute within a separate thread context
def on_event(payload: Dict[str, Any]) -> None:
'''Subscriber callback.'''
global num_events_received
if payload['request_id'] == '1234':
num_events_received[0] += 1
else:
num_events_received[1] += 1
def publisher_process(
shutdown_event: multiprocessing.synchronize.Event,
dispatcher_queue: EventQueue,
) -> None:
logger.info('publisher started')
try:
while not shutdown_event.is_set():
dispatcher_queue.publish(
request_id='12345',
event_name=eventNames.WORK_STARTED,
event_payload={'time': time.time()},
publisher_id='eventing_pubsub_process',
)
except KeyboardInterrupt:
pass
logger.info('publisher shutdown')
if __name__ == '__main__':
start_time = time.time()
# Start eventing core
subscriber: Optional[EventSubscriber] = None
with EventManager() as event_manager:
assert event_manager.queue
# Create a subscriber.
# Internally, subscribe will start a separate thread
# to receive incoming published messages.
with EventSubscriber(event_manager.queue, callback=on_event) as subscriber:
# Start a publisher process to demonstrate safe exchange
# of messages between processes.
publisher_shutdown_event = multiprocessing.Event()
publisher = multiprocessing.Process(
target=publisher_process, args=(
publisher_shutdown_event, event_manager.queue, ),
)
publisher.start()
# Dispatch event from main process too
# to demonstrate safe exchange of messages
# between threads.
try:
while True:
event_manager.queue.publish(
request_id='1234',
event_name=eventNames.WORK_STARTED,
event_payload={'time': time.time()},
publisher_id='eventing_pubsub_main',
)
except KeyboardInterrupt:
logger.info('KBE!!!')
finally:
# Stop publisher process
publisher_shutdown_event.set()
publisher.join()
logger.info(
'Received {0} events from main thread, {1} events from another process, in {2} seconds'.format(
num_events_received[0], num_events_received[1], time.time(
) - start_time,
),
)
logger.info('Done!!!')