Spaces:
Runtime error
Runtime error
| """ | |
| Vector Chunking and RAG Module | |
| Handles document chunking, vector embeddings, and RAG question-answering | |
| """ | |
| import os | |
| import json | |
| import numpy as np | |
| from typing import Dict, Any, List, Optional, Tuple | |
| from langchain.text_splitter import RecursiveCharacterTextSplitter, CharacterTextSplitter | |
| from langchain.schema import Document | |
| from langchain_community.vectorstores import FAISS, Chroma | |
| from langchain.chains import RetrievalQA, ConversationalRetrievalChain | |
| from langchain.memory import ConversationBufferMemory | |
| from langchain.prompts import PromptTemplate | |
| import tempfile | |
| import shutil | |
| class VectorChunker: | |
| """Main class for document chunking and vector operations""" | |
| def __init__(self, embeddings_model, chunk_size: int = 1000, chunk_overlap: int = 200): | |
| self.embeddings = embeddings_model | |
| self.chunk_size = chunk_size | |
| self.chunk_overlap = chunk_overlap | |
| self.setup_text_splitters() | |
| self.vector_stores = {} # Cache for vector stores | |
| def setup_text_splitters(self): | |
| """Initialize different text splitting strategies""" | |
| # Default recursive splitter | |
| self.recursive_splitter = RecursiveCharacterTextSplitter( | |
| chunk_size=self.chunk_size, | |
| chunk_overlap=self.chunk_overlap, | |
| length_function=len, | |
| separators=["\n\n", "\n", " ", ""] | |
| ) | |
| # Character-based splitter | |
| self.character_splitter = CharacterTextSplitter( | |
| chunk_size=self.chunk_size, | |
| chunk_overlap=self.chunk_overlap, | |
| separator="\n\n" | |
| ) | |
| # Semantic splitter for better context preservation | |
| self.semantic_splitter = RecursiveCharacterTextSplitter( | |
| chunk_size=800, # Smaller chunks for better semantic coherence | |
| chunk_overlap=150, | |
| length_function=len, | |
| separators=["\n\n", "\n", ". ", " ", ""] | |
| ) | |
| def chunk_documents(self, documents: List[Document], strategy: str = "recursive") -> List[Document]: | |
| """ | |
| Chunk documents using specified strategy | |
| Args: | |
| documents (List[Document]): List of documents to chunk | |
| strategy (str): Chunking strategy ("recursive", "character", "semantic") | |
| Returns: | |
| List[Document]: List of chunked documents | |
| """ | |
| try: | |
| # Choose splitter based on strategy | |
| if strategy == "character": | |
| splitter = self.character_splitter | |
| elif strategy == "semantic": | |
| splitter = self.semantic_splitter | |
| else: | |
| splitter = self.recursive_splitter | |
| # Split documents | |
| chunked_docs = [] | |
| for doc in documents: | |
| chunks = splitter.split_documents([doc]) | |
| # Add chunk metadata | |
| for i, chunk in enumerate(chunks): | |
| chunk.metadata.update({ | |
| 'chunk_index': i, | |
| 'total_chunks': len(chunks), | |
| 'chunk_strategy': strategy, | |
| 'original_source': doc.metadata.get('source', 'unknown'), | |
| 'chunk_size': len(chunk.page_content), | |
| 'chunk_word_count': len(chunk.page_content.split()) | |
| }) | |
| chunked_docs.extend(chunks) | |
| return chunked_docs | |
| except Exception as e: | |
| raise Exception(f"Document chunking failed: {str(e)}") | |
| def create_vector_store(self, documents: List[Document], store_type: str = "faiss", | |
| persist_directory: Optional[str] = None) -> Any: | |
| """ | |
| Create vector store from documents | |
| Args: | |
| documents (List[Document]): Documents to vectorize | |
| store_type (str): Type of vector store ("faiss", "chroma") | |
| persist_directory (str): Optional directory to persist the store | |
| Returns: | |
| Vector store instance | |
| """ | |
| try: | |
| if not documents: | |
| raise ValueError("No documents provided for vector store creation") | |
| if store_type.lower() == "chroma": | |
| if persist_directory: | |
| vector_store = Chroma.from_documents( | |
| documents=documents, | |
| embedding=self.embeddings, | |
| persist_directory=persist_directory | |
| ) | |
| vector_store.persist() | |
| else: | |
| vector_store = Chroma.from_documents( | |
| documents=documents, | |
| embedding=self.embeddings | |
| ) | |
| else: # Default to FAISS | |
| vector_store = FAISS.from_documents( | |
| documents=documents, | |
| embedding=self.embeddings | |
| ) | |
| # Save FAISS index if persist directory provided | |
| if persist_directory: | |
| os.makedirs(persist_directory, exist_ok=True) | |
| vector_store.save_local(persist_directory) | |
| return vector_store | |
| except Exception as e: | |
| raise Exception(f"Vector store creation failed: {str(e)}") | |
| def create_qa_chain(self, documents: List[Document], llm, chain_type: str = "stuff") -> RetrievalQA: | |
| """ | |
| Create a Question-Answering chain from documents | |
| Args: | |
| documents (List[Document]): Documents for the knowledge base | |
| llm: Language model for answering questions | |
| chain_type (str): Type of QA chain ("stuff", "map_reduce", "refine") | |
| Returns: | |
| RetrievalQA: Configured QA chain | |
| """ | |
| try: | |
| # Chunk documents | |
| chunked_docs = self.chunk_documents(documents, strategy="semantic") | |
| # Create vector store | |
| vector_store = self.create_vector_store(chunked_docs, store_type="faiss") | |
| # Create retriever | |
| retriever = vector_store.as_retriever( | |
| search_type="similarity", | |
| search_kwargs={"k": 4} # Retrieve top 4 most relevant chunks | |
| ) | |
| # Custom prompt for GEO-focused QA | |
| qa_prompt_template = """Use the following pieces of context to answer the question at the end. | |
| If you don't know the answer, just say that you don't know, don't try to make up an answer. | |
| Focus on providing clear, accurate, and complete answers that would be suitable for AI search engines. | |
| Context: | |
| {context} | |
| Question: {question} | |
| Answer:""" | |
| qa_prompt = PromptTemplate( | |
| template=qa_prompt_template, | |
| input_variables=["context", "question"] | |
| ) | |
| # Create QA chain | |
| qa_chain = RetrievalQA.from_chain_type( | |
| llm=llm, | |
| chain_type=chain_type, | |
| retriever=retriever, | |
| return_source_documents=True, | |
| chain_type_kwargs={"prompt": qa_prompt} | |
| ) | |
| return qa_chain | |
| except Exception as e: | |
| raise Exception(f"QA chain creation failed: {str(e)}") | |
| def create_conversational_chain(self, documents: List[Document], llm) -> ConversationalRetrievalChain: | |
| """ | |
| Create a conversational retrieval chain with memory | |
| Args: | |
| documents (List[Document]): Documents for the knowledge base | |
| llm: Language model for conversation | |
| Returns: | |
| ConversationalRetrievalChain: Configured conversational chain | |
| """ | |
| try: | |
| # Chunk documents | |
| chunked_docs = self.chunk_documents(documents, strategy="semantic") | |
| # Create vector store | |
| vector_store = self.create_vector_store(chunked_docs, store_type="faiss") | |
| # Create retriever | |
| retriever = vector_store.as_retriever( | |
| search_type="similarity", | |
| search_kwargs={"k": 3} | |
| ) | |
| # Create memory | |
| memory = ConversationBufferMemory( | |
| memory_key="chat_history", | |
| return_messages=True, | |
| output_key="answer" | |
| ) | |
| # Custom prompt for conversational QA | |
| condense_question_prompt = """Given the following conversation and a follow up question, | |
| rephrase the follow up question to be a standalone question that can be understood without the chat history. | |
| Chat History: | |
| {chat_history} | |
| Follow Up Input: {question} | |
| Standalone question:""" | |
| # Create conversational chain | |
| conv_chain = ConversationalRetrievalChain.from_llm( | |
| llm=llm, | |
| retriever=retriever, | |
| memory=memory, | |
| return_source_documents=True, | |
| condense_question_prompt=PromptTemplate.from_template(condense_question_prompt) | |
| ) | |
| return conv_chain | |
| except Exception as e: | |
| raise Exception(f"Conversational chain creation failed: {str(e)}") | |
| def semantic_search(self, query: str, documents: List[Document], top_k: int = 5) -> List[Dict[str, Any]]: | |
| """ | |
| Perform semantic search on documents | |
| Args: | |
| query (str): Search query | |
| documents (List[Document]): Documents to search | |
| top_k (int): Number of top results to return | |
| Returns: | |
| List[Dict]: Search results with scores | |
| """ | |
| try: | |
| # Chunk documents | |
| chunked_docs = self.chunk_documents(documents, strategy="semantic") | |
| # Create vector store | |
| vector_store = self.create_vector_store(chunked_docs, store_type="faiss") | |
| # Perform similarity search with scores | |
| results = vector_store.similarity_search_with_score(query, k=top_k) | |
| # Format results | |
| formatted_results = [] | |
| for doc, score in results: | |
| result = { | |
| 'content': doc.page_content, | |
| 'metadata': doc.metadata, | |
| 'similarity_score': float(score), | |
| 'relevance_rank': len(formatted_results) + 1 | |
| } | |
| formatted_results.append(result) | |
| return formatted_results | |
| except Exception as e: | |
| raise Exception(f"Semantic search failed: {str(e)}") | |
| def analyze_document_similarity(self, documents: List[Document]) -> Dict[str, Any]: | |
| """ | |
| Analyze similarity between documents | |
| Args: | |
| documents (List[Document]): Documents to analyze | |
| Returns: | |
| Dict: Similarity analysis results | |
| """ | |
| try: | |
| if len(documents) < 2: | |
| return {'error': 'Need at least 2 documents for similarity analysis'} | |
| # Chunk documents | |
| chunked_docs = self.chunk_documents(documents, strategy="semantic") | |
| # Create embeddings for each document | |
| doc_embeddings = [] | |
| doc_metadata = [] | |
| for doc in chunked_docs: | |
| # Get embedding for the document | |
| embedding = self.embeddings.embed_query(doc.page_content) | |
| doc_embeddings.append(embedding) | |
| doc_metadata.append({ | |
| 'content_preview': doc.page_content[:200] + "...", | |
| 'metadata': doc.metadata, | |
| 'length': len(doc.page_content) | |
| }) | |
| # Calculate pairwise similarities | |
| similarities = [] | |
| embeddings_array = np.array(doc_embeddings) | |
| for i in range(len(embeddings_array)): | |
| for j in range(i + 1, len(embeddings_array)): | |
| # Calculate cosine similarity | |
| similarity = np.dot(embeddings_array[i], embeddings_array[j]) / ( | |
| np.linalg.norm(embeddings_array[i]) * np.linalg.norm(embeddings_array[j]) | |
| ) | |
| similarities.append({ | |
| 'doc_1_index': i, | |
| 'doc_2_index': j, | |
| 'similarity_score': float(similarity), | |
| 'doc_1_preview': doc_metadata[i]['content_preview'], | |
| 'doc_2_preview': doc_metadata[j]['content_preview'] | |
| }) | |
| # Sort by similarity score | |
| similarities.sort(key=lambda x: x['similarity_score'], reverse=True) | |
| # Calculate statistics | |
| similarity_scores = [s['similarity_score'] for s in similarities] | |
| return { | |
| 'total_comparisons': len(similarities), | |
| 'average_similarity': np.mean(similarity_scores), | |
| 'max_similarity': max(similarity_scores), | |
| 'min_similarity': min(similarity_scores), | |
| 'similarity_distribution': { | |
| 'high_similarity': len([s for s in similarity_scores if s > 0.8]), | |
| 'medium_similarity': len([s for s in similarity_scores if 0.5 < s <= 0.8]), | |
| 'low_similarity': len([s for s in similarity_scores if s <= 0.5]) | |
| }, | |
| 'top_similar_pairs': similarities[:5], | |
| 'most_dissimilar_pairs': similarities[-3:] | |
| } | |
| except Exception as e: | |
| return {'error': f"Similarity analysis failed: {str(e)}"} | |
| def extract_key_passages(self, documents: List[Document], queries: List[str], | |
| passages_per_query: int = 3) -> Dict[str, List[Dict[str, Any]]]: | |
| """ | |
| Extract key passages from documents based on multiple queries | |
| Args: | |
| documents (List[Document]): Documents to search | |
| queries (List[str]): List of queries to search for | |
| passages_per_query (int): Number of passages to extract per query | |
| Returns: | |
| Dict: Key passages organized by query | |
| """ | |
| try: | |
| # Chunk documents | |
| chunked_docs = self.chunk_documents(documents, strategy="semantic") | |
| # Create vector store | |
| vector_store = self.create_vector_store(chunked_docs, store_type="faiss") | |
| key_passages = {} | |
| for query in queries: | |
| # Search for relevant passages | |
| results = vector_store.similarity_search_with_score(query, k=passages_per_query) | |
| passages = [] | |
| for doc, score in results: | |
| passage = { | |
| 'content': doc.page_content, | |
| 'relevance_score': float(score), | |
| 'metadata': doc.metadata, | |
| 'word_count': len(doc.page_content.split()), | |
| 'query_match': query | |
| } | |
| passages.append(passage) | |
| key_passages[query] = passages | |
| return key_passages | |
| except Exception as e: | |
| return {'error': f"Key passage extraction failed: {str(e)}"} | |
| def optimize_chunking_strategy(self, documents: List[Document], | |
| test_queries: List[str]) -> Dict[str, Any]: | |
| """ | |
| Test different chunking strategies and recommend the best one | |
| Args: | |
| documents (List[Document]): Documents to test | |
| test_queries (List[str]): Queries to test retrieval performance | |
| Returns: | |
| Dict: Optimization results and recommendations | |
| """ | |
| try: | |
| strategies = ["recursive", "character", "semantic"] | |
| strategy_results = {} | |
| for strategy in strategies: | |
| try: | |
| # Test this strategy | |
| chunked_docs = self.chunk_documents(documents, strategy=strategy) | |
| vector_store = self.create_vector_store(chunked_docs, store_type="faiss") | |
| # Test retrieval performance | |
| retrieval_scores = [] | |
| for query in test_queries: | |
| results = vector_store.similarity_search_with_score(query, k=3) | |
| # Calculate average relevance score | |
| if results: | |
| avg_score = sum(score for _, score in results) / len(results) | |
| retrieval_scores.append(float(avg_score)) | |
| # Calculate strategy metrics | |
| avg_retrieval_score = np.mean(retrieval_scores) if retrieval_scores else 0 | |
| total_chunks = len(chunked_docs) | |
| avg_chunk_size = np.mean([len(doc.page_content) for doc in chunked_docs]) | |
| strategy_results[strategy] = { | |
| 'average_retrieval_score': avg_retrieval_score, | |
| 'total_chunks': total_chunks, | |
| 'average_chunk_size': avg_chunk_size, | |
| 'retrieval_scores': retrieval_scores, | |
| 'chunk_size_distribution': { | |
| 'min': min(len(doc.page_content) for doc in chunked_docs), | |
| 'max': max(len(doc.page_content) for doc in chunked_docs), | |
| 'std': float(np.std([len(doc.page_content) for doc in chunked_docs])) | |
| } | |
| } | |
| except Exception as e: | |
| strategy_results[strategy] = {'error': f"Strategy test failed: {str(e)}"} | |
| # Determine best strategy | |
| valid_strategies = {k: v for k, v in strategy_results.items() if 'error' not in v} | |
| if valid_strategies: | |
| best_strategy = max(valid_strategies.keys(), | |
| key=lambda k: valid_strategies[k]['average_retrieval_score']) | |
| recommendation = { | |
| 'recommended_strategy': best_strategy, | |
| 'reason': f"Best average retrieval score: {valid_strategies[best_strategy]['average_retrieval_score']:.4f}", | |
| 'all_results': strategy_results, | |
| 'performance_summary': { | |
| strategy: result.get('average_retrieval_score', 0) | |
| for strategy, result in valid_strategies.items() | |
| } | |
| } | |
| else: | |
| recommendation = { | |
| 'recommended_strategy': 'recursive', # Default fallback | |
| 'reason': 'All strategies failed, using default', | |
| 'all_results': strategy_results | |
| } | |
| return recommendation | |
| except Exception as e: | |
| return {'error': f"Chunking optimization failed: {str(e)}"} | |
| def create_document_summary(self, documents: List[Document], llm, | |
| summary_type: str = "extractive") -> Dict[str, Any]: | |
| """ | |
| Create document summaries using the chunked content | |
| Args: | |
| documents (List[Document]): Documents to summarize | |
| llm: Language model for summarization | |
| summary_type (str): Type of summary ("extractive", "abstractive") | |
| Returns: | |
| Dict: Summary results | |
| """ | |
| try: | |
| # Chunk documents for better processing | |
| chunked_docs = self.chunk_documents(documents, strategy="semantic") | |
| if summary_type == "extractive": | |
| # Extract key sentences/chunks | |
| return self._create_extractive_summary(chunked_docs) | |
| else: | |
| # Generate abstractive summary using LLM | |
| return self._create_abstractive_summary(chunked_docs, llm) | |
| except Exception as e: | |
| return {'error': f"Document summarization failed: {str(e)}"} | |
| def _create_extractive_summary(self, chunked_docs: List[Document]) -> Dict[str, Any]: | |
| """Create extractive summary by selecting key chunks""" | |
| try: | |
| # Simple extractive approach: select chunks with highest semantic density | |
| chunk_scores = [] | |
| for doc in chunked_docs: | |
| content = doc.page_content | |
| # Simple scoring based on content characteristics | |
| word_count = len(content.split()) | |
| sentence_count = len([s for s in content.split('.') if s.strip()]) | |
| # Score based on information density | |
| density_score = word_count / max(sentence_count, 1) | |
| # Bonus for chunks with questions, definitions, or lists | |
| structure_bonus = 0 | |
| if '?' in content: | |
| structure_bonus += 1 | |
| if any(word in content.lower() for word in ['define', 'definition', 'means', 'refers to']): | |
| structure_bonus += 2 | |
| if content.count('\n•') > 0 or content.count('1.') > 0: | |
| structure_bonus += 1 | |
| total_score = density_score + structure_bonus | |
| chunk_scores.append((doc, total_score)) | |
| # Sort by score and select top chunks for summary | |
| chunk_scores.sort(key=lambda x: x[1], reverse=True) | |
| top_chunks = chunk_scores[:min(5, len(chunk_scores))] | |
| summary_content = [] | |
| for doc, score in top_chunks: | |
| summary_content.append({ | |
| 'content': doc.page_content, | |
| 'score': score, | |
| 'metadata': doc.metadata | |
| }) | |
| return { | |
| 'summary_type': 'extractive', | |
| 'key_chunks': summary_content, | |
| 'total_chunks_analyzed': len(chunked_docs), | |
| 'chunks_selected': len(top_chunks) | |
| } | |
| except Exception as e: | |
| return {'error': f"Extractive summary failed: {str(e)}"} | |
| def _create_abstractive_summary(self, chunked_docs: List[Document], llm) -> Dict[str, Any]: | |
| """Create abstractive summary using language model""" | |
| try: | |
| # Combine content from top chunks | |
| combined_content = "\n\n".join([doc.page_content for doc in chunked_docs[:10]]) | |
| summary_prompt = f"""Please provide a comprehensive summary of the following content. | |
| Focus on the main topics, key insights, and important details that would be valuable for AI search engines. | |
| Content: | |
| {combined_content[:5000]} | |
| Summary:""" | |
| from langchain.prompts import ChatPromptTemplate | |
| prompt_template = ChatPromptTemplate.from_messages([ | |
| ("system", "You are a professional content summarizer. Create clear, informative summaries."), | |
| ("user", summary_prompt) | |
| ]) | |
| chain = prompt_template | llm | |
| result = chain.invoke({}) | |
| summary_text = result.content if hasattr(result, 'content') else str(result) | |
| return { | |
| 'summary_type': 'abstractive', | |
| 'summary': summary_text, | |
| 'source_chunks': len(chunked_docs), | |
| 'content_length_processed': len(combined_content) | |
| } | |
| except Exception as e: | |
| return {'error': f"Abstractive summary failed: {str(e)}"} | |
| def save_vector_store(self, vector_store, directory_path: str, store_type: str = "faiss") -> bool: | |
| """ | |
| Save vector store to disk | |
| Args: | |
| vector_store: Vector store instance to save | |
| directory_path (str): Directory to save the store | |
| store_type (str): Type of vector store | |
| Returns: | |
| bool: Success status | |
| """ | |
| try: | |
| os.makedirs(directory_path, exist_ok=True) | |
| if store_type.lower() == "faiss": | |
| vector_store.save_local(directory_path) | |
| elif store_type.lower() == "chroma": | |
| # Chroma stores are typically persisted during creation | |
| pass | |
| return True | |
| except Exception as e: | |
| print(f"Failed to save vector store: {str(e)}") | |
| return False | |
| def load_vector_store(self, directory_path: str, store_type: str = "faiss"): | |
| """ | |
| Load vector store from disk | |
| Args: | |
| directory_path (str): Directory containing the saved store | |
| store_type (str): Type of vector store | |
| Returns: | |
| Vector store instance or None if failed | |
| """ | |
| try: | |
| if not os.path.exists(directory_path): | |
| return None | |
| if store_type.lower() == "faiss": | |
| vector_store = FAISS.load_local( | |
| directory_path, | |
| self.embeddings, | |
| allow_dangerous_deserialization=True | |
| ) | |
| return vector_store | |
| elif store_type.lower() == "chroma": | |
| vector_store = Chroma( | |
| persist_directory=directory_path, | |
| embedding_function=self.embeddings | |
| ) | |
| return vector_store | |
| return None | |
| except Exception as e: | |
| print(f"Failed to load vector store: {str(e)}") | |
| return None | |
| def get_chunking_stats(self, documents: List[Document], strategy: str = "recursive") -> Dict[str, Any]: | |
| """ | |
| Get detailed statistics about document chunking | |
| Args: | |
| documents (List[Document]): Documents to analyze | |
| strategy (str): Chunking strategy to use | |
| Returns: | |
| Dict: Detailed chunking statistics | |
| """ | |
| try: | |
| # Chunk documents | |
| chunked_docs = self.chunk_documents(documents, strategy=strategy) | |
| # Calculate statistics | |
| chunk_sizes = [len(doc.page_content) for doc in chunked_docs] | |
| word_counts = [len(doc.page_content.split()) for doc in chunked_docs] | |
| stats = { | |
| 'strategy_used': strategy, | |
| 'original_documents': len(documents), | |
| 'total_chunks': len(chunked_docs), | |
| 'chunk_size_stats': { | |
| 'min': min(chunk_sizes) if chunk_sizes else 0, | |
| 'max': max(chunk_sizes) if chunk_sizes else 0, | |
| 'mean': np.mean(chunk_sizes) if chunk_sizes else 0, | |
| 'median': np.median(chunk_sizes) if chunk_sizes else 0, | |
| 'std': np.std(chunk_sizes) if chunk_sizes else 0 | |
| }, | |
| 'word_count_stats': { | |
| 'min': min(word_counts) if word_counts else 0, | |
| 'max': max(word_counts) if word_counts else 0, | |
| 'mean': np.mean(word_counts) if word_counts else 0, | |
| 'median': np.median(word_counts) if word_counts else 0, | |
| 'std': np.std(word_counts) if word_counts else 0 | |
| }, | |
| 'chunk_distribution': { | |
| 'very_small': len([s for s in chunk_sizes if s < 200]), | |
| 'small': len([s for s in chunk_sizes if 200 <= s < 500]), | |
| 'medium': len([s for s in chunk_sizes if 500 <= s < 1000]), | |
| 'large': len([s for s in chunk_sizes if 1000 <= s < 2000]), | |
| 'very_large': len([s for s in chunk_sizes if s >= 2000]) | |
| }, | |
| 'overlap_efficiency': self._calculate_overlap_efficiency(chunked_docs), | |
| 'content_coverage': self._calculate_content_coverage(documents, chunked_docs) | |
| } | |
| return stats | |
| except Exception as e: | |
| return {'error': f"Chunking statistics failed: {str(e)}"} | |
| def _calculate_overlap_efficiency(self, chunked_docs: List[Document]) -> float: | |
| """Calculate efficiency of chunk overlaps""" | |
| try: | |
| if len(chunked_docs) < 2: | |
| return 1.0 | |
| total_content_length = sum(len(doc.page_content) for doc in chunked_docs) | |
| unique_content = set() | |
| # Rough estimate of content uniqueness | |
| for doc in chunked_docs: | |
| words = doc.page_content.split() | |
| for i in range(0, len(words), 10): # Sample every 10th word | |
| unique_content.add(' '.join(words[i:i+10])) | |
| # Efficiency as ratio of unique content to total content | |
| efficiency = len(unique_content) * 10 / total_content_length if total_content_length > 0 else 0 | |
| return min(efficiency, 1.0) | |
| except Exception: | |
| return 0.5 # Default neutral efficiency | |
| def _calculate_content_coverage(self, original_docs: List[Document], | |
| chunked_docs: List[Document]) -> float: | |
| """Calculate how well chunks cover original content""" | |
| try: | |
| original_content = ' '.join([doc.page_content for doc in original_docs]) | |
| chunked_content = ' '.join([doc.page_content for doc in chunked_docs]) | |
| # Simple coverage metric based on length | |
| coverage = len(chunked_content) / len(original_content) if original_content else 0 | |
| return min(coverage, 1.0) | |
| except Exception: | |
| return 0.0 | |
| class ChunkingOptimizer: | |
| """Helper class for optimizing chunking parameters""" | |
| def __init__(self, embeddings_model): | |
| self.embeddings = embeddings_model | |
| def optimize_chunk_size(self, documents: List[Document], test_queries: List[str], | |
| size_range: Tuple[int, int] = (200, 2000), | |
| step_size: int = 200) -> Dict[str, Any]: | |
| """ | |
| Find optimal chunk size for given documents and queries | |
| Args: | |
| documents (List[Document]): Documents to test | |
| test_queries (List[str]): Queries for testing retrieval | |
| size_range (Tuple[int, int]): Range of chunk sizes to test | |
| step_size (int): Step size for testing | |
| Returns: | |
| Dict: Optimization results with recommended chunk size | |
| """ | |
| try: | |
| results = {} | |
| min_size, max_size = size_range | |
| for chunk_size in range(min_size, max_size + 1, step_size): | |
| # Test this chunk size | |
| chunker = VectorChunker(self.embeddings, chunk_size=chunk_size) | |
| try: | |
| chunked_docs = chunker.chunk_documents(documents) | |
| vector_store = chunker.create_vector_store(chunked_docs) | |
| # Test retrieval performance | |
| retrieval_scores = [] | |
| for query in test_queries: | |
| search_results = vector_store.similarity_search_with_score(query, k=3) | |
| if search_results: | |
| avg_score = sum(score for _, score in search_results) / len(search_results) | |
| retrieval_scores.append(float(avg_score)) | |
| avg_performance = np.mean(retrieval_scores) if retrieval_scores else 0 | |
| results[chunk_size] = { | |
| 'average_retrieval_score': avg_performance, | |
| 'total_chunks': len(chunked_docs), | |
| 'retrieval_scores': retrieval_scores | |
| } | |
| except Exception as e: | |
| results[chunk_size] = {'error': str(e)} | |
| # Find optimal chunk size | |
| valid_results = {k: v for k, v in results.items() if 'error' not in v} | |
| if valid_results: | |
| optimal_size = max(valid_results.keys(), | |
| key=lambda k: valid_results[k]['average_retrieval_score']) | |
| return { | |
| 'optimal_chunk_size': optimal_size, | |
| 'optimal_performance': valid_results[optimal_size]['average_retrieval_score'], | |
| 'all_results': results, | |
| 'performance_trend': self._analyze_performance_trend(valid_results), | |
| 'recommendation': f"Use chunk size {optimal_size} for best retrieval performance" | |
| } | |
| else: | |
| return { | |
| 'error': 'No valid chunk sizes could be tested', | |
| 'all_results': results | |
| } | |
| except Exception as e: | |
| return {'error': f"Chunk size optimization failed: {str(e)}"} | |
| def _analyze_performance_trend(self, results: Dict[int, Dict[str, Any]]) -> Dict[str, Any]: | |
| """Analyze performance trend across different chunk sizes""" | |
| try: | |
| sizes = sorted(results.keys()) | |
| performances = [results[size]['average_retrieval_score'] for size in sizes] | |
| # Find trend direction | |
| if len(performances) >= 2: | |
| trend_direction = "increasing" if performances[-1] > performances[0] else "decreasing" | |
| peak_performance = max(performances) | |
| peak_size = sizes[performances.index(peak_performance)] | |
| return { | |
| 'trend_direction': trend_direction, | |
| 'peak_performance': peak_performance, | |
| 'peak_size': peak_size, | |
| 'performance_range': max(performances) - min(performances), | |
| 'stable_performance': max(performances) - min(performances) < 0.1 | |
| } | |
| else: | |
| return {'error': 'Insufficient data for trend analysis'} | |
| except Exception: | |
| return {'error': 'Trend analysis failed'} | |
| class RAGPipeline: | |
| """Complete RAG pipeline for document question-answering""" | |
| def __init__(self, embeddings_model, llm): | |
| self.embeddings = embeddings_model | |
| self.llm = llm | |
| self.chunker = VectorChunker(embeddings_model) | |
| self.vector_stores = {} | |
| self.qa_chains = {} | |
| def create_pipeline(self, documents: List[Document], pipeline_id: str, | |
| chunking_strategy: str = "semantic") -> Dict[str, Any]: | |
| """ | |
| Create a complete RAG pipeline for documents | |
| Args: | |
| documents (List[Document]): Documents to process | |
| pipeline_id (str): Unique identifier for this pipeline | |
| chunking_strategy (str): Strategy for document chunking | |
| Returns: | |
| Dict: Pipeline creation results | |
| """ | |
| try: | |
| # Step 1: Chunk documents | |
| chunked_docs = self.chunker.chunk_documents(documents, strategy=chunking_strategy) | |
| # Step 2: Create vector store | |
| vector_store = self.chunker.create_vector_store(chunked_docs, store_type="faiss") | |
| # Step 3: Create QA chain | |
| qa_chain = self.chunker.create_qa_chain(documents, self.llm) | |
| # Store pipeline components | |
| self.vector_stores[pipeline_id] = vector_store | |
| self.qa_chains[pipeline_id] = qa_chain | |
| # Pipeline statistics | |
| stats = { | |
| 'pipeline_id': pipeline_id, | |
| 'documents_processed': len(documents), | |
| 'chunks_created': len(chunked_docs), | |
| 'chunking_strategy': chunking_strategy, | |
| 'vector_store_type': 'faiss', | |
| 'embedding_model': str(self.embeddings), | |
| 'created_at': self._get_timestamp() | |
| } | |
| return { | |
| 'success': True, | |
| 'pipeline_stats': stats, | |
| 'chunking_info': self.chunker.get_chunking_stats(documents, chunking_strategy) | |
| } | |
| except Exception as e: | |
| return {'error': f"Pipeline creation failed: {str(e)}"} | |
| def query_pipeline(self, pipeline_id: str, query: str, | |
| return_sources: bool = True) -> Dict[str, Any]: | |
| """ | |
| Query a created RAG pipeline | |
| Args: | |
| pipeline_id (str): ID of the pipeline to query | |
| query (str): Question to ask | |
| return_sources (bool): Whether to return source documents | |
| Returns: | |
| Dict: Query results with answer and sources | |
| """ | |
| try: | |
| if pipeline_id not in self.qa_chains: | |
| return {'error': f"Pipeline '{pipeline_id}' not found"} | |
| qa_chain = self.qa_chains[pipeline_id] | |
| # Execute query | |
| result = qa_chain({"query": query}) | |
| # Format response | |
| response = { | |
| 'query': query, | |
| 'answer': result.get('result', 'No answer generated'), | |
| 'pipeline_id': pipeline_id, | |
| 'query_timestamp': self._get_timestamp() | |
| } | |
| # Add source documents if requested | |
| if return_sources and 'source_documents' in result: | |
| sources = [] | |
| for i, doc in enumerate(result['source_documents']): | |
| source = { | |
| 'source_index': i, | |
| 'content': doc.page_content, | |
| 'metadata': doc.metadata, | |
| 'relevance_rank': i + 1 | |
| } | |
| sources.append(source) | |
| response['sources'] = sources | |
| response['num_sources'] = len(sources) | |
| return response | |
| except Exception as e: | |
| return {'error': f"Pipeline query failed: {str(e)}"} | |
| def batch_query_pipeline(self, pipeline_id: str, queries: List[str]) -> List[Dict[str, Any]]: | |
| """ | |
| Execute multiple queries on a pipeline | |
| Args: | |
| pipeline_id (str): ID of the pipeline to query | |
| queries (List[str]): List of questions to ask | |
| Returns: | |
| List[Dict]: List of query results | |
| """ | |
| results = [] | |
| for i, query in enumerate(queries): | |
| try: | |
| result = self.query_pipeline(pipeline_id, query, return_sources=False) | |
| result['batch_index'] = i | |
| results.append(result) | |
| except Exception as e: | |
| results.append({ | |
| 'batch_index': i, | |
| 'query': query, | |
| 'error': f"Batch query failed: {str(e)}" | |
| }) | |
| return results | |
| def evaluate_pipeline(self, pipeline_id: str, test_queries: List[str], | |
| expected_answers: List[str] = None) -> Dict[str, Any]: | |
| """ | |
| Evaluate pipeline performance on test queries | |
| Args: | |
| pipeline_id (str): ID of the pipeline to evaluate | |
| test_queries (List[str]): Test questions | |
| expected_answers (List[str]): Optional expected answers for comparison | |
| Returns: | |
| Dict: Evaluation results | |
| """ | |
| try: | |
| if pipeline_id not in self.qa_chains: | |
| return {'error': f"Pipeline '{pipeline_id}' not found"} | |
| evaluation_results = [] | |
| response_times = [] | |
| for i, query in enumerate(test_queries): | |
| import time | |
| start_time = time.time() | |
| # Execute query | |
| result = self.query_pipeline(pipeline_id, query, return_sources=True) | |
| end_time = time.time() | |
| response_time = end_time - start_time | |
| response_times.append(response_time) | |
| # Evaluate result | |
| eval_result = { | |
| 'query_index': i, | |
| 'query': query, | |
| 'answer_generated': not result.get('error'), | |
| 'response_time': response_time, | |
| 'answer_length': len(result.get('answer', '')), | |
| 'sources_returned': result.get('num_sources', 0) | |
| } | |
| # If expected answer provided, calculate similarity | |
| if expected_answers and i < len(expected_answers): | |
| expected = expected_answers[i] | |
| generated = result.get('answer', '') | |
| # Simple similarity metric | |
| similarity = self._calculate_answer_similarity(expected, generated) | |
| eval_result['answer_similarity'] = similarity | |
| eval_result['expected_answer'] = expected | |
| evaluation_results.append(eval_result) | |
| # Calculate aggregate metrics | |
| successful_queries = len([r for r in evaluation_results if r['answer_generated']]) | |
| avg_response_time = np.mean(response_times) if response_times else 0 | |
| if expected_answers: | |
| similarities = [r.get('answer_similarity', 0) for r in evaluation_results | |
| if 'answer_similarity' in r] | |
| avg_similarity = np.mean(similarities) if similarities else 0 | |
| else: | |
| avg_similarity = None | |
| return { | |
| 'pipeline_id': pipeline_id, | |
| 'total_queries': len(test_queries), | |
| 'successful_queries': successful_queries, | |
| 'success_rate': successful_queries / len(test_queries) if test_queries else 0, | |
| 'average_response_time': avg_response_time, | |
| 'average_answer_similarity': avg_similarity, | |
| 'detailed_results': evaluation_results, | |
| 'evaluation_timestamp': self._get_timestamp() | |
| } | |
| except Exception as e: | |
| return {'error': f"Pipeline evaluation failed: {str(e)}"} | |
| def _calculate_answer_similarity(self, expected: str, generated: str) -> float: | |
| """Calculate similarity between expected and generated answers""" | |
| try: | |
| # Simple word overlap similarity | |
| expected_words = set(expected.lower().split()) | |
| generated_words = set(generated.lower().split()) | |
| if not expected_words and not generated_words: | |
| return 1.0 | |
| intersection = expected_words.intersection(generated_words) | |
| union = expected_words.union(generated_words) | |
| return len(intersection) / len(union) if union else 0.0 | |
| except Exception: | |
| return 0.0 | |
| def get_pipeline_info(self, pipeline_id: str) -> Dict[str, Any]: | |
| """Get information about a specific pipeline""" | |
| try: | |
| if pipeline_id not in self.qa_chains: | |
| return {'error': f"Pipeline '{pipeline_id}' not found"} | |
| # Get vector store info | |
| vector_store = self.vector_stores.get(pipeline_id) | |
| if vector_store: | |
| try: | |
| # Try to get vector store statistics | |
| total_vectors = vector_store.index.ntotal if hasattr(vector_store, 'index') else 'unknown' | |
| except: | |
| total_vectors = 'unknown' | |
| else: | |
| total_vectors = 'unknown' | |
| return { | |
| 'pipeline_id': pipeline_id, | |
| 'has_qa_chain': pipeline_id in self.qa_chains, | |
| 'has_vector_store': pipeline_id in self.vector_stores, | |
| 'total_vectors': total_vectors, | |
| 'embedding_model': str(self.embeddings), | |
| 'llm_model': str(self.llm) | |
| } | |
| except Exception as e: | |
| return {'error': f"Failed to get pipeline info: {str(e)}"} | |
| def list_pipelines(self) -> Dict[str, Any]: | |
| """List all created pipelines""" | |
| return { | |
| 'total_pipelines': len(self.qa_chains), | |
| 'pipeline_ids': list(self.qa_chains.keys()), | |
| 'vector_stores': list(self.vector_stores.keys()) | |
| } | |
| def delete_pipeline(self, pipeline_id: str) -> Dict[str, Any]: | |
| """Delete a pipeline and free resources""" | |
| try: | |
| deleted_components = [] | |
| if pipeline_id in self.qa_chains: | |
| del self.qa_chains[pipeline_id] | |
| deleted_components.append('qa_chain') | |
| if pipeline_id in self.vector_stores: | |
| del self.vector_stores[pipeline_id] | |
| deleted_components.append('vector_store') | |
| if deleted_components: | |
| return { | |
| 'success': True, | |
| 'pipeline_id': pipeline_id, | |
| 'deleted_components': deleted_components | |
| } | |
| else: | |
| return {'error': f"Pipeline '{pipeline_id}' not found"} | |
| except Exception as e: | |
| return {'error': f"Pipeline deletion failed: {str(e)}"} | |
| def export_pipeline_config(self, pipeline_id: str) -> Dict[str, Any]: | |
| """Export pipeline configuration for recreation""" | |
| try: | |
| if pipeline_id not in self.qa_chains: | |
| return {'error': f"Pipeline '{pipeline_id}' not found"} | |
| config = { | |
| 'pipeline_id': pipeline_id, | |
| 'embedding_model_name': getattr(self.embeddings, 'model_name', 'unknown'), | |
| 'llm_model_name': getattr(self.llm, 'model_name', 'unknown'), | |
| 'chunker_config': { | |
| 'chunk_size': self.chunker.chunk_size, | |
| 'chunk_overlap': self.chunker.chunk_overlap | |
| }, | |
| 'export_timestamp': self._get_timestamp(), | |
| 'vector_store_type': 'faiss' | |
| } | |
| return config | |
| except Exception as e: | |
| return {'error': f"Pipeline export failed: {str(e)}"} | |
| def _get_timestamp(self) -> str: | |
| """Get current timestamp""" | |
| from datetime import datetime | |
| return datetime.now().strftime('%Y-%m-%d %H:%M:%S') | |
| # Utility functions for the module | |
| def optimize_rag_pipeline(documents: List[Document], embeddings_model, llm, | |
| test_queries: List[str]) -> Dict[str, Any]: | |
| """ | |
| Optimize RAG pipeline configuration for given documents and queries | |
| Args: | |
| documents (List[Document]): Documents to optimize for | |
| embeddings_model: Embedding model to use | |
| llm: Language model to use | |
| test_queries (List[str]): Test queries for optimization | |
| Returns: | |
| Dict: Optimization recommendations | |
| """ | |
| try: | |
| # Test different chunking strategies | |
| chunker = VectorChunker(embeddings_model) | |
| chunking_results = chunker.optimize_chunking_strategy(documents, test_queries) | |
| # Test different chunk sizes | |
| optimizer = ChunkingOptimizer(embeddings_model) | |
| size_results = optimizer.optimize_chunk_size(documents, test_queries) | |
| # Create optimized pipeline | |
| best_strategy = chunking_results.get('recommended_strategy', 'semantic') | |
| best_size = size_results.get('optimal_chunk_size', 1000) | |
| # Create optimized chunker | |
| optimized_chunker = VectorChunker( | |
| embeddings_model, | |
| chunk_size=best_size, | |
| chunk_overlap=best_size // 5 # 20% overlap | |
| ) | |
| # Test the optimized configuration | |
| pipeline = RAGPipeline(embeddings_model, llm) | |
| pipeline.chunker = optimized_chunker | |
| test_pipeline_id = "optimization_test" | |
| creation_result = pipeline.create_pipeline(documents, test_pipeline_id, best_strategy) | |
| if not creation_result.get('error'): | |
| evaluation_result = pipeline.evaluate_pipeline(test_pipeline_id, test_queries) | |
| pipeline.delete_pipeline(test_pipeline_id) # Clean up | |
| else: | |
| evaluation_result = {'error': 'Could not evaluate optimized pipeline'} | |
| return { | |
| 'optimization_complete': True, | |
| 'recommended_config': { | |
| 'chunking_strategy': best_strategy, | |
| 'chunk_size': best_size, | |
| 'chunk_overlap': best_size // 5 | |
| }, | |
| 'chunking_optimization': chunking_results, | |
| 'size_optimization': size_results, | |
| 'performance_evaluation': evaluation_result, | |
| 'recommendations': [ | |
| f"Use {best_strategy} chunking strategy", | |
| f"Set chunk size to {best_size} characters", | |
| f"Use {best_size // 5} character overlap", | |
| "Monitor and adjust based on query performance" | |
| ] | |
| } | |
| except Exception as e: | |
| return {'error': f"RAG optimization failed: {str(e)}"} | |
| def create_demo_rag_system(sample_documents: List[Document], embeddings_model, llm) -> Dict[str, Any]: | |
| """ | |
| Create a demonstration RAG system with sample documents | |
| Args: | |
| sample_documents (List[Document]): Sample documents for demo | |
| embeddings_model: Embedding model | |
| llm: Language model | |
| Returns: | |
| Dict: Demo system information and sample interactions | |
| """ | |
| try: | |
| # Create RAG pipeline | |
| pipeline = RAGPipeline(embeddings_model, llm) | |
| demo_id = "demo_system" | |
| # Create the pipeline | |
| creation_result = pipeline.create_pipeline(sample_documents, demo_id, "semantic") | |
| if creation_result.get('error'): | |
| return {'error': f"Demo system creation failed: {creation_result['error']}"} | |
| # Sample queries for demonstration | |
| demo_queries = [ | |
| "What is the main topic of these documents?", | |
| "Can you summarize the key points?", | |
| "What are the most important concepts mentioned?" | |
| ] | |
| # Execute demo queries | |
| demo_results = [] | |
| for query in demo_queries: | |
| result = pipeline.query_pipeline(demo_id, query, return_sources=True) | |
| demo_results.append(result) | |
| # Get system statistics | |
| pipeline_info = pipeline.get_pipeline_info(demo_id) | |
| return { | |
| 'demo_system_created': True, | |
| 'pipeline_id': demo_id, | |
| 'creation_stats': creation_result, | |
| 'pipeline_info': pipeline_info, | |
| 'demo_queries': demo_queries, | |
| 'demo_results': demo_results, | |
| 'usage_instructions': [ | |
| f"Use pipeline.query_pipeline('{demo_id}', 'your question') to ask questions", | |
| "The system will return answers with source document references", | |
| "Sources show which parts of the documents were used for the answer" | |
| ] | |
| } | |
| except Exception as e: | |
| return {'error': f"Demo system creation failed: {str(e)}"} | |
| # Export the main classes for use in other modules | |
| __all__ = [ | |
| 'VectorChunker', | |
| 'ChunkingOptimizer', | |
| 'RAGPipeline', | |
| 'optimize_rag_pipeline', | |
| 'create_demo_rag_system' | |
| ] |