Spaces:
Running
Running
File size: 20,363 Bytes
c95ad37 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 | """Main application orchestrator for F1 Commentary Robot.
This module provides the CommentarySystem class that coordinates all system
components, handles initialization, manages the main event processing loop,
and ensures graceful shutdown.
Validates: Requirements 17.1, 17.2, 17.3, 17.4, 17.5, 17.6, 17.7
"""
import logging
import signal
import sys
import time
import threading
from typing import Optional
from reachy_f1_commentator.src.config import Config, load_config
from reachy_f1_commentator.src.logging_config import setup_logging
from reachy_f1_commentator.src.models import EventType
from reachy_f1_commentator.src.data_ingestion import DataIngestionModule
from reachy_f1_commentator.src.race_state_tracker import RaceStateTracker
from reachy_f1_commentator.src.event_queue import PriorityEventQueue
from reachy_f1_commentator.src.commentary_generator import CommentaryGenerator
from reachy_f1_commentator.src.enhanced_commentary_generator import EnhancedCommentaryGenerator
from reachy_f1_commentator.src.speech_synthesizer import SpeechSynthesizer
from reachy_f1_commentator.src.motion_controller import MotionController
from reachy_f1_commentator.src.qa_manager import QAManager
from reachy_f1_commentator.src.resource_monitor import ResourceMonitor
logger = logging.getLogger(__name__)
class CommentarySystem:
"""Main orchestrator for the F1 Commentary Robot system.
Coordinates all system components, manages initialization in dependency
order, verifies API connectivity, and handles graceful shutdown.
Validates: Requirements 17.1, 17.2, 17.3, 17.4, 17.5, 17.6, 17.7
"""
def __init__(self, config_path: str = "config/config.json"):
"""Initialize commentary system with configuration.
Args:
config_path: Path to configuration file
Validates: Requirement 17.1
"""
# Load configuration
self.config = load_config(config_path)
# Setup logging
setup_logging(self.config.log_level, self.config.log_file)
logger.info("=" * 80)
logger.info("F1 Commentary Robot - System Initialization")
logger.info("=" * 80)
# Initialize components (will be set during initialize())
self.race_state_tracker: Optional[RaceStateTracker] = None
self.event_queue: Optional[PriorityEventQueue] = None
self.motion_controller: Optional[MotionController] = None
self.speech_synthesizer: Optional[SpeechSynthesizer] = None
self.commentary_generator: Optional[EnhancedCommentaryGenerator] = None
self.data_ingestion: Optional[DataIngestionModule] = None
self.qa_manager: Optional[QAManager] = None
self.resource_monitor: Optional[ResourceMonitor] = None
# System state
self._initialized = False
self._running = False
self._shutdown_requested = False
self._event_processing_thread: Optional[threading.Thread] = None
# Register signal handlers for graceful shutdown
signal.signal(signal.SIGTERM, self._signal_handler)
signal.signal(signal.SIGINT, self._signal_handler)
logger.info(f"Configuration loaded: replay_mode={self.config.replay_mode}")
def initialize(self) -> bool:
"""Initialize all system modules in dependency order.
Initialization order:
1. Race State Tracker (no dependencies)
2. Event Queue (no dependencies)
3. Motion Controller (no dependencies)
4. Speech Synthesizer (depends on Motion Controller)
5. Commentary Generator (depends on Race State Tracker)
6. Data Ingestion Module (depends on Event Queue)
7. Q&A Manager (depends on Race State Tracker, Event Queue)
8. Resource Monitor (no dependencies)
Returns:
True if initialization successful, False otherwise
Validates: Requirements 17.1, 17.2, 17.3
"""
if self._initialized:
logger.warning("System already initialized")
return True
try:
logger.info("Starting system initialization...")
# 1. Initialize Race State Tracker
logger.info("Initializing Race State Tracker...")
self.race_state_tracker = RaceStateTracker()
logger.info("β Race State Tracker initialized")
# 2. Initialize Event Queue
logger.info("Initializing Event Queue...")
self.event_queue = PriorityEventQueue(max_size=self.config.max_queue_size)
logger.info("β Event Queue initialized")
# 3. Initialize Motion Controller
logger.info("Initializing Motion Controller...")
self.motion_controller = MotionController(self.config)
# Move robot head to neutral position during initialization
if self.config.enable_movements:
logger.info("Moving robot head to neutral position...")
self.motion_controller.return_to_neutral()
time.sleep(1.0) # Wait for movement to complete
logger.info("β Motion Controller initialized")
# 4. Initialize Speech Synthesizer
logger.info("Initializing Speech Synthesizer...")
self.speech_synthesizer = SpeechSynthesizer(
config=self.config,
motion_controller=self.motion_controller
)
# Connect Reachy SDK to speech synthesizer if motion controller has it
if self.motion_controller.reachy.is_connected():
self.speech_synthesizer.set_reachy(self.motion_controller.reachy.reachy)
logger.info("β Speech Synthesizer initialized")
# 5. Initialize Commentary Generator
logger.info("Initializing Commentary Generator...")
# Use EnhancedCommentaryGenerator which maintains backward compatibility
# and supports both enhanced and basic modes (Requirement 19.1, 19.8)
# Note: OpenF1 client will be set after data ingestion module is initialized
self.commentary_generator = EnhancedCommentaryGenerator(
config=self.config,
state_tracker=self.race_state_tracker,
openf1_client=None # Will be set after data ingestion initialization
)
# Log which mode is active at startup (Requirement 19.8)
if self.commentary_generator.is_enhanced_mode():
logger.info("β Commentary Generator initialized in ENHANCED mode")
else:
logger.info("β Commentary Generator initialized in BASIC mode")
# Load static data if in enhanced mode
if self.commentary_generator.is_enhanced_mode():
logger.info("Loading static data for enhanced commentary...")
session_key = self.config.replay_race_id if self.config.replay_mode else None
if self.commentary_generator.load_static_data(session_key):
logger.info("β Static data loaded successfully")
else:
logger.warning("β Failed to load static data - enhanced features may be limited")
# 6. Initialize Data Ingestion Module
logger.info("Initializing Data Ingestion Module...")
self.data_ingestion = DataIngestionModule(
config=self.config,
event_queue=self.event_queue
)
logger.info("β Data Ingestion Module initialized")
# Connect OpenF1 client to enhanced commentary generator (Requirement 19.4)
if self.commentary_generator.is_enhanced_mode():
logger.info("Connecting OpenF1 client to enhanced commentary generator...")
self.commentary_generator.openf1_client = self.data_ingestion.client
# Re-initialize enhanced components now that we have the client
self.commentary_generator._initialize_enhanced_components()
logger.info("β OpenF1 client connected to commentary generator")
# 7. Initialize Q&A Manager
logger.info("Initializing Q&A Manager...")
self.qa_manager = QAManager(
state_tracker=self.race_state_tracker,
event_queue=self.event_queue
)
logger.info("β Q&A Manager initialized")
# 8. Initialize Resource Monitor
logger.info("Initializing Resource Monitor...")
self.resource_monitor = ResourceMonitor()
self.resource_monitor.start()
logger.info("β Resource Monitor initialized")
# Verify API connectivity before entering active mode
if not self.config.replay_mode:
logger.info("Verifying API connectivity...")
# Test OpenF1 API connection
if not self._verify_openf1_connectivity():
logger.error("Failed to verify OpenF1 API connectivity")
return False
# Test ElevenLabs API connection
if not self._verify_elevenlabs_connectivity():
logger.error("Failed to verify ElevenLabs API connectivity")
logger.warning("System will continue in TEXT_ONLY mode")
logger.info("β API connectivity verified")
else:
logger.info("Replay mode enabled - skipping API connectivity checks")
self._initialized = True
logger.info("=" * 80)
logger.info("System initialization complete!")
logger.info("=" * 80)
return True
except Exception as e:
logger.error(f"[CommentarySystem] System initialization failed: {e}", exc_info=True)
return False
def _verify_openf1_connectivity(self) -> bool:
"""Verify connectivity to OpenF1 API.
Returns:
True if connection successful, False otherwise
Validates: Requirement 17.3
"""
try:
# Try to authenticate with OpenF1 API
if self.data_ingestion.client.authenticate():
logger.info("β OpenF1 API connection verified")
return True
else:
logger.error("β OpenF1 API authentication failed")
return False
except Exception as e:
logger.error(f"[CommentarySystem] OpenF1 API verification failed: {e}", exc_info=True)
return False
def _verify_elevenlabs_connectivity(self) -> bool:
"""Verify connectivity to ElevenLabs API.
Returns:
True if connection successful, False otherwise
Validates: Requirement 17.3
"""
try:
# Try a simple TTS request
test_text = "System check"
audio_bytes = self.speech_synthesizer.elevenlabs_client.text_to_speech(test_text)
if audio_bytes:
logger.info("β ElevenLabs API connection verified")
return True
else:
logger.error("β ElevenLabs API test request failed")
return False
except Exception as e:
logger.error(f"[CommentarySystem] ElevenLabs API verification failed: {e}", exc_info=True)
return False
def start(self) -> bool:
"""Start the commentary system.
Starts data ingestion and event processing loop.
Returns:
True if started successfully, False otherwise
"""
if not self._initialized:
logger.error("Cannot start system: not initialized")
return False
if self._running:
logger.warning("System already running")
return True
try:
logger.info("Starting commentary system...")
# Start data ingestion
if not self.data_ingestion.start():
logger.error("Failed to start data ingestion")
return False
# Start event processing loop
self._running = True
self._event_processing_thread = threading.Thread(
target=self._event_processing_loop,
daemon=True,
name="EventProcessingThread"
)
self._event_processing_thread.start()
logger.info("=" * 80)
logger.info("F1 Commentary Robot is now ACTIVE!")
logger.info("=" * 80)
return True
except Exception as e:
logger.error(f"[CommentarySystem] Failed to start system: {e}", exc_info=True)
return False
def _event_processing_loop(self) -> None:
"""Main event processing loop.
Continuously dequeues events, generates commentary, and plays audio.
"""
logger.info("Event processing loop started")
while self._running and not self._shutdown_requested:
try:
# Dequeue next event
event = self.event_queue.dequeue()
if event is None:
# No events available, sleep briefly
time.sleep(0.1)
continue
# Update race state
self.race_state_tracker.update(event)
# Skip position updates for commentary (too frequent)
if event.event_type == EventType.POSITION_UPDATE:
continue
# Generate commentary
logger.info(f"Processing event: {event.event_type.value}")
commentary_text = self.commentary_generator.generate(event)
# Synthesize and play audio
self.speech_synthesizer.synthesize_and_play(commentary_text)
# Execute gesture based on event type
if self.config.enable_movements:
gesture = self.motion_controller.gesture_library.get_gesture_for_event(event.event_type)
self.motion_controller.execute_gesture(gesture)
except Exception as e:
logger.error(f"[CommentarySystem] Error in event processing loop: {e}", exc_info=True)
time.sleep(0.5) # Brief pause before continuing
logger.info("Event processing loop stopped")
def shutdown(self) -> None:
"""Gracefully shutdown the commentary system.
Completes current commentary, closes API connections, and returns
robot to neutral position.
Validates: Requirements 17.4, 17.5, 17.6, 17.7
"""
if self._shutdown_requested:
logger.warning("Shutdown already in progress")
return
self._shutdown_requested = True
logger.info("=" * 80)
logger.info("Initiating graceful shutdown...")
logger.info("=" * 80)
try:
# Complete current commentary before stopping
if self.speech_synthesizer and self.speech_synthesizer.is_speaking():
logger.info("Waiting for current commentary to complete...")
timeout = 10.0 # Maximum 10 seconds to wait
start_time = time.time()
while self.speech_synthesizer.is_speaking() and (time.time() - start_time) < timeout:
time.sleep(0.5)
if self.speech_synthesizer.is_speaking():
logger.warning("Commentary did not complete within timeout, proceeding with shutdown")
# Stop event processing loop
logger.info("Stopping event processing...")
self._running = False
if self._event_processing_thread and self._event_processing_thread.is_alive():
self._event_processing_thread.join(timeout=5.0)
# Stop data ingestion
if self.data_ingestion:
logger.info("Stopping data ingestion...")
self.data_ingestion.stop()
# Stop speech synthesizer
if self.speech_synthesizer:
logger.info("Stopping speech synthesizer...")
self.speech_synthesizer.stop()
# Return robot head to neutral position
if self.motion_controller and self.config.enable_movements:
logger.info("Returning robot head to neutral position...")
self.motion_controller.return_to_neutral()
time.sleep(1.0) # Wait for movement to complete
# Stop motion controller
if self.motion_controller:
logger.info("Stopping motion controller...")
self.motion_controller.stop()
# Stop resource monitor
if self.resource_monitor:
logger.info("Stopping resource monitor...")
self.resource_monitor.stop()
# Close all API connections gracefully
logger.info("Closing API connections...")
if self.data_ingestion and self.data_ingestion.client:
self.data_ingestion.client.close()
logger.info("=" * 80)
logger.info("Shutdown complete. Goodbye!")
logger.info("=" * 80)
except Exception as e:
logger.error(f"[CommentarySystem] Error during shutdown: {e}", exc_info=True)
def _signal_handler(self, signum, frame):
"""Handle SIGTERM and SIGINT signals for graceful shutdown.
Args:
signum: Signal number
frame: Current stack frame
Validates: Requirement 17.7
"""
signal_name = "SIGTERM" if signum == signal.SIGTERM else "SIGINT"
logger.info(f"Received {signal_name} signal, initiating graceful shutdown...")
self.shutdown()
sys.exit(0)
def process_question(self, question: str) -> None:
"""Process a user question (Q&A functionality).
Args:
question: User's question text
"""
if not self._initialized or not self._running:
logger.warning("Cannot process question: system not running")
return
try:
logger.info(f"Processing question: {question}")
# Process question and get response
response = self.qa_manager.process_question(question)
# Synthesize and play response
self.speech_synthesizer.synthesize_and_play(response)
# Wait for response to complete
while self.speech_synthesizer.is_speaking():
time.sleep(0.5)
# Resume event queue
self.qa_manager.resume_event_queue()
logger.info("Question processed successfully")
except Exception as e:
logger.error(f"[CommentarySystem] Error processing question: {e}", exc_info=True)
# Ensure event queue is resumed even on error
if self.qa_manager:
self.qa_manager.resume_event_queue()
def is_running(self) -> bool:
"""Check if system is running.
Returns:
True if system is running, False otherwise
"""
return self._running
def is_initialized(self) -> bool:
"""Check if system is initialized.
Returns:
True if system is initialized, False otherwise
"""
return self._initialized
|