""" Vector Chunking and RAG Module Handles document chunking, vector embeddings, and RAG question-answering """ import os import json import numpy as np from typing import Dict, Any, List, Optional, Tuple from langchain.text_splitter import RecursiveCharacterTextSplitter, CharacterTextSplitter from langchain.schema import Document from langchain_community.vectorstores import FAISS, Chroma from langchain.chains import RetrievalQA, ConversationalRetrievalChain from langchain.memory import ConversationBufferMemory from langchain.prompts import PromptTemplate import tempfile import shutil class VectorChunker: """Main class for document chunking and vector operations""" def __init__(self, embeddings_model, chunk_size: int = 1000, chunk_overlap: int = 200): self.embeddings = embeddings_model self.chunk_size = chunk_size self.chunk_overlap = chunk_overlap self.setup_text_splitters() self.vector_stores = {} # Cache for vector stores def setup_text_splitters(self): """Initialize different text splitting strategies""" # Default recursive splitter self.recursive_splitter = RecursiveCharacterTextSplitter( chunk_size=self.chunk_size, chunk_overlap=self.chunk_overlap, length_function=len, separators=["\n\n", "\n", " ", ""] ) # Character-based splitter self.character_splitter = CharacterTextSplitter( chunk_size=self.chunk_size, chunk_overlap=self.chunk_overlap, separator="\n\n" ) # Semantic splitter for better context preservation self.semantic_splitter = RecursiveCharacterTextSplitter( chunk_size=800, # Smaller chunks for better semantic coherence chunk_overlap=150, length_function=len, separators=["\n\n", "\n", ". ", " ", ""] ) def chunk_documents(self, documents: List[Document], strategy: str = "recursive") -> List[Document]: """ Chunk documents using specified strategy Args: documents (List[Document]): List of documents to chunk strategy (str): Chunking strategy ("recursive", "character", "semantic") Returns: List[Document]: List of chunked documents """ try: # Choose splitter based on strategy if strategy == "character": splitter = self.character_splitter elif strategy == "semantic": splitter = self.semantic_splitter else: splitter = self.recursive_splitter # Split documents chunked_docs = [] for doc in documents: chunks = splitter.split_documents([doc]) # Add chunk metadata for i, chunk in enumerate(chunks): chunk.metadata.update({ 'chunk_index': i, 'total_chunks': len(chunks), 'chunk_strategy': strategy, 'original_source': doc.metadata.get('source', 'unknown'), 'chunk_size': len(chunk.page_content), 'chunk_word_count': len(chunk.page_content.split()) }) chunked_docs.extend(chunks) return chunked_docs except Exception as e: raise Exception(f"Document chunking failed: {str(e)}") def create_vector_store(self, documents: List[Document], store_type: str = "faiss", persist_directory: Optional[str] = None) -> Any: """ Create vector store from documents Args: documents (List[Document]): Documents to vectorize store_type (str): Type of vector store ("faiss", "chroma") persist_directory (str): Optional directory to persist the store Returns: Vector store instance """ try: if not documents: raise ValueError("No documents provided for vector store creation") if store_type.lower() == "chroma": if persist_directory: vector_store = Chroma.from_documents( documents=documents, embedding=self.embeddings, persist_directory=persist_directory ) vector_store.persist() else: vector_store = Chroma.from_documents( documents=documents, embedding=self.embeddings ) else: # Default to FAISS vector_store = FAISS.from_documents( documents=documents, embedding=self.embeddings ) # Save FAISS index if persist directory provided if persist_directory: os.makedirs(persist_directory, exist_ok=True) vector_store.save_local(persist_directory) return vector_store except Exception as e: raise Exception(f"Vector store creation failed: {str(e)}") def create_qa_chain(self, documents: List[Document], llm, chain_type: str = "stuff") -> RetrievalQA: """ Create a Question-Answering chain from documents Args: documents (List[Document]): Documents for the knowledge base llm: Language model for answering questions chain_type (str): Type of QA chain ("stuff", "map_reduce", "refine") Returns: RetrievalQA: Configured QA chain """ try: # Chunk documents chunked_docs = self.chunk_documents(documents, strategy="semantic") # Create vector store vector_store = self.create_vector_store(chunked_docs, store_type="faiss") # Create retriever retriever = vector_store.as_retriever( search_type="similarity", search_kwargs={"k": 4} # Retrieve top 4 most relevant chunks ) # Custom prompt for GEO-focused QA qa_prompt_template = """Use the following pieces of context to answer the question at the end. If you don't know the answer, just say that you don't know, don't try to make up an answer. Focus on providing clear, accurate, and complete answers that would be suitable for AI search engines. Context: {context} Question: {question} Answer:""" qa_prompt = PromptTemplate( template=qa_prompt_template, input_variables=["context", "question"] ) # Create QA chain qa_chain = RetrievalQA.from_chain_type( llm=llm, chain_type=chain_type, retriever=retriever, return_source_documents=True, chain_type_kwargs={"prompt": qa_prompt} ) return qa_chain except Exception as e: raise Exception(f"QA chain creation failed: {str(e)}") def create_conversational_chain(self, documents: List[Document], llm) -> ConversationalRetrievalChain: """ Create a conversational retrieval chain with memory Args: documents (List[Document]): Documents for the knowledge base llm: Language model for conversation Returns: ConversationalRetrievalChain: Configured conversational chain """ try: # Chunk documents chunked_docs = self.chunk_documents(documents, strategy="semantic") # Create vector store vector_store = self.create_vector_store(chunked_docs, store_type="faiss") # Create retriever retriever = vector_store.as_retriever( search_type="similarity", search_kwargs={"k": 3} ) # Create memory memory = ConversationBufferMemory( memory_key="chat_history", return_messages=True, output_key="answer" ) # Custom prompt for conversational QA condense_question_prompt = """Given the following conversation and a follow up question, rephrase the follow up question to be a standalone question that can be understood without the chat history. Chat History: {chat_history} Follow Up Input: {question} Standalone question:""" # Create conversational chain conv_chain = ConversationalRetrievalChain.from_llm( llm=llm, retriever=retriever, memory=memory, return_source_documents=True, condense_question_prompt=PromptTemplate.from_template(condense_question_prompt) ) return conv_chain except Exception as e: raise Exception(f"Conversational chain creation failed: {str(e)}") def semantic_search(self, query: str, documents: List[Document], top_k: int = 5) -> List[Dict[str, Any]]: """ Perform semantic search on documents Args: query (str): Search query documents (List[Document]): Documents to search top_k (int): Number of top results to return Returns: List[Dict]: Search results with scores """ try: # Chunk documents chunked_docs = self.chunk_documents(documents, strategy="semantic") # Create vector store vector_store = self.create_vector_store(chunked_docs, store_type="faiss") # Perform similarity search with scores results = vector_store.similarity_search_with_score(query, k=top_k) # Format results formatted_results = [] for doc, score in results: result = { 'content': doc.page_content, 'metadata': doc.metadata, 'similarity_score': float(score), 'relevance_rank': len(formatted_results) + 1 } formatted_results.append(result) return formatted_results except Exception as e: raise Exception(f"Semantic search failed: {str(e)}") def analyze_document_similarity(self, documents: List[Document]) -> Dict[str, Any]: """ Analyze similarity between documents Args: documents (List[Document]): Documents to analyze Returns: Dict: Similarity analysis results """ try: if len(documents) < 2: return {'error': 'Need at least 2 documents for similarity analysis'} # Chunk documents chunked_docs = self.chunk_documents(documents, strategy="semantic") # Create embeddings for each document doc_embeddings = [] doc_metadata = [] for doc in chunked_docs: # Get embedding for the document embedding = self.embeddings.embed_query(doc.page_content) doc_embeddings.append(embedding) doc_metadata.append({ 'content_preview': doc.page_content[:200] + "...", 'metadata': doc.metadata, 'length': len(doc.page_content) }) # Calculate pairwise similarities similarities = [] embeddings_array = np.array(doc_embeddings) for i in range(len(embeddings_array)): for j in range(i + 1, len(embeddings_array)): # Calculate cosine similarity similarity = np.dot(embeddings_array[i], embeddings_array[j]) / ( np.linalg.norm(embeddings_array[i]) * np.linalg.norm(embeddings_array[j]) ) similarities.append({ 'doc_1_index': i, 'doc_2_index': j, 'similarity_score': float(similarity), 'doc_1_preview': doc_metadata[i]['content_preview'], 'doc_2_preview': doc_metadata[j]['content_preview'] }) # Sort by similarity score similarities.sort(key=lambda x: x['similarity_score'], reverse=True) # Calculate statistics similarity_scores = [s['similarity_score'] for s in similarities] return { 'total_comparisons': len(similarities), 'average_similarity': np.mean(similarity_scores), 'max_similarity': max(similarity_scores), 'min_similarity': min(similarity_scores), 'similarity_distribution': { 'high_similarity': len([s for s in similarity_scores if s > 0.8]), 'medium_similarity': len([s for s in similarity_scores if 0.5 < s <= 0.8]), 'low_similarity': len([s for s in similarity_scores if s <= 0.5]) }, 'top_similar_pairs': similarities[:5], 'most_dissimilar_pairs': similarities[-3:] } except Exception as e: return {'error': f"Similarity analysis failed: {str(e)}"} def extract_key_passages(self, documents: List[Document], queries: List[str], passages_per_query: int = 3) -> Dict[str, List[Dict[str, Any]]]: """ Extract key passages from documents based on multiple queries Args: documents (List[Document]): Documents to search queries (List[str]): List of queries to search for passages_per_query (int): Number of passages to extract per query Returns: Dict: Key passages organized by query """ try: # Chunk documents chunked_docs = self.chunk_documents(documents, strategy="semantic") # Create vector store vector_store = self.create_vector_store(chunked_docs, store_type="faiss") key_passages = {} for query in queries: # Search for relevant passages results = vector_store.similarity_search_with_score(query, k=passages_per_query) passages = [] for doc, score in results: passage = { 'content': doc.page_content, 'relevance_score': float(score), 'metadata': doc.metadata, 'word_count': len(doc.page_content.split()), 'query_match': query } passages.append(passage) key_passages[query] = passages return key_passages except Exception as e: return {'error': f"Key passage extraction failed: {str(e)}"} def optimize_chunking_strategy(self, documents: List[Document], test_queries: List[str]) -> Dict[str, Any]: """ Test different chunking strategies and recommend the best one Args: documents (List[Document]): Documents to test test_queries (List[str]): Queries to test retrieval performance Returns: Dict: Optimization results and recommendations """ try: strategies = ["recursive", "character", "semantic"] strategy_results = {} for strategy in strategies: try: # Test this strategy chunked_docs = self.chunk_documents(documents, strategy=strategy) vector_store = self.create_vector_store(chunked_docs, store_type="faiss") # Test retrieval performance retrieval_scores = [] for query in test_queries: results = vector_store.similarity_search_with_score(query, k=3) # Calculate average relevance score if results: avg_score = sum(score for _, score in results) / len(results) retrieval_scores.append(float(avg_score)) # Calculate strategy metrics avg_retrieval_score = np.mean(retrieval_scores) if retrieval_scores else 0 total_chunks = len(chunked_docs) avg_chunk_size = np.mean([len(doc.page_content) for doc in chunked_docs]) strategy_results[strategy] = { 'average_retrieval_score': avg_retrieval_score, 'total_chunks': total_chunks, 'average_chunk_size': avg_chunk_size, 'retrieval_scores': retrieval_scores, 'chunk_size_distribution': { 'min': min(len(doc.page_content) for doc in chunked_docs), 'max': max(len(doc.page_content) for doc in chunked_docs), 'std': float(np.std([len(doc.page_content) for doc in chunked_docs])) } } except Exception as e: strategy_results[strategy] = {'error': f"Strategy test failed: {str(e)}"} # Determine best strategy valid_strategies = {k: v for k, v in strategy_results.items() if 'error' not in v} if valid_strategies: best_strategy = max(valid_strategies.keys(), key=lambda k: valid_strategies[k]['average_retrieval_score']) recommendation = { 'recommended_strategy': best_strategy, 'reason': f"Best average retrieval score: {valid_strategies[best_strategy]['average_retrieval_score']:.4f}", 'all_results': strategy_results, 'performance_summary': { strategy: result.get('average_retrieval_score', 0) for strategy, result in valid_strategies.items() } } else: recommendation = { 'recommended_strategy': 'recursive', # Default fallback 'reason': 'All strategies failed, using default', 'all_results': strategy_results } return recommendation except Exception as e: return {'error': f"Chunking optimization failed: {str(e)}"} def create_document_summary(self, documents: List[Document], llm, summary_type: str = "extractive") -> Dict[str, Any]: """ Create document summaries using the chunked content Args: documents (List[Document]): Documents to summarize llm: Language model for summarization summary_type (str): Type of summary ("extractive", "abstractive") Returns: Dict: Summary results """ try: # Chunk documents for better processing chunked_docs = self.chunk_documents(documents, strategy="semantic") if summary_type == "extractive": # Extract key sentences/chunks return self._create_extractive_summary(chunked_docs) else: # Generate abstractive summary using LLM return self._create_abstractive_summary(chunked_docs, llm) except Exception as e: return {'error': f"Document summarization failed: {str(e)}"} def _create_extractive_summary(self, chunked_docs: List[Document]) -> Dict[str, Any]: """Create extractive summary by selecting key chunks""" try: # Simple extractive approach: select chunks with highest semantic density chunk_scores = [] for doc in chunked_docs: content = doc.page_content # Simple scoring based on content characteristics word_count = len(content.split()) sentence_count = len([s for s in content.split('.') if s.strip()]) # Score based on information density density_score = word_count / max(sentence_count, 1) # Bonus for chunks with questions, definitions, or lists structure_bonus = 0 if '?' in content: structure_bonus += 1 if any(word in content.lower() for word in ['define', 'definition', 'means', 'refers to']): structure_bonus += 2 if content.count('\n•') > 0 or content.count('1.') > 0: structure_bonus += 1 total_score = density_score + structure_bonus chunk_scores.append((doc, total_score)) # Sort by score and select top chunks for summary chunk_scores.sort(key=lambda x: x[1], reverse=True) top_chunks = chunk_scores[:min(5, len(chunk_scores))] summary_content = [] for doc, score in top_chunks: summary_content.append({ 'content': doc.page_content, 'score': score, 'metadata': doc.metadata }) return { 'summary_type': 'extractive', 'key_chunks': summary_content, 'total_chunks_analyzed': len(chunked_docs), 'chunks_selected': len(top_chunks) } except Exception as e: return {'error': f"Extractive summary failed: {str(e)}"} def _create_abstractive_summary(self, chunked_docs: List[Document], llm) -> Dict[str, Any]: """Create abstractive summary using language model""" try: # Combine content from top chunks combined_content = "\n\n".join([doc.page_content for doc in chunked_docs[:10]]) summary_prompt = f"""Please provide a comprehensive summary of the following content. Focus on the main topics, key insights, and important details that would be valuable for AI search engines. Content: {combined_content[:5000]} Summary:""" from langchain.prompts import ChatPromptTemplate prompt_template = ChatPromptTemplate.from_messages([ ("system", "You are a professional content summarizer. Create clear, informative summaries."), ("user", summary_prompt) ]) chain = prompt_template | llm result = chain.invoke({}) summary_text = result.content if hasattr(result, 'content') else str(result) return { 'summary_type': 'abstractive', 'summary': summary_text, 'source_chunks': len(chunked_docs), 'content_length_processed': len(combined_content) } except Exception as e: return {'error': f"Abstractive summary failed: {str(e)}"} def save_vector_store(self, vector_store, directory_path: str, store_type: str = "faiss") -> bool: """ Save vector store to disk Args: vector_store: Vector store instance to save directory_path (str): Directory to save the store store_type (str): Type of vector store Returns: bool: Success status """ try: os.makedirs(directory_path, exist_ok=True) if store_type.lower() == "faiss": vector_store.save_local(directory_path) elif store_type.lower() == "chroma": # Chroma stores are typically persisted during creation pass return True except Exception as e: print(f"Failed to save vector store: {str(e)}") return False def load_vector_store(self, directory_path: str, store_type: str = "faiss"): """ Load vector store from disk Args: directory_path (str): Directory containing the saved store store_type (str): Type of vector store Returns: Vector store instance or None if failed """ try: if not os.path.exists(directory_path): return None if store_type.lower() == "faiss": vector_store = FAISS.load_local( directory_path, self.embeddings, allow_dangerous_deserialization=True ) return vector_store elif store_type.lower() == "chroma": vector_store = Chroma( persist_directory=directory_path, embedding_function=self.embeddings ) return vector_store return None except Exception as e: print(f"Failed to load vector store: {str(e)}") return None def get_chunking_stats(self, documents: List[Document], strategy: str = "recursive") -> Dict[str, Any]: """ Get detailed statistics about document chunking Args: documents (List[Document]): Documents to analyze strategy (str): Chunking strategy to use Returns: Dict: Detailed chunking statistics """ try: # Chunk documents chunked_docs = self.chunk_documents(documents, strategy=strategy) # Calculate statistics chunk_sizes = [len(doc.page_content) for doc in chunked_docs] word_counts = [len(doc.page_content.split()) for doc in chunked_docs] stats = { 'strategy_used': strategy, 'original_documents': len(documents), 'total_chunks': len(chunked_docs), 'chunk_size_stats': { 'min': min(chunk_sizes) if chunk_sizes else 0, 'max': max(chunk_sizes) if chunk_sizes else 0, 'mean': np.mean(chunk_sizes) if chunk_sizes else 0, 'median': np.median(chunk_sizes) if chunk_sizes else 0, 'std': np.std(chunk_sizes) if chunk_sizes else 0 }, 'word_count_stats': { 'min': min(word_counts) if word_counts else 0, 'max': max(word_counts) if word_counts else 0, 'mean': np.mean(word_counts) if word_counts else 0, 'median': np.median(word_counts) if word_counts else 0, 'std': np.std(word_counts) if word_counts else 0 }, 'chunk_distribution': { 'very_small': len([s for s in chunk_sizes if s < 200]), 'small': len([s for s in chunk_sizes if 200 <= s < 500]), 'medium': len([s for s in chunk_sizes if 500 <= s < 1000]), 'large': len([s for s in chunk_sizes if 1000 <= s < 2000]), 'very_large': len([s for s in chunk_sizes if s >= 2000]) }, 'overlap_efficiency': self._calculate_overlap_efficiency(chunked_docs), 'content_coverage': self._calculate_content_coverage(documents, chunked_docs) } return stats except Exception as e: return {'error': f"Chunking statistics failed: {str(e)}"} def _calculate_overlap_efficiency(self, chunked_docs: List[Document]) -> float: """Calculate efficiency of chunk overlaps""" try: if len(chunked_docs) < 2: return 1.0 total_content_length = sum(len(doc.page_content) for doc in chunked_docs) unique_content = set() # Rough estimate of content uniqueness for doc in chunked_docs: words = doc.page_content.split() for i in range(0, len(words), 10): # Sample every 10th word unique_content.add(' '.join(words[i:i+10])) # Efficiency as ratio of unique content to total content efficiency = len(unique_content) * 10 / total_content_length if total_content_length > 0 else 0 return min(efficiency, 1.0) except Exception: return 0.5 # Default neutral efficiency def _calculate_content_coverage(self, original_docs: List[Document], chunked_docs: List[Document]) -> float: """Calculate how well chunks cover original content""" try: original_content = ' '.join([doc.page_content for doc in original_docs]) chunked_content = ' '.join([doc.page_content for doc in chunked_docs]) # Simple coverage metric based on length coverage = len(chunked_content) / len(original_content) if original_content else 0 return min(coverage, 1.0) except Exception: return 0.0 class ChunkingOptimizer: """Helper class for optimizing chunking parameters""" def __init__(self, embeddings_model): self.embeddings = embeddings_model def optimize_chunk_size(self, documents: List[Document], test_queries: List[str], size_range: Tuple[int, int] = (200, 2000), step_size: int = 200) -> Dict[str, Any]: """ Find optimal chunk size for given documents and queries Args: documents (List[Document]): Documents to test test_queries (List[str]): Queries for testing retrieval size_range (Tuple[int, int]): Range of chunk sizes to test step_size (int): Step size for testing Returns: Dict: Optimization results with recommended chunk size """ try: results = {} min_size, max_size = size_range for chunk_size in range(min_size, max_size + 1, step_size): # Test this chunk size chunker = VectorChunker(self.embeddings, chunk_size=chunk_size) try: chunked_docs = chunker.chunk_documents(documents) vector_store = chunker.create_vector_store(chunked_docs) # Test retrieval performance retrieval_scores = [] for query in test_queries: search_results = vector_store.similarity_search_with_score(query, k=3) if search_results: avg_score = sum(score for _, score in search_results) / len(search_results) retrieval_scores.append(float(avg_score)) avg_performance = np.mean(retrieval_scores) if retrieval_scores else 0 results[chunk_size] = { 'average_retrieval_score': avg_performance, 'total_chunks': len(chunked_docs), 'retrieval_scores': retrieval_scores } except Exception as e: results[chunk_size] = {'error': str(e)} # Find optimal chunk size valid_results = {k: v for k, v in results.items() if 'error' not in v} if valid_results: optimal_size = max(valid_results.keys(), key=lambda k: valid_results[k]['average_retrieval_score']) return { 'optimal_chunk_size': optimal_size, 'optimal_performance': valid_results[optimal_size]['average_retrieval_score'], 'all_results': results, 'performance_trend': self._analyze_performance_trend(valid_results), 'recommendation': f"Use chunk size {optimal_size} for best retrieval performance" } else: return { 'error': 'No valid chunk sizes could be tested', 'all_results': results } except Exception as e: return {'error': f"Chunk size optimization failed: {str(e)}"} def _analyze_performance_trend(self, results: Dict[int, Dict[str, Any]]) -> Dict[str, Any]: """Analyze performance trend across different chunk sizes""" try: sizes = sorted(results.keys()) performances = [results[size]['average_retrieval_score'] for size in sizes] # Find trend direction if len(performances) >= 2: trend_direction = "increasing" if performances[-1] > performances[0] else "decreasing" peak_performance = max(performances) peak_size = sizes[performances.index(peak_performance)] return { 'trend_direction': trend_direction, 'peak_performance': peak_performance, 'peak_size': peak_size, 'performance_range': max(performances) - min(performances), 'stable_performance': max(performances) - min(performances) < 0.1 } else: return {'error': 'Insufficient data for trend analysis'} except Exception: return {'error': 'Trend analysis failed'} class RAGPipeline: """Complete RAG pipeline for document question-answering""" def __init__(self, embeddings_model, llm): self.embeddings = embeddings_model self.llm = llm self.chunker = VectorChunker(embeddings_model) self.vector_stores = {} self.qa_chains = {} def create_pipeline(self, documents: List[Document], pipeline_id: str, chunking_strategy: str = "semantic") -> Dict[str, Any]: """ Create a complete RAG pipeline for documents Args: documents (List[Document]): Documents to process pipeline_id (str): Unique identifier for this pipeline chunking_strategy (str): Strategy for document chunking Returns: Dict: Pipeline creation results """ try: # Step 1: Chunk documents chunked_docs = self.chunker.chunk_documents(documents, strategy=chunking_strategy) # Step 2: Create vector store vector_store = self.chunker.create_vector_store(chunked_docs, store_type="faiss") # Step 3: Create QA chain qa_chain = self.chunker.create_qa_chain(documents, self.llm) # Store pipeline components self.vector_stores[pipeline_id] = vector_store self.qa_chains[pipeline_id] = qa_chain # Pipeline statistics stats = { 'pipeline_id': pipeline_id, 'documents_processed': len(documents), 'chunks_created': len(chunked_docs), 'chunking_strategy': chunking_strategy, 'vector_store_type': 'faiss', 'embedding_model': str(self.embeddings), 'created_at': self._get_timestamp() } return { 'success': True, 'pipeline_stats': stats, 'chunking_info': self.chunker.get_chunking_stats(documents, chunking_strategy) } except Exception as e: return {'error': f"Pipeline creation failed: {str(e)}"} def query_pipeline(self, pipeline_id: str, query: str, return_sources: bool = True) -> Dict[str, Any]: """ Query a created RAG pipeline Args: pipeline_id (str): ID of the pipeline to query query (str): Question to ask return_sources (bool): Whether to return source documents Returns: Dict: Query results with answer and sources """ try: if pipeline_id not in self.qa_chains: return {'error': f"Pipeline '{pipeline_id}' not found"} qa_chain = self.qa_chains[pipeline_id] # Execute query result = qa_chain({"query": query}) # Format response response = { 'query': query, 'answer': result.get('result', 'No answer generated'), 'pipeline_id': pipeline_id, 'query_timestamp': self._get_timestamp() } # Add source documents if requested if return_sources and 'source_documents' in result: sources = [] for i, doc in enumerate(result['source_documents']): source = { 'source_index': i, 'content': doc.page_content, 'metadata': doc.metadata, 'relevance_rank': i + 1 } sources.append(source) response['sources'] = sources response['num_sources'] = len(sources) return response except Exception as e: return {'error': f"Pipeline query failed: {str(e)}"} def batch_query_pipeline(self, pipeline_id: str, queries: List[str]) -> List[Dict[str, Any]]: """ Execute multiple queries on a pipeline Args: pipeline_id (str): ID of the pipeline to query queries (List[str]): List of questions to ask Returns: List[Dict]: List of query results """ results = [] for i, query in enumerate(queries): try: result = self.query_pipeline(pipeline_id, query, return_sources=False) result['batch_index'] = i results.append(result) except Exception as e: results.append({ 'batch_index': i, 'query': query, 'error': f"Batch query failed: {str(e)}" }) return results def evaluate_pipeline(self, pipeline_id: str, test_queries: List[str], expected_answers: List[str] = None) -> Dict[str, Any]: """ Evaluate pipeline performance on test queries Args: pipeline_id (str): ID of the pipeline to evaluate test_queries (List[str]): Test questions expected_answers (List[str]): Optional expected answers for comparison Returns: Dict: Evaluation results """ try: if pipeline_id not in self.qa_chains: return {'error': f"Pipeline '{pipeline_id}' not found"} evaluation_results = [] response_times = [] for i, query in enumerate(test_queries): import time start_time = time.time() # Execute query result = self.query_pipeline(pipeline_id, query, return_sources=True) end_time = time.time() response_time = end_time - start_time response_times.append(response_time) # Evaluate result eval_result = { 'query_index': i, 'query': query, 'answer_generated': not result.get('error'), 'response_time': response_time, 'answer_length': len(result.get('answer', '')), 'sources_returned': result.get('num_sources', 0) } # If expected answer provided, calculate similarity if expected_answers and i < len(expected_answers): expected = expected_answers[i] generated = result.get('answer', '') # Simple similarity metric similarity = self._calculate_answer_similarity(expected, generated) eval_result['answer_similarity'] = similarity eval_result['expected_answer'] = expected evaluation_results.append(eval_result) # Calculate aggregate metrics successful_queries = len([r for r in evaluation_results if r['answer_generated']]) avg_response_time = np.mean(response_times) if response_times else 0 if expected_answers: similarities = [r.get('answer_similarity', 0) for r in evaluation_results if 'answer_similarity' in r] avg_similarity = np.mean(similarities) if similarities else 0 else: avg_similarity = None return { 'pipeline_id': pipeline_id, 'total_queries': len(test_queries), 'successful_queries': successful_queries, 'success_rate': successful_queries / len(test_queries) if test_queries else 0, 'average_response_time': avg_response_time, 'average_answer_similarity': avg_similarity, 'detailed_results': evaluation_results, 'evaluation_timestamp': self._get_timestamp() } except Exception as e: return {'error': f"Pipeline evaluation failed: {str(e)}"} def _calculate_answer_similarity(self, expected: str, generated: str) -> float: """Calculate similarity between expected and generated answers""" try: # Simple word overlap similarity expected_words = set(expected.lower().split()) generated_words = set(generated.lower().split()) if not expected_words and not generated_words: return 1.0 intersection = expected_words.intersection(generated_words) union = expected_words.union(generated_words) return len(intersection) / len(union) if union else 0.0 except Exception: return 0.0 def get_pipeline_info(self, pipeline_id: str) -> Dict[str, Any]: """Get information about a specific pipeline""" try: if pipeline_id not in self.qa_chains: return {'error': f"Pipeline '{pipeline_id}' not found"} # Get vector store info vector_store = self.vector_stores.get(pipeline_id) if vector_store: try: # Try to get vector store statistics total_vectors = vector_store.index.ntotal if hasattr(vector_store, 'index') else 'unknown' except: total_vectors = 'unknown' else: total_vectors = 'unknown' return { 'pipeline_id': pipeline_id, 'has_qa_chain': pipeline_id in self.qa_chains, 'has_vector_store': pipeline_id in self.vector_stores, 'total_vectors': total_vectors, 'embedding_model': str(self.embeddings), 'llm_model': str(self.llm) } except Exception as e: return {'error': f"Failed to get pipeline info: {str(e)}"} def list_pipelines(self) -> Dict[str, Any]: """List all created pipelines""" return { 'total_pipelines': len(self.qa_chains), 'pipeline_ids': list(self.qa_chains.keys()), 'vector_stores': list(self.vector_stores.keys()) } def delete_pipeline(self, pipeline_id: str) -> Dict[str, Any]: """Delete a pipeline and free resources""" try: deleted_components = [] if pipeline_id in self.qa_chains: del self.qa_chains[pipeline_id] deleted_components.append('qa_chain') if pipeline_id in self.vector_stores: del self.vector_stores[pipeline_id] deleted_components.append('vector_store') if deleted_components: return { 'success': True, 'pipeline_id': pipeline_id, 'deleted_components': deleted_components } else: return {'error': f"Pipeline '{pipeline_id}' not found"} except Exception as e: return {'error': f"Pipeline deletion failed: {str(e)}"} def export_pipeline_config(self, pipeline_id: str) -> Dict[str, Any]: """Export pipeline configuration for recreation""" try: if pipeline_id not in self.qa_chains: return {'error': f"Pipeline '{pipeline_id}' not found"} config = { 'pipeline_id': pipeline_id, 'embedding_model_name': getattr(self.embeddings, 'model_name', 'unknown'), 'llm_model_name': getattr(self.llm, 'model_name', 'unknown'), 'chunker_config': { 'chunk_size': self.chunker.chunk_size, 'chunk_overlap': self.chunker.chunk_overlap }, 'export_timestamp': self._get_timestamp(), 'vector_store_type': 'faiss' } return config except Exception as e: return {'error': f"Pipeline export failed: {str(e)}"} def _get_timestamp(self) -> str: """Get current timestamp""" from datetime import datetime return datetime.now().strftime('%Y-%m-%d %H:%M:%S') # Utility functions for the module def optimize_rag_pipeline(documents: List[Document], embeddings_model, llm, test_queries: List[str]) -> Dict[str, Any]: """ Optimize RAG pipeline configuration for given documents and queries Args: documents (List[Document]): Documents to optimize for embeddings_model: Embedding model to use llm: Language model to use test_queries (List[str]): Test queries for optimization Returns: Dict: Optimization recommendations """ try: # Test different chunking strategies chunker = VectorChunker(embeddings_model) chunking_results = chunker.optimize_chunking_strategy(documents, test_queries) # Test different chunk sizes optimizer = ChunkingOptimizer(embeddings_model) size_results = optimizer.optimize_chunk_size(documents, test_queries) # Create optimized pipeline best_strategy = chunking_results.get('recommended_strategy', 'semantic') best_size = size_results.get('optimal_chunk_size', 1000) # Create optimized chunker optimized_chunker = VectorChunker( embeddings_model, chunk_size=best_size, chunk_overlap=best_size // 5 # 20% overlap ) # Test the optimized configuration pipeline = RAGPipeline(embeddings_model, llm) pipeline.chunker = optimized_chunker test_pipeline_id = "optimization_test" creation_result = pipeline.create_pipeline(documents, test_pipeline_id, best_strategy) if not creation_result.get('error'): evaluation_result = pipeline.evaluate_pipeline(test_pipeline_id, test_queries) pipeline.delete_pipeline(test_pipeline_id) # Clean up else: evaluation_result = {'error': 'Could not evaluate optimized pipeline'} return { 'optimization_complete': True, 'recommended_config': { 'chunking_strategy': best_strategy, 'chunk_size': best_size, 'chunk_overlap': best_size // 5 }, 'chunking_optimization': chunking_results, 'size_optimization': size_results, 'performance_evaluation': evaluation_result, 'recommendations': [ f"Use {best_strategy} chunking strategy", f"Set chunk size to {best_size} characters", f"Use {best_size // 5} character overlap", "Monitor and adjust based on query performance" ] } except Exception as e: return {'error': f"RAG optimization failed: {str(e)}"} def create_demo_rag_system(sample_documents: List[Document], embeddings_model, llm) -> Dict[str, Any]: """ Create a demonstration RAG system with sample documents Args: sample_documents (List[Document]): Sample documents for demo embeddings_model: Embedding model llm: Language model Returns: Dict: Demo system information and sample interactions """ try: # Create RAG pipeline pipeline = RAGPipeline(embeddings_model, llm) demo_id = "demo_system" # Create the pipeline creation_result = pipeline.create_pipeline(sample_documents, demo_id, "semantic") if creation_result.get('error'): return {'error': f"Demo system creation failed: {creation_result['error']}"} # Sample queries for demonstration demo_queries = [ "What is the main topic of these documents?", "Can you summarize the key points?", "What are the most important concepts mentioned?" ] # Execute demo queries demo_results = [] for query in demo_queries: result = pipeline.query_pipeline(demo_id, query, return_sources=True) demo_results.append(result) # Get system statistics pipeline_info = pipeline.get_pipeline_info(demo_id) return { 'demo_system_created': True, 'pipeline_id': demo_id, 'creation_stats': creation_result, 'pipeline_info': pipeline_info, 'demo_queries': demo_queries, 'demo_results': demo_results, 'usage_instructions': [ f"Use pipeline.query_pipeline('{demo_id}', 'your question') to ask questions", "The system will return answers with source document references", "Sources show which parts of the documents were used for the answer" ] } except Exception as e: return {'error': f"Demo system creation failed: {str(e)}"} # Export the main classes for use in other modules __all__ = [ 'VectorChunker', 'ChunkingOptimizer', 'RAGPipeline', 'optimize_rag_pipeline', 'create_demo_rag_system' ]