Spaces:
Sleeping
Sleeping
| """ | |
| Performance-Optimized Hugging Face Spaces Entry Point | |
| FIXED VERSION: Preserves two-value return format (answer, footnotes) | |
| This version fixes the ValueError by ensuring the query wrapper | |
| returns the same format as the original RAG engine: (answer, footnotes) | |
| """ | |
| import os | |
| import sys | |
| from pathlib import Path | |
| from concurrent.futures import ThreadPoolExecutor | |
| # Add the current directory to Python path for Spaces environment | |
| sys.path.insert(0, str(Path(__file__).parent)) | |
| from openai import OpenAI | |
| from src.config import Config | |
| from src.vector_store import VectorStoreManager | |
| from src.rag_query import RAGQueryEngine | |
| from src.question_generator import QuestionGenerator | |
| from src.knowledge_graph import KnowledgeGraphGenerator | |
| from src.gradio_interface import GradioInterfaceBuilder | |
| # Import personalized learning if available | |
| try: | |
| from modules.personalized_learning import UserProfilingSystem, LearningPathGenerator, AdaptiveLearningEngine | |
| PERSONALIZED_LEARNING_AVAILABLE = True | |
| except ImportError: | |
| PERSONALIZED_LEARNING_AVAILABLE = False | |
| print("β οΈ Personalized learning modules not available") | |
| # Import proactive learning if available | |
| try: | |
| from modules.proactive_learning import ProactiveLearningEngine | |
| PROACTIVE_LEARNING_AVAILABLE = True | |
| except ImportError: | |
| PROACTIVE_LEARNING_AVAILABLE = False | |
| print("β οΈ Proactive learning modules not available") | |
| # Import scenario contextualization if available | |
| try: | |
| from modules.scenario_contextualization.database.scenario_database import ScenarioDatabase | |
| from modules.scenario_contextualization.integration.feature_extractor import ADASFeatureExtractor | |
| from modules.scenario_contextualization.retrieval.scenario_retriever import ScenarioRetriever | |
| from modules.scenario_contextualization.formatting.constructive_formatter import ConstructiveFormatter | |
| from modules.scenario_contextualization.integration.enhanced_rag_engine import EnhancedRAGEngine | |
| SCENARIO_CONTEXTUALIZATION_AVAILABLE = True | |
| except ImportError as e: | |
| SCENARIO_CONTEXTUALIZATION_AVAILABLE = False | |
| print(f"β οΈ Scenario contextualization modules not available: {e}") | |
| # Performance configuration | |
| ENABLE_CACHING = True # Enable query result caching | |
| MAX_WORKERS = 4 # Thread pool worker count | |
| QUERY_TIMEOUT = 30 # Query timeout in seconds | |
| # Global thread pool for asynchronous query processing | |
| executor = ThreadPoolExecutor(max_workers=MAX_WORKERS) | |
| # In-memory cache for query results | |
| # Format: {question: (answer, footnotes)} | |
| query_cache = {} | |
| def initialize_system(config: Config) -> dict: | |
| """ | |
| Initialize the RAG system components with performance optimization | |
| Args: | |
| config: Configuration object containing API keys and settings | |
| Returns: | |
| Dictionary containing all initialized system components | |
| Raises: | |
| ValueError: If OPENAI_API_KEY is not configured | |
| RuntimeError: If system initialization fails | |
| """ | |
| print("π§ Initializing core components...") | |
| # Validate OpenAI API key | |
| if not config.openai_api_key: | |
| raise ValueError( | |
| "OPENAI_API_KEY not found! Please set it in Hugging Face Spaces Secrets. " | |
| "Go to Settings > Secrets and add OPENAI_API_KEY" | |
| ) | |
| client = OpenAI(api_key=config.openai_api_key) | |
| # Initialize vector store manager | |
| vector_store_manager = VectorStoreManager(client) | |
| # Get or create vector store | |
| vector_store_id = config.get_vector_store_id() | |
| if not vector_store_id: | |
| print("π¦ Creating new vector store...") | |
| pdf_files = config.get_pdf_files() | |
| if not pdf_files: | |
| raise ValueError(f"No PDF files found in {config.car_manual_dir}") | |
| vector_store_details = vector_store_manager.create_vector_store(config.vector_store_name) | |
| if not vector_store_details: | |
| raise RuntimeError("Failed to create vector store") | |
| vector_store_id = vector_store_details["id"] | |
| config.save_vector_store_id(vector_store_id, config.vector_store_name) | |
| # Upload PDF files to vector store | |
| upload_stats = vector_store_manager.upload_pdf_files(pdf_files, vector_store_id) | |
| if upload_stats["successful_uploads"] == 0: | |
| raise RuntimeError("Failed to upload any files") | |
| else: | |
| print(f"β Using existing vector store: {vector_store_id}") | |
| # Initialize core RAG components | |
| print("π§ Initializing RAG engine...") | |
| rag_engine = RAGQueryEngine(client, vector_store_id, config.model) | |
| print("π§ Initializing question generator...") | |
| question_generator = QuestionGenerator(client, rag_engine) | |
| print("π§ Initializing knowledge graph...") | |
| knowledge_graph = KnowledgeGraphGenerator(client, vector_store_id, str(config.output_dir)) | |
| # Initialize optional personalized learning modules | |
| user_profiling = None | |
| learning_path_generator = None | |
| adaptive_engine = None | |
| if PERSONALIZED_LEARNING_AVAILABLE: | |
| try: | |
| user_profiling = UserProfilingSystem() | |
| learning_path_generator = LearningPathGenerator(user_profiling, config.available_topics) | |
| adaptive_engine = AdaptiveLearningEngine(user_profiling, learning_path_generator) | |
| print("β Personalized Learning System initialized!") | |
| except Exception as e: | |
| print(f"β οΈ Error initializing Personalized Learning System: {e}") | |
| # Initialize optional proactive learning | |
| proactive_engine = None | |
| if PROACTIVE_LEARNING_AVAILABLE and user_profiling: | |
| try: | |
| proactive_engine = ProactiveLearningEngine( | |
| client, rag_engine, user_profiling, adaptive_engine, config.available_topics | |
| ) | |
| print("β Proactive Learning Assistance initialized!") | |
| except Exception as e: | |
| print(f"β οΈ Error initializing Proactive Learning Assistance: {e}") | |
| # Initialize optional scenario contextualization | |
| enhanced_rag_engine = None | |
| if SCENARIO_CONTEXTUALIZATION_AVAILABLE: | |
| try: | |
| scenario_database = ScenarioDatabase() | |
| feature_extractor = ADASFeatureExtractor(use_llm=False, client=client) | |
| scenario_retriever = ScenarioRetriever( | |
| scenario_database=scenario_database, | |
| scenario_vector_store_id=None, | |
| client=client | |
| ) | |
| formatter = ConstructiveFormatter() | |
| enhanced_rag_engine = EnhancedRAGEngine( | |
| base_rag_engine=rag_engine, | |
| scenario_retriever=scenario_retriever, | |
| feature_extractor=feature_extractor, | |
| formatter=formatter | |
| ) | |
| print("β Scenario Contextualization initialized!") | |
| except Exception as e: | |
| print(f"β οΈ Error initializing Scenario Contextualization: {e}") | |
| print("β Core system initialized!") | |
| return { | |
| "client": client, | |
| "vector_store_manager": vector_store_manager, | |
| "rag_engine": rag_engine, | |
| "question_generator": question_generator, | |
| "knowledge_graph": knowledge_graph, | |
| "user_profiling": user_profiling, | |
| "learning_path_generator": learning_path_generator, | |
| "adaptive_engine": adaptive_engine, | |
| "proactive_engine": proactive_engine, | |
| "enhanced_rag_engine": enhanced_rag_engine, | |
| "config": config | |
| } | |
| def create_optimized_query_wrapper(rag_engine): | |
| """ | |
| Create an optimized query wrapper with caching, timeout, and async processing | |
| CRITICAL: This wrapper preserves the original return format: (answer, footnotes) | |
| Args: | |
| rag_engine: The RAG query engine to wrap | |
| Returns: | |
| Optimized query function that returns (answer, footnotes) | |
| """ | |
| # Store reference to original query method | |
| original_query = rag_engine.query | |
| def query_with_optimization(question: str, use_cache: bool = True): | |
| """ | |
| Optimized query function with caching and timeout protection | |
| Args: | |
| question: User's question | |
| use_cache: Whether to use cached results (default: True) | |
| Returns: | |
| Tuple of (answer: str, footnotes: list) | |
| - answer: The response text | |
| - footnotes: List of source references | |
| """ | |
| # Validate input | |
| if not question or not question.strip(): | |
| return "Please enter a question.", [] | |
| # Normalize question for cache key | |
| cache_key = question.strip().lower() | |
| # Check cache for previous results | |
| if use_cache and ENABLE_CACHING and cache_key in query_cache: | |
| print(f"π Using cached result for: {question[:50]}...") | |
| return query_cache[cache_key] | |
| try: | |
| print(f"π Processing query: {question[:50]}...") | |
| # Execute query in thread pool (non-blocking) | |
| future = executor.submit(original_query, question) | |
| # Wait for result with timeout protection | |
| result = future.result(timeout=QUERY_TIMEOUT) | |
| # Handle different return formats | |
| # Original RAG engine returns (answer, footnotes) | |
| if isinstance(result, tuple) and len(result) == 2: | |
| answer, footnotes = result | |
| else: | |
| # Fallback: if only single value returned | |
| answer = str(result) | |
| footnotes = [] | |
| # Cache the complete result (both answer and footnotes) | |
| if ENABLE_CACHING: | |
| query_cache[cache_key] = (answer, footnotes) | |
| # Limit cache size to prevent memory issues | |
| if len(query_cache) > 100: | |
| # Remove oldest entry (FIFO) | |
| query_cache.pop(next(iter(query_cache))) | |
| print(f"β Query completed successfully") | |
| return answer, footnotes | |
| except TimeoutError: | |
| error_msg = "β±οΈ Query timeout. Please try a simpler question or try again later." | |
| print(error_msg) | |
| return error_msg, [] | |
| except Exception as e: | |
| error_msg = f"β Error processing query: {str(e)}" | |
| print(error_msg) | |
| return error_msg, [] | |
| return query_with_optimization | |
| def create_app(): | |
| """ | |
| Create and return the optimized Gradio app for Hugging Face Spaces | |
| Returns: | |
| Gradio Blocks interface | |
| """ | |
| print("=" * 60) | |
| print("π CSRC Car Manual RAG System - Performance Optimized") | |
| print("=" * 60) | |
| # Load configuration | |
| config = Config() | |
| # Initialize system components | |
| try: | |
| components = initialize_system(config) | |
| except Exception as e: | |
| print(f"β Error initializing system: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| import gradio as gr | |
| error_msg = f""" | |
| # β Initialization Error | |
| **Error:** {str(e)} | |
| Please check the logs for more details. | |
| """ | |
| return gr.Interface( | |
| fn=lambda: error_msg, | |
| inputs=None, | |
| outputs=gr.Markdown(), | |
| title="CSRC Car Manual RAG System", | |
| ) | |
| # Create optimized query wrapper | |
| optimized_query = create_optimized_query_wrapper(components["rag_engine"]) | |
| # Replace RAG engine's query method with optimized version | |
| # This maintains the (answer, footnotes) return format | |
| components["rag_engine"].query = optimized_query | |
| # Build Gradio interface | |
| print("\nπ Building Gradio interface...") | |
| try: | |
| interface_builder = GradioInterfaceBuilder( | |
| rag_engine=components["rag_engine"], | |
| question_generator=components["question_generator"], | |
| knowledge_graph=components["knowledge_graph"], | |
| config=components["config"], | |
| user_profiling=components["user_profiling"], | |
| adaptive_engine=components["adaptive_engine"], | |
| proactive_engine=components["proactive_engine"] | |
| ) | |
| print("π¦ Creating interface components...") | |
| demo = interface_builder.create_interface() | |
| # Enable queue system for better concurrent performance | |
| print("β‘ Enabling queue for better performance...") | |
| demo.queue( | |
| max_size=20, # Maximum queue size | |
| default_concurrency_limit=5 # Max concurrent requests | |
| ) | |
| print("β Gradio interface created successfully!") | |
| return demo | |
| except Exception as e: | |
| print(f"β Error building Gradio interface: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| import gradio as gr | |
| error_msg = f""" | |
| # β Interface Building Error | |
| **Error:** {str(e)} | |
| """ | |
| return gr.Interface( | |
| fn=lambda: error_msg, | |
| inputs=None, | |
| outputs=gr.Markdown(), | |
| title="CSRC Car Manual RAG System", | |
| ) | |
| # Singleton pattern to prevent multiple initializations | |
| _app_instance = None | |
| def get_app(): | |
| """ | |
| Get or create the app instance (singleton pattern) | |
| Returns: | |
| Gradio app instance | |
| """ | |
| global _app_instance | |
| if _app_instance is None: | |
| print("π Creating new app instance...") | |
| _app_instance = create_app() | |
| print("β App instance created!") | |
| else: | |
| print("β»οΈ Reusing existing app instance") | |
| return _app_instance | |
| # For Hugging Face Spaces auto-detection | |
| if __name__ == "__main__": | |
| demo = get_app() | |
| demo.launch( | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| show_error=True, # Show detailed errors for debugging | |
| favicon_path=None, # Skip favicon for faster startup | |
| ) | |
| else: | |
| # Module-level variable for Spaces auto-detection | |
| demo = get_app() |