| """
|
| Simple event bus for direct component-to-component communication.
|
| Provides a lightweight alternative to the full EventSystem.
|
| """
|
| import logging
|
| from typing import Dict, List, Callable, Any
|
|
|
| logger = logging.getLogger(__name__)
|
|
|
| class EventBus:
|
| """Simple synchronous event bus for direct event routing."""
|
|
|
| def __init__(self):
|
| """Initialize an empty event bus."""
|
| self.subscribers = {}
|
| logger.info("Initialized EventBus")
|
|
|
| def subscribe(self, event_type: str, callback: Callable[[str, Any], None]) -> None:
|
| """Subscribe a callback to an event type."""
|
| if event_type not in self.subscribers:
|
| self.subscribers[event_type] = []
|
| self.subscribers[event_type].append(callback)
|
| logger.debug(f"Added subscriber to {event_type}")
|
|
|
| def unsubscribe(self, event_type: str, callback: Callable[[str, Any], None]) -> None:
|
| """Unsubscribe a callback from an event type."""
|
| if event_type in self.subscribers and callback in self.subscribers[event_type]:
|
| self.subscribers[event_type].remove(callback)
|
| logger.debug(f"Removed subscriber from {event_type}")
|
|
|
| def publish(self, event_type: str, data: Any = None) -> None:
|
| """Synchronously publish an event to all subscribers."""
|
| subscribers = self.subscribers.get(event_type, []).copy()
|
|
|
| if not subscribers:
|
| logger.debug(f"No subscribers for event {event_type}")
|
| return
|
|
|
| logger.debug(f"Dispatching event {event_type} to {len(subscribers)} subscribers")
|
|
|
| for callback in subscribers:
|
| try:
|
| callback(event_type, data)
|
| except Exception as e:
|
| logger.error(f"Error in subscriber callback: {e}")
|
|
|
|
|
| event_bus = EventBus()
|
|
|