File size: 7,067 Bytes
0861a59
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
"""

Event system module for enabling parallel processing across components.

Implements a publisher-subscriber pattern to decouple components.

"""
import logging
import threading
import queue
import time
from typing import Dict, List, Callable, Any, Optional, Set
import concurrent.futures

logger = logging.getLogger(__name__)

class Event:
    """Event class containing event data and metadata."""
    def __init__(self, event_type: str, data: Any = None, source: str = None):
        self.event_type = event_type
        self.data = data
        self.source = source
        self.timestamp = time.time()
    
    def __repr__(self) -> str:
        return f"Event({self.event_type}, source={self.source}, timestamp={self.timestamp})"

class EventSystem:
    """Event system for parallel processing of prompts and responses."""
    
    def __init__(self, max_workers: int = 4):
        """Initialize the event system."""
        self.subscribers: Dict[str, List[Callable[[Event], None]]] = {}
        self.lock = threading.RLock()  # Reentrant lock for thread safety
        self.event_queue = queue.Queue()
        self.running = False
        self.dispatcher_thread = None
        self.max_workers = max_workers
        self.thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=max_workers)
        self.futures = set()
        logger.info(f"Initialized EventSystem with {max_workers} workers")
    
    def subscribe(self, event_type: str, callback: Callable[[Event], None]) -> None:
        """Subscribe a callback to a specific event type."""
        with self.lock:
            if event_type not in self.subscribers:
                self.subscribers[event_type] = []
            self.subscribers[event_type].append(callback)
            logger.debug(f"Added subscriber to {event_type}, total: {len(self.subscribers[event_type])}")
    
    def unsubscribe(self, event_type: str, callback: Callable[[Event], None]) -> None:
        """Unsubscribe a callback from a specific event type."""
        with self.lock:
            if event_type in self.subscribers and callback in self.subscribers[event_type]:
                self.subscribers[event_type].remove(callback)
                logger.debug(f"Removed subscriber from {event_type}, remaining: {len(self.subscribers[event_type])}")
    
    def publish(self, event: Event) -> None:
        """Publish an event to all subscribers."""
        self.event_queue.put(event)
        logger.debug(f"Published event: {event}")
        
        # Start dispatcher if not running
        with self.lock:
            if not self.running:
                self.start()
    
    def publish_from_dict(self, event_type: str, data: Dict[str, Any], source: str = None) -> None:
        """Convenient method to publish an event from a dictionary."""
        event = Event(event_type, data, source)
        self.publish(event)
    
    def start(self) -> None:
        """Start the event dispatcher thread."""
        with self.lock:
            if not self.running:
                self.running = True
                self.dispatcher_thread = threading.Thread(target=self._dispatch_events)
                self.dispatcher_thread.daemon = True
                self.dispatcher_thread.start()
                logger.info("Event dispatcher thread started")
    
    def stop(self) -> None:
        """Stop the event dispatcher thread."""
        with self.lock:
            if self.running:
                self.running = False
                self.event_queue.put(None)  # Sentinel to stop the thread
                if self.dispatcher_thread and self.dispatcher_thread.is_alive():
                    self.dispatcher_thread.join(timeout=2.0)
                logger.info("Event dispatcher thread stopped")
        
        # Shut down thread pool
        self.thread_pool.shutdown(wait=False)
        
    def _dispatch_events(self) -> None:
        """Dispatcher thread that processes events from the queue."""
        while self.running:
            try:
                # Get next event with timeout to allow checking running flag
                event = self.event_queue.get(timeout=0.5)
                
                # Handle sentinel value
                if event is None:
                    break
                    
                # Process the event
                self._process_event(event)
                
                # Mark task as done
                self.event_queue.task_done()
            except queue.Empty:
                continue
            except Exception as e:
                logger.error(f"Error in event dispatcher: {e}")
        
        logger.info("Event dispatcher thread exiting")
    
    def _process_event(self, event: Event) -> None:
        """Process a single event by notifying subscribers."""
        with self.lock:
            # Get subscribers for this event type
            subscribers = self.subscribers.get(event.event_type, []).copy()
            # Also check for wildcard subscribers
            wildcard_subscribers = self.subscribers.get("*", []).copy()
            all_subscribers = subscribers + wildcard_subscribers
        
        if not all_subscribers:
            logger.debug(f"No subscribers for event {event.event_type}")
            return
        
        logger.debug(f"Dispatching event {event.event_type} to {len(all_subscribers)} subscribers")
        
        # Submit a task to the thread pool for each subscriber
        for callback in all_subscribers:
            future = self.thread_pool.submit(self._safe_callback, callback, event)
            self.futures.add(future)
            future.add_done_callback(lambda f: self.futures.remove(f))
    
    def _safe_callback(self, callback: Callable[[Event], None], event: Event) -> None:
        """Execute a callback safely, catching exceptions."""
        try:
            callback(event)
        except Exception as e:
            logger.error(f"Error in subscriber callback: {e}")
    
    def wait_for_all_events(self, timeout: Optional[float] = None) -> bool:
        """Wait for all pending events to be processed."""
        try:
            self.event_queue.join()
            
            # Also wait for all futures to complete
            done, not_done = concurrent.futures.wait(
                self.futures, 
                timeout=timeout,
                return_when=concurrent.futures.ALL_COMPLETED
            )
            
            return len(not_done) == 0
        except Exception as e:
            logger.error(f"Error waiting for events: {e}")
            return False

# Common event types
EVENT_USER_INPUT = "user_input"
EVENT_MODEL_REQUEST = "model_request" 
EVENT_MODEL_RESPONSE = "model_response"
EVENT_STDP_REQUEST = "stdp_request"
EVENT_STDP_RESPONSE = "stdp_response"
EVENT_TOKEN_GENERATED = "token_generated"
EVENT_RESPONSE_COMPLETE = "response_complete"
EVENT_ERROR = "error"