footypredict-pro / src /live /stream_processor.py
nananie143's picture
feat: Complete blueprint implementation with 66+ modules
90bacf7 verified
"""
Stream Processor Module
Real-time data stream processing.
Part of the complete blueprint implementation.
"""
import asyncio
from typing import Dict, List, Optional, Callable, Any
from datetime import datetime
from collections import deque
import logging
import json
logger = logging.getLogger(__name__)
class StreamProcessor:
"""
Processes real-time data streams.
Features:
- Event buffering
- Windowed aggregation
- Real-time triggers
- State management
"""
def __init__(
self,
buffer_size: int = 1000,
window_seconds: int = 60
):
self.buffer_size = buffer_size
self.window_seconds = window_seconds
self.buffers: Dict[str, deque] = {}
self.handlers: Dict[str, List[Callable]] = {}
self.state: Dict[str, Any] = {}
self.is_running = False
def register_handler(
self,
event_type: str,
handler: Callable
):
"""Register an event handler."""
if event_type not in self.handlers:
self.handlers[event_type] = []
self.handlers[event_type].append(handler)
logger.info(f"Registered handler for: {event_type}")
def process_event(
self,
event_type: str,
data: Dict
):
"""Process a single event."""
# Add timestamp
data['_timestamp'] = datetime.now().isoformat()
data['_event_type'] = event_type
# Buffer event
if event_type not in self.buffers:
self.buffers[event_type] = deque(maxlen=self.buffer_size)
self.buffers[event_type].append(data)
# Trigger handlers
if event_type in self.handlers:
for handler in self.handlers[event_type]:
try:
handler(data, self.state)
except Exception as e:
logger.error(f"Handler error: {e}")
def get_window(
self,
event_type: str,
seconds: int = None
) -> List[Dict]:
"""Get events in time window."""
seconds = seconds or self.window_seconds
if event_type not in self.buffers:
return []
cutoff = datetime.now().timestamp() - seconds
return [
e for e in self.buffers[event_type]
if datetime.fromisoformat(e['_timestamp']).timestamp() >= cutoff
]
def aggregate_window(
self,
event_type: str,
field: str,
func: str = 'mean',
seconds: int = None
) -> Optional[float]:
"""Aggregate field values in window."""
events = self.get_window(event_type, seconds)
if not events:
return None
values = [e.get(field) for e in events if field in e and e[field] is not None]
if not values:
return None
if func == 'mean':
return sum(values) / len(values)
elif func == 'sum':
return sum(values)
elif func == 'max':
return max(values)
elif func == 'min':
return min(values)
elif func == 'count':
return len(values)
return None
def set_state(self, key: str, value: Any):
"""Set processor state."""
self.state[key] = value
def get_state(self, key: str, default: Any = None) -> Any:
"""Get processor state."""
return self.state.get(key, default)
def clear_buffer(self, event_type: str = None):
"""Clear event buffer."""
if event_type:
if event_type in self.buffers:
self.buffers[event_type].clear()
else:
self.buffers.clear()
class LiveMatchProcessor(StreamProcessor):
"""Specialized processor for live match events."""
def __init__(self):
super().__init__(buffer_size=500, window_seconds=300)
self._setup_handlers()
def _setup_handlers(self):
"""Setup default handlers."""
self.register_handler('goal', self._handle_goal)
self.register_handler('card', self._handle_card)
self.register_handler('odds_update', self._handle_odds)
def _handle_goal(self, event: Dict, state: Dict):
"""Handle goal event."""
match_id = event.get('match_id')
if match_id:
key = f"goals_{match_id}"
current = state.get(key, {'home': 0, 'away': 0})
if event.get('team') == 'home':
current['home'] += 1
else:
current['away'] += 1
state[key] = current
logger.info(f"Goal scored: {match_id} -> {current}")
def _handle_card(self, event: Dict, state: Dict):
"""Handle card event."""
match_id = event.get('match_id')
if match_id:
key = f"cards_{match_id}"
cards = state.get(key, [])
cards.append({
'player': event.get('player'),
'card_type': event.get('card_type'),
'minute': event.get('minute')
})
state[key] = cards
def _handle_odds(self, event: Dict, state: Dict):
"""Handle odds update."""
match_id = event.get('match_id')
if match_id:
key = f"odds_{match_id}"
state[key] = event.get('odds', {})
def get_match_state(self, match_id: str) -> Dict:
"""Get current state for a match."""
return {
'goals': self.get_state(f"goals_{match_id}", {'home': 0, 'away': 0}),
'cards': self.get_state(f"cards_{match_id}", []),
'odds': self.get_state(f"odds_{match_id}", {}),
'recent_events': self.get_window(match_id, seconds=60)
}
_processor: Optional[StreamProcessor] = None
_match_processor: Optional[LiveMatchProcessor] = None
def get_processor() -> StreamProcessor:
global _processor
if _processor is None:
_processor = StreamProcessor()
return _processor
def get_match_processor() -> LiveMatchProcessor:
global _match_processor
if _match_processor is None:
_match_processor = LiveMatchProcessor()
return _match_processor