Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| # RAG System for Hugging Face Spaces | |
| A simplified Retrieval-Augmented Generation (RAG) system using: | |
| - **FAISS** for vector search and similarity matching | |
| - **BM25** for keyword-based sparse retrieval | |
| - **Hybrid Search** combining both dense and sparse methods | |
| - **Streamlit** for modern, interactive web interface | |
| - **Qwen 2.5 1.5B** for intelligent response generation | |
| ## Features | |
| - π **Multi-Method Retrieval**: Hybrid, dense, and sparse search options | |
| - π **PDF Processing**: Automatic document loading and chunking | |
| - π¬ **Real-time Chat**: Interactive conversation interface | |
| - β‘ **Parallel Loading**: Concurrent document processing | |
| - π **Performance Metrics**: Response times and confidence scores | |
| - π― **Smart Fallbacks**: Graceful handling of model loading failures | |
| ## Architecture | |
| The system follows a modular architecture: | |
| 1. **Document Processing**: PDF extraction and chunking | |
| 2. **Vector Storage**: FAISS index for embeddings | |
| 3. **Search Engine**: BM25 for keyword matching | |
| 4. **Response Generation**: LLM-based answer synthesis | |
| 5. **Web Interface**: Streamlit for user interaction | |
| ## Usage | |
| 1. Upload PDF documents or use pre-loaded ones | |
| 2. Choose retrieval method (hybrid/dense/sparse) | |
| 3. Ask questions in natural language | |
| 4. View answers with source citations and confidence scores | |
| """ | |
| import streamlit as st | |
| import os | |
| import tempfile | |
| from pathlib import Path | |
| import time | |
| from typing import List, Dict, Optional | |
| import json | |
| import glob | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| from loguru import logger | |
| # Import our simplified components | |
| from rag_system import SimpleRAGSystem | |
| from pdf_processor import SimplePDFProcessor | |
| from hf_spaces_config import get_hf_config, is_hf_spaces | |
| from guard_rails import GuardRailConfig | |
| # ============================================================================= | |
| # PAGE CONFIGURATION | |
| # ============================================================================= | |
| # Configure Streamlit page settings for optimal user experience | |
| st.set_page_config( | |
| page_title="RAG System - Hugging Face", | |
| page_icon="π€", | |
| layout="wide", # Use full width for better content display | |
| initial_sidebar_state="expanded", # Show sidebar by default | |
| ) | |
| # ============================================================================= | |
| # SESSION STATE INITIALIZATION | |
| # ============================================================================= | |
| # Initialize Streamlit session state for persistent data across interactions | |
| if "rag_system" not in st.session_state: | |
| st.session_state.rag_system = None # Main RAG system instance | |
| if "documents_loaded" not in st.session_state: | |
| st.session_state.documents_loaded = False # Document loading status | |
| if "chat_history" not in st.session_state: | |
| st.session_state.chat_history = [] # Conversation history | |
| if "initializing" not in st.session_state: | |
| st.session_state.initializing = False # Initialization status | |
| # ============================================================================= | |
| # UTILITY FUNCTIONS | |
| # ============================================================================= | |
| def display_environment_info(): | |
| """ | |
| Display information about the current deployment environment | |
| """ | |
| if is_hf_spaces(): | |
| st.sidebar.markdown("### π Environment") | |
| st.sidebar.info("**Hugging Face Spaces**") | |
| # Get HF Spaces configuration details | |
| try: | |
| hf_config = get_hf_config() | |
| st.sidebar.markdown("**Configuration:**") | |
| st.sidebar.text( | |
| f"β’ Cache: {hf_config.cache_dirs.get('transformers_cache', 'N/A')}" | |
| ) | |
| st.sidebar.text( | |
| f"β’ Vector Store: {hf_config.cache_dirs.get('vector_store', 'N/A')}" | |
| ) | |
| # Show resource limits | |
| resource_limits = hf_config.get_resource_limits() | |
| st.sidebar.markdown("**Resource Limits:**") | |
| st.sidebar.text(f"β’ Memory: {resource_limits['max_memory_usage']*100:.0f}%") | |
| st.sidebar.text(f"β’ CPU: {resource_limits['max_cpu_usage']*100:.0f}%") | |
| st.sidebar.text( | |
| f"β’ Concurrent: {resource_limits['max_concurrent_requests']}" | |
| ) | |
| except Exception as e: | |
| st.sidebar.warning(f"Config error: {e}") | |
| else: | |
| st.sidebar.markdown("### π» Environment") | |
| st.sidebar.info("**Local Development**") | |
| def load_single_document(rag_system, pdf_path): | |
| """ | |
| Load a single document into the RAG system | |
| Args: | |
| rag_system: The RAG system instance | |
| pdf_path: Path to the PDF file | |
| Returns: | |
| tuple: (filename, success_status, error_message) | |
| """ | |
| try: | |
| filename = os.path.basename(pdf_path) | |
| success = rag_system.add_document(pdf_path, filename) | |
| return filename, success, None | |
| except Exception as e: | |
| return os.path.basename(pdf_path), False, str(e) | |
| def initialize_rag_system(): | |
| """ | |
| Initialize the RAG system with automatic document loading | |
| This function: | |
| 1. Creates the RAG system instance | |
| 2. Automatically loads all available PDF documents | |
| 3. Uses parallel processing for faster loading | |
| 4. Provides real-time feedback on loading progress | |
| """ | |
| if st.session_state.rag_system is None and not st.session_state.initializing: | |
| st.session_state.initializing = True | |
| st.write("π Starting RAG system initialization...") | |
| # Check deployment environment | |
| if is_hf_spaces(): | |
| st.info("π Running in Hugging Face Spaces environment") | |
| st.write("π Setting up HF Spaces optimized configuration...") | |
| else: | |
| st.info("π» Running in local development environment") | |
| st.write("π Using local development configuration...") | |
| with st.spinner("Initializing RAG system..."): | |
| try: | |
| # Get HF Spaces configuration | |
| hf_config = get_hf_config() | |
| model_config = hf_config.get_model_config() | |
| guard_config = GuardRailConfig(**hf_config.get_guard_rail_config()) | |
| # Create RAG system instance with HF Spaces optimized settings | |
| st.session_state.rag_system = SimpleRAGSystem( | |
| embedding_model=model_config["embedding_model"], | |
| generative_model=model_config["generative_model"], | |
| chunk_sizes=model_config["chunk_sizes"], | |
| vector_store_path=model_config["vector_store_path"], | |
| enable_guard_rails=model_config["enable_guard_rails"], | |
| guard_rail_config=guard_config, | |
| ) | |
| st.write("β RAG system created successfully") | |
| # Auto-load all available PDF documents in parallel | |
| pdf_files = glob.glob("/app/*.pdf") | |
| st.write(f"π Found {len(pdf_files)} PDF files") | |
| if pdf_files: | |
| loaded_count = 0 | |
| failed_count = 0 | |
| with st.spinner( | |
| f"Loading {len(pdf_files)} PDF documents in parallel..." | |
| ): | |
| # Use ThreadPoolExecutor for parallel loading | |
| # This significantly speeds up document processing | |
| with ThreadPoolExecutor(max_workers=4) as executor: | |
| # Submit all document loading tasks | |
| future_to_pdf = { | |
| executor.submit( | |
| load_single_document, | |
| st.session_state.rag_system, | |
| pdf_path, | |
| ): pdf_path | |
| for pdf_path in pdf_files | |
| } | |
| # Process completed tasks and provide real-time feedback | |
| for future in as_completed(future_to_pdf): | |
| filename, success, error = future.result() | |
| if success: | |
| loaded_count += 1 | |
| st.write(f"β Loaded: {filename}") | |
| logger.info(f"β Loaded: {filename}") | |
| else: | |
| failed_count += 1 | |
| st.write(f"β οΈ Failed: {filename} - {error}") | |
| logger.warning( | |
| f"β οΈ Failed to load {filename}: {error}" | |
| ) | |
| # Update system status based on loading results | |
| if loaded_count > 0: | |
| st.session_state.documents_loaded = True | |
| st.success( | |
| f"β Successfully loaded {loaded_count} PDF documents!" | |
| ) | |
| if failed_count > 0: | |
| st.warning(f"β οΈ Failed to load {failed_count} documents") | |
| else: | |
| st.warning("β οΈ No documents could be loaded") | |
| # Still allow querying even if no documents loaded | |
| st.session_state.documents_loaded = True | |
| else: | |
| st.info("π No PDF documents found in the container") | |
| # Still allow querying even if no documents found | |
| st.session_state.documents_loaded = True | |
| st.success("β RAG system initialized!") | |
| except Exception as e: | |
| st.error(f"β Failed to initialize RAG system: {e}") | |
| logger.error(f"RAG system initialization failed: {e}") | |
| # Reset initialization flag on error | |
| st.session_state.initializing = False | |
| raise | |
| finally: | |
| # Always reset initialization flag | |
| st.session_state.initializing = False | |
| def upload_document(uploaded_file): | |
| """ | |
| Upload and process a document through the web interface | |
| Args: | |
| uploaded_file: Streamlit uploaded file object | |
| """ | |
| if uploaded_file is not None: | |
| try: | |
| # Create temporary file for processing | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmp_file: | |
| tmp_file.write(uploaded_file.getvalue()) | |
| tmp_path = tmp_file.name | |
| # Process the document with progress feedback | |
| with st.spinner(f"Processing {uploaded_file.name}..."): | |
| success = st.session_state.rag_system.add_document( | |
| tmp_path, uploaded_file.name | |
| ) | |
| if success: | |
| st.success(f"β {uploaded_file.name} processed successfully!") | |
| st.session_state.documents_loaded = True | |
| # Clean up temporary file | |
| os.unlink(tmp_path) | |
| else: | |
| st.error(f"β Failed to process {uploaded_file.name}") | |
| os.unlink(tmp_path) | |
| except Exception as e: | |
| st.error(f"β Error processing document: {str(e)}") | |
| def query_rag( | |
| query: str, method: str = "hybrid", top_k: int = 5, user_id: str = "anonymous" | |
| ): | |
| """ | |
| Query the RAG system with detailed logging and error handling | |
| Args: | |
| query: User's question | |
| method: Retrieval method (hybrid/dense/sparse) | |
| top_k: Number of results to retrieve | |
| user_id: User identifier for guard rail tracking | |
| Returns: | |
| tuple: (response_object, response_time) | |
| """ | |
| try: | |
| st.write(f"π Starting query: {query}") | |
| st.write(f"π Method: {method}, top_k: {top_k}") | |
| if st.session_state.rag_system is None: | |
| st.error("β RAG system is not initialized") | |
| return None, "RAG system not initialized" | |
| st.write(f"β RAG system is available") | |
| start_time = time.time() | |
| st.write(f"π Calling rag_system.query with guard rails...") | |
| response = st.session_state.rag_system.query(query, method, top_k, user_id) | |
| response_time = time.time() - start_time | |
| st.write(f"β Response received in {response_time:.2f}s") | |
| st.write(f"β Response type: {type(response)}") | |
| if response: | |
| st.write(f"β Response answer: {response.answer[:100]}...") | |
| return response, response_time | |
| except Exception as e: | |
| st.error(f"β Error during query: {str(e)}") | |
| logger.error(f"Query error: {e}") | |
| import traceback | |
| st.error(f"β Full error: {traceback.format_exc()}") | |
| return None, f"Error: {str(e)}" | |
| def display_search_results(results: List[Dict]): | |
| """ | |
| Display search results with detailed information and metrics | |
| Args: | |
| results: List of search result dictionaries | |
| """ | |
| if not results: | |
| st.info("No search results found.") | |
| return | |
| # Display each search result with comprehensive information | |
| for i, result in enumerate(results, 1): | |
| st.markdown(f"---") | |
| st.markdown(f"**Result {i}** - Score: {result.score:.3f}") | |
| st.write(f"**Source:** {result.filename}") | |
| st.write(f"**Method:** {result.search_method}") | |
| st.write(f"**Text:** {result.text[:500]}...") | |
| # Show detailed scores for hybrid search | |
| if result.dense_score and result.sparse_score: | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| st.metric("Dense Score", f"{result.dense_score:.3f}") | |
| with col2: | |
| st.metric("Sparse Score", f"{result.sparse_score:.3f}") | |
| # ============================================================================= | |
| # MAIN APPLICATION | |
| # ============================================================================= | |
| def main(): | |
| """ | |
| Main application function that orchestrates the entire RAG system interface | |
| This function: | |
| 1. Sets up the user interface | |
| 2. Initializes the RAG system | |
| 3. Handles document uploads | |
| 4. Manages the chat interface | |
| 5. Displays results and metrics | |
| """ | |
| st.write("π App starting...") | |
| # Display environment information in sidebar | |
| display_environment_info() | |
| st.title("π€ RAG System - Hugging Face Spaces") | |
| st.markdown("A simplified RAG system using FAISS + BM25 + Qwen 2.5 1.5B") | |
| # Initialize RAG system | |
| initialize_rag_system() | |
| # ============================================================================= | |
| # SIDEBAR CONFIGURATION | |
| # ============================================================================= | |
| with st.sidebar: | |
| st.header("π Document Upload") | |
| # File uploader for PDF documents | |
| uploaded_file = st.file_uploader( | |
| "Upload PDF Document", | |
| type=["pdf"], | |
| help="Upload a PDF document to add to the knowledge base", | |
| ) | |
| if uploaded_file: | |
| upload_document(uploaded_file) | |
| st.divider() | |
| st.header("βοΈ Settings") | |
| # Retrieval method selection | |
| method = st.selectbox( | |
| "Retrieval Method", | |
| ["hybrid", "dense", "sparse"], | |
| help="Choose the retrieval method: hybrid (combines dense and sparse), dense (vector similarity), or sparse (keyword matching)", | |
| ) | |
| # Number of results slider | |
| top_k = st.slider( | |
| "Number of Results", | |
| min_value=1, | |
| max_value=10, | |
| value=5, | |
| help="Number of top results to retrieve and use for answer generation", | |
| ) | |
| st.divider() | |
| # System information display | |
| if st.session_state.rag_system: | |
| stats = st.session_state.rag_system.get_stats() | |
| st.header("π System Info") | |
| st.write(f"**Documents:** {stats['total_documents']}") | |
| st.write(f"**Chunks:** {stats['total_chunks']}") | |
| st.write(f"**Vector Size:** {stats['vector_size']}") | |
| st.write(f"**Model:** {stats['model_name']}") | |
| # ============================================================================= | |
| # MAIN CONTENT AREA | |
| # ============================================================================= | |
| # Initialize RAG system if not already done | |
| if not st.session_state.rag_system: | |
| if st.session_state.initializing: | |
| st.info("π RAG system is initializing... Please wait.") | |
| return | |
| else: | |
| initialize_rag_system() | |
| return | |
| # Show system info and allow querying immediately after initialization | |
| stats = st.session_state.rag_system.get_stats() | |
| documents_available = stats["total_documents"] > 0 | |
| if not documents_available: | |
| st.info( | |
| "π No documents loaded yet, but you can still ask questions. The system will respond based on its general knowledge." | |
| ) | |
| # ============================================================================= | |
| # CHAT INTERFACE | |
| # ============================================================================= | |
| st.header("π¬ Ask Questions About Your Documents") | |
| # Chat input for user questions | |
| query = st.chat_input("Ask a question about the loaded documents...") | |
| if query: | |
| st.write(f"π Processing query: {query}") | |
| # Add user message to chat history | |
| st.session_state.chat_history.append({"role": "user", "content": query}) | |
| # Get response from RAG system | |
| response, response_time = query_rag(query, method, top_k) | |
| st.write(f"π Response type: {type(response)}") | |
| st.write(f"π Response time: {response_time}") | |
| if response: | |
| st.write("β Got valid response, adding to chat history") | |
| # Add assistant response to chat history with metadata | |
| st.session_state.chat_history.append( | |
| { | |
| "role": "assistant", | |
| "content": response.answer, | |
| "search_results": response.search_results, | |
| "method_used": response.method_used, | |
| "confidence": response.confidence, | |
| "response_time": response_time, | |
| } | |
| ) | |
| else: | |
| st.write("β No valid response received") | |
| st.session_state.chat_history.append( | |
| {"role": "assistant", "content": f"Error: {response_time}"} | |
| ) | |
| # ============================================================================= | |
| # CHAT HISTORY DISPLAY | |
| # ============================================================================= | |
| # Display conversation history with detailed information | |
| for message in st.session_state.chat_history: | |
| if message["role"] == "user": | |
| with st.chat_message("user"): | |
| st.write(message["content"]) | |
| else: | |
| with st.chat_message("assistant"): | |
| st.write(message["content"]) | |
| # Show additional information for assistant messages | |
| if "search_results" in message: | |
| st.markdown("**π Search Results:**") | |
| display_search_results(message["search_results"]) | |
| # Display performance metrics | |
| col1, col2, col3 = st.columns(3) | |
| with col1: | |
| st.metric("Method", message["method_used"]) | |
| with col2: | |
| st.metric("Confidence", f"{message['confidence']:.3f}") | |
| with col3: | |
| st.metric("Response Time", f"{message['response_time']:.2f}s") | |
| # ============================================================================= | |
| # UTILITY CONTROLS | |
| # ============================================================================= | |
| # Clear chat history button | |
| if st.session_state.chat_history: | |
| if st.button("ποΈ Clear Chat History"): | |
| st.session_state.chat_history = [] | |
| st.rerun() | |
| # ============================================================================= | |
| # APPLICATION ENTRY POINT | |
| # ============================================================================= | |
| if __name__ == "__main__": | |
| main() | |