Spaces:
Running
Running
| """ | |
| Data Ingestion Module for F1 Commentary Robot. | |
| This module connects to the OpenF1 API, polls endpoints for race data, | |
| parses JSON responses into structured events, and emits them to the event queue. | |
| Validates: Requirements 1.1, 1.2, 1.3, 1.4, 1.5, 1.6, 2.1-2.8 | |
| """ | |
| import logging | |
| import time | |
| import threading | |
| from typing import Optional, List, Dict, Any | |
| from datetime import datetime, timedelta | |
| import requests | |
| from requests.adapters import HTTPAdapter | |
| from urllib3.util.retry import Retry | |
| from reachy_f1_commentator.src.models import ( | |
| RaceEvent, EventType, OvertakeEvent, PitStopEvent, LeadChangeEvent, | |
| FastestLapEvent, IncidentEvent, SafetyCarEvent, FlagEvent, PositionUpdateEvent | |
| ) | |
| from reachy_f1_commentator.src.config import Config | |
| from reachy_f1_commentator.src.event_queue import PriorityEventQueue | |
| from reachy_f1_commentator.src.replay_mode import HistoricalDataLoader, ReplayController | |
| logger = logging.getLogger(__name__) | |
| class OpenF1Client: | |
| """ | |
| Client for OpenF1 API with retry logic and connection management. | |
| Note: OpenF1 API does NOT require authentication for historical data. | |
| Real-time data requires a paid account, but historical data is freely accessible. | |
| Handles HTTP connections, retry with exponential backoff, | |
| and connection loss detection/reconnection. | |
| Validates: Requirements 1.1, 1.2, 1.4, 1.5 | |
| """ | |
| def __init__(self, api_key: Optional[str] = None, base_url: str = "https://api.openf1.org/v1"): | |
| """ | |
| Initialize OpenF1 API client. | |
| Args: | |
| api_key: OpenF1 API authentication key (only needed for real-time data, optional for historical) | |
| base_url: Base URL for OpenF1 API | |
| """ | |
| self.api_key = api_key | |
| self.base_url = base_url.rstrip('/') | |
| self.session = None | |
| self._authenticated = False | |
| self._max_retries = 10 | |
| self._retry_delay = 5 # seconds | |
| def authenticate(self) -> bool: | |
| """ | |
| Set up HTTP session with retry logic. | |
| Note: OpenF1 API does NOT require authentication for historical data. | |
| This method sets up the session without authentication headers. | |
| Returns: | |
| True if session setup successful, False otherwise | |
| Validates: Requirements 1.1, 1.2 | |
| """ | |
| try: | |
| # Create session with retry strategy | |
| self.session = requests.Session() | |
| # Configure retry strategy with exponential backoff | |
| retry_strategy = Retry( | |
| total=3, | |
| backoff_factor=1, | |
| status_forcelist=[500, 502, 503, 504], # Removed 429 - not a rate limit issue | |
| allowed_methods=["GET", "POST"] | |
| ) | |
| adapter = HTTPAdapter(max_retries=retry_strategy) | |
| self.session.mount("http://", adapter) | |
| self.session.mount("https://", adapter) | |
| # OpenF1 API does NOT require authentication for historical data | |
| # Only set headers if API key is provided (for real-time data) | |
| # For historical data, no authentication is needed | |
| if self.api_key: | |
| logger.info("API key provided - will be used for real-time data access") | |
| # Note: Real-time data requires paid account and different access method | |
| # For now, we only support historical data which needs no auth | |
| # Test connection with a simple request (no auth needed) | |
| test_url = f"{self.base_url}/sessions" | |
| response = self.session.get(test_url, timeout=5) # 5 second timeout per requirement 10.5 | |
| response.raise_for_status() | |
| self._authenticated = True | |
| logger.info("Successfully connected to OpenF1 API (no authentication required for historical data)") | |
| return True | |
| except requests.exceptions.RequestException as e: | |
| logger.error(f"Failed to connect to OpenF1 API: {e}") | |
| self._authenticated = False | |
| return False | |
| def poll_endpoint(self, endpoint: str, params: Optional[Dict[str, Any]] = None) -> Optional[List[Dict]]: | |
| """ | |
| Poll a single OpenF1 API endpoint with retry logic. | |
| Implements exponential backoff retry for failed requests (max 10 attempts). | |
| Args: | |
| endpoint: API endpoint path (e.g., '/position', '/laps') | |
| params: Optional query parameters | |
| Returns: | |
| List of data dictionaries, or None if request fails | |
| Validates: Requirements 1.4, 1.5 | |
| """ | |
| if not self._authenticated or not self.session: | |
| logger.warning("Not authenticated, attempting to authenticate") | |
| if not self.authenticate(): | |
| return None | |
| url = f"{self.base_url}{endpoint}" | |
| attempt = 0 | |
| while attempt < self._max_retries: | |
| try: | |
| response = self.session.get(url, params=params, timeout=5) # 5 second timeout per requirement 10.5 | |
| response.raise_for_status() | |
| data = response.json() | |
| # Ensure we return a list | |
| if isinstance(data, dict): | |
| return [data] | |
| elif isinstance(data, list): | |
| return data | |
| else: | |
| logger.warning(f"Unexpected data type from {endpoint}: {type(data)}") | |
| return None | |
| except requests.exceptions.Timeout: | |
| attempt += 1 | |
| logger.warning(f"Timeout polling {endpoint}, attempt {attempt}/{self._max_retries}") | |
| if attempt < self._max_retries: | |
| time.sleep(self._retry_delay) | |
| except requests.exceptions.ConnectionError as e: | |
| attempt += 1 | |
| logger.error(f"Connection error polling {endpoint}: {e}, attempt {attempt}/{self._max_retries}") | |
| if attempt < self._max_retries: | |
| time.sleep(self._retry_delay) | |
| # Try to re-authenticate | |
| self.authenticate() | |
| except requests.exceptions.HTTPError as e: | |
| if e.response.status_code in [429, 500, 502, 503, 504]: | |
| attempt += 1 | |
| logger.warning(f"HTTP error {e.response.status_code} polling {endpoint}, attempt {attempt}/{self._max_retries}") | |
| if attempt < self._max_retries: | |
| time.sleep(self._retry_delay) | |
| else: | |
| logger.error(f"HTTP error polling {endpoint}: {e}") | |
| return None | |
| except Exception as e: | |
| logger.error(f"Unexpected error polling {endpoint}: {e}") | |
| return None | |
| logger.error(f"Failed to poll {endpoint} after {self._max_retries} attempts") | |
| return None | |
| def close(self) -> None: | |
| """Close the HTTP session.""" | |
| if self.session: | |
| self.session.close() | |
| self._authenticated = False | |
| logger.info("Closed OpenF1 API connection") | |
| class EventParser: | |
| """ | |
| Parses OpenF1 API responses into structured race events. | |
| Detects overtakes, pit stops, lead changes, fastest laps, incidents, | |
| flags, and safety car deployments from raw API data. | |
| Validates: Requirements 2.1-2.8 | |
| """ | |
| def __init__(self): | |
| """Initialize event parser with state tracking.""" | |
| self._last_positions: Dict[str, int] = {} # driver -> position | |
| self._last_position_time: Dict[str, datetime] = {} # driver -> timestamp | |
| self._last_leader: Optional[str] = None | |
| self._fastest_lap_time: Optional[float] = None | |
| self._overtake_threshold = timedelta(seconds=0.5) # False overtake filter | |
| self._starting_grid_announced = False # Track if we've announced the grid | |
| self._driver_names: Dict[str, str] = {} # driver_number -> full_name mapping | |
| self._race_started = False # Track if race has started | |
| self._seen_green_flag = False # Track if we've seen a green flag | |
| self._initial_positions: Dict[str, int] = {} # Collect initial positions for grid | |
| self._position_events_seen = 0 # Count position events before grid announcement | |
| def _get_driver_name(self, driver_number: str) -> str: | |
| """ | |
| Get driver name from driver number. | |
| Args: | |
| driver_number: Driver number as string | |
| Returns: | |
| Driver full name if available, otherwise driver number | |
| """ | |
| return self._driver_names.get(str(driver_number), str(driver_number)) | |
| def parse_position_data(self, data: List[Dict]) -> List[RaceEvent]: | |
| """ | |
| Parse position data to detect overtakes and lead changes. | |
| Filters out false overtakes (position swaps within 0.5 seconds). | |
| Also extracts starting grid from first position snapshot if starting_grid endpoint was empty. | |
| Args: | |
| data: List of position data dictionaries | |
| Returns: | |
| List of detected events (OvertakeEvent, LeadChangeEvent, PositionUpdateEvent) | |
| Validates: Requirements 2.1, 2.3, 2.8 | |
| """ | |
| events = [] | |
| if not data: | |
| return events | |
| try: | |
| # If we haven't announced the grid yet, collect initial positions | |
| if not self._starting_grid_announced: | |
| self._position_events_seen += 1 | |
| for entry in data: | |
| driver_number = entry.get('driver_number') or entry.get('driver') | |
| position = entry.get('position') | |
| if driver_number and position: | |
| self._initial_positions[str(driver_number)] = int(position) | |
| # Announce grid when we have 20 drivers, or after 25 position events with at least 18 drivers | |
| # (to handle cases where some drivers didn't start) | |
| should_announce = ( | |
| len(self._initial_positions) >= 20 or # Full grid | |
| (len(self._initial_positions) >= 18 and self._position_events_seen >= 25) # Partial grid after timeout | |
| ) | |
| if should_announce: | |
| grid = [] | |
| for driver_number, position in self._initial_positions.items(): | |
| driver_name = self._get_driver_name(driver_number) | |
| grid.append({ | |
| 'position': position, | |
| 'driver_number': str(driver_number), | |
| 'full_name': driver_name | |
| }) | |
| # Sort by position | |
| grid.sort(key=lambda x: x['position']) | |
| # Create starting grid announcement event | |
| event = RaceEvent( | |
| event_type=EventType.POSITION_UPDATE, | |
| timestamp=datetime.now(), | |
| data={ | |
| 'starting_grid': grid, | |
| 'is_starting_grid': True | |
| } | |
| ) | |
| events.append(event) | |
| self._starting_grid_announced = True | |
| logger.info(f"Starting grid announced with {len(grid)} drivers from first position snapshot") | |
| # Build current position map | |
| current_positions: Dict[str, int] = {} | |
| current_time = datetime.now() | |
| lap_number = 1 | |
| for entry in data: | |
| driver = entry.get('driver_number') or entry.get('driver') | |
| position = entry.get('position') | |
| if driver and position: | |
| current_positions[str(driver)] = int(position) | |
| # Extract lap number if available | |
| if 'lap_number' in entry: | |
| lap_number = entry['lap_number'] | |
| # Detect overtakes and lead changes | |
| if self._last_positions: | |
| for driver, new_pos in current_positions.items(): | |
| old_pos = self._last_positions.get(driver) | |
| if old_pos is not None and old_pos > new_pos: | |
| # Driver moved up in position | |
| # Check for false overtake (rapid position swap) | |
| last_time = self._last_position_time.get(driver, current_time) | |
| time_diff = current_time - last_time | |
| if time_diff > self._overtake_threshold: | |
| # Find who was overtaken | |
| overtaken_driver = None | |
| for other_driver, other_new_pos in current_positions.items(): | |
| if other_driver != driver: | |
| other_old_pos = self._last_positions.get(other_driver) | |
| if other_old_pos == new_pos and other_new_pos == old_pos: | |
| overtaken_driver = other_driver | |
| break | |
| if overtaken_driver: | |
| event = RaceEvent( | |
| event_type=EventType.OVERTAKE, | |
| timestamp=current_time, | |
| data={ | |
| 'overtaking_driver': driver, | |
| 'overtaken_driver': overtaken_driver, | |
| 'new_position': new_pos, | |
| 'lap_number': lap_number | |
| } | |
| ) | |
| events.append(event) | |
| logger.info(f"Detected overtake: {driver} overtakes {overtaken_driver} for P{new_pos}") | |
| # Check for lead change | |
| current_leader = None | |
| for driver, pos in current_positions.items(): | |
| if pos == 1: | |
| current_leader = driver | |
| break | |
| if current_leader and self._last_leader and current_leader != self._last_leader: | |
| event = RaceEvent( | |
| event_type=EventType.LEAD_CHANGE, | |
| timestamp=current_time, | |
| data={ | |
| 'new_leader': current_leader, | |
| 'old_leader': self._last_leader, | |
| 'lap_number': lap_number | |
| } | |
| ) | |
| events.append(event) | |
| logger.info(f"Detected lead change: {current_leader} takes lead from {self._last_leader}") | |
| self._last_leader = current_leader | |
| # Update state | |
| self._last_positions = current_positions | |
| for driver in current_positions: | |
| self._last_position_time[driver] = current_time | |
| # Always emit position update (unless we're still collecting initial grid) | |
| if current_positions and self._starting_grid_announced: | |
| event = RaceEvent( | |
| event_type=EventType.POSITION_UPDATE, | |
| timestamp=current_time, | |
| data={ | |
| 'positions': current_positions, | |
| 'lap_number': lap_number | |
| } | |
| ) | |
| events.append(event) | |
| except Exception as e: | |
| logger.error(f"[DataIngestion] Error parsing position data: {e}", exc_info=True) | |
| return events | |
| def parse_pit_data(self, data: List[Dict]) -> List[RaceEvent]: | |
| """ | |
| Parse pit stop data to detect pit stops. | |
| Args: | |
| data: List of pit stop data dictionaries | |
| Returns: | |
| List of PitStopEvent events | |
| Validates: Requirement 2.2 | |
| """ | |
| events = [] | |
| if not data: | |
| return events | |
| try: | |
| for entry in data: | |
| driver_number = entry.get('driver_number') or entry.get('driver') | |
| pit_duration = entry.get('pit_duration', 0.0) | |
| lap_number = entry.get('lap_number', 1) | |
| if driver_number: | |
| # Get driver name | |
| driver_name = self._get_driver_name(driver_number) | |
| event = RaceEvent( | |
| event_type=EventType.PIT_STOP, | |
| timestamp=datetime.now(), | |
| data={ | |
| 'driver': driver_name, | |
| 'driver_number': str(driver_number), | |
| 'pit_duration': float(pit_duration), | |
| 'lap_number': lap_number, | |
| 'tire_compound': entry.get('tire_compound', 'unknown') | |
| } | |
| ) | |
| events.append(event) | |
| logger.info(f"Detected pit stop: {driver_name} (duration: {pit_duration}s)") | |
| except Exception as e: | |
| logger.error(f"[DataIngestion] Error parsing pit data: {e}", exc_info=True) | |
| return events | |
| def parse_lap_data(self, data: List[Dict]) -> List[RaceEvent]: | |
| """ | |
| Parse lap data to detect fastest laps and race start. | |
| Args: | |
| data: List of lap data dictionaries | |
| Returns: | |
| List of FastestLapEvent events and race start event | |
| Validates: Requirement 2.4 | |
| """ | |
| events = [] | |
| if not data: | |
| return events | |
| try: | |
| for entry in data: | |
| driver_number = entry.get('driver_number') or entry.get('driver') | |
| lap_time = entry.get('lap_duration') or entry.get('lap_time') | |
| lap_number = entry.get('lap_number', 1) | |
| # Detect race start from first lap 1 event | |
| if lap_number == 1 and not self._race_started and self._starting_grid_announced: | |
| self._race_started = True | |
| race_start_event = RaceEvent( | |
| event_type=EventType.FLAG, | |
| timestamp=datetime.now(), | |
| data={ | |
| 'flag_type': 'green', | |
| 'sector': None, | |
| 'lap_number': 1, | |
| 'message': 'Race Start', | |
| 'is_race_start': True | |
| } | |
| ) | |
| events.append(race_start_event) | |
| logger.info("Detected race start from first lap data!") | |
| if driver_number and lap_time: | |
| lap_time = float(lap_time) | |
| # Only track fastest lap after race has started | |
| if self._race_started: | |
| # Check if this is a new fastest lap | |
| if self._fastest_lap_time is None or lap_time < self._fastest_lap_time: | |
| self._fastest_lap_time = lap_time | |
| # Get driver name | |
| driver_name = self._get_driver_name(driver_number) | |
| event = RaceEvent( | |
| event_type=EventType.FASTEST_LAP, | |
| timestamp=datetime.now(), | |
| data={ | |
| 'driver': driver_name, | |
| 'driver_number': str(driver_number), | |
| 'lap_time': lap_time, | |
| 'lap_number': lap_number | |
| } | |
| ) | |
| events.append(event) | |
| logger.info(f"Detected fastest lap: {driver_name} ({lap_time}s)") | |
| except Exception as e: | |
| logger.error(f"[DataIngestion] Error parsing lap data: {e}", exc_info=True) | |
| return events | |
| def parse_race_control_data(self, data: List[Dict]) -> List[RaceEvent]: | |
| """ | |
| Parse race control data to detect flags, safety car, and incidents. | |
| Filters out boring race control messages and only keeps important ones like: | |
| - Race start | |
| - Safety car deployment/withdrawal | |
| - Red flags | |
| - Chequered flag | |
| - Major incidents | |
| Args: | |
| data: List of race control message dictionaries | |
| Returns: | |
| List of events (FlagEvent, SafetyCarEvent, IncidentEvent) | |
| Validates: Requirements 2.5, 2.6, 2.7 | |
| """ | |
| events = [] | |
| if not data: | |
| return events | |
| try: | |
| for entry in data: | |
| message = entry.get('message', '').lower() | |
| category = entry.get('category', '').lower() | |
| lap_number = entry.get('lap_number', 1) | |
| # Filter out boring messages - only keep important race control events | |
| boring_keywords = [ | |
| 'track limits', | |
| 'deleted', | |
| 'time', | |
| 'under investigation', | |
| 'noted', | |
| 'reported', | |
| 'car stopped', | |
| 'drs enabled', | |
| 'drs disabled', | |
| 'permission', | |
| 'allowed', | |
| 'document', | |
| 'stewards', | |
| 'penalty' | |
| ] | |
| # Skip boring messages unless they're about important events | |
| is_boring = any(keyword in message for keyword in boring_keywords) | |
| is_important = ( | |
| 'safety car' in message or | |
| 'red flag' in message or | |
| 'chequered' in message or 'checkered' in message or | |
| 'session started' in message or | |
| 'green flag' in message or | |
| 'incident' in message or | |
| 'crash' in message or | |
| 'collision' in message | |
| ) | |
| if is_boring and not is_important: | |
| continue # Skip this boring message | |
| # Detect flags (only important ones) | |
| if 'flag' in message or 'flag' in category: | |
| flag_type = 'yellow' | |
| if 'red' in message: | |
| flag_type = 'red' | |
| elif 'green' in message: | |
| flag_type = 'green' | |
| elif 'blue' in message: | |
| continue # Skip blue flags (not interesting for commentary) | |
| elif 'chequered' in message or 'checkered' in message: | |
| flag_type = 'chequered' | |
| elif 'yellow' not in message: | |
| continue # Skip other flag types | |
| # Check if this is the race start (first green flag after grid) | |
| is_race_start = False | |
| if flag_type == 'green' and not self._race_started and self._starting_grid_announced: | |
| # This is the race start! | |
| self._race_started = True | |
| is_race_start = True | |
| logger.info("Detected race start!") | |
| event = RaceEvent( | |
| event_type=EventType.FLAG, | |
| timestamp=datetime.now(), | |
| data={ | |
| 'flag_type': flag_type, | |
| 'sector': entry.get('sector'), | |
| 'lap_number': lap_number, | |
| 'message': entry.get('message', ''), | |
| 'is_race_start': is_race_start | |
| } | |
| ) | |
| events.append(event) | |
| logger.info(f"Detected flag: {flag_type}") | |
| # Detect race start from "SESSION STARTED" message | |
| elif 'session started' in message and not self._race_started and self._starting_grid_announced: | |
| self._race_started = True | |
| event = RaceEvent( | |
| event_type=EventType.FLAG, | |
| timestamp=datetime.now(), | |
| data={ | |
| 'flag_type': 'green', | |
| 'sector': None, | |
| 'lap_number': lap_number, | |
| 'message': entry.get('message', ''), | |
| 'is_race_start': True | |
| } | |
| ) | |
| events.append(event) | |
| logger.info("Detected race start from SESSION STARTED message!") | |
| # Detect safety car | |
| elif 'safety car' in message or 'sc' in category: | |
| status = 'deployed' | |
| if 'in' in message: | |
| status = 'in' | |
| elif 'ending' in message or 'end' in message: | |
| status = 'ending' | |
| event = RaceEvent( | |
| event_type=EventType.SAFETY_CAR, | |
| timestamp=datetime.now(), | |
| data={ | |
| 'status': status, | |
| 'reason': entry.get('message', ''), | |
| 'lap_number': lap_number | |
| } | |
| ) | |
| events.append(event) | |
| logger.info(f"Detected safety car: {status}") | |
| # Skip incidents for now - they flood the queue at race start | |
| # TODO: Re-enable incidents with better filtering later | |
| # elif 'incident' in message or 'crash' in message or 'collision' in message: | |
| # event = RaceEvent( | |
| # event_type=EventType.INCIDENT, | |
| # timestamp=datetime.now(), | |
| # data={ | |
| # 'description': entry.get('message', ''), | |
| # 'drivers_involved': [], # Would need more parsing | |
| # 'lap_number': lap_number | |
| # } | |
| # ) | |
| # events.append(event) | |
| # logger.info(f"Detected incident: {entry.get('message', '')}") | |
| except Exception as e: | |
| logger.error(f"[DataIngestion] Error parsing race control data: {e}", exc_info=True) | |
| return events | |
| def parse_drivers_data(self, data: List[Dict]) -> List[RaceEvent]: | |
| """ | |
| Parse drivers data to populate driver name lookup table. | |
| This endpoint provides driver information (names, teams, etc.) but NOT grid positions. | |
| Grid positions come from starting_grid or position endpoints. | |
| Args: | |
| data: List of driver data dictionaries | |
| Returns: | |
| Empty list (no events generated, just populates lookup table) | |
| """ | |
| events = [] | |
| if not data: | |
| return events | |
| try: | |
| # Populate driver name lookup table | |
| for entry in data: | |
| driver_number = entry.get('driver_number') | |
| full_name = entry.get('full_name', 'Unknown') | |
| if driver_number: | |
| # Store driver name for lookup | |
| self._driver_names[str(driver_number)] = full_name | |
| logger.info(f"Loaded {len(self._driver_names)} driver names for lookup") | |
| except Exception as e: | |
| logger.error(f"[DataIngestion] Error parsing drivers data: {e}", exc_info=True) | |
| return events | |
| def parse_overtakes_data(self, data: List[Dict]) -> List[RaceEvent]: | |
| """ | |
| Parse overtakes data from OpenF1 API. | |
| Uses the official overtakes endpoint instead of detecting from position changes. | |
| This is more accurate as it's based on official timing data. | |
| Args: | |
| data: List of overtake data dictionaries | |
| Returns: | |
| List of OvertakeEvent events | |
| """ | |
| events = [] | |
| logger.debug(f"[EventParser] parse_overtakes_data called with {len(data) if data else 0} records") | |
| if not data: | |
| logger.debug("[EventParser] No overtake data to parse") | |
| return events | |
| logger.info(f"[EventParser] Parsing {len(data)} overtake records") | |
| try: | |
| for entry in data: | |
| overtaking_driver_num = entry.get('overtaking_driver_number') | |
| overtaken_driver_num = entry.get('overtaken_driver_number') | |
| lap_number = entry.get('lap_number', 1) | |
| position = entry.get('position') # New position after overtake | |
| logger.debug(f"[EventParser] Processing overtake: {overtaking_driver_num} -> {overtaken_driver_num}") | |
| if overtaking_driver_num and overtaken_driver_num: | |
| # Get driver names | |
| overtaking_driver = self._get_driver_name(overtaking_driver_num) | |
| overtaken_driver = self._get_driver_name(overtaken_driver_num) | |
| event = RaceEvent( | |
| event_type=EventType.OVERTAKE, | |
| timestamp=datetime.now(), | |
| data={ | |
| 'overtaking_driver': overtaking_driver, | |
| 'overtaken_driver': overtaken_driver, | |
| 'overtaking_driver_number': str(overtaking_driver_num), | |
| 'overtaken_driver_number': str(overtaken_driver_num), | |
| 'new_position': position, # Add the position | |
| 'lap_number': lap_number | |
| } | |
| ) | |
| events.append(event) | |
| logger.debug(f"Parsed overtake: {overtaking_driver} overtakes {overtaken_driver} for P{position} on lap {lap_number}") | |
| else: | |
| logger.warning(f"[EventParser] Skipping overtake with missing driver numbers: {entry}") | |
| except Exception as e: | |
| logger.error(f"[DataIngestion] Error parsing overtakes data: {e}", exc_info=True) | |
| logger.info(f"[EventParser] Created {len(events)} overtake events") | |
| return events | |
| def parse_starting_grid_data(self, data: List[Dict]) -> List[RaceEvent]: | |
| """ | |
| Parse starting_grid data to get the actual grid positions. | |
| This endpoint provides the official starting grid with correct positions. | |
| Note: If this endpoint is empty, the starting grid will be extracted from | |
| the first position data snapshot instead. | |
| Args: | |
| data: List of starting grid data dictionaries | |
| Returns: | |
| List of events (one STARTING_GRID event with properly ordered drivers) | |
| """ | |
| events = [] | |
| if not data or self._starting_grid_announced: | |
| return events | |
| try: | |
| # Sort by position to ensure correct order | |
| sorted_grid = sorted(data, key=lambda x: x.get('position', 999)) | |
| # Build grid with driver names | |
| grid = [] | |
| for entry in sorted_grid: | |
| driver_number = entry.get('driver_number') | |
| position = entry.get('position') | |
| if driver_number and position: | |
| # Get driver name from lookup | |
| driver_name = self._get_driver_name(driver_number) | |
| grid.append({ | |
| 'position': position, | |
| 'driver_number': str(driver_number), | |
| 'full_name': driver_name | |
| }) | |
| if grid: | |
| # Create starting grid announcement event | |
| event = RaceEvent( | |
| event_type=EventType.POSITION_UPDATE, | |
| timestamp=datetime.now(), | |
| data={ | |
| 'starting_grid': grid, | |
| 'is_starting_grid': True | |
| } | |
| ) | |
| events.append(event) | |
| self._starting_grid_announced = True | |
| logger.info(f"Starting grid announced with {len(grid)} drivers from starting_grid endpoint") | |
| except Exception as e: | |
| logger.error(f"[DataIngestion] Error parsing starting_grid data: {e}", exc_info=True) | |
| return events | |
| class DataIngestionModule: | |
| """ | |
| Main orchestrator for data ingestion from OpenF1 API. | |
| Manages polling threads for multiple endpoints, coordinates event parsing, | |
| and emits events to the event queue. Supports both live mode and replay mode. | |
| Validates: Requirements 1.6, 9.3 | |
| """ | |
| def __init__(self, config: Config, event_queue: PriorityEventQueue): | |
| """ | |
| Initialize data ingestion module. | |
| Args: | |
| config: System configuration | |
| event_queue: Event queue for emitting parsed events | |
| """ | |
| self.config = config | |
| self.event_queue = event_queue | |
| self.client = OpenF1Client(config.openf1_api_key, config.openf1_base_url) | |
| self.parser = EventParser() | |
| self._running = False | |
| self._threads: List[threading.Thread] = [] | |
| # Replay mode components | |
| self._replay_controller: Optional[ReplayController] = None | |
| self._historical_loader: Optional[HistoricalDataLoader] = None | |
| def start(self) -> bool: | |
| """ | |
| Start polling all configured endpoints (live mode) or replay (replay mode). | |
| Launches separate threads for each endpoint with configured intervals in live mode, | |
| or starts replay controller in replay mode. | |
| Returns: | |
| True if started successfully, False otherwise | |
| Validates: Requirements 1.6, 9.3 | |
| """ | |
| if self._running: | |
| logger.warning("Data ingestion already running") | |
| return False | |
| # Check if we're in replay mode | |
| if self.config.replay_mode: | |
| return self._start_replay_mode() | |
| else: | |
| return self._start_live_mode() | |
| def _start_live_mode(self) -> bool: | |
| """ | |
| Start live mode data ingestion. | |
| Returns: | |
| True if started successfully, False otherwise | |
| """ | |
| # Authenticate first | |
| if not self.client.authenticate(): | |
| logger.error("Failed to authenticate with OpenF1 API") | |
| return False | |
| self._running = True | |
| # Start polling threads for each endpoint | |
| endpoints = [ | |
| ('/position', self.config.position_poll_interval, self.parser.parse_position_data), | |
| ('/pit', self.config.pit_poll_interval, self.parser.parse_pit_data), | |
| ('/laps', self.config.laps_poll_interval, self.parser.parse_lap_data), | |
| ('/race_control', self.config.race_control_poll_interval, self.parser.parse_race_control_data), | |
| ] | |
| for endpoint, interval, parser_func in endpoints: | |
| thread = threading.Thread( | |
| target=self._poll_loop, | |
| args=(endpoint, interval, parser_func), | |
| daemon=True | |
| ) | |
| thread.start() | |
| self._threads.append(thread) | |
| logger.info(f"Started polling thread for {endpoint} (interval: {interval}s)") | |
| logger.info("Data ingestion module started in LIVE mode") | |
| return True | |
| def _start_replay_mode(self) -> bool: | |
| """ | |
| Start replay mode data ingestion. | |
| Returns: | |
| True if started successfully, False otherwise | |
| Validates: Requirement 9.3 | |
| """ | |
| if not self.config.replay_race_id: | |
| logger.error("replay_race_id not configured for replay mode") | |
| return False | |
| logger.info(f"Starting replay mode for race: {self.config.replay_race_id}") | |
| # Initialize historical data loader | |
| self._historical_loader = HistoricalDataLoader( | |
| api_key=self.config.openf1_api_key, | |
| base_url=self.config.openf1_base_url | |
| ) | |
| # Load race data | |
| race_data = self._historical_loader.load_race(self.config.replay_race_id) | |
| if not race_data: | |
| logger.error(f"Failed to load race data for {self.config.replay_race_id}") | |
| return False | |
| # Initialize replay controller | |
| self._replay_controller = ReplayController( | |
| race_data=race_data, | |
| playback_speed=self.config.replay_speed, | |
| skip_large_gaps=self.config.replay_skip_large_gaps | |
| ) | |
| # Start replay with callback to process events | |
| self._replay_controller.start(self._replay_event_callback) | |
| self._running = True | |
| logger.info(f"Data ingestion module started in REPLAY mode at {self.config.replay_speed}x speed") | |
| # Wait for replay to complete (keep thread alive) | |
| # The replay controller runs in its own thread, so we need to wait for it | |
| while self._running and self._replay_controller and not self._replay_controller.is_stopped(): | |
| time.sleep(0.1) | |
| logger.info("Replay mode completed") | |
| return True | |
| def _replay_event_callback(self, endpoint: str, data: Dict) -> None: | |
| """ | |
| Callback for replay controller to process historical events. | |
| Parses the event using the same parser as live mode and emits to queue. | |
| Args: | |
| endpoint: Endpoint name ('position', 'pit', 'laps', 'race_control') | |
| data: Event data dictionary | |
| Validates: Requirement 9.3 | |
| """ | |
| try: | |
| # Map endpoint to parser function | |
| parser_map = { | |
| 'drivers': self.parser.parse_drivers_data, | |
| 'starting_grid': self.parser.parse_starting_grid_data, | |
| 'position': self.parser.parse_position_data, | |
| 'pit': self.parser.parse_pit_data, | |
| 'laps': self.parser.parse_lap_data, | |
| 'race_control': self.parser.parse_race_control_data, | |
| 'overtakes': self.parser.parse_overtakes_data | |
| } | |
| parser_func = parser_map.get(endpoint) | |
| if not parser_func: | |
| logger.warning(f"Unknown endpoint in replay: {endpoint}") | |
| return | |
| # Debug: log endpoint being processed | |
| logger.debug(f"[DataIngestion] Processing {endpoint} event") | |
| # Parse events (parser expects a list) | |
| events = parser_func([data]) | |
| # Debug: log how many events were generated | |
| if events: | |
| logger.debug(f"[DataIngestion] Generated {len(events)} events from {endpoint}") | |
| # Emit events to queue | |
| for event in events: | |
| self.event_queue.enqueue(event) | |
| except Exception as e: | |
| logger.error(f"[DataIngestion] Error processing replay event from {endpoint}: {e}", exc_info=True) | |
| def stop(self) -> None: | |
| """ | |
| Stop polling and gracefully shutdown all threads (live mode) or replay (replay mode). | |
| Validates: Requirements 1.6, 9.3 | |
| """ | |
| if not self._running: | |
| return | |
| logger.info("Stopping data ingestion module...") | |
| self._running = False | |
| # Stop replay controller if in replay mode | |
| if self._replay_controller: | |
| self._replay_controller.stop() | |
| self._replay_controller = None | |
| # Wait for threads to finish (with timeout) | |
| for thread in self._threads: | |
| thread.join(timeout=5.0) | |
| self._threads.clear() | |
| self.client.close() | |
| logger.info("Data ingestion module stopped") | |
| def _poll_loop(self, endpoint: str, interval: float, parser_func) -> None: | |
| """ | |
| Polling loop for a single endpoint. | |
| Args: | |
| endpoint: API endpoint path | |
| interval: Polling interval in seconds | |
| parser_func: Function to parse endpoint data | |
| """ | |
| while self._running: | |
| try: | |
| start_time = time.time() | |
| # Poll endpoint | |
| data = self.client.poll_endpoint(endpoint) | |
| if data: | |
| # Parse events | |
| parse_start = time.time() | |
| events = parser_func(data) | |
| parse_duration = time.time() - parse_start | |
| # Log parsing latency (Requirement 1.3) | |
| if parse_duration > 0.5: | |
| logger.warning(f"Parsing {endpoint} took {parse_duration:.3f}s (exceeds 500ms target)") | |
| # Emit events to queue | |
| for event in events: | |
| self.event_queue.enqueue(event) | |
| # Sleep for remaining interval time | |
| elapsed = time.time() - start_time | |
| sleep_time = max(0, interval - elapsed) | |
| if sleep_time > 0: | |
| time.sleep(sleep_time) | |
| except Exception as e: | |
| logger.error(f"[DataIngestion] Error in polling loop for {endpoint}: {e}", exc_info=True) | |
| time.sleep(interval) | |
| def pause_replay(self) -> None: | |
| """ | |
| Pause replay playback (replay mode only). | |
| Validates: Requirement 9.4 | |
| """ | |
| if self._replay_controller: | |
| self._replay_controller.pause() | |
| else: | |
| logger.warning("Not in replay mode, cannot pause") | |
| def resume_replay(self) -> None: | |
| """ | |
| Resume replay playback (replay mode only). | |
| Validates: Requirement 9.4 | |
| """ | |
| if self._replay_controller: | |
| self._replay_controller.resume() | |
| else: | |
| logger.warning("Not in replay mode, cannot resume") | |
| def seek_replay_to_lap(self, lap_number: int) -> None: | |
| """ | |
| Seek to specific lap in replay (replay mode only). | |
| Args: | |
| lap_number: Lap number to seek to | |
| Validates: Requirement 9.5 | |
| """ | |
| if self._replay_controller: | |
| self._replay_controller.seek_to_lap(lap_number) | |
| else: | |
| logger.warning("Not in replay mode, cannot seek") | |
| def set_replay_speed(self, speed: float) -> None: | |
| """ | |
| Set replay playback speed (replay mode only). | |
| Args: | |
| speed: Playback speed multiplier (1.0 = real-time) | |
| Validates: Requirement 9.2 | |
| """ | |
| if self._replay_controller: | |
| self._replay_controller.set_playback_speed(speed) | |
| else: | |
| logger.warning("Not in replay mode, cannot set speed") | |
| def get_replay_progress(self) -> float: | |
| """ | |
| Get replay progress (replay mode only). | |
| Returns: | |
| Progress from 0.0 to 1.0, or 0.0 if not in replay mode | |
| """ | |
| if self._replay_controller: | |
| return self._replay_controller.get_progress() | |
| return 0.0 | |
| def is_replay_paused(self) -> bool: | |
| """ | |
| Check if replay is paused (replay mode only). | |
| Returns: | |
| True if paused, False otherwise | |
| """ | |
| if self._replay_controller: | |
| return self._replay_controller.is_paused() | |
| return False | |