| """
|
| Vector Chunking and RAG Module
|
| Handles document chunking, vector embeddings, and RAG question-answering
|
| """
|
|
|
| import os
|
| import json
|
| import numpy as np
|
| from typing import Dict, Any, List, Optional, Tuple
|
| from langchain.text_splitter import RecursiveCharacterTextSplitter, CharacterTextSplitter
|
| from langchain.schema import Document
|
| from langchain_community.vectorstores import FAISS, Chroma
|
| from langchain.chains import RetrievalQA, ConversationalRetrievalChain
|
| from langchain.memory import ConversationBufferMemory
|
| from langchain.prompts import PromptTemplate
|
| import tempfile
|
| import shutil
|
|
|
|
|
| class VectorChunker:
|
| """Main class for document chunking and vector operations"""
|
|
|
| def __init__(self, embeddings_model, chunk_size: int = 1000, chunk_overlap: int = 200):
|
| self.embeddings = embeddings_model
|
| self.chunk_size = chunk_size
|
| self.chunk_overlap = chunk_overlap
|
| self.setup_text_splitters()
|
| self.vector_stores = {}
|
|
|
| def setup_text_splitters(self):
|
| """Initialize different text splitting strategies"""
|
|
|
|
|
| self.recursive_splitter = RecursiveCharacterTextSplitter(
|
| chunk_size=self.chunk_size,
|
| chunk_overlap=self.chunk_overlap,
|
| length_function=len,
|
| separators=["\n\n", "\n", " ", ""]
|
| )
|
|
|
|
|
| self.character_splitter = CharacterTextSplitter(
|
| chunk_size=self.chunk_size,
|
| chunk_overlap=self.chunk_overlap,
|
| separator="\n\n"
|
| )
|
|
|
|
|
| self.semantic_splitter = RecursiveCharacterTextSplitter(
|
| chunk_size=800,
|
| chunk_overlap=150,
|
| length_function=len,
|
| separators=["\n\n", "\n", ". ", " ", ""]
|
| )
|
|
|
| def chunk_documents(self, documents: List[Document], strategy: str = "recursive") -> List[Document]:
|
| """
|
| Chunk documents using specified strategy
|
|
|
| Args:
|
| documents (List[Document]): List of documents to chunk
|
| strategy (str): Chunking strategy ("recursive", "character", "semantic")
|
|
|
| Returns:
|
| List[Document]: List of chunked documents
|
| """
|
| try:
|
|
|
| if strategy == "character":
|
| splitter = self.character_splitter
|
| elif strategy == "semantic":
|
| splitter = self.semantic_splitter
|
| else:
|
| splitter = self.recursive_splitter
|
|
|
|
|
| chunked_docs = []
|
|
|
| for doc in documents:
|
| chunks = splitter.split_documents([doc])
|
|
|
|
|
| for i, chunk in enumerate(chunks):
|
| chunk.metadata.update({
|
| 'chunk_index': i,
|
| 'total_chunks': len(chunks),
|
| 'chunk_strategy': strategy,
|
| 'original_source': doc.metadata.get('source', 'unknown'),
|
| 'chunk_size': len(chunk.page_content),
|
| 'chunk_word_count': len(chunk.page_content.split())
|
| })
|
|
|
| chunked_docs.extend(chunks)
|
|
|
| return chunked_docs
|
|
|
| except Exception as e:
|
| raise Exception(f"Document chunking failed: {str(e)}")
|
|
|
| def create_vector_store(self, documents: List[Document], store_type: str = "faiss",
|
| persist_directory: Optional[str] = None) -> Any:
|
| """
|
| Create vector store from documents
|
|
|
| Args:
|
| documents (List[Document]): Documents to vectorize
|
| store_type (str): Type of vector store ("faiss", "chroma")
|
| persist_directory (str): Optional directory to persist the store
|
|
|
| Returns:
|
| Vector store instance
|
| """
|
| try:
|
| if not documents:
|
| raise ValueError("No documents provided for vector store creation")
|
|
|
| if store_type.lower() == "chroma":
|
| if persist_directory:
|
| vector_store = Chroma.from_documents(
|
| documents=documents,
|
| embedding=self.embeddings,
|
| persist_directory=persist_directory
|
| )
|
| vector_store.persist()
|
| else:
|
| vector_store = Chroma.from_documents(
|
| documents=documents,
|
| embedding=self.embeddings
|
| )
|
| else:
|
| vector_store = FAISS.from_documents(
|
| documents=documents,
|
| embedding=self.embeddings
|
| )
|
|
|
|
|
| if persist_directory:
|
| os.makedirs(persist_directory, exist_ok=True)
|
| vector_store.save_local(persist_directory)
|
|
|
| return vector_store
|
|
|
| except Exception as e:
|
| raise Exception(f"Vector store creation failed: {str(e)}")
|
|
|
| def create_qa_chain(self, documents: List[Document], llm, chain_type: str = "stuff") -> RetrievalQA:
|
| """
|
| Create a Question-Answering chain from documents
|
|
|
| Args:
|
| documents (List[Document]): Documents for the knowledge base
|
| llm: Language model for answering questions
|
| chain_type (str): Type of QA chain ("stuff", "map_reduce", "refine")
|
|
|
| Returns:
|
| RetrievalQA: Configured QA chain
|
| """
|
| try:
|
|
|
| chunked_docs = self.chunk_documents(documents, strategy="semantic")
|
|
|
|
|
| vector_store = self.create_vector_store(chunked_docs, store_type="faiss")
|
|
|
|
|
| retriever = vector_store.as_retriever(
|
| search_type="similarity",
|
| search_kwargs={"k": 4}
|
| )
|
|
|
|
|
| qa_prompt_template = """Use the following pieces of context to answer the question at the end.
|
| If you don't know the answer, just say that you don't know, don't try to make up an answer.
|
| Focus on providing clear, accurate, and complete answers that would be suitable for AI search engines.
|
|
|
| Context:
|
| {context}
|
|
|
| Question: {question}
|
|
|
| Answer:"""
|
|
|
| qa_prompt = PromptTemplate(
|
| template=qa_prompt_template,
|
| input_variables=["context", "question"]
|
| )
|
|
|
|
|
| qa_chain = RetrievalQA.from_chain_type(
|
| llm=llm,
|
| chain_type=chain_type,
|
| retriever=retriever,
|
| return_source_documents=True,
|
| chain_type_kwargs={"prompt": qa_prompt}
|
| )
|
|
|
| return qa_chain
|
|
|
| except Exception as e:
|
| raise Exception(f"QA chain creation failed: {str(e)}")
|
|
|
| def create_conversational_chain(self, documents: List[Document], llm) -> ConversationalRetrievalChain:
|
| """
|
| Create a conversational retrieval chain with memory
|
|
|
| Args:
|
| documents (List[Document]): Documents for the knowledge base
|
| llm: Language model for conversation
|
|
|
| Returns:
|
| ConversationalRetrievalChain: Configured conversational chain
|
| """
|
| try:
|
|
|
| chunked_docs = self.chunk_documents(documents, strategy="semantic")
|
|
|
|
|
| vector_store = self.create_vector_store(chunked_docs, store_type="faiss")
|
|
|
|
|
| retriever = vector_store.as_retriever(
|
| search_type="similarity",
|
| search_kwargs={"k": 3}
|
| )
|
|
|
|
|
| memory = ConversationBufferMemory(
|
| memory_key="chat_history",
|
| return_messages=True,
|
| output_key="answer"
|
| )
|
|
|
|
|
| condense_question_prompt = """Given the following conversation and a follow up question,
|
| rephrase the follow up question to be a standalone question that can be understood without the chat history.
|
|
|
| Chat History:
|
| {chat_history}
|
| Follow Up Input: {question}
|
| Standalone question:"""
|
|
|
|
|
| conv_chain = ConversationalRetrievalChain.from_llm(
|
| llm=llm,
|
| retriever=retriever,
|
| memory=memory,
|
| return_source_documents=True,
|
| condense_question_prompt=PromptTemplate.from_template(condense_question_prompt)
|
| )
|
|
|
| return conv_chain
|
|
|
| except Exception as e:
|
| raise Exception(f"Conversational chain creation failed: {str(e)}")
|
|
|
| def semantic_search(self, query: str, documents: List[Document], top_k: int = 5) -> List[Dict[str, Any]]:
|
| """
|
| Perform semantic search on documents
|
|
|
| Args:
|
| query (str): Search query
|
| documents (List[Document]): Documents to search
|
| top_k (int): Number of top results to return
|
|
|
| Returns:
|
| List[Dict]: Search results with scores
|
| """
|
| try:
|
|
|
| chunked_docs = self.chunk_documents(documents, strategy="semantic")
|
|
|
|
|
| vector_store = self.create_vector_store(chunked_docs, store_type="faiss")
|
|
|
|
|
| results = vector_store.similarity_search_with_score(query, k=top_k)
|
|
|
|
|
| formatted_results = []
|
| for doc, score in results:
|
| result = {
|
| 'content': doc.page_content,
|
| 'metadata': doc.metadata,
|
| 'similarity_score': float(score),
|
| 'relevance_rank': len(formatted_results) + 1
|
| }
|
| formatted_results.append(result)
|
|
|
| return formatted_results
|
|
|
| except Exception as e:
|
| raise Exception(f"Semantic search failed: {str(e)}")
|
|
|
| def analyze_document_similarity(self, documents: List[Document]) -> Dict[str, Any]:
|
| """
|
| Analyze similarity between documents
|
|
|
| Args:
|
| documents (List[Document]): Documents to analyze
|
|
|
| Returns:
|
| Dict: Similarity analysis results
|
| """
|
| try:
|
| if len(documents) < 2:
|
| return {'error': 'Need at least 2 documents for similarity analysis'}
|
|
|
|
|
| chunked_docs = self.chunk_documents(documents, strategy="semantic")
|
|
|
|
|
| doc_embeddings = []
|
| doc_metadata = []
|
|
|
| for doc in chunked_docs:
|
|
|
| embedding = self.embeddings.embed_query(doc.page_content)
|
| doc_embeddings.append(embedding)
|
| doc_metadata.append({
|
| 'content_preview': doc.page_content[:200] + "...",
|
| 'metadata': doc.metadata,
|
| 'length': len(doc.page_content)
|
| })
|
|
|
|
|
| similarities = []
|
| embeddings_array = np.array(doc_embeddings)
|
|
|
| for i in range(len(embeddings_array)):
|
| for j in range(i + 1, len(embeddings_array)):
|
|
|
| similarity = np.dot(embeddings_array[i], embeddings_array[j]) / (
|
| np.linalg.norm(embeddings_array[i]) * np.linalg.norm(embeddings_array[j])
|
| )
|
|
|
| similarities.append({
|
| 'doc_1_index': i,
|
| 'doc_2_index': j,
|
| 'similarity_score': float(similarity),
|
| 'doc_1_preview': doc_metadata[i]['content_preview'],
|
| 'doc_2_preview': doc_metadata[j]['content_preview']
|
| })
|
|
|
|
|
| similarities.sort(key=lambda x: x['similarity_score'], reverse=True)
|
|
|
|
|
| similarity_scores = [s['similarity_score'] for s in similarities]
|
|
|
| return {
|
| 'total_comparisons': len(similarities),
|
| 'average_similarity': np.mean(similarity_scores),
|
| 'max_similarity': max(similarity_scores),
|
| 'min_similarity': min(similarity_scores),
|
| 'similarity_distribution': {
|
| 'high_similarity': len([s for s in similarity_scores if s > 0.8]),
|
| 'medium_similarity': len([s for s in similarity_scores if 0.5 < s <= 0.8]),
|
| 'low_similarity': len([s for s in similarity_scores if s <= 0.5])
|
| },
|
| 'top_similar_pairs': similarities[:5],
|
| 'most_dissimilar_pairs': similarities[-3:]
|
| }
|
|
|
| except Exception as e:
|
| return {'error': f"Similarity analysis failed: {str(e)}"}
|
|
|
| def extract_key_passages(self, documents: List[Document], queries: List[str],
|
| passages_per_query: int = 3) -> Dict[str, List[Dict[str, Any]]]:
|
| """
|
| Extract key passages from documents based on multiple queries
|
|
|
| Args:
|
| documents (List[Document]): Documents to search
|
| queries (List[str]): List of queries to search for
|
| passages_per_query (int): Number of passages to extract per query
|
|
|
| Returns:
|
| Dict: Key passages organized by query
|
| """
|
| try:
|
|
|
| chunked_docs = self.chunk_documents(documents, strategy="semantic")
|
|
|
|
|
| vector_store = self.create_vector_store(chunked_docs, store_type="faiss")
|
|
|
| key_passages = {}
|
|
|
| for query in queries:
|
|
|
| results = vector_store.similarity_search_with_score(query, k=passages_per_query)
|
|
|
| passages = []
|
| for doc, score in results:
|
| passage = {
|
| 'content': doc.page_content,
|
| 'relevance_score': float(score),
|
| 'metadata': doc.metadata,
|
| 'word_count': len(doc.page_content.split()),
|
| 'query_match': query
|
| }
|
| passages.append(passage)
|
|
|
| key_passages[query] = passages
|
|
|
| return key_passages
|
|
|
| except Exception as e:
|
| return {'error': f"Key passage extraction failed: {str(e)}"}
|
|
|
| def optimize_chunking_strategy(self, documents: List[Document],
|
| test_queries: List[str]) -> Dict[str, Any]:
|
| """
|
| Test different chunking strategies and recommend the best one
|
|
|
| Args:
|
| documents (List[Document]): Documents to test
|
| test_queries (List[str]): Queries to test retrieval performance
|
|
|
| Returns:
|
| Dict: Optimization results and recommendations
|
| """
|
| try:
|
| strategies = ["recursive", "character", "semantic"]
|
| strategy_results = {}
|
|
|
| for strategy in strategies:
|
| try:
|
|
|
| chunked_docs = self.chunk_documents(documents, strategy=strategy)
|
| vector_store = self.create_vector_store(chunked_docs, store_type="faiss")
|
|
|
|
|
| retrieval_scores = []
|
|
|
| for query in test_queries:
|
| results = vector_store.similarity_search_with_score(query, k=3)
|
|
|
|
|
| if results:
|
| avg_score = sum(score for _, score in results) / len(results)
|
| retrieval_scores.append(float(avg_score))
|
|
|
|
|
| avg_retrieval_score = np.mean(retrieval_scores) if retrieval_scores else 0
|
| total_chunks = len(chunked_docs)
|
| avg_chunk_size = np.mean([len(doc.page_content) for doc in chunked_docs])
|
|
|
| strategy_results[strategy] = {
|
| 'average_retrieval_score': avg_retrieval_score,
|
| 'total_chunks': total_chunks,
|
| 'average_chunk_size': avg_chunk_size,
|
| 'retrieval_scores': retrieval_scores,
|
| 'chunk_size_distribution': {
|
| 'min': min(len(doc.page_content) for doc in chunked_docs),
|
| 'max': max(len(doc.page_content) for doc in chunked_docs),
|
| 'std': float(np.std([len(doc.page_content) for doc in chunked_docs]))
|
| }
|
| }
|
|
|
| except Exception as e:
|
| strategy_results[strategy] = {'error': f"Strategy test failed: {str(e)}"}
|
|
|
|
|
| valid_strategies = {k: v for k, v in strategy_results.items() if 'error' not in v}
|
|
|
| if valid_strategies:
|
| best_strategy = max(valid_strategies.keys(),
|
| key=lambda k: valid_strategies[k]['average_retrieval_score'])
|
|
|
| recommendation = {
|
| 'recommended_strategy': best_strategy,
|
| 'reason': f"Best average retrieval score: {valid_strategies[best_strategy]['average_retrieval_score']:.4f}",
|
| 'all_results': strategy_results,
|
| 'performance_summary': {
|
| strategy: result.get('average_retrieval_score', 0)
|
| for strategy, result in valid_strategies.items()
|
| }
|
| }
|
| else:
|
| recommendation = {
|
| 'recommended_strategy': 'recursive',
|
| 'reason': 'All strategies failed, using default',
|
| 'all_results': strategy_results
|
| }
|
|
|
| return recommendation
|
|
|
| except Exception as e:
|
| return {'error': f"Chunking optimization failed: {str(e)}"}
|
|
|
| def create_document_summary(self, documents: List[Document], llm,
|
| summary_type: str = "extractive") -> Dict[str, Any]:
|
| """
|
| Create document summaries using the chunked content
|
|
|
| Args:
|
| documents (List[Document]): Documents to summarize
|
| llm: Language model for summarization
|
| summary_type (str): Type of summary ("extractive", "abstractive")
|
|
|
| Returns:
|
| Dict: Summary results
|
| """
|
| try:
|
|
|
| chunked_docs = self.chunk_documents(documents, strategy="semantic")
|
|
|
| if summary_type == "extractive":
|
|
|
| return self._create_extractive_summary(chunked_docs)
|
| else:
|
|
|
| return self._create_abstractive_summary(chunked_docs, llm)
|
|
|
| except Exception as e:
|
| return {'error': f"Document summarization failed: {str(e)}"}
|
|
|
| def _create_extractive_summary(self, chunked_docs: List[Document]) -> Dict[str, Any]:
|
| """Create extractive summary by selecting key chunks"""
|
| try:
|
|
|
| chunk_scores = []
|
|
|
| for doc in chunked_docs:
|
| content = doc.page_content
|
|
|
| word_count = len(content.split())
|
| sentence_count = len([s for s in content.split('.') if s.strip()])
|
|
|
|
|
| density_score = word_count / max(sentence_count, 1)
|
|
|
|
|
| structure_bonus = 0
|
| if '?' in content:
|
| structure_bonus += 1
|
| if any(word in content.lower() for word in ['define', 'definition', 'means', 'refers to']):
|
| structure_bonus += 2
|
| if content.count('\n•') > 0 or content.count('1.') > 0:
|
| structure_bonus += 1
|
|
|
| total_score = density_score + structure_bonus
|
| chunk_scores.append((doc, total_score))
|
|
|
|
|
| chunk_scores.sort(key=lambda x: x[1], reverse=True)
|
| top_chunks = chunk_scores[:min(5, len(chunk_scores))]
|
|
|
| summary_content = []
|
| for doc, score in top_chunks:
|
| summary_content.append({
|
| 'content': doc.page_content,
|
| 'score': score,
|
| 'metadata': doc.metadata
|
| })
|
|
|
| return {
|
| 'summary_type': 'extractive',
|
| 'key_chunks': summary_content,
|
| 'total_chunks_analyzed': len(chunked_docs),
|
| 'chunks_selected': len(top_chunks)
|
| }
|
|
|
| except Exception as e:
|
| return {'error': f"Extractive summary failed: {str(e)}"}
|
|
|
| def _create_abstractive_summary(self, chunked_docs: List[Document], llm) -> Dict[str, Any]:
|
| """Create abstractive summary using language model"""
|
| try:
|
|
|
| combined_content = "\n\n".join([doc.page_content for doc in chunked_docs[:10]])
|
|
|
| summary_prompt = f"""Please provide a comprehensive summary of the following content.
|
| Focus on the main topics, key insights, and important details that would be valuable for AI search engines.
|
|
|
| Content:
|
| {combined_content[:5000]}
|
|
|
| Summary:"""
|
|
|
| from langchain.prompts import ChatPromptTemplate
|
|
|
| prompt_template = ChatPromptTemplate.from_messages([
|
| ("system", "You are a professional content summarizer. Create clear, informative summaries."),
|
| ("user", summary_prompt)
|
| ])
|
|
|
| chain = prompt_template | llm
|
| result = chain.invoke({})
|
|
|
| summary_text = result.content if hasattr(result, 'content') else str(result)
|
|
|
| return {
|
| 'summary_type': 'abstractive',
|
| 'summary': summary_text,
|
| 'source_chunks': len(chunked_docs),
|
| 'content_length_processed': len(combined_content)
|
| }
|
|
|
| except Exception as e:
|
| return {'error': f"Abstractive summary failed: {str(e)}"}
|
|
|
| def save_vector_store(self, vector_store, directory_path: str, store_type: str = "faiss") -> bool:
|
| """
|
| Save vector store to disk
|
|
|
| Args:
|
| vector_store: Vector store instance to save
|
| directory_path (str): Directory to save the store
|
| store_type (str): Type of vector store
|
|
|
| Returns:
|
| bool: Success status
|
| """
|
| try:
|
| os.makedirs(directory_path, exist_ok=True)
|
|
|
| if store_type.lower() == "faiss":
|
| vector_store.save_local(directory_path)
|
| elif store_type.lower() == "chroma":
|
|
|
| pass
|
|
|
| return True
|
|
|
| except Exception as e:
|
| print(f"Failed to save vector store: {str(e)}")
|
| return False
|
|
|
| def load_vector_store(self, directory_path: str, store_type: str = "faiss"):
|
| """
|
| Load vector store from disk
|
|
|
| Args:
|
| directory_path (str): Directory containing the saved store
|
| store_type (str): Type of vector store
|
|
|
| Returns:
|
| Vector store instance or None if failed
|
| """
|
| try:
|
| if not os.path.exists(directory_path):
|
| return None
|
|
|
| if store_type.lower() == "faiss":
|
| vector_store = FAISS.load_local(
|
| directory_path,
|
| self.embeddings,
|
| allow_dangerous_deserialization=True
|
| )
|
| return vector_store
|
| elif store_type.lower() == "chroma":
|
| vector_store = Chroma(
|
| persist_directory=directory_path,
|
| embedding_function=self.embeddings
|
| )
|
| return vector_store
|
|
|
| return None
|
|
|
| except Exception as e:
|
| print(f"Failed to load vector store: {str(e)}")
|
| return None
|
|
|
| def get_chunking_stats(self, documents: List[Document], strategy: str = "recursive") -> Dict[str, Any]:
|
| """
|
| Get detailed statistics about document chunking
|
|
|
| Args:
|
| documents (List[Document]): Documents to analyze
|
| strategy (str): Chunking strategy to use
|
|
|
| Returns:
|
| Dict: Detailed chunking statistics
|
| """
|
| try:
|
|
|
| chunked_docs = self.chunk_documents(documents, strategy=strategy)
|
|
|
|
|
| chunk_sizes = [len(doc.page_content) for doc in chunked_docs]
|
| word_counts = [len(doc.page_content.split()) for doc in chunked_docs]
|
|
|
| stats = {
|
| 'strategy_used': strategy,
|
| 'original_documents': len(documents),
|
| 'total_chunks': len(chunked_docs),
|
| 'chunk_size_stats': {
|
| 'min': min(chunk_sizes) if chunk_sizes else 0,
|
| 'max': max(chunk_sizes) if chunk_sizes else 0,
|
| 'mean': np.mean(chunk_sizes) if chunk_sizes else 0,
|
| 'median': np.median(chunk_sizes) if chunk_sizes else 0,
|
| 'std': np.std(chunk_sizes) if chunk_sizes else 0
|
| },
|
| 'word_count_stats': {
|
| 'min': min(word_counts) if word_counts else 0,
|
| 'max': max(word_counts) if word_counts else 0,
|
| 'mean': np.mean(word_counts) if word_counts else 0,
|
| 'median': np.median(word_counts) if word_counts else 0,
|
| 'std': np.std(word_counts) if word_counts else 0
|
| },
|
| 'chunk_distribution': {
|
| 'very_small': len([s for s in chunk_sizes if s < 200]),
|
| 'small': len([s for s in chunk_sizes if 200 <= s < 500]),
|
| 'medium': len([s for s in chunk_sizes if 500 <= s < 1000]),
|
| 'large': len([s for s in chunk_sizes if 1000 <= s < 2000]),
|
| 'very_large': len([s for s in chunk_sizes if s >= 2000])
|
| },
|
| 'overlap_efficiency': self._calculate_overlap_efficiency(chunked_docs),
|
| 'content_coverage': self._calculate_content_coverage(documents, chunked_docs)
|
| }
|
|
|
| return stats
|
|
|
| except Exception as e:
|
| return {'error': f"Chunking statistics failed: {str(e)}"}
|
|
|
| def _calculate_overlap_efficiency(self, chunked_docs: List[Document]) -> float:
|
| """Calculate efficiency of chunk overlaps"""
|
| try:
|
| if len(chunked_docs) < 2:
|
| return 1.0
|
|
|
| total_content_length = sum(len(doc.page_content) for doc in chunked_docs)
|
| unique_content = set()
|
|
|
|
|
| for doc in chunked_docs:
|
| words = doc.page_content.split()
|
| for i in range(0, len(words), 10):
|
| unique_content.add(' '.join(words[i:i+10]))
|
|
|
|
|
| efficiency = len(unique_content) * 10 / total_content_length if total_content_length > 0 else 0
|
| return min(efficiency, 1.0)
|
|
|
| except Exception:
|
| return 0.5
|
|
|
| def _calculate_content_coverage(self, original_docs: List[Document],
|
| chunked_docs: List[Document]) -> float:
|
| """Calculate how well chunks cover original content"""
|
| try:
|
| original_content = ' '.join([doc.page_content for doc in original_docs])
|
| chunked_content = ' '.join([doc.page_content for doc in chunked_docs])
|
|
|
|
|
| coverage = len(chunked_content) / len(original_content) if original_content else 0
|
| return min(coverage, 1.0)
|
|
|
| except Exception:
|
| return 0.0
|
|
|
|
|
| class ChunkingOptimizer:
|
| """Helper class for optimizing chunking parameters"""
|
|
|
| def __init__(self, embeddings_model):
|
| self.embeddings = embeddings_model
|
|
|
| def optimize_chunk_size(self, documents: List[Document], test_queries: List[str],
|
| size_range: Tuple[int, int] = (200, 2000),
|
| step_size: int = 200) -> Dict[str, Any]:
|
| """
|
| Find optimal chunk size for given documents and queries
|
|
|
| Args:
|
| documents (List[Document]): Documents to test
|
| test_queries (List[str]): Queries for testing retrieval
|
| size_range (Tuple[int, int]): Range of chunk sizes to test
|
| step_size (int): Step size for testing
|
|
|
| Returns:
|
| Dict: Optimization results with recommended chunk size
|
| """
|
| try:
|
| results = {}
|
| min_size, max_size = size_range
|
|
|
| for chunk_size in range(min_size, max_size + 1, step_size):
|
|
|
| chunker = VectorChunker(self.embeddings, chunk_size=chunk_size)
|
|
|
| try:
|
| chunked_docs = chunker.chunk_documents(documents)
|
| vector_store = chunker.create_vector_store(chunked_docs)
|
|
|
|
|
| retrieval_scores = []
|
| for query in test_queries:
|
| search_results = vector_store.similarity_search_with_score(query, k=3)
|
| if search_results:
|
| avg_score = sum(score for _, score in search_results) / len(search_results)
|
| retrieval_scores.append(float(avg_score))
|
|
|
| avg_performance = np.mean(retrieval_scores) if retrieval_scores else 0
|
|
|
| results[chunk_size] = {
|
| 'average_retrieval_score': avg_performance,
|
| 'total_chunks': len(chunked_docs),
|
| 'retrieval_scores': retrieval_scores
|
| }
|
|
|
| except Exception as e:
|
| results[chunk_size] = {'error': str(e)}
|
|
|
|
|
| valid_results = {k: v for k, v in results.items() if 'error' not in v}
|
|
|
| if valid_results:
|
| optimal_size = max(valid_results.keys(),
|
| key=lambda k: valid_results[k]['average_retrieval_score'])
|
|
|
| return {
|
| 'optimal_chunk_size': optimal_size,
|
| 'optimal_performance': valid_results[optimal_size]['average_retrieval_score'],
|
| 'all_results': results,
|
| 'performance_trend': self._analyze_performance_trend(valid_results),
|
| 'recommendation': f"Use chunk size {optimal_size} for best retrieval performance"
|
| }
|
| else:
|
| return {
|
| 'error': 'No valid chunk sizes could be tested',
|
| 'all_results': results
|
| }
|
|
|
| except Exception as e:
|
| return {'error': f"Chunk size optimization failed: {str(e)}"}
|
|
|
| def _analyze_performance_trend(self, results: Dict[int, Dict[str, Any]]) -> Dict[str, Any]:
|
| """Analyze performance trend across different chunk sizes"""
|
| try:
|
| sizes = sorted(results.keys())
|
| performances = [results[size]['average_retrieval_score'] for size in sizes]
|
|
|
|
|
| if len(performances) >= 2:
|
| trend_direction = "increasing" if performances[-1] > performances[0] else "decreasing"
|
| peak_performance = max(performances)
|
| peak_size = sizes[performances.index(peak_performance)]
|
|
|
| return {
|
| 'trend_direction': trend_direction,
|
| 'peak_performance': peak_performance,
|
| 'peak_size': peak_size,
|
| 'performance_range': max(performances) - min(performances),
|
| 'stable_performance': max(performances) - min(performances) < 0.1
|
| }
|
| else:
|
| return {'error': 'Insufficient data for trend analysis'}
|
|
|
| except Exception:
|
| return {'error': 'Trend analysis failed'}
|
|
|
|
|
| class RAGPipeline:
|
| """Complete RAG pipeline for document question-answering"""
|
|
|
| def __init__(self, embeddings_model, llm):
|
| self.embeddings = embeddings_model
|
| self.llm = llm
|
| self.chunker = VectorChunker(embeddings_model)
|
| self.vector_stores = {}
|
| self.qa_chains = {}
|
|
|
| def create_pipeline(self, documents: List[Document], pipeline_id: str,
|
| chunking_strategy: str = "semantic") -> Dict[str, Any]:
|
| """
|
| Create a complete RAG pipeline for documents
|
|
|
| Args:
|
| documents (List[Document]): Documents to process
|
| pipeline_id (str): Unique identifier for this pipeline
|
| chunking_strategy (str): Strategy for document chunking
|
|
|
| Returns:
|
| Dict: Pipeline creation results
|
| """
|
| try:
|
|
|
| chunked_docs = self.chunker.chunk_documents(documents, strategy=chunking_strategy)
|
|
|
|
|
| vector_store = self.chunker.create_vector_store(chunked_docs, store_type="faiss")
|
|
|
|
|
| qa_chain = self.chunker.create_qa_chain(documents, self.llm)
|
|
|
|
|
| self.vector_stores[pipeline_id] = vector_store
|
| self.qa_chains[pipeline_id] = qa_chain
|
|
|
|
|
| stats = {
|
| 'pipeline_id': pipeline_id,
|
| 'documents_processed': len(documents),
|
| 'chunks_created': len(chunked_docs),
|
| 'chunking_strategy': chunking_strategy,
|
| 'vector_store_type': 'faiss',
|
| 'embedding_model': str(self.embeddings),
|
| 'created_at': self._get_timestamp()
|
| }
|
|
|
| return {
|
| 'success': True,
|
| 'pipeline_stats': stats,
|
| 'chunking_info': self.chunker.get_chunking_stats(documents, chunking_strategy)
|
| }
|
|
|
| except Exception as e:
|
| return {'error': f"Pipeline creation failed: {str(e)}"}
|
|
|
| def query_pipeline(self, pipeline_id: str, query: str,
|
| return_sources: bool = True) -> Dict[str, Any]:
|
| """
|
| Query a created RAG pipeline
|
|
|
| Args:
|
| pipeline_id (str): ID of the pipeline to query
|
| query (str): Question to ask
|
| return_sources (bool): Whether to return source documents
|
|
|
| Returns:
|
| Dict: Query results with answer and sources
|
| """
|
| try:
|
| if pipeline_id not in self.qa_chains:
|
| return {'error': f"Pipeline '{pipeline_id}' not found"}
|
|
|
| qa_chain = self.qa_chains[pipeline_id]
|
|
|
|
|
| result = qa_chain({"query": query})
|
|
|
|
|
| response = {
|
| 'query': query,
|
| 'answer': result.get('result', 'No answer generated'),
|
| 'pipeline_id': pipeline_id,
|
| 'query_timestamp': self._get_timestamp()
|
| }
|
|
|
|
|
| if return_sources and 'source_documents' in result:
|
| sources = []
|
| for i, doc in enumerate(result['source_documents']):
|
| source = {
|
| 'source_index': i,
|
| 'content': doc.page_content,
|
| 'metadata': doc.metadata,
|
| 'relevance_rank': i + 1
|
| }
|
| sources.append(source)
|
|
|
| response['sources'] = sources
|
| response['num_sources'] = len(sources)
|
|
|
| return response
|
|
|
| except Exception as e:
|
| return {'error': f"Pipeline query failed: {str(e)}"}
|
|
|
| def batch_query_pipeline(self, pipeline_id: str, queries: List[str]) -> List[Dict[str, Any]]:
|
| """
|
| Execute multiple queries on a pipeline
|
|
|
| Args:
|
| pipeline_id (str): ID of the pipeline to query
|
| queries (List[str]): List of questions to ask
|
|
|
| Returns:
|
| List[Dict]: List of query results
|
| """
|
| results = []
|
|
|
| for i, query in enumerate(queries):
|
| try:
|
| result = self.query_pipeline(pipeline_id, query, return_sources=False)
|
| result['batch_index'] = i
|
| results.append(result)
|
|
|
| except Exception as e:
|
| results.append({
|
| 'batch_index': i,
|
| 'query': query,
|
| 'error': f"Batch query failed: {str(e)}"
|
| })
|
|
|
| return results
|
|
|
| def evaluate_pipeline(self, pipeline_id: str, test_queries: List[str],
|
| expected_answers: List[str] = None) -> Dict[str, Any]:
|
| """
|
| Evaluate pipeline performance on test queries
|
|
|
| Args:
|
| pipeline_id (str): ID of the pipeline to evaluate
|
| test_queries (List[str]): Test questions
|
| expected_answers (List[str]): Optional expected answers for comparison
|
|
|
| Returns:
|
| Dict: Evaluation results
|
| """
|
| try:
|
| if pipeline_id not in self.qa_chains:
|
| return {'error': f"Pipeline '{pipeline_id}' not found"}
|
|
|
| evaluation_results = []
|
| response_times = []
|
|
|
| for i, query in enumerate(test_queries):
|
| import time
|
| start_time = time.time()
|
|
|
|
|
| result = self.query_pipeline(pipeline_id, query, return_sources=True)
|
|
|
| end_time = time.time()
|
| response_time = end_time - start_time
|
| response_times.append(response_time)
|
|
|
|
|
| eval_result = {
|
| 'query_index': i,
|
| 'query': query,
|
| 'answer_generated': not result.get('error'),
|
| 'response_time': response_time,
|
| 'answer_length': len(result.get('answer', '')),
|
| 'sources_returned': result.get('num_sources', 0)
|
| }
|
|
|
|
|
| if expected_answers and i < len(expected_answers):
|
| expected = expected_answers[i]
|
| generated = result.get('answer', '')
|
|
|
|
|
| similarity = self._calculate_answer_similarity(expected, generated)
|
| eval_result['answer_similarity'] = similarity
|
| eval_result['expected_answer'] = expected
|
|
|
| evaluation_results.append(eval_result)
|
|
|
|
|
| successful_queries = len([r for r in evaluation_results if r['answer_generated']])
|
| avg_response_time = np.mean(response_times) if response_times else 0
|
|
|
| if expected_answers:
|
| similarities = [r.get('answer_similarity', 0) for r in evaluation_results
|
| if 'answer_similarity' in r]
|
| avg_similarity = np.mean(similarities) if similarities else 0
|
| else:
|
| avg_similarity = None
|
|
|
| return {
|
| 'pipeline_id': pipeline_id,
|
| 'total_queries': len(test_queries),
|
| 'successful_queries': successful_queries,
|
| 'success_rate': successful_queries / len(test_queries) if test_queries else 0,
|
| 'average_response_time': avg_response_time,
|
| 'average_answer_similarity': avg_similarity,
|
| 'detailed_results': evaluation_results,
|
| 'evaluation_timestamp': self._get_timestamp()
|
| }
|
|
|
| except Exception as e:
|
| return {'error': f"Pipeline evaluation failed: {str(e)}"}
|
|
|
| def _calculate_answer_similarity(self, expected: str, generated: str) -> float:
|
| """Calculate similarity between expected and generated answers"""
|
| try:
|
|
|
| expected_words = set(expected.lower().split())
|
| generated_words = set(generated.lower().split())
|
|
|
| if not expected_words and not generated_words:
|
| return 1.0
|
|
|
| intersection = expected_words.intersection(generated_words)
|
| union = expected_words.union(generated_words)
|
|
|
| return len(intersection) / len(union) if union else 0.0
|
|
|
| except Exception:
|
| return 0.0
|
|
|
| def get_pipeline_info(self, pipeline_id: str) -> Dict[str, Any]:
|
| """Get information about a specific pipeline"""
|
| try:
|
| if pipeline_id not in self.qa_chains:
|
| return {'error': f"Pipeline '{pipeline_id}' not found"}
|
|
|
|
|
| vector_store = self.vector_stores.get(pipeline_id)
|
| if vector_store:
|
| try:
|
|
|
| total_vectors = vector_store.index.ntotal if hasattr(vector_store, 'index') else 'unknown'
|
| except:
|
| total_vectors = 'unknown'
|
| else:
|
| total_vectors = 'unknown'
|
|
|
| return {
|
| 'pipeline_id': pipeline_id,
|
| 'has_qa_chain': pipeline_id in self.qa_chains,
|
| 'has_vector_store': pipeline_id in self.vector_stores,
|
| 'total_vectors': total_vectors,
|
| 'embedding_model': str(self.embeddings),
|
| 'llm_model': str(self.llm)
|
| }
|
|
|
| except Exception as e:
|
| return {'error': f"Failed to get pipeline info: {str(e)}"}
|
|
|
| def list_pipelines(self) -> Dict[str, Any]:
|
| """List all created pipelines"""
|
| return {
|
| 'total_pipelines': len(self.qa_chains),
|
| 'pipeline_ids': list(self.qa_chains.keys()),
|
| 'vector_stores': list(self.vector_stores.keys())
|
| }
|
|
|
| def delete_pipeline(self, pipeline_id: str) -> Dict[str, Any]:
|
| """Delete a pipeline and free resources"""
|
| try:
|
| deleted_components = []
|
|
|
| if pipeline_id in self.qa_chains:
|
| del self.qa_chains[pipeline_id]
|
| deleted_components.append('qa_chain')
|
|
|
| if pipeline_id in self.vector_stores:
|
| del self.vector_stores[pipeline_id]
|
| deleted_components.append('vector_store')
|
|
|
| if deleted_components:
|
| return {
|
| 'success': True,
|
| 'pipeline_id': pipeline_id,
|
| 'deleted_components': deleted_components
|
| }
|
| else:
|
| return {'error': f"Pipeline '{pipeline_id}' not found"}
|
|
|
| except Exception as e:
|
| return {'error': f"Pipeline deletion failed: {str(e)}"}
|
|
|
| def export_pipeline_config(self, pipeline_id: str) -> Dict[str, Any]:
|
| """Export pipeline configuration for recreation"""
|
| try:
|
| if pipeline_id not in self.qa_chains:
|
| return {'error': f"Pipeline '{pipeline_id}' not found"}
|
|
|
| config = {
|
| 'pipeline_id': pipeline_id,
|
| 'embedding_model_name': getattr(self.embeddings, 'model_name', 'unknown'),
|
| 'llm_model_name': getattr(self.llm, 'model_name', 'unknown'),
|
| 'chunker_config': {
|
| 'chunk_size': self.chunker.chunk_size,
|
| 'chunk_overlap': self.chunker.chunk_overlap
|
| },
|
| 'export_timestamp': self._get_timestamp(),
|
| 'vector_store_type': 'faiss'
|
| }
|
|
|
| return config
|
|
|
| except Exception as e:
|
| return {'error': f"Pipeline export failed: {str(e)}"}
|
|
|
| def _get_timestamp(self) -> str:
|
| """Get current timestamp"""
|
| from datetime import datetime
|
| return datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
|
|
|
|
|
|
|
|
| def optimize_rag_pipeline(documents: List[Document], embeddings_model, llm,
|
| test_queries: List[str]) -> Dict[str, Any]:
|
| """
|
| Optimize RAG pipeline configuration for given documents and queries
|
|
|
| Args:
|
| documents (List[Document]): Documents to optimize for
|
| embeddings_model: Embedding model to use
|
| llm: Language model to use
|
| test_queries (List[str]): Test queries for optimization
|
|
|
| Returns:
|
| Dict: Optimization recommendations
|
| """
|
| try:
|
|
|
| chunker = VectorChunker(embeddings_model)
|
| chunking_results = chunker.optimize_chunking_strategy(documents, test_queries)
|
|
|
|
|
| optimizer = ChunkingOptimizer(embeddings_model)
|
| size_results = optimizer.optimize_chunk_size(documents, test_queries)
|
|
|
|
|
| best_strategy = chunking_results.get('recommended_strategy', 'semantic')
|
| best_size = size_results.get('optimal_chunk_size', 1000)
|
|
|
|
|
| optimized_chunker = VectorChunker(
|
| embeddings_model,
|
| chunk_size=best_size,
|
| chunk_overlap=best_size // 5
|
| )
|
|
|
|
|
| pipeline = RAGPipeline(embeddings_model, llm)
|
| pipeline.chunker = optimized_chunker
|
|
|
| test_pipeline_id = "optimization_test"
|
| creation_result = pipeline.create_pipeline(documents, test_pipeline_id, best_strategy)
|
|
|
| if not creation_result.get('error'):
|
| evaluation_result = pipeline.evaluate_pipeline(test_pipeline_id, test_queries)
|
| pipeline.delete_pipeline(test_pipeline_id)
|
| else:
|
| evaluation_result = {'error': 'Could not evaluate optimized pipeline'}
|
|
|
| return {
|
| 'optimization_complete': True,
|
| 'recommended_config': {
|
| 'chunking_strategy': best_strategy,
|
| 'chunk_size': best_size,
|
| 'chunk_overlap': best_size // 5
|
| },
|
| 'chunking_optimization': chunking_results,
|
| 'size_optimization': size_results,
|
| 'performance_evaluation': evaluation_result,
|
| 'recommendations': [
|
| f"Use {best_strategy} chunking strategy",
|
| f"Set chunk size to {best_size} characters",
|
| f"Use {best_size // 5} character overlap",
|
| "Monitor and adjust based on query performance"
|
| ]
|
| }
|
|
|
| except Exception as e:
|
| return {'error': f"RAG optimization failed: {str(e)}"}
|
|
|
|
|
| def create_demo_rag_system(sample_documents: List[Document], embeddings_model, llm) -> Dict[str, Any]:
|
| """
|
| Create a demonstration RAG system with sample documents
|
|
|
| Args:
|
| sample_documents (List[Document]): Sample documents for demo
|
| embeddings_model: Embedding model
|
| llm: Language model
|
|
|
| Returns:
|
| Dict: Demo system information and sample interactions
|
| """
|
| try:
|
|
|
| pipeline = RAGPipeline(embeddings_model, llm)
|
| demo_id = "demo_system"
|
|
|
|
|
| creation_result = pipeline.create_pipeline(sample_documents, demo_id, "semantic")
|
|
|
| if creation_result.get('error'):
|
| return {'error': f"Demo system creation failed: {creation_result['error']}"}
|
|
|
|
|
| demo_queries = [
|
| "What is the main topic of these documents?",
|
| "Can you summarize the key points?",
|
| "What are the most important concepts mentioned?"
|
| ]
|
|
|
|
|
| demo_results = []
|
| for query in demo_queries:
|
| result = pipeline.query_pipeline(demo_id, query, return_sources=True)
|
| demo_results.append(result)
|
|
|
|
|
| pipeline_info = pipeline.get_pipeline_info(demo_id)
|
|
|
| return {
|
| 'demo_system_created': True,
|
| 'pipeline_id': demo_id,
|
| 'creation_stats': creation_result,
|
| 'pipeline_info': pipeline_info,
|
| 'demo_queries': demo_queries,
|
| 'demo_results': demo_results,
|
| 'usage_instructions': [
|
| f"Use pipeline.query_pipeline('{demo_id}', 'your question') to ask questions",
|
| "The system will return answers with source document references",
|
| "Sources show which parts of the documents were used for the answer"
|
| ]
|
| }
|
|
|
| except Exception as e:
|
| return {'error': f"Demo system creation failed: {str(e)}"}
|
|
|
|
|
|
|
| __all__ = [
|
| 'VectorChunker',
|
| 'ChunkingOptimizer',
|
| 'RAGPipeline',
|
| 'optimize_rag_pipeline',
|
| 'create_demo_rag_system'
|
| ] |