"""Main application orchestrator for F1 Commentary Robot. This module provides the CommentarySystem class that coordinates all system components, handles initialization, manages the main event processing loop, and ensures graceful shutdown. Validates: Requirements 17.1, 17.2, 17.3, 17.4, 17.5, 17.6, 17.7 """ import logging import signal import sys import time import threading from typing import Optional from reachy_f1_commentator.src.config import Config, load_config from reachy_f1_commentator.src.logging_config import setup_logging from reachy_f1_commentator.src.models import EventType from reachy_f1_commentator.src.data_ingestion import DataIngestionModule from reachy_f1_commentator.src.race_state_tracker import RaceStateTracker from reachy_f1_commentator.src.event_queue import PriorityEventQueue from reachy_f1_commentator.src.commentary_generator import CommentaryGenerator from reachy_f1_commentator.src.enhanced_commentary_generator import EnhancedCommentaryGenerator from reachy_f1_commentator.src.speech_synthesizer import SpeechSynthesizer from reachy_f1_commentator.src.motion_controller import MotionController from reachy_f1_commentator.src.qa_manager import QAManager from reachy_f1_commentator.src.resource_monitor import ResourceMonitor logger = logging.getLogger(__name__) class CommentarySystem: """Main orchestrator for the F1 Commentary Robot system. Coordinates all system components, manages initialization in dependency order, verifies API connectivity, and handles graceful shutdown. Validates: Requirements 17.1, 17.2, 17.3, 17.4, 17.5, 17.6, 17.7 """ def __init__(self, config_path: str = "config/config.json"): """Initialize commentary system with configuration. Args: config_path: Path to configuration file Validates: Requirement 17.1 """ # Load configuration self.config = load_config(config_path) # Setup logging setup_logging(self.config.log_level, self.config.log_file) logger.info("=" * 80) logger.info("F1 Commentary Robot - System Initialization") logger.info("=" * 80) # Initialize components (will be set during initialize()) self.race_state_tracker: Optional[RaceStateTracker] = None self.event_queue: Optional[PriorityEventQueue] = None self.motion_controller: Optional[MotionController] = None self.speech_synthesizer: Optional[SpeechSynthesizer] = None self.commentary_generator: Optional[EnhancedCommentaryGenerator] = None self.data_ingestion: Optional[DataIngestionModule] = None self.qa_manager: Optional[QAManager] = None self.resource_monitor: Optional[ResourceMonitor] = None # System state self._initialized = False self._running = False self._shutdown_requested = False self._event_processing_thread: Optional[threading.Thread] = None # Register signal handlers for graceful shutdown signal.signal(signal.SIGTERM, self._signal_handler) signal.signal(signal.SIGINT, self._signal_handler) logger.info(f"Configuration loaded: replay_mode={self.config.replay_mode}") def initialize(self) -> bool: """Initialize all system modules in dependency order. Initialization order: 1. Race State Tracker (no dependencies) 2. Event Queue (no dependencies) 3. Motion Controller (no dependencies) 4. Speech Synthesizer (depends on Motion Controller) 5. Commentary Generator (depends on Race State Tracker) 6. Data Ingestion Module (depends on Event Queue) 7. Q&A Manager (depends on Race State Tracker, Event Queue) 8. Resource Monitor (no dependencies) Returns: True if initialization successful, False otherwise Validates: Requirements 17.1, 17.2, 17.3 """ if self._initialized: logger.warning("System already initialized") return True try: logger.info("Starting system initialization...") # 1. Initialize Race State Tracker logger.info("Initializing Race State Tracker...") self.race_state_tracker = RaceStateTracker() logger.info("✓ Race State Tracker initialized") # 2. Initialize Event Queue logger.info("Initializing Event Queue...") self.event_queue = PriorityEventQueue(max_size=self.config.max_queue_size) logger.info("✓ Event Queue initialized") # 3. Initialize Motion Controller logger.info("Initializing Motion Controller...") self.motion_controller = MotionController(self.config) # Move robot head to neutral position during initialization if self.config.enable_movements: logger.info("Moving robot head to neutral position...") self.motion_controller.return_to_neutral() time.sleep(1.0) # Wait for movement to complete logger.info("✓ Motion Controller initialized") # 4. Initialize Speech Synthesizer logger.info("Initializing Speech Synthesizer...") self.speech_synthesizer = SpeechSynthesizer( config=self.config, motion_controller=self.motion_controller ) # Connect Reachy SDK to speech synthesizer if motion controller has it if self.motion_controller.reachy.is_connected(): self.speech_synthesizer.set_reachy(self.motion_controller.reachy.reachy) logger.info("✓ Speech Synthesizer initialized") # 5. Initialize Commentary Generator logger.info("Initializing Commentary Generator...") # Use EnhancedCommentaryGenerator which maintains backward compatibility # and supports both enhanced and basic modes (Requirement 19.1, 19.8) # Note: OpenF1 client will be set after data ingestion module is initialized self.commentary_generator = EnhancedCommentaryGenerator( config=self.config, state_tracker=self.race_state_tracker, openf1_client=None # Will be set after data ingestion initialization ) # Log which mode is active at startup (Requirement 19.8) if self.commentary_generator.is_enhanced_mode(): logger.info("✓ Commentary Generator initialized in ENHANCED mode") else: logger.info("✓ Commentary Generator initialized in BASIC mode") # Load static data if in enhanced mode if self.commentary_generator.is_enhanced_mode(): logger.info("Loading static data for enhanced commentary...") session_key = self.config.replay_race_id if self.config.replay_mode else None if self.commentary_generator.load_static_data(session_key): logger.info("✓ Static data loaded successfully") else: logger.warning("⚠ Failed to load static data - enhanced features may be limited") # 6. Initialize Data Ingestion Module logger.info("Initializing Data Ingestion Module...") self.data_ingestion = DataIngestionModule( config=self.config, event_queue=self.event_queue ) logger.info("✓ Data Ingestion Module initialized") # Connect OpenF1 client to enhanced commentary generator (Requirement 19.4) if self.commentary_generator.is_enhanced_mode(): logger.info("Connecting OpenF1 client to enhanced commentary generator...") self.commentary_generator.openf1_client = self.data_ingestion.client # Re-initialize enhanced components now that we have the client self.commentary_generator._initialize_enhanced_components() logger.info("✓ OpenF1 client connected to commentary generator") # 7. Initialize Q&A Manager logger.info("Initializing Q&A Manager...") self.qa_manager = QAManager( state_tracker=self.race_state_tracker, event_queue=self.event_queue ) logger.info("✓ Q&A Manager initialized") # 8. Initialize Resource Monitor logger.info("Initializing Resource Monitor...") self.resource_monitor = ResourceMonitor() self.resource_monitor.start() logger.info("✓ Resource Monitor initialized") # Verify API connectivity before entering active mode if not self.config.replay_mode: logger.info("Verifying API connectivity...") # Test OpenF1 API connection if not self._verify_openf1_connectivity(): logger.error("Failed to verify OpenF1 API connectivity") return False # Test ElevenLabs API connection if not self._verify_elevenlabs_connectivity(): logger.error("Failed to verify ElevenLabs API connectivity") logger.warning("System will continue in TEXT_ONLY mode") logger.info("✓ API connectivity verified") else: logger.info("Replay mode enabled - skipping API connectivity checks") self._initialized = True logger.info("=" * 80) logger.info("System initialization complete!") logger.info("=" * 80) return True except Exception as e: logger.error(f"[CommentarySystem] System initialization failed: {e}", exc_info=True) return False def _verify_openf1_connectivity(self) -> bool: """Verify connectivity to OpenF1 API. Returns: True if connection successful, False otherwise Validates: Requirement 17.3 """ try: # Try to authenticate with OpenF1 API if self.data_ingestion.client.authenticate(): logger.info("✓ OpenF1 API connection verified") return True else: logger.error("✗ OpenF1 API authentication failed") return False except Exception as e: logger.error(f"[CommentarySystem] OpenF1 API verification failed: {e}", exc_info=True) return False def _verify_elevenlabs_connectivity(self) -> bool: """Verify connectivity to ElevenLabs API. Returns: True if connection successful, False otherwise Validates: Requirement 17.3 """ try: # Try a simple TTS request test_text = "System check" audio_bytes = self.speech_synthesizer.elevenlabs_client.text_to_speech(test_text) if audio_bytes: logger.info("✓ ElevenLabs API connection verified") return True else: logger.error("✗ ElevenLabs API test request failed") return False except Exception as e: logger.error(f"[CommentarySystem] ElevenLabs API verification failed: {e}", exc_info=True) return False def start(self) -> bool: """Start the commentary system. Starts data ingestion and event processing loop. Returns: True if started successfully, False otherwise """ if not self._initialized: logger.error("Cannot start system: not initialized") return False if self._running: logger.warning("System already running") return True try: logger.info("Starting commentary system...") # Start data ingestion if not self.data_ingestion.start(): logger.error("Failed to start data ingestion") return False # Start event processing loop self._running = True self._event_processing_thread = threading.Thread( target=self._event_processing_loop, daemon=True, name="EventProcessingThread" ) self._event_processing_thread.start() logger.info("=" * 80) logger.info("F1 Commentary Robot is now ACTIVE!") logger.info("=" * 80) return True except Exception as e: logger.error(f"[CommentarySystem] Failed to start system: {e}", exc_info=True) return False def _event_processing_loop(self) -> None: """Main event processing loop. Continuously dequeues events, generates commentary, and plays audio. """ logger.info("Event processing loop started") while self._running and not self._shutdown_requested: try: # Dequeue next event event = self.event_queue.dequeue() if event is None: # No events available, sleep briefly time.sleep(0.1) continue # Update race state self.race_state_tracker.update(event) # Skip position updates for commentary (too frequent) if event.event_type == EventType.POSITION_UPDATE: continue # Generate commentary logger.info(f"Processing event: {event.event_type.value}") commentary_text = self.commentary_generator.generate(event) # Synthesize and play audio self.speech_synthesizer.synthesize_and_play(commentary_text) # Execute gesture based on event type if self.config.enable_movements: gesture = self.motion_controller.gesture_library.get_gesture_for_event(event.event_type) self.motion_controller.execute_gesture(gesture) except Exception as e: logger.error(f"[CommentarySystem] Error in event processing loop: {e}", exc_info=True) time.sleep(0.5) # Brief pause before continuing logger.info("Event processing loop stopped") def shutdown(self) -> None: """Gracefully shutdown the commentary system. Completes current commentary, closes API connections, and returns robot to neutral position. Validates: Requirements 17.4, 17.5, 17.6, 17.7 """ if self._shutdown_requested: logger.warning("Shutdown already in progress") return self._shutdown_requested = True logger.info("=" * 80) logger.info("Initiating graceful shutdown...") logger.info("=" * 80) try: # Complete current commentary before stopping if self.speech_synthesizer and self.speech_synthesizer.is_speaking(): logger.info("Waiting for current commentary to complete...") timeout = 10.0 # Maximum 10 seconds to wait start_time = time.time() while self.speech_synthesizer.is_speaking() and (time.time() - start_time) < timeout: time.sleep(0.5) if self.speech_synthesizer.is_speaking(): logger.warning("Commentary did not complete within timeout, proceeding with shutdown") # Stop event processing loop logger.info("Stopping event processing...") self._running = False if self._event_processing_thread and self._event_processing_thread.is_alive(): self._event_processing_thread.join(timeout=5.0) # Stop data ingestion if self.data_ingestion: logger.info("Stopping data ingestion...") self.data_ingestion.stop() # Stop speech synthesizer if self.speech_synthesizer: logger.info("Stopping speech synthesizer...") self.speech_synthesizer.stop() # Return robot head to neutral position if self.motion_controller and self.config.enable_movements: logger.info("Returning robot head to neutral position...") self.motion_controller.return_to_neutral() time.sleep(1.0) # Wait for movement to complete # Stop motion controller if self.motion_controller: logger.info("Stopping motion controller...") self.motion_controller.stop() # Stop resource monitor if self.resource_monitor: logger.info("Stopping resource monitor...") self.resource_monitor.stop() # Close all API connections gracefully logger.info("Closing API connections...") if self.data_ingestion and self.data_ingestion.client: self.data_ingestion.client.close() logger.info("=" * 80) logger.info("Shutdown complete. Goodbye!") logger.info("=" * 80) except Exception as e: logger.error(f"[CommentarySystem] Error during shutdown: {e}", exc_info=True) def _signal_handler(self, signum, frame): """Handle SIGTERM and SIGINT signals for graceful shutdown. Args: signum: Signal number frame: Current stack frame Validates: Requirement 17.7 """ signal_name = "SIGTERM" if signum == signal.SIGTERM else "SIGINT" logger.info(f"Received {signal_name} signal, initiating graceful shutdown...") self.shutdown() sys.exit(0) def process_question(self, question: str) -> None: """Process a user question (Q&A functionality). Args: question: User's question text """ if not self._initialized or not self._running: logger.warning("Cannot process question: system not running") return try: logger.info(f"Processing question: {question}") # Process question and get response response = self.qa_manager.process_question(question) # Synthesize and play response self.speech_synthesizer.synthesize_and_play(response) # Wait for response to complete while self.speech_synthesizer.is_speaking(): time.sleep(0.5) # Resume event queue self.qa_manager.resume_event_queue() logger.info("Question processed successfully") except Exception as e: logger.error(f"[CommentarySystem] Error processing question: {e}", exc_info=True) # Ensure event queue is resumed even on error if self.qa_manager: self.qa_manager.resume_event_queue() def is_running(self) -> bool: """Check if system is running. Returns: True if system is running, False otherwise """ return self._running def is_initialized(self) -> bool: """Check if system is initialized. Returns: True if system is initialized, False otherwise """ return self._initialized