File size: 7,067 Bytes
0861a59 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 | """
Event system module for enabling parallel processing across components.
Implements a publisher-subscriber pattern to decouple components.
"""
import logging
import threading
import queue
import time
from typing import Dict, List, Callable, Any, Optional, Set
import concurrent.futures
logger = logging.getLogger(__name__)
class Event:
"""Event class containing event data and metadata."""
def __init__(self, event_type: str, data: Any = None, source: str = None):
self.event_type = event_type
self.data = data
self.source = source
self.timestamp = time.time()
def __repr__(self) -> str:
return f"Event({self.event_type}, source={self.source}, timestamp={self.timestamp})"
class EventSystem:
"""Event system for parallel processing of prompts and responses."""
def __init__(self, max_workers: int = 4):
"""Initialize the event system."""
self.subscribers: Dict[str, List[Callable[[Event], None]]] = {}
self.lock = threading.RLock() # Reentrant lock for thread safety
self.event_queue = queue.Queue()
self.running = False
self.dispatcher_thread = None
self.max_workers = max_workers
self.thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=max_workers)
self.futures = set()
logger.info(f"Initialized EventSystem with {max_workers} workers")
def subscribe(self, event_type: str, callback: Callable[[Event], None]) -> None:
"""Subscribe a callback to a specific event type."""
with self.lock:
if event_type not in self.subscribers:
self.subscribers[event_type] = []
self.subscribers[event_type].append(callback)
logger.debug(f"Added subscriber to {event_type}, total: {len(self.subscribers[event_type])}")
def unsubscribe(self, event_type: str, callback: Callable[[Event], None]) -> None:
"""Unsubscribe a callback from a specific event type."""
with self.lock:
if event_type in self.subscribers and callback in self.subscribers[event_type]:
self.subscribers[event_type].remove(callback)
logger.debug(f"Removed subscriber from {event_type}, remaining: {len(self.subscribers[event_type])}")
def publish(self, event: Event) -> None:
"""Publish an event to all subscribers."""
self.event_queue.put(event)
logger.debug(f"Published event: {event}")
# Start dispatcher if not running
with self.lock:
if not self.running:
self.start()
def publish_from_dict(self, event_type: str, data: Dict[str, Any], source: str = None) -> None:
"""Convenient method to publish an event from a dictionary."""
event = Event(event_type, data, source)
self.publish(event)
def start(self) -> None:
"""Start the event dispatcher thread."""
with self.lock:
if not self.running:
self.running = True
self.dispatcher_thread = threading.Thread(target=self._dispatch_events)
self.dispatcher_thread.daemon = True
self.dispatcher_thread.start()
logger.info("Event dispatcher thread started")
def stop(self) -> None:
"""Stop the event dispatcher thread."""
with self.lock:
if self.running:
self.running = False
self.event_queue.put(None) # Sentinel to stop the thread
if self.dispatcher_thread and self.dispatcher_thread.is_alive():
self.dispatcher_thread.join(timeout=2.0)
logger.info("Event dispatcher thread stopped")
# Shut down thread pool
self.thread_pool.shutdown(wait=False)
def _dispatch_events(self) -> None:
"""Dispatcher thread that processes events from the queue."""
while self.running:
try:
# Get next event with timeout to allow checking running flag
event = self.event_queue.get(timeout=0.5)
# Handle sentinel value
if event is None:
break
# Process the event
self._process_event(event)
# Mark task as done
self.event_queue.task_done()
except queue.Empty:
continue
except Exception as e:
logger.error(f"Error in event dispatcher: {e}")
logger.info("Event dispatcher thread exiting")
def _process_event(self, event: Event) -> None:
"""Process a single event by notifying subscribers."""
with self.lock:
# Get subscribers for this event type
subscribers = self.subscribers.get(event.event_type, []).copy()
# Also check for wildcard subscribers
wildcard_subscribers = self.subscribers.get("*", []).copy()
all_subscribers = subscribers + wildcard_subscribers
if not all_subscribers:
logger.debug(f"No subscribers for event {event.event_type}")
return
logger.debug(f"Dispatching event {event.event_type} to {len(all_subscribers)} subscribers")
# Submit a task to the thread pool for each subscriber
for callback in all_subscribers:
future = self.thread_pool.submit(self._safe_callback, callback, event)
self.futures.add(future)
future.add_done_callback(lambda f: self.futures.remove(f))
def _safe_callback(self, callback: Callable[[Event], None], event: Event) -> None:
"""Execute a callback safely, catching exceptions."""
try:
callback(event)
except Exception as e:
logger.error(f"Error in subscriber callback: {e}")
def wait_for_all_events(self, timeout: Optional[float] = None) -> bool:
"""Wait for all pending events to be processed."""
try:
self.event_queue.join()
# Also wait for all futures to complete
done, not_done = concurrent.futures.wait(
self.futures,
timeout=timeout,
return_when=concurrent.futures.ALL_COMPLETED
)
return len(not_done) == 0
except Exception as e:
logger.error(f"Error waiting for events: {e}")
return False
# Common event types
EVENT_USER_INPUT = "user_input"
EVENT_MODEL_REQUEST = "model_request"
EVENT_MODEL_RESPONSE = "model_response"
EVENT_STDP_REQUEST = "stdp_request"
EVENT_STDP_RESPONSE = "stdp_response"
EVENT_TOKEN_GENERATED = "token_generated"
EVENT_RESPONSE_COMPLETE = "response_complete"
EVENT_ERROR = "error"
|