""" Event system module for enabling parallel processing across components. Implements a publisher-subscriber pattern to decouple components. """ import logging import threading import queue import time from typing import Dict, List, Callable, Any, Optional, Set import concurrent.futures logger = logging.getLogger(__name__) class Event: """Event class containing event data and metadata.""" def __init__(self, event_type: str, data: Any = None, source: str = None): self.event_type = event_type self.data = data self.source = source self.timestamp = time.time() def __repr__(self) -> str: return f"Event({self.event_type}, source={self.source}, timestamp={self.timestamp})" class EventSystem: """Event system for parallel processing of prompts and responses.""" def __init__(self, max_workers: int = 4): """Initialize the event system.""" self.subscribers: Dict[str, List[Callable[[Event], None]]] = {} self.lock = threading.RLock() # Reentrant lock for thread safety self.event_queue = queue.Queue() self.running = False self.dispatcher_thread = None self.max_workers = max_workers self.thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) self.futures = set() logger.info(f"Initialized EventSystem with {max_workers} workers") def subscribe(self, event_type: str, callback: Callable[[Event], None]) -> None: """Subscribe a callback to a specific event type.""" with self.lock: if event_type not in self.subscribers: self.subscribers[event_type] = [] self.subscribers[event_type].append(callback) logger.debug(f"Added subscriber to {event_type}, total: {len(self.subscribers[event_type])}") def unsubscribe(self, event_type: str, callback: Callable[[Event], None]) -> None: """Unsubscribe a callback from a specific event type.""" with self.lock: if event_type in self.subscribers and callback in self.subscribers[event_type]: self.subscribers[event_type].remove(callback) logger.debug(f"Removed subscriber from {event_type}, remaining: {len(self.subscribers[event_type])}") def publish(self, event: Event) -> None: """Publish an event to all subscribers.""" self.event_queue.put(event) logger.debug(f"Published event: {event}") # Start dispatcher if not running with self.lock: if not self.running: self.start() def publish_from_dict(self, event_type: str, data: Dict[str, Any], source: str = None) -> None: """Convenient method to publish an event from a dictionary.""" event = Event(event_type, data, source) self.publish(event) def start(self) -> None: """Start the event dispatcher thread.""" with self.lock: if not self.running: self.running = True self.dispatcher_thread = threading.Thread(target=self._dispatch_events) self.dispatcher_thread.daemon = True self.dispatcher_thread.start() logger.info("Event dispatcher thread started") def stop(self) -> None: """Stop the event dispatcher thread.""" with self.lock: if self.running: self.running = False self.event_queue.put(None) # Sentinel to stop the thread if self.dispatcher_thread and self.dispatcher_thread.is_alive(): self.dispatcher_thread.join(timeout=2.0) logger.info("Event dispatcher thread stopped") # Shut down thread pool self.thread_pool.shutdown(wait=False) def _dispatch_events(self) -> None: """Dispatcher thread that processes events from the queue.""" while self.running: try: # Get next event with timeout to allow checking running flag event = self.event_queue.get(timeout=0.5) # Handle sentinel value if event is None: break # Process the event self._process_event(event) # Mark task as done self.event_queue.task_done() except queue.Empty: continue except Exception as e: logger.error(f"Error in event dispatcher: {e}") logger.info("Event dispatcher thread exiting") def _process_event(self, event: Event) -> None: """Process a single event by notifying subscribers.""" with self.lock: # Get subscribers for this event type subscribers = self.subscribers.get(event.event_type, []).copy() # Also check for wildcard subscribers wildcard_subscribers = self.subscribers.get("*", []).copy() all_subscribers = subscribers + wildcard_subscribers if not all_subscribers: logger.debug(f"No subscribers for event {event.event_type}") return logger.debug(f"Dispatching event {event.event_type} to {len(all_subscribers)} subscribers") # Submit a task to the thread pool for each subscriber for callback in all_subscribers: future = self.thread_pool.submit(self._safe_callback, callback, event) self.futures.add(future) future.add_done_callback(lambda f: self.futures.remove(f)) def _safe_callback(self, callback: Callable[[Event], None], event: Event) -> None: """Execute a callback safely, catching exceptions.""" try: callback(event) except Exception as e: logger.error(f"Error in subscriber callback: {e}") def wait_for_all_events(self, timeout: Optional[float] = None) -> bool: """Wait for all pending events to be processed.""" try: self.event_queue.join() # Also wait for all futures to complete done, not_done = concurrent.futures.wait( self.futures, timeout=timeout, return_when=concurrent.futures.ALL_COMPLETED ) return len(not_done) == 0 except Exception as e: logger.error(f"Error waiting for events: {e}") return False # Common event types EVENT_USER_INPUT = "user_input" EVENT_MODEL_REQUEST = "model_request" EVENT_MODEL_RESPONSE = "model_response" EVENT_STDP_REQUEST = "stdp_request" EVENT_STDP_RESPONSE = "stdp_response" EVENT_TOKEN_GENERATED = "token_generated" EVENT_RESPONSE_COMPLETE = "response_complete" EVENT_ERROR = "error"