Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| # Simplified RAG System for Hugging Face Spaces | |
| This module provides a comprehensive Retrieval-Augmented Generation (RAG) system using: | |
| - **FAISS** for efficient vector storage and similarity search | |
| - **BM25** for sparse retrieval and keyword matching | |
| - **Hybrid Search** combining both dense and sparse methods | |
| - **Qwen 2.5 1.5B** for intelligent response generation | |
| - **Thread Safety** for concurrent document loading | |
| ## Architecture Overview | |
| The RAG system follows a modular design with these key components: | |
| 1. **Document Processing**: PDF extraction and intelligent chunking | |
| 2. **Vector Storage**: FAISS index for high-dimensional embeddings | |
| 3. **Sparse Retrieval**: BM25 for keyword-based search | |
| 4. **Hybrid Search**: Combines dense and sparse methods for optimal results | |
| 5. **Response Generation**: LLM-based answer synthesis with context | |
| 6. **Thread Safety**: Concurrent document loading with proper locking | |
| ## Key Features | |
| - π **Multi-Method Search**: Hybrid, dense, and sparse retrieval options | |
| - π **Performance Metrics**: Confidence scores and response times | |
| - π **Thread Safety**: Safe concurrent document loading | |
| - πΎ **Persistence**: Automatic index saving and loading | |
| - π― **Smart Fallbacks**: Graceful model loading with alternatives | |
| - π **Scalable**: Efficient handling of large document collections | |
| ## Usage Example | |
| ```python | |
| # Initialize the RAG system | |
| rag = SimpleRAGSystem() | |
| # Add documents | |
| rag.add_document("document.pdf", "Document Name") | |
| # Query the system | |
| response = rag.query("What is the main topic?", method="hybrid", top_k=5) | |
| print(response.answer) | |
| ``` | |
| """ | |
| import os | |
| import pickle | |
| import json | |
| import time | |
| from typing import List, Dict, Optional, Tuple | |
| from dataclasses import dataclass | |
| import numpy as np | |
| import torch | |
| from loguru import logger | |
| import threading | |
| # Import required libraries for AI/ML functionality | |
| from sentence_transformers import SentenceTransformer | |
| from rank_bm25 import BM25Okapi | |
| import faiss | |
| from transformers import AutoTokenizer, AutoModelForCausalLM | |
| # Import guard rail system | |
| from guard_rails import GuardRailSystem, GuardRailConfig, GuardRailResult | |
| # Import HF Spaces configuration | |
| try: | |
| from hf_spaces_config import get_hf_config, is_hf_spaces | |
| HF_SPACES_AVAILABLE = True | |
| except ImportError: | |
| HF_SPACES_AVAILABLE = False | |
| logger.warning("HF Spaces configuration not available") | |
| # ============================================================================= | |
| # DATA STRUCTURES | |
| # ============================================================================= | |
| class DocumentChunk: | |
| """ | |
| Represents a document chunk with metadata | |
| Attributes: | |
| text: The actual text content of the chunk | |
| doc_id: Unique identifier for the source document | |
| filename: Name of the source file | |
| chunk_id: Unique identifier for this specific chunk | |
| chunk_size: Target size used for chunking | |
| """ | |
| text: str | |
| doc_id: str | |
| filename: str | |
| chunk_id: str | |
| chunk_size: int | |
| class SearchResult: | |
| """ | |
| Represents a search result with scoring information | |
| Attributes: | |
| text: The retrieved text content | |
| score: Combined relevance score | |
| doc_id: Source document identifier | |
| filename: Source file name | |
| search_method: Method used for retrieval (dense/sparse/hybrid) | |
| dense_score: Vector similarity score (if applicable) | |
| sparse_score: Keyword matching score (if applicable) | |
| """ | |
| text: str | |
| score: float | |
| doc_id: str | |
| filename: str | |
| search_method: str | |
| dense_score: Optional[float] = None | |
| sparse_score: Optional[float] = None | |
| class RAGResponse: | |
| """ | |
| Represents a complete RAG system response | |
| Attributes: | |
| answer: Generated answer text | |
| confidence: Confidence score for the response | |
| search_results: List of retrieved documents | |
| method_used: Search method that was used | |
| response_time: Time taken to generate response | |
| query: Original user query | |
| """ | |
| answer: str | |
| confidence: float | |
| search_results: List[SearchResult] | |
| method_used: str | |
| response_time: float | |
| query: str | |
| # ============================================================================= | |
| # MAIN RAG SYSTEM CLASS | |
| # ============================================================================= | |
| class SimpleRAGSystem: | |
| """ | |
| Simplified RAG system for Hugging Face Spaces | |
| This class provides a complete RAG implementation with: | |
| - Document ingestion and processing | |
| - Vector and sparse search capabilities | |
| - Response generation using language models | |
| - Thread-safe concurrent operations | |
| - Persistent storage and retrieval | |
| """ | |
| def __init__( | |
| self, | |
| embedding_model: str = "all-MiniLM-L6-v2", | |
| generative_model: str = "Qwen/Qwen2.5-1.5B-Instruct", | |
| chunk_sizes: List[int] = None, | |
| vector_store_path: str = "./vector_store", | |
| enable_guard_rails: bool = True, | |
| guard_rail_config: GuardRailConfig = None, | |
| ): | |
| """ | |
| Initialize the RAG system with specified models and configuration | |
| Args: | |
| embedding_model: Sentence transformer model for embeddings | |
| generative_model: Language model for response generation | |
| chunk_sizes: List of chunk sizes for document processing | |
| vector_store_path: Path for storing FAISS index and metadata | |
| enable_guard_rails: Whether to enable guard rail system | |
| guard_rail_config: Configuration for guard rail system | |
| """ | |
| self.embedding_model = embedding_model | |
| self.generative_model = generative_model | |
| self.chunk_sizes = chunk_sizes or [100, 400] # Default chunk sizes | |
| self.vector_store_path = vector_store_path | |
| self.enable_guard_rails = enable_guard_rails | |
| # Initialize core components | |
| self.embedder = None # Sentence transformer for embeddings | |
| self.tokenizer = None # Tokenizer for language model | |
| self.model = None # Language model for generation | |
| self.faiss_index = None # FAISS index for vector search | |
| self.bm25 = None # BM25 for sparse search | |
| self.documents = [] # List of processed documents | |
| self.chunks = [] # List of document chunks | |
| self._lock = threading.Lock() # Thread safety for concurrent loading | |
| # Initialize guard rail system | |
| if self.enable_guard_rails: | |
| self.guard_rails = GuardRailSystem(guard_rail_config) | |
| logger.info("Guard rail system enabled") | |
| else: | |
| self.guard_rails = None | |
| logger.info("Guard rail system disabled") | |
| # Create vector store directory for persistence | |
| os.makedirs(vector_store_path, exist_ok=True) | |
| # Set up HF Spaces configuration if available | |
| if HF_SPACES_AVAILABLE: | |
| try: | |
| hf_config = get_hf_config() | |
| if is_hf_spaces(): | |
| logger.info( | |
| "π HF Spaces environment detected - using optimized configuration" | |
| ) | |
| # Cache directories are automatically set up by hf_config | |
| else: | |
| logger.info("π» Local development environment detected") | |
| except Exception as e: | |
| logger.warning(f"HF Spaces configuration failed: {e}") | |
| # Load or initialize system components | |
| self._load_models() | |
| self._load_or_create_index() | |
| logger.info("Simple RAG system initialized successfully!") | |
| def _load_models(self): | |
| """ | |
| Load embedding and generative models with fallback handling | |
| This method: | |
| 1. Loads the sentence transformer for embeddings | |
| 2. Attempts to load the primary language model (Qwen) | |
| 3. Falls back to distilgpt2 if primary model fails | |
| 4. Configures tokenizers and model settings | |
| """ | |
| try: | |
| # Get cache directory from HF Spaces config if available | |
| cache_dir = None | |
| if HF_SPACES_AVAILABLE: | |
| try: | |
| hf_config = get_hf_config() | |
| cache_dir = hf_config.cache_dirs.get("transformers_cache") | |
| if cache_dir: | |
| logger.info(f"Using HF Spaces cache directory: {cache_dir}") | |
| except Exception as e: | |
| logger.warning(f"Could not get HF Spaces cache directory: {e}") | |
| # Load embedding model for document vectorization | |
| if cache_dir: | |
| self.embedder = SentenceTransformer( | |
| self.embedding_model, cache_folder=cache_dir | |
| ) | |
| else: | |
| self.embedder = SentenceTransformer(self.embedding_model) | |
| self.vector_size = self.embedder.get_sentence_embedding_dimension() | |
| # Load generative model with fallback strategy | |
| model_loaded = False | |
| # Try loading Qwen model first (primary choice) | |
| try: | |
| self.tokenizer = AutoTokenizer.from_pretrained( | |
| self.generative_model, | |
| trust_remote_code=True, | |
| padding_side="left", # Important for generation | |
| cache_dir=cache_dir, | |
| ) | |
| # Load model with explicit CPU configuration for deployment compatibility | |
| self.model = AutoModelForCausalLM.from_pretrained( | |
| self.generative_model, | |
| trust_remote_code=True, | |
| torch_dtype=torch.float32, # Use float32 for CPU compatibility | |
| device_map=None, # Let PyTorch handle device placement | |
| low_cpu_mem_usage=False, # Disable for better compatibility | |
| cache_dir=cache_dir, | |
| ) | |
| # Move to CPU explicitly for deployment environments | |
| self.model = self.model.to("cpu") | |
| model_loaded = True | |
| except Exception as e: | |
| logger.warning(f"Failed to load Qwen model: {e}") | |
| # Fallback to distilgpt2 if Qwen fails | |
| if not model_loaded: | |
| logger.info("Falling back to distilgpt2...") | |
| self.generative_model = "distilgpt2" | |
| try: | |
| self.tokenizer = AutoTokenizer.from_pretrained( | |
| self.generative_model, | |
| trust_remote_code=True, | |
| padding_side="left", | |
| ) | |
| self.model = AutoModelForCausalLM.from_pretrained( | |
| self.generative_model, | |
| trust_remote_code=True, | |
| ) | |
| # Ensure fallback model is also on CPU | |
| self.model = self.model.to("cpu") | |
| model_loaded = True | |
| except Exception as e: | |
| logger.error(f"Failed to load distilgpt2: {e}") | |
| raise Exception("Could not load any generative model") | |
| # Configure tokenizer settings for generation | |
| if self.tokenizer.pad_token is None: | |
| self.tokenizer.pad_token = self.tokenizer.eos_token | |
| self.tokenizer.pad_token_id = self.tokenizer.eos_token_id | |
| logger.info(f"β Models loaded successfully") | |
| logger.info(f" - Embedding: {self.embedding_model}") | |
| logger.info(f" - Generative: {self.generative_model}") | |
| except Exception as e: | |
| logger.error(f"β Failed to load models: {e}") | |
| raise | |
| def _load_or_create_index(self): | |
| """ | |
| Load existing FAISS index or create a new one | |
| This method: | |
| 1. Checks for existing index files | |
| 2. Loads existing index and metadata if available | |
| 3. Creates new index if none exists | |
| 4. Rebuilds BM25 index from loaded chunks | |
| """ | |
| faiss_path = os.path.join(self.vector_store_path, "faiss_index.bin") | |
| metadata_path = os.path.join(self.vector_store_path, "metadata.pkl") | |
| if os.path.exists(faiss_path) and os.path.exists(metadata_path): | |
| # Load existing index and metadata | |
| try: | |
| self.faiss_index = faiss.read_index(faiss_path) | |
| with open(metadata_path, "rb") as f: | |
| metadata = pickle.load(f) | |
| self.documents = metadata.get("documents", []) | |
| self.chunks = metadata.get("chunks", []) | |
| # Rebuild BM25 index from loaded chunks | |
| if self.chunks: | |
| texts = [chunk.text for chunk in self.chunks] | |
| tokenized_texts = [text.lower().split() for text in texts] | |
| self.bm25 = BM25Okapi(tokenized_texts) | |
| logger.info(f"β Loaded existing index with {len(self.chunks)} chunks") | |
| except Exception as e: | |
| logger.warning(f"Failed to load existing index: {e}") | |
| self._create_new_index() | |
| else: | |
| self._create_new_index() | |
| def _create_new_index(self): | |
| """Create new FAISS index with appropriate configuration""" | |
| vector_size = self.embedder.get_sentence_embedding_dimension() | |
| # Use Inner Product for cosine similarity (normalized vectors) | |
| self.faiss_index = faiss.IndexFlatIP(vector_size) | |
| self.bm25 = None | |
| logger.info(f"β Created new FAISS index with dimension {vector_size}") | |
| def _save_index(self): | |
| """ | |
| Save FAISS index and metadata for persistence | |
| This ensures that the system state is preserved across restarts. | |
| """ | |
| try: | |
| # Save FAISS index | |
| faiss_path = os.path.join(self.vector_store_path, "faiss_index.bin") | |
| faiss.write_index(self.faiss_index, faiss_path) | |
| # Save metadata including documents and chunks | |
| metadata_path = os.path.join(self.vector_store_path, "metadata.pkl") | |
| metadata = {"documents": self.documents, "chunks": self.chunks} | |
| with open(metadata_path, "wb") as f: | |
| pickle.dump(metadata, f) | |
| logger.info("β Index saved successfully") | |
| except Exception as e: | |
| logger.error(f"β Failed to save index: {e}") | |
| def add_document(self, file_path: str, filename: str) -> bool: | |
| """ | |
| Add a document to the RAG system with thread safety | |
| This method: | |
| 1. Processes the PDF document into chunks | |
| 2. Adds document metadata to the system | |
| 3. Updates embeddings and BM25 index | |
| 4. Saves the updated index | |
| Args: | |
| file_path: Path to the PDF file | |
| filename: Name of the file for reference | |
| Returns: | |
| True if successful, False otherwise | |
| """ | |
| try: | |
| from pdf_processor import SimplePDFProcessor | |
| # Process the document using the PDF processor | |
| processor = SimplePDFProcessor() | |
| processed_doc = processor.process_document(file_path, self.chunk_sizes) | |
| # Thread-safe document addition using lock | |
| with self._lock: | |
| # Add document metadata to the system | |
| self.documents.append( | |
| { | |
| "filename": filename, | |
| "title": processed_doc.title, | |
| "author": processed_doc.author, | |
| "file_path": file_path, | |
| } | |
| ) | |
| # Add all chunks from the processed document | |
| for chunk in processed_doc.chunks: | |
| self.chunks.append(chunk) | |
| # Update search indices with new content | |
| self._update_embeddings() | |
| self._update_bm25() | |
| # Persist the updated index | |
| self._save_index() | |
| logger.info( | |
| f"β Added document: {filename} ({len(processed_doc.chunks)} chunks)" | |
| ) | |
| return True | |
| except Exception as e: | |
| logger.error(f"β Failed to add document {filename}: {e}") | |
| return False | |
| def _update_embeddings(self): | |
| """ | |
| Update FAISS index with new embeddings | |
| This method: | |
| 1. Extracts text from all chunks | |
| 2. Generates embeddings using the sentence transformer | |
| 3. Adds embeddings to the FAISS index | |
| """ | |
| if not self.chunks: | |
| return | |
| # Generate embeddings for all chunks | |
| texts = [chunk.text for chunk in self.chunks] | |
| embeddings = self.embedder.encode(texts, show_progress_bar=False) | |
| # Add embeddings to FAISS index | |
| self.faiss_index.add(embeddings.astype("float32")) | |
| def _update_bm25(self): | |
| """ | |
| Update BM25 index with new chunks | |
| This method rebuilds the BM25 index with all current chunks | |
| for keyword-based search functionality. | |
| """ | |
| if not self.chunks: | |
| return | |
| # Rebuild BM25 with all chunks | |
| texts = [chunk.text for chunk in self.chunks] | |
| tokenized_texts = [text.lower().split() for text in texts] | |
| self.bm25 = BM25Okapi(tokenized_texts) | |
| def search( | |
| self, query: str, method: str = "hybrid", top_k: int = 5 | |
| ) -> List[SearchResult]: | |
| """ | |
| Search for relevant documents using specified method | |
| This method supports three search strategies: | |
| - **dense**: Vector similarity search using FAISS | |
| - **sparse**: Keyword matching using BM25 | |
| - **hybrid**: Combines both methods for optimal results | |
| Args: | |
| query: Search query string | |
| method: Search method (hybrid, dense, sparse) | |
| top_k: Number of results to return | |
| Returns: | |
| List of search results with scores and metadata | |
| """ | |
| if not self.chunks: | |
| return [] | |
| results = [] | |
| # Perform dense search (vector similarity) | |
| if method == "dense" or method == "hybrid": | |
| # Generate query embedding | |
| query_embedding = self.embedder.encode([query]) | |
| # Search FAISS index | |
| scores, indices = self.faiss_index.search( | |
| query_embedding.astype("float32"), min(top_k, len(self.chunks)) | |
| ) | |
| # Process dense search results | |
| for score, idx in zip(scores[0], indices[0]): | |
| if idx < len(self.chunks): | |
| chunk = self.chunks[idx] | |
| results.append( | |
| SearchResult( | |
| text=chunk.text, | |
| score=float(score), | |
| doc_id=chunk.doc_id, | |
| filename=chunk.filename, | |
| search_method="dense", | |
| dense_score=float(score), | |
| ) | |
| ) | |
| # Perform sparse search (keyword matching) | |
| if method == "sparse" or method == "hybrid": | |
| if self.bm25: | |
| # Tokenize query for BM25 | |
| tokenized_query = query.lower().split() | |
| bm25_scores = self.bm25.get_scores(tokenized_query) | |
| # Get top BM25 results | |
| top_indices = np.argsort(bm25_scores)[::-1][:top_k] | |
| # Process sparse search results | |
| for idx in top_indices: | |
| if idx < len(self.chunks): | |
| chunk = self.chunks[idx] | |
| score = float(bm25_scores[idx]) | |
| # Check if result already exists (for hybrid search) | |
| existing_result = next( | |
| ( | |
| r | |
| for r in results | |
| if r.doc_id == chunk.doc_id and r.text == chunk.text | |
| ), | |
| None, | |
| ) | |
| if existing_result: | |
| # Update existing result with sparse score | |
| existing_result.sparse_score = score | |
| if method == "hybrid": | |
| # Combine scores for hybrid search | |
| existing_result.score = ( | |
| existing_result.dense_score + score | |
| ) / 2 | |
| else: | |
| # Add new sparse result | |
| results.append( | |
| SearchResult( | |
| text=chunk.text, | |
| score=score, | |
| doc_id=chunk.doc_id, | |
| filename=chunk.filename, | |
| search_method="sparse", | |
| sparse_score=score, | |
| ) | |
| ) | |
| # Sort by score and return top_k results | |
| results.sort(key=lambda x: x.score, reverse=True) | |
| return results[:top_k] | |
| def generate_response(self, query: str, context: str) -> str: | |
| """ | |
| Generate response using the language model | |
| This method: | |
| 1. Prepares a prompt with context and query | |
| 2. Uses the appropriate chat template for the model | |
| 3. Generates a response with controlled parameters | |
| 4. Handles model-specific response formatting | |
| Args: | |
| query: User's question | |
| context: Retrieved context from search | |
| Returns: | |
| Generated response text | |
| """ | |
| try: | |
| # Prepare prompt based on model capabilities | |
| if hasattr(self.tokenizer, "apply_chat_template"): | |
| # Use chat template for modern models like Qwen | |
| messages = [ | |
| { | |
| "role": "system", | |
| "content": "You are a helpful AI assistant. Use the provided context to answer the user's question accurately and concisely. If the context doesn't contain enough information to answer the question, say so.", | |
| }, | |
| { | |
| "role": "user", | |
| "content": f"Context: {context}\n\nQuestion: {query}", | |
| }, | |
| ] | |
| prompt = self.tokenizer.apply_chat_template( | |
| messages, tokenize=False, add_generation_prompt=True | |
| ) | |
| else: | |
| # Fallback for non-chat models | |
| prompt = f"Context: {context}\n\nQuestion: {query}\n\nAnswer:" | |
| # Tokenize input with appropriate settings | |
| tokenized = self.tokenizer( | |
| prompt, | |
| return_tensors="pt", | |
| truncation=True, | |
| max_length=1024, # Limit input length | |
| padding=True, | |
| return_attention_mask=True, | |
| ) | |
| # Generate response with controlled parameters | |
| with torch.no_grad(): | |
| try: | |
| outputs = self.model.generate( | |
| tokenized.input_ids, | |
| attention_mask=tokenized.attention_mask, | |
| max_new_tokens=512, # Limit response length | |
| num_return_sequences=1, | |
| temperature=0.7, # Balance creativity and coherence | |
| do_sample=True, # Enable sampling for more natural responses | |
| pad_token_id=self.tokenizer.pad_token_id, | |
| eos_token_id=self.tokenizer.eos_token_id, | |
| ) | |
| except RuntimeError as e: | |
| if "Half" in str(e): | |
| # Handle half-precision compatibility issues | |
| logger.warning( | |
| "Half precision not supported on CPU, converting to float32" | |
| ) | |
| # Convert model to float32 | |
| self.model = self.model.float() | |
| outputs = self.model.generate( | |
| tokenized.input_ids, | |
| attention_mask=tokenized.attention_mask, | |
| max_new_tokens=512, | |
| num_return_sequences=1, | |
| temperature=0.7, | |
| do_sample=True, | |
| pad_token_id=self.tokenizer.pad_token_id, | |
| eos_token_id=self.tokenizer.eos_token_id, | |
| ) | |
| else: | |
| raise e | |
| # Decode the generated response | |
| response = self.tokenizer.decode(outputs[0], skip_special_tokens=True) | |
| # Extract only the generated part (remove input prompt) | |
| if hasattr(self.tokenizer, "apply_chat_template"): | |
| # Handle chat model response formatting | |
| if "<|im_start|>assistant" in response: | |
| response = response.split("<|im_start|>assistant")[-1] | |
| if "<|im_end|>" in response: | |
| response = response.split("<|im_end|>")[0] | |
| else: | |
| # Handle standard model response formatting | |
| response = response[len(prompt) :] | |
| return response.strip() | |
| except Exception as e: | |
| logger.error(f"Error generating response: {e}") | |
| return f"Error generating response: {str(e)}" | |
| def query( | |
| self, | |
| query: str, | |
| method: str = "hybrid", | |
| top_k: int = 5, | |
| user_id: str = "anonymous", | |
| ) -> RAGResponse: | |
| """ | |
| Complete RAG query pipeline with guard rail protection | |
| This method orchestrates the entire RAG process with safety checks: | |
| 1. Validates input using guard rails | |
| 2. Searches for relevant documents | |
| 3. Combines context from search results | |
| 4. Generates a response using the language model | |
| 5. Validates output using guard rails | |
| 6. Calculates confidence and timing metrics | |
| Args: | |
| query: User's question | |
| method: Search method to use | |
| top_k: Number of search results to use | |
| user_id: User identifier for rate limiting and tracking | |
| Returns: | |
| Complete RAG response with answer, metadata, and metrics | |
| """ | |
| start_time = time.time() | |
| # ============================================================================= | |
| # INPUT VALIDATION WITH GUARD RAILS | |
| # ============================================================================= | |
| if self.enable_guard_rails and self.guard_rails: | |
| # Validate input using guard rails | |
| input_validation = self.guard_rails.validate_input(query, user_id) | |
| if not input_validation.passed: | |
| logger.warning(f"Input validation failed: {input_validation.reason}") | |
| if input_validation.blocked: | |
| return RAGResponse( | |
| answer=f"I cannot process this request: {input_validation.reason}", | |
| confidence=0.0, | |
| search_results=[], | |
| method_used=method, | |
| response_time=time.time() - start_time, | |
| query=query, | |
| ) | |
| else: | |
| # Warning but continue processing | |
| logger.warning( | |
| f"Input validation warning: {input_validation.reason}" | |
| ) | |
| # Sanitize input | |
| query = self.guard_rails.sanitize_input(query) | |
| # Search for relevant documents | |
| search_results = self.search(query, method, top_k) | |
| # Handle case where no relevant documents found | |
| if not search_results: | |
| return RAGResponse( | |
| answer="I couldn't find any relevant information to answer your question.", | |
| confidence=0.0, | |
| search_results=[], | |
| method_used=method, | |
| response_time=time.time() - start_time, | |
| query=query, | |
| ) | |
| # Combine context from search results | |
| context = "\n\n".join([result.text for result in search_results]) | |
| # Generate response using the language model | |
| answer = self.generate_response(query, context) | |
| # Calculate confidence based on search result scores | |
| confidence = np.mean([result.score for result in search_results]) | |
| # ============================================================================= | |
| # OUTPUT VALIDATION WITH GUARD RAILS | |
| # ============================================================================= | |
| if self.enable_guard_rails and self.guard_rails: | |
| # Validate output using guard rails | |
| output_validation = self.guard_rails.validate_output( | |
| answer, confidence, context | |
| ) | |
| if not output_validation.passed: | |
| logger.warning(f"Output validation failed: {output_validation.reason}") | |
| if output_validation.blocked: | |
| return RAGResponse( | |
| answer="I cannot provide this response due to safety concerns.", | |
| confidence=0.0, | |
| search_results=search_results, | |
| method_used=method, | |
| response_time=time.time() - start_time, | |
| query=query, | |
| ) | |
| else: | |
| # Warning but continue with response | |
| logger.warning( | |
| f"Output validation warning: {output_validation.reason}" | |
| ) | |
| # Sanitize output | |
| answer = self.guard_rails.sanitize_output(answer) | |
| # Create and return complete response | |
| return RAGResponse( | |
| answer=answer, | |
| confidence=confidence, | |
| search_results=search_results, | |
| method_used=method, | |
| response_time=time.time() - start_time, | |
| query=query, | |
| ) | |
| def get_stats(self) -> Dict: | |
| """ | |
| Get system statistics and configuration information | |
| Returns: | |
| Dictionary containing system metrics and settings | |
| """ | |
| return { | |
| "total_documents": len(self.documents), | |
| "total_chunks": len(self.chunks), | |
| "vector_size": ( | |
| self.embedder.get_sentence_embedding_dimension() if self.embedder else 0 | |
| ), | |
| "model_name": self.generative_model, | |
| "embedding_model": self.embedding_model, | |
| "chunk_sizes": self.chunk_sizes, | |
| } | |
| def clear(self): | |
| """ | |
| Clear all documents and reset the system | |
| This method: | |
| 1. Clears all documents and chunks | |
| 2. Creates a new FAISS index | |
| 3. Saves the empty state | |
| """ | |
| self.documents = [] | |
| self.chunks = [] | |
| self._create_new_index() | |
| self._save_index() | |
| logger.info("β System cleared successfully") | |