WildnerveAI's picture
Upload 20 files
0861a59 verified
"""
Event system module for enabling parallel processing across components.
Implements a publisher-subscriber pattern to decouple components.
"""
import logging
import threading
import queue
import time
from typing import Dict, List, Callable, Any, Optional, Set
import concurrent.futures
logger = logging.getLogger(__name__)
class Event:
"""Event class containing event data and metadata."""
def __init__(self, event_type: str, data: Any = None, source: str = None):
self.event_type = event_type
self.data = data
self.source = source
self.timestamp = time.time()
def __repr__(self) -> str:
return f"Event({self.event_type}, source={self.source}, timestamp={self.timestamp})"
class EventSystem:
"""Event system for parallel processing of prompts and responses."""
def __init__(self, max_workers: int = 4):
"""Initialize the event system."""
self.subscribers: Dict[str, List[Callable[[Event], None]]] = {}
self.lock = threading.RLock() # Reentrant lock for thread safety
self.event_queue = queue.Queue()
self.running = False
self.dispatcher_thread = None
self.max_workers = max_workers
self.thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=max_workers)
self.futures = set()
logger.info(f"Initialized EventSystem with {max_workers} workers")
def subscribe(self, event_type: str, callback: Callable[[Event], None]) -> None:
"""Subscribe a callback to a specific event type."""
with self.lock:
if event_type not in self.subscribers:
self.subscribers[event_type] = []
self.subscribers[event_type].append(callback)
logger.debug(f"Added subscriber to {event_type}, total: {len(self.subscribers[event_type])}")
def unsubscribe(self, event_type: str, callback: Callable[[Event], None]) -> None:
"""Unsubscribe a callback from a specific event type."""
with self.lock:
if event_type in self.subscribers and callback in self.subscribers[event_type]:
self.subscribers[event_type].remove(callback)
logger.debug(f"Removed subscriber from {event_type}, remaining: {len(self.subscribers[event_type])}")
def publish(self, event: Event) -> None:
"""Publish an event to all subscribers."""
self.event_queue.put(event)
logger.debug(f"Published event: {event}")
# Start dispatcher if not running
with self.lock:
if not self.running:
self.start()
def publish_from_dict(self, event_type: str, data: Dict[str, Any], source: str = None) -> None:
"""Convenient method to publish an event from a dictionary."""
event = Event(event_type, data, source)
self.publish(event)
def start(self) -> None:
"""Start the event dispatcher thread."""
with self.lock:
if not self.running:
self.running = True
self.dispatcher_thread = threading.Thread(target=self._dispatch_events)
self.dispatcher_thread.daemon = True
self.dispatcher_thread.start()
logger.info("Event dispatcher thread started")
def stop(self) -> None:
"""Stop the event dispatcher thread."""
with self.lock:
if self.running:
self.running = False
self.event_queue.put(None) # Sentinel to stop the thread
if self.dispatcher_thread and self.dispatcher_thread.is_alive():
self.dispatcher_thread.join(timeout=2.0)
logger.info("Event dispatcher thread stopped")
# Shut down thread pool
self.thread_pool.shutdown(wait=False)
def _dispatch_events(self) -> None:
"""Dispatcher thread that processes events from the queue."""
while self.running:
try:
# Get next event with timeout to allow checking running flag
event = self.event_queue.get(timeout=0.5)
# Handle sentinel value
if event is None:
break
# Process the event
self._process_event(event)
# Mark task as done
self.event_queue.task_done()
except queue.Empty:
continue
except Exception as e:
logger.error(f"Error in event dispatcher: {e}")
logger.info("Event dispatcher thread exiting")
def _process_event(self, event: Event) -> None:
"""Process a single event by notifying subscribers."""
with self.lock:
# Get subscribers for this event type
subscribers = self.subscribers.get(event.event_type, []).copy()
# Also check for wildcard subscribers
wildcard_subscribers = self.subscribers.get("*", []).copy()
all_subscribers = subscribers + wildcard_subscribers
if not all_subscribers:
logger.debug(f"No subscribers for event {event.event_type}")
return
logger.debug(f"Dispatching event {event.event_type} to {len(all_subscribers)} subscribers")
# Submit a task to the thread pool for each subscriber
for callback in all_subscribers:
future = self.thread_pool.submit(self._safe_callback, callback, event)
self.futures.add(future)
future.add_done_callback(lambda f: self.futures.remove(f))
def _safe_callback(self, callback: Callable[[Event], None], event: Event) -> None:
"""Execute a callback safely, catching exceptions."""
try:
callback(event)
except Exception as e:
logger.error(f"Error in subscriber callback: {e}")
def wait_for_all_events(self, timeout: Optional[float] = None) -> bool:
"""Wait for all pending events to be processed."""
try:
self.event_queue.join()
# Also wait for all futures to complete
done, not_done = concurrent.futures.wait(
self.futures,
timeout=timeout,
return_when=concurrent.futures.ALL_COMPLETED
)
return len(not_done) == 0
except Exception as e:
logger.error(f"Error waiting for events: {e}")
return False
# Common event types
EVENT_USER_INPUT = "user_input"
EVENT_MODEL_REQUEST = "model_request"
EVENT_MODEL_RESPONSE = "model_response"
EVENT_STDP_REQUEST = "stdp_request"
EVENT_STDP_RESPONSE = "stdp_response"
EVENT_TOKEN_GENERATED = "token_generated"
EVENT_RESPONSE_COMPLETE = "response_complete"
EVENT_ERROR = "error"