""" Performance-Optimized Hugging Face Spaces Entry Point FIXED VERSION: Preserves two-value return format (answer, footnotes) This version fixes the ValueError by ensuring the query wrapper returns the same format as the original RAG engine: (answer, footnotes) """ import os import sys from pathlib import Path from concurrent.futures import ThreadPoolExecutor # Add the current directory to Python path for Spaces environment sys.path.insert(0, str(Path(__file__).parent)) from openai import OpenAI from src.config import Config from src.vector_store import VectorStoreManager from src.rag_query import RAGQueryEngine from src.question_generator import QuestionGenerator from src.knowledge_graph import KnowledgeGraphGenerator from src.gradio_interface import GradioInterfaceBuilder # Import personalized learning if available try: from modules.personalized_learning import UserProfilingSystem, LearningPathGenerator, AdaptiveLearningEngine PERSONALIZED_LEARNING_AVAILABLE = True except ImportError: PERSONALIZED_LEARNING_AVAILABLE = False print("⚠️ Personalized learning modules not available") # Import proactive learning if available try: from modules.proactive_learning import ProactiveLearningEngine PROACTIVE_LEARNING_AVAILABLE = True except ImportError: PROACTIVE_LEARNING_AVAILABLE = False print("⚠️ Proactive learning modules not available") # Import scenario contextualization if available try: from modules.scenario_contextualization.database.scenario_database import ScenarioDatabase from modules.scenario_contextualization.integration.feature_extractor import ADASFeatureExtractor from modules.scenario_contextualization.retrieval.scenario_retriever import ScenarioRetriever from modules.scenario_contextualization.formatting.constructive_formatter import ConstructiveFormatter from modules.scenario_contextualization.integration.enhanced_rag_engine import EnhancedRAGEngine SCENARIO_CONTEXTUALIZATION_AVAILABLE = True except ImportError as e: SCENARIO_CONTEXTUALIZATION_AVAILABLE = False print(f"⚠️ Scenario contextualization modules not available: {e}") # Performance configuration ENABLE_CACHING = True # Enable query result caching MAX_WORKERS = 4 # Thread pool worker count QUERY_TIMEOUT = 30 # Query timeout in seconds # Global thread pool for asynchronous query processing executor = ThreadPoolExecutor(max_workers=MAX_WORKERS) # In-memory cache for query results # Format: {question: (answer, footnotes)} query_cache = {} def initialize_system(config: Config) -> dict: """ Initialize the RAG system components with performance optimization Args: config: Configuration object containing API keys and settings Returns: Dictionary containing all initialized system components Raises: ValueError: If OPENAI_API_KEY is not configured RuntimeError: If system initialization fails """ print("🔧 Initializing core components...") # Validate OpenAI API key if not config.openai_api_key: raise ValueError( "OPENAI_API_KEY not found! Please set it in Hugging Face Spaces Secrets. " "Go to Settings > Secrets and add OPENAI_API_KEY" ) client = OpenAI(api_key=config.openai_api_key) # Initialize vector store manager vector_store_manager = VectorStoreManager(client) # Get or create vector store vector_store_id = config.get_vector_store_id() if not vector_store_id: print("📦 Creating new vector store...") pdf_files = config.get_pdf_files() if not pdf_files: raise ValueError(f"No PDF files found in {config.car_manual_dir}") vector_store_details = vector_store_manager.create_vector_store(config.vector_store_name) if not vector_store_details: raise RuntimeError("Failed to create vector store") vector_store_id = vector_store_details["id"] config.save_vector_store_id(vector_store_id, config.vector_store_name) # Upload PDF files to vector store upload_stats = vector_store_manager.upload_pdf_files(pdf_files, vector_store_id) if upload_stats["successful_uploads"] == 0: raise RuntimeError("Failed to upload any files") else: print(f"✅ Using existing vector store: {vector_store_id}") # Initialize core RAG components print("🔧 Initializing RAG engine...") rag_engine = RAGQueryEngine(client, vector_store_id, config.model) print("🔧 Initializing question generator...") question_generator = QuestionGenerator(client, rag_engine) print("🔧 Initializing knowledge graph...") knowledge_graph = KnowledgeGraphGenerator(client, vector_store_id, str(config.output_dir)) # Initialize optional personalized learning modules user_profiling = None learning_path_generator = None adaptive_engine = None if PERSONALIZED_LEARNING_AVAILABLE: try: user_profiling = UserProfilingSystem() learning_path_generator = LearningPathGenerator(user_profiling, config.available_topics) adaptive_engine = AdaptiveLearningEngine(user_profiling, learning_path_generator) print("✅ Personalized Learning System initialized!") except Exception as e: print(f"⚠️ Error initializing Personalized Learning System: {e}") # Initialize optional proactive learning proactive_engine = None if PROACTIVE_LEARNING_AVAILABLE and user_profiling: try: proactive_engine = ProactiveLearningEngine( client, rag_engine, user_profiling, adaptive_engine, config.available_topics ) print("✅ Proactive Learning Assistance initialized!") except Exception as e: print(f"⚠️ Error initializing Proactive Learning Assistance: {e}") # Initialize optional scenario contextualization enhanced_rag_engine = None if SCENARIO_CONTEXTUALIZATION_AVAILABLE: try: scenario_database = ScenarioDatabase() feature_extractor = ADASFeatureExtractor(use_llm=False, client=client) scenario_retriever = ScenarioRetriever( scenario_database=scenario_database, scenario_vector_store_id=None, client=client ) formatter = ConstructiveFormatter() enhanced_rag_engine = EnhancedRAGEngine( base_rag_engine=rag_engine, scenario_retriever=scenario_retriever, feature_extractor=feature_extractor, formatter=formatter ) print("✅ Scenario Contextualization initialized!") except Exception as e: print(f"⚠️ Error initializing Scenario Contextualization: {e}") print("✅ Core system initialized!") return { "client": client, "vector_store_manager": vector_store_manager, "rag_engine": rag_engine, "question_generator": question_generator, "knowledge_graph": knowledge_graph, "user_profiling": user_profiling, "learning_path_generator": learning_path_generator, "adaptive_engine": adaptive_engine, "proactive_engine": proactive_engine, "enhanced_rag_engine": enhanced_rag_engine, "config": config } def create_optimized_query_wrapper(rag_engine): """ Create an optimized query wrapper with caching, timeout, and async processing CRITICAL: This wrapper preserves the original return format: (answer, footnotes) Args: rag_engine: The RAG query engine to wrap Returns: Optimized query function that returns (answer, footnotes) """ # Store reference to original query method original_query = rag_engine.query def query_with_optimization(question: str, use_cache: bool = True): """ Optimized query function with caching and timeout protection Args: question: User's question use_cache: Whether to use cached results (default: True) Returns: Tuple of (answer: str, footnotes: list) - answer: The response text - footnotes: List of source references """ # Validate input if not question or not question.strip(): return "Please enter a question.", [] # Normalize question for cache key cache_key = question.strip().lower() # Check cache for previous results if use_cache and ENABLE_CACHING and cache_key in query_cache: print(f"📋 Using cached result for: {question[:50]}...") return query_cache[cache_key] try: print(f"🔍 Processing query: {question[:50]}...") # Execute query in thread pool (non-blocking) future = executor.submit(original_query, question) # Wait for result with timeout protection result = future.result(timeout=QUERY_TIMEOUT) # Handle different return formats # Original RAG engine returns (answer, footnotes) if isinstance(result, tuple) and len(result) == 2: answer, footnotes = result else: # Fallback: if only single value returned answer = str(result) footnotes = [] # Cache the complete result (both answer and footnotes) if ENABLE_CACHING: query_cache[cache_key] = (answer, footnotes) # Limit cache size to prevent memory issues if len(query_cache) > 100: # Remove oldest entry (FIFO) query_cache.pop(next(iter(query_cache))) print(f"✅ Query completed successfully") return answer, footnotes except TimeoutError: error_msg = "⏱️ Query timeout. Please try a simpler question or try again later." print(error_msg) return error_msg, [] except Exception as e: error_msg = f"❌ Error processing query: {str(e)}" print(error_msg) return error_msg, [] return query_with_optimization def create_app(): """ Create and return the optimized Gradio app for Hugging Face Spaces Returns: Gradio Blocks interface """ print("=" * 60) print("🚗 CSRC Car Manual RAG System - Performance Optimized") print("=" * 60) # Load configuration config = Config() # Initialize system components try: components = initialize_system(config) except Exception as e: print(f"❌ Error initializing system: {e}") import traceback traceback.print_exc() import gradio as gr error_msg = f""" # ❌ Initialization Error **Error:** {str(e)} Please check the logs for more details. """ return gr.Interface( fn=lambda: error_msg, inputs=None, outputs=gr.Markdown(), title="CSRC Car Manual RAG System", ) # Create optimized query wrapper optimized_query = create_optimized_query_wrapper(components["rag_engine"]) # Replace RAG engine's query method with optimized version # This maintains the (answer, footnotes) return format components["rag_engine"].query = optimized_query # Build Gradio interface print("\n🌐 Building Gradio interface...") try: interface_builder = GradioInterfaceBuilder( rag_engine=components["rag_engine"], question_generator=components["question_generator"], knowledge_graph=components["knowledge_graph"], config=components["config"], user_profiling=components["user_profiling"], adaptive_engine=components["adaptive_engine"], proactive_engine=components["proactive_engine"] ) print("📦 Creating interface components...") demo = interface_builder.create_interface() # Enable queue system for better concurrent performance print("⚡ Enabling queue for better performance...") demo.queue( max_size=20, # Maximum queue size default_concurrency_limit=5 # Max concurrent requests ) print("✅ Gradio interface created successfully!") return demo except Exception as e: print(f"❌ Error building Gradio interface: {e}") import traceback traceback.print_exc() import gradio as gr error_msg = f""" # ❌ Interface Building Error **Error:** {str(e)} """ return gr.Interface( fn=lambda: error_msg, inputs=None, outputs=gr.Markdown(), title="CSRC Car Manual RAG System", ) # Singleton pattern to prevent multiple initializations _app_instance = None def get_app(): """ Get or create the app instance (singleton pattern) Returns: Gradio app instance """ global _app_instance if _app_instance is None: print("🔄 Creating new app instance...") _app_instance = create_app() print("✅ App instance created!") else: print("♻️ Reusing existing app instance") return _app_instance # For Hugging Face Spaces auto-detection if __name__ == "__main__": demo = get_app() demo.launch( server_name="0.0.0.0", server_port=7860, show_error=True, # Show detailed errors for debugging favicon_path=None, # Skip favicon for faster startup ) else: # Module-level variable for Spaces auto-detection demo = get_app()