Spaces:
Runtime error
Runtime error
| """ | |
| Stream Processor Module | |
| Real-time data stream processing. | |
| Part of the complete blueprint implementation. | |
| """ | |
| import asyncio | |
| from typing import Dict, List, Optional, Callable, Any | |
| from datetime import datetime | |
| from collections import deque | |
| import logging | |
| import json | |
| logger = logging.getLogger(__name__) | |
| class StreamProcessor: | |
| """ | |
| Processes real-time data streams. | |
| Features: | |
| - Event buffering | |
| - Windowed aggregation | |
| - Real-time triggers | |
| - State management | |
| """ | |
| def __init__( | |
| self, | |
| buffer_size: int = 1000, | |
| window_seconds: int = 60 | |
| ): | |
| self.buffer_size = buffer_size | |
| self.window_seconds = window_seconds | |
| self.buffers: Dict[str, deque] = {} | |
| self.handlers: Dict[str, List[Callable]] = {} | |
| self.state: Dict[str, Any] = {} | |
| self.is_running = False | |
| def register_handler( | |
| self, | |
| event_type: str, | |
| handler: Callable | |
| ): | |
| """Register an event handler.""" | |
| if event_type not in self.handlers: | |
| self.handlers[event_type] = [] | |
| self.handlers[event_type].append(handler) | |
| logger.info(f"Registered handler for: {event_type}") | |
| def process_event( | |
| self, | |
| event_type: str, | |
| data: Dict | |
| ): | |
| """Process a single event.""" | |
| # Add timestamp | |
| data['_timestamp'] = datetime.now().isoformat() | |
| data['_event_type'] = event_type | |
| # Buffer event | |
| if event_type not in self.buffers: | |
| self.buffers[event_type] = deque(maxlen=self.buffer_size) | |
| self.buffers[event_type].append(data) | |
| # Trigger handlers | |
| if event_type in self.handlers: | |
| for handler in self.handlers[event_type]: | |
| try: | |
| handler(data, self.state) | |
| except Exception as e: | |
| logger.error(f"Handler error: {e}") | |
| def get_window( | |
| self, | |
| event_type: str, | |
| seconds: int = None | |
| ) -> List[Dict]: | |
| """Get events in time window.""" | |
| seconds = seconds or self.window_seconds | |
| if event_type not in self.buffers: | |
| return [] | |
| cutoff = datetime.now().timestamp() - seconds | |
| return [ | |
| e for e in self.buffers[event_type] | |
| if datetime.fromisoformat(e['_timestamp']).timestamp() >= cutoff | |
| ] | |
| def aggregate_window( | |
| self, | |
| event_type: str, | |
| field: str, | |
| func: str = 'mean', | |
| seconds: int = None | |
| ) -> Optional[float]: | |
| """Aggregate field values in window.""" | |
| events = self.get_window(event_type, seconds) | |
| if not events: | |
| return None | |
| values = [e.get(field) for e in events if field in e and e[field] is not None] | |
| if not values: | |
| return None | |
| if func == 'mean': | |
| return sum(values) / len(values) | |
| elif func == 'sum': | |
| return sum(values) | |
| elif func == 'max': | |
| return max(values) | |
| elif func == 'min': | |
| return min(values) | |
| elif func == 'count': | |
| return len(values) | |
| return None | |
| def set_state(self, key: str, value: Any): | |
| """Set processor state.""" | |
| self.state[key] = value | |
| def get_state(self, key: str, default: Any = None) -> Any: | |
| """Get processor state.""" | |
| return self.state.get(key, default) | |
| def clear_buffer(self, event_type: str = None): | |
| """Clear event buffer.""" | |
| if event_type: | |
| if event_type in self.buffers: | |
| self.buffers[event_type].clear() | |
| else: | |
| self.buffers.clear() | |
| class LiveMatchProcessor(StreamProcessor): | |
| """Specialized processor for live match events.""" | |
| def __init__(self): | |
| super().__init__(buffer_size=500, window_seconds=300) | |
| self._setup_handlers() | |
| def _setup_handlers(self): | |
| """Setup default handlers.""" | |
| self.register_handler('goal', self._handle_goal) | |
| self.register_handler('card', self._handle_card) | |
| self.register_handler('odds_update', self._handle_odds) | |
| def _handle_goal(self, event: Dict, state: Dict): | |
| """Handle goal event.""" | |
| match_id = event.get('match_id') | |
| if match_id: | |
| key = f"goals_{match_id}" | |
| current = state.get(key, {'home': 0, 'away': 0}) | |
| if event.get('team') == 'home': | |
| current['home'] += 1 | |
| else: | |
| current['away'] += 1 | |
| state[key] = current | |
| logger.info(f"Goal scored: {match_id} -> {current}") | |
| def _handle_card(self, event: Dict, state: Dict): | |
| """Handle card event.""" | |
| match_id = event.get('match_id') | |
| if match_id: | |
| key = f"cards_{match_id}" | |
| cards = state.get(key, []) | |
| cards.append({ | |
| 'player': event.get('player'), | |
| 'card_type': event.get('card_type'), | |
| 'minute': event.get('minute') | |
| }) | |
| state[key] = cards | |
| def _handle_odds(self, event: Dict, state: Dict): | |
| """Handle odds update.""" | |
| match_id = event.get('match_id') | |
| if match_id: | |
| key = f"odds_{match_id}" | |
| state[key] = event.get('odds', {}) | |
| def get_match_state(self, match_id: str) -> Dict: | |
| """Get current state for a match.""" | |
| return { | |
| 'goals': self.get_state(f"goals_{match_id}", {'home': 0, 'away': 0}), | |
| 'cards': self.get_state(f"cards_{match_id}", []), | |
| 'odds': self.get_state(f"odds_{match_id}", {}), | |
| 'recent_events': self.get_window(match_id, seconds=60) | |
| } | |
| _processor: Optional[StreamProcessor] = None | |
| _match_processor: Optional[LiveMatchProcessor] = None | |
| def get_processor() -> StreamProcessor: | |
| global _processor | |
| if _processor is None: | |
| _processor = StreamProcessor() | |
| return _processor | |
| def get_match_processor() -> LiveMatchProcessor: | |
| global _match_processor | |
| if _match_processor is None: | |
| _match_processor = LiveMatchProcessor() | |
| return _match_processor | |