Bryceeee's picture
Update app.py
c856b02 verified
"""
Performance-Optimized Hugging Face Spaces Entry Point
FIXED VERSION: Preserves two-value return format (answer, footnotes)
This version fixes the ValueError by ensuring the query wrapper
returns the same format as the original RAG engine: (answer, footnotes)
"""
import os
import sys
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor
# Add the current directory to Python path for Spaces environment
sys.path.insert(0, str(Path(__file__).parent))
from openai import OpenAI
from src.config import Config
from src.vector_store import VectorStoreManager
from src.rag_query import RAGQueryEngine
from src.question_generator import QuestionGenerator
from src.knowledge_graph import KnowledgeGraphGenerator
from src.gradio_interface import GradioInterfaceBuilder
# Import personalized learning if available
try:
from modules.personalized_learning import UserProfilingSystem, LearningPathGenerator, AdaptiveLearningEngine
PERSONALIZED_LEARNING_AVAILABLE = True
except ImportError:
PERSONALIZED_LEARNING_AVAILABLE = False
print("⚠️ Personalized learning modules not available")
# Import proactive learning if available
try:
from modules.proactive_learning import ProactiveLearningEngine
PROACTIVE_LEARNING_AVAILABLE = True
except ImportError:
PROACTIVE_LEARNING_AVAILABLE = False
print("⚠️ Proactive learning modules not available")
# Import scenario contextualization if available
try:
from modules.scenario_contextualization.database.scenario_database import ScenarioDatabase
from modules.scenario_contextualization.integration.feature_extractor import ADASFeatureExtractor
from modules.scenario_contextualization.retrieval.scenario_retriever import ScenarioRetriever
from modules.scenario_contextualization.formatting.constructive_formatter import ConstructiveFormatter
from modules.scenario_contextualization.integration.enhanced_rag_engine import EnhancedRAGEngine
SCENARIO_CONTEXTUALIZATION_AVAILABLE = True
except ImportError as e:
SCENARIO_CONTEXTUALIZATION_AVAILABLE = False
print(f"⚠️ Scenario contextualization modules not available: {e}")
# Performance configuration
ENABLE_CACHING = True # Enable query result caching
MAX_WORKERS = 4 # Thread pool worker count
QUERY_TIMEOUT = 30 # Query timeout in seconds
# Global thread pool for asynchronous query processing
executor = ThreadPoolExecutor(max_workers=MAX_WORKERS)
# In-memory cache for query results
# Format: {question: (answer, footnotes)}
query_cache = {}
def initialize_system(config: Config) -> dict:
"""
Initialize the RAG system components with performance optimization
Args:
config: Configuration object containing API keys and settings
Returns:
Dictionary containing all initialized system components
Raises:
ValueError: If OPENAI_API_KEY is not configured
RuntimeError: If system initialization fails
"""
print("πŸ”§ Initializing core components...")
# Validate OpenAI API key
if not config.openai_api_key:
raise ValueError(
"OPENAI_API_KEY not found! Please set it in Hugging Face Spaces Secrets. "
"Go to Settings > Secrets and add OPENAI_API_KEY"
)
client = OpenAI(api_key=config.openai_api_key)
# Initialize vector store manager
vector_store_manager = VectorStoreManager(client)
# Get or create vector store
vector_store_id = config.get_vector_store_id()
if not vector_store_id:
print("πŸ“¦ Creating new vector store...")
pdf_files = config.get_pdf_files()
if not pdf_files:
raise ValueError(f"No PDF files found in {config.car_manual_dir}")
vector_store_details = vector_store_manager.create_vector_store(config.vector_store_name)
if not vector_store_details:
raise RuntimeError("Failed to create vector store")
vector_store_id = vector_store_details["id"]
config.save_vector_store_id(vector_store_id, config.vector_store_name)
# Upload PDF files to vector store
upload_stats = vector_store_manager.upload_pdf_files(pdf_files, vector_store_id)
if upload_stats["successful_uploads"] == 0:
raise RuntimeError("Failed to upload any files")
else:
print(f"βœ… Using existing vector store: {vector_store_id}")
# Initialize core RAG components
print("πŸ”§ Initializing RAG engine...")
rag_engine = RAGQueryEngine(client, vector_store_id, config.model)
print("πŸ”§ Initializing question generator...")
question_generator = QuestionGenerator(client, rag_engine)
print("πŸ”§ Initializing knowledge graph...")
knowledge_graph = KnowledgeGraphGenerator(client, vector_store_id, str(config.output_dir))
# Initialize optional personalized learning modules
user_profiling = None
learning_path_generator = None
adaptive_engine = None
if PERSONALIZED_LEARNING_AVAILABLE:
try:
user_profiling = UserProfilingSystem()
learning_path_generator = LearningPathGenerator(user_profiling, config.available_topics)
adaptive_engine = AdaptiveLearningEngine(user_profiling, learning_path_generator)
print("βœ… Personalized Learning System initialized!")
except Exception as e:
print(f"⚠️ Error initializing Personalized Learning System: {e}")
# Initialize optional proactive learning
proactive_engine = None
if PROACTIVE_LEARNING_AVAILABLE and user_profiling:
try:
proactive_engine = ProactiveLearningEngine(
client, rag_engine, user_profiling, adaptive_engine, config.available_topics
)
print("βœ… Proactive Learning Assistance initialized!")
except Exception as e:
print(f"⚠️ Error initializing Proactive Learning Assistance: {e}")
# Initialize optional scenario contextualization
enhanced_rag_engine = None
if SCENARIO_CONTEXTUALIZATION_AVAILABLE:
try:
scenario_database = ScenarioDatabase()
feature_extractor = ADASFeatureExtractor(use_llm=False, client=client)
scenario_retriever = ScenarioRetriever(
scenario_database=scenario_database,
scenario_vector_store_id=None,
client=client
)
formatter = ConstructiveFormatter()
enhanced_rag_engine = EnhancedRAGEngine(
base_rag_engine=rag_engine,
scenario_retriever=scenario_retriever,
feature_extractor=feature_extractor,
formatter=formatter
)
print("βœ… Scenario Contextualization initialized!")
except Exception as e:
print(f"⚠️ Error initializing Scenario Contextualization: {e}")
print("βœ… Core system initialized!")
return {
"client": client,
"vector_store_manager": vector_store_manager,
"rag_engine": rag_engine,
"question_generator": question_generator,
"knowledge_graph": knowledge_graph,
"user_profiling": user_profiling,
"learning_path_generator": learning_path_generator,
"adaptive_engine": adaptive_engine,
"proactive_engine": proactive_engine,
"enhanced_rag_engine": enhanced_rag_engine,
"config": config
}
def create_optimized_query_wrapper(rag_engine):
"""
Create an optimized query wrapper with caching, timeout, and async processing
CRITICAL: This wrapper preserves the original return format: (answer, footnotes)
Args:
rag_engine: The RAG query engine to wrap
Returns:
Optimized query function that returns (answer, footnotes)
"""
# Store reference to original query method
original_query = rag_engine.query
def query_with_optimization(question: str, use_cache: bool = True):
"""
Optimized query function with caching and timeout protection
Args:
question: User's question
use_cache: Whether to use cached results (default: True)
Returns:
Tuple of (answer: str, footnotes: list)
- answer: The response text
- footnotes: List of source references
"""
# Validate input
if not question or not question.strip():
return "Please enter a question.", []
# Normalize question for cache key
cache_key = question.strip().lower()
# Check cache for previous results
if use_cache and ENABLE_CACHING and cache_key in query_cache:
print(f"πŸ“‹ Using cached result for: {question[:50]}...")
return query_cache[cache_key]
try:
print(f"πŸ” Processing query: {question[:50]}...")
# Execute query in thread pool (non-blocking)
future = executor.submit(original_query, question)
# Wait for result with timeout protection
result = future.result(timeout=QUERY_TIMEOUT)
# Handle different return formats
# Original RAG engine returns (answer, footnotes)
if isinstance(result, tuple) and len(result) == 2:
answer, footnotes = result
else:
# Fallback: if only single value returned
answer = str(result)
footnotes = []
# Cache the complete result (both answer and footnotes)
if ENABLE_CACHING:
query_cache[cache_key] = (answer, footnotes)
# Limit cache size to prevent memory issues
if len(query_cache) > 100:
# Remove oldest entry (FIFO)
query_cache.pop(next(iter(query_cache)))
print(f"βœ… Query completed successfully")
return answer, footnotes
except TimeoutError:
error_msg = "⏱️ Query timeout. Please try a simpler question or try again later."
print(error_msg)
return error_msg, []
except Exception as e:
error_msg = f"❌ Error processing query: {str(e)}"
print(error_msg)
return error_msg, []
return query_with_optimization
def create_app():
"""
Create and return the optimized Gradio app for Hugging Face Spaces
Returns:
Gradio Blocks interface
"""
print("=" * 60)
print("πŸš— CSRC Car Manual RAG System - Performance Optimized")
print("=" * 60)
# Load configuration
config = Config()
# Initialize system components
try:
components = initialize_system(config)
except Exception as e:
print(f"❌ Error initializing system: {e}")
import traceback
traceback.print_exc()
import gradio as gr
error_msg = f"""
# ❌ Initialization Error
**Error:** {str(e)}
Please check the logs for more details.
"""
return gr.Interface(
fn=lambda: error_msg,
inputs=None,
outputs=gr.Markdown(),
title="CSRC Car Manual RAG System",
)
# Create optimized query wrapper
optimized_query = create_optimized_query_wrapper(components["rag_engine"])
# Replace RAG engine's query method with optimized version
# This maintains the (answer, footnotes) return format
components["rag_engine"].query = optimized_query
# Build Gradio interface
print("\n🌐 Building Gradio interface...")
try:
interface_builder = GradioInterfaceBuilder(
rag_engine=components["rag_engine"],
question_generator=components["question_generator"],
knowledge_graph=components["knowledge_graph"],
config=components["config"],
user_profiling=components["user_profiling"],
adaptive_engine=components["adaptive_engine"],
proactive_engine=components["proactive_engine"]
)
print("πŸ“¦ Creating interface components...")
demo = interface_builder.create_interface()
# Enable queue system for better concurrent performance
print("⚑ Enabling queue for better performance...")
demo.queue(
max_size=20, # Maximum queue size
default_concurrency_limit=5 # Max concurrent requests
)
print("βœ… Gradio interface created successfully!")
return demo
except Exception as e:
print(f"❌ Error building Gradio interface: {e}")
import traceback
traceback.print_exc()
import gradio as gr
error_msg = f"""
# ❌ Interface Building Error
**Error:** {str(e)}
"""
return gr.Interface(
fn=lambda: error_msg,
inputs=None,
outputs=gr.Markdown(),
title="CSRC Car Manual RAG System",
)
# Singleton pattern to prevent multiple initializations
_app_instance = None
def get_app():
"""
Get or create the app instance (singleton pattern)
Returns:
Gradio app instance
"""
global _app_instance
if _app_instance is None:
print("πŸ”„ Creating new app instance...")
_app_instance = create_app()
print("βœ… App instance created!")
else:
print("♻️ Reusing existing app instance")
return _app_instance
# For Hugging Face Spaces auto-detection
if __name__ == "__main__":
demo = get_app()
demo.launch(
server_name="0.0.0.0",
server_port=7860,
show_error=True, # Show detailed errors for debugging
favicon_path=None, # Skip favicon for faster startup
)
else:
# Module-level variable for Spaces auto-detection
demo = get_app()