WildnerveAI's picture
Upload 20 files
0861a59 verified
"""
Simple event bus for direct component-to-component communication.
Provides a lightweight alternative to the full EventSystem.
"""
import logging
from typing import Dict, List, Callable, Any
logger = logging.getLogger(__name__)
class EventBus:
"""Simple synchronous event bus for direct event routing."""
def __init__(self):
"""Initialize an empty event bus."""
self.subscribers = {}
logger.info("Initialized EventBus")
def subscribe(self, event_type: str, callback: Callable[[str, Any], None]) -> None:
"""Subscribe a callback to an event type."""
if event_type not in self.subscribers:
self.subscribers[event_type] = []
self.subscribers[event_type].append(callback)
logger.debug(f"Added subscriber to {event_type}")
def unsubscribe(self, event_type: str, callback: Callable[[str, Any], None]) -> None:
"""Unsubscribe a callback from an event type."""
if event_type in self.subscribers and callback in self.subscribers[event_type]:
self.subscribers[event_type].remove(callback)
logger.debug(f"Removed subscriber from {event_type}")
def publish(self, event_type: str, data: Any = None) -> None:
"""Synchronously publish an event to all subscribers."""
subscribers = self.subscribers.get(event_type, []).copy()
if not subscribers:
logger.debug(f"No subscribers for event {event_type}")
return
logger.debug(f"Dispatching event {event_type} to {len(subscribers)} subscribers")
for callback in subscribers:
try:
callback(event_type, data)
except Exception as e:
logger.error(f"Error in subscriber callback: {e}")
# Create a global instance for convenience
event_bus = EventBus()