|
|
"""
|
|
|
Event system module for enabling parallel processing across components.
|
|
|
Implements a publisher-subscriber pattern to decouple components.
|
|
|
"""
|
|
|
import logging
|
|
|
import threading
|
|
|
import queue
|
|
|
import time
|
|
|
from typing import Dict, List, Callable, Any, Optional, Set
|
|
|
import concurrent.futures
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class Event:
|
|
|
"""Event class containing event data and metadata."""
|
|
|
def __init__(self, event_type: str, data: Any = None, source: str = None):
|
|
|
self.event_type = event_type
|
|
|
self.data = data
|
|
|
self.source = source
|
|
|
self.timestamp = time.time()
|
|
|
|
|
|
def __repr__(self) -> str:
|
|
|
return f"Event({self.event_type}, source={self.source}, timestamp={self.timestamp})"
|
|
|
|
|
|
class EventSystem:
|
|
|
"""Event system for parallel processing of prompts and responses."""
|
|
|
|
|
|
def __init__(self, max_workers: int = 4):
|
|
|
"""Initialize the event system."""
|
|
|
self.subscribers: Dict[str, List[Callable[[Event], None]]] = {}
|
|
|
self.lock = threading.RLock()
|
|
|
self.event_queue = queue.Queue()
|
|
|
self.running = False
|
|
|
self.dispatcher_thread = None
|
|
|
self.max_workers = max_workers
|
|
|
self.thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=max_workers)
|
|
|
self.futures = set()
|
|
|
logger.info(f"Initialized EventSystem with {max_workers} workers")
|
|
|
|
|
|
def subscribe(self, event_type: str, callback: Callable[[Event], None]) -> None:
|
|
|
"""Subscribe a callback to a specific event type."""
|
|
|
with self.lock:
|
|
|
if event_type not in self.subscribers:
|
|
|
self.subscribers[event_type] = []
|
|
|
self.subscribers[event_type].append(callback)
|
|
|
logger.debug(f"Added subscriber to {event_type}, total: {len(self.subscribers[event_type])}")
|
|
|
|
|
|
def unsubscribe(self, event_type: str, callback: Callable[[Event], None]) -> None:
|
|
|
"""Unsubscribe a callback from a specific event type."""
|
|
|
with self.lock:
|
|
|
if event_type in self.subscribers and callback in self.subscribers[event_type]:
|
|
|
self.subscribers[event_type].remove(callback)
|
|
|
logger.debug(f"Removed subscriber from {event_type}, remaining: {len(self.subscribers[event_type])}")
|
|
|
|
|
|
def publish(self, event: Event) -> None:
|
|
|
"""Publish an event to all subscribers."""
|
|
|
self.event_queue.put(event)
|
|
|
logger.debug(f"Published event: {event}")
|
|
|
|
|
|
|
|
|
with self.lock:
|
|
|
if not self.running:
|
|
|
self.start()
|
|
|
|
|
|
def publish_from_dict(self, event_type: str, data: Dict[str, Any], source: str = None) -> None:
|
|
|
"""Convenient method to publish an event from a dictionary."""
|
|
|
event = Event(event_type, data, source)
|
|
|
self.publish(event)
|
|
|
|
|
|
def start(self) -> None:
|
|
|
"""Start the event dispatcher thread."""
|
|
|
with self.lock:
|
|
|
if not self.running:
|
|
|
self.running = True
|
|
|
self.dispatcher_thread = threading.Thread(target=self._dispatch_events)
|
|
|
self.dispatcher_thread.daemon = True
|
|
|
self.dispatcher_thread.start()
|
|
|
logger.info("Event dispatcher thread started")
|
|
|
|
|
|
def stop(self) -> None:
|
|
|
"""Stop the event dispatcher thread."""
|
|
|
with self.lock:
|
|
|
if self.running:
|
|
|
self.running = False
|
|
|
self.event_queue.put(None)
|
|
|
if self.dispatcher_thread and self.dispatcher_thread.is_alive():
|
|
|
self.dispatcher_thread.join(timeout=2.0)
|
|
|
logger.info("Event dispatcher thread stopped")
|
|
|
|
|
|
|
|
|
self.thread_pool.shutdown(wait=False)
|
|
|
|
|
|
def _dispatch_events(self) -> None:
|
|
|
"""Dispatcher thread that processes events from the queue."""
|
|
|
while self.running:
|
|
|
try:
|
|
|
|
|
|
event = self.event_queue.get(timeout=0.5)
|
|
|
|
|
|
|
|
|
if event is None:
|
|
|
break
|
|
|
|
|
|
|
|
|
self._process_event(event)
|
|
|
|
|
|
|
|
|
self.event_queue.task_done()
|
|
|
except queue.Empty:
|
|
|
continue
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error in event dispatcher: {e}")
|
|
|
|
|
|
logger.info("Event dispatcher thread exiting")
|
|
|
|
|
|
def _process_event(self, event: Event) -> None:
|
|
|
"""Process a single event by notifying subscribers."""
|
|
|
with self.lock:
|
|
|
|
|
|
subscribers = self.subscribers.get(event.event_type, []).copy()
|
|
|
|
|
|
wildcard_subscribers = self.subscribers.get("*", []).copy()
|
|
|
all_subscribers = subscribers + wildcard_subscribers
|
|
|
|
|
|
if not all_subscribers:
|
|
|
logger.debug(f"No subscribers for event {event.event_type}")
|
|
|
return
|
|
|
|
|
|
logger.debug(f"Dispatching event {event.event_type} to {len(all_subscribers)} subscribers")
|
|
|
|
|
|
|
|
|
for callback in all_subscribers:
|
|
|
future = self.thread_pool.submit(self._safe_callback, callback, event)
|
|
|
self.futures.add(future)
|
|
|
future.add_done_callback(lambda f: self.futures.remove(f))
|
|
|
|
|
|
def _safe_callback(self, callback: Callable[[Event], None], event: Event) -> None:
|
|
|
"""Execute a callback safely, catching exceptions."""
|
|
|
try:
|
|
|
callback(event)
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error in subscriber callback: {e}")
|
|
|
|
|
|
def wait_for_all_events(self, timeout: Optional[float] = None) -> bool:
|
|
|
"""Wait for all pending events to be processed."""
|
|
|
try:
|
|
|
self.event_queue.join()
|
|
|
|
|
|
|
|
|
done, not_done = concurrent.futures.wait(
|
|
|
self.futures,
|
|
|
timeout=timeout,
|
|
|
return_when=concurrent.futures.ALL_COMPLETED
|
|
|
)
|
|
|
|
|
|
return len(not_done) == 0
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error waiting for events: {e}")
|
|
|
return False
|
|
|
|
|
|
|
|
|
EVENT_USER_INPUT = "user_input"
|
|
|
EVENT_MODEL_REQUEST = "model_request"
|
|
|
EVENT_MODEL_RESPONSE = "model_response"
|
|
|
EVENT_STDP_REQUEST = "stdp_request"
|
|
|
EVENT_STDP_RESPONSE = "stdp_response"
|
|
|
EVENT_TOKEN_GENERATED = "token_generated"
|
|
|
EVENT_RESPONSE_COMPLETE = "response_complete"
|
|
|
EVENT_ERROR = "error"
|
|
|
|