|
|
"""
|
|
|
Vector Chunking and RAG Module
|
|
|
Handles document chunking, vector embeddings, and RAG question-answering
|
|
|
"""
|
|
|
|
|
|
import os
|
|
|
import json
|
|
|
import numpy as np
|
|
|
from typing import Dict, Any, List, Optional, Tuple
|
|
|
from langchain.text_splitter import RecursiveCharacterTextSplitter, CharacterTextSplitter
|
|
|
from langchain.schema import Document
|
|
|
from langchain_community.vectorstores import FAISS, Chroma
|
|
|
from langchain.chains import RetrievalQA, ConversationalRetrievalChain
|
|
|
from langchain.memory import ConversationBufferMemory
|
|
|
from langchain.prompts import PromptTemplate
|
|
|
import tempfile
|
|
|
import shutil
|
|
|
|
|
|
|
|
|
class VectorChunker:
|
|
|
"""Main class for document chunking and vector operations"""
|
|
|
|
|
|
def __init__(self, embeddings_model, chunk_size: int = 1000, chunk_overlap: int = 200):
|
|
|
self.embeddings = embeddings_model
|
|
|
self.chunk_size = chunk_size
|
|
|
self.chunk_overlap = chunk_overlap
|
|
|
self.setup_text_splitters()
|
|
|
self.vector_stores = {}
|
|
|
|
|
|
def setup_text_splitters(self):
|
|
|
"""Initialize different text splitting strategies"""
|
|
|
|
|
|
|
|
|
self.recursive_splitter = RecursiveCharacterTextSplitter(
|
|
|
chunk_size=self.chunk_size,
|
|
|
chunk_overlap=self.chunk_overlap,
|
|
|
length_function=len,
|
|
|
separators=["\n\n", "\n", " ", ""]
|
|
|
)
|
|
|
|
|
|
|
|
|
self.character_splitter = CharacterTextSplitter(
|
|
|
chunk_size=self.chunk_size,
|
|
|
chunk_overlap=self.chunk_overlap,
|
|
|
separator="\n\n"
|
|
|
)
|
|
|
|
|
|
|
|
|
self.semantic_splitter = RecursiveCharacterTextSplitter(
|
|
|
chunk_size=800,
|
|
|
chunk_overlap=150,
|
|
|
length_function=len,
|
|
|
separators=["\n\n", "\n", ". ", " ", ""]
|
|
|
)
|
|
|
|
|
|
def chunk_documents(self, documents: List[Document], strategy: str = "recursive") -> List[Document]:
|
|
|
"""
|
|
|
Chunk documents using specified strategy
|
|
|
|
|
|
Args:
|
|
|
documents (List[Document]): List of documents to chunk
|
|
|
strategy (str): Chunking strategy ("recursive", "character", "semantic")
|
|
|
|
|
|
Returns:
|
|
|
List[Document]: List of chunked documents
|
|
|
"""
|
|
|
try:
|
|
|
|
|
|
if strategy == "character":
|
|
|
splitter = self.character_splitter
|
|
|
elif strategy == "semantic":
|
|
|
splitter = self.semantic_splitter
|
|
|
else:
|
|
|
splitter = self.recursive_splitter
|
|
|
|
|
|
|
|
|
chunked_docs = []
|
|
|
|
|
|
for doc in documents:
|
|
|
chunks = splitter.split_documents([doc])
|
|
|
|
|
|
|
|
|
for i, chunk in enumerate(chunks):
|
|
|
chunk.metadata.update({
|
|
|
'chunk_index': i,
|
|
|
'total_chunks': len(chunks),
|
|
|
'chunk_strategy': strategy,
|
|
|
'original_source': doc.metadata.get('source', 'unknown'),
|
|
|
'chunk_size': len(chunk.page_content),
|
|
|
'chunk_word_count': len(chunk.page_content.split())
|
|
|
})
|
|
|
|
|
|
chunked_docs.extend(chunks)
|
|
|
|
|
|
return chunked_docs
|
|
|
|
|
|
except Exception as e:
|
|
|
raise Exception(f"Document chunking failed: {str(e)}")
|
|
|
|
|
|
def create_vector_store(self, documents: List[Document], store_type: str = "faiss",
|
|
|
persist_directory: Optional[str] = None) -> Any:
|
|
|
"""
|
|
|
Create vector store from documents
|
|
|
|
|
|
Args:
|
|
|
documents (List[Document]): Documents to vectorize
|
|
|
store_type (str): Type of vector store ("faiss", "chroma")
|
|
|
persist_directory (str): Optional directory to persist the store
|
|
|
|
|
|
Returns:
|
|
|
Vector store instance
|
|
|
"""
|
|
|
try:
|
|
|
if not documents:
|
|
|
raise ValueError("No documents provided for vector store creation")
|
|
|
|
|
|
if store_type.lower() == "chroma":
|
|
|
if persist_directory:
|
|
|
vector_store = Chroma.from_documents(
|
|
|
documents=documents,
|
|
|
embedding=self.embeddings,
|
|
|
persist_directory=persist_directory
|
|
|
)
|
|
|
vector_store.persist()
|
|
|
else:
|
|
|
vector_store = Chroma.from_documents(
|
|
|
documents=documents,
|
|
|
embedding=self.embeddings
|
|
|
)
|
|
|
else:
|
|
|
vector_store = FAISS.from_documents(
|
|
|
documents=documents,
|
|
|
embedding=self.embeddings
|
|
|
)
|
|
|
|
|
|
|
|
|
if persist_directory:
|
|
|
os.makedirs(persist_directory, exist_ok=True)
|
|
|
vector_store.save_local(persist_directory)
|
|
|
|
|
|
return vector_store
|
|
|
|
|
|
except Exception as e:
|
|
|
raise Exception(f"Vector store creation failed: {str(e)}")
|
|
|
|
|
|
def create_qa_chain(self, documents: List[Document], llm, chain_type: str = "stuff") -> RetrievalQA:
|
|
|
"""
|
|
|
Create a Question-Answering chain from documents
|
|
|
|
|
|
Args:
|
|
|
documents (List[Document]): Documents for the knowledge base
|
|
|
llm: Language model for answering questions
|
|
|
chain_type (str): Type of QA chain ("stuff", "map_reduce", "refine")
|
|
|
|
|
|
Returns:
|
|
|
RetrievalQA: Configured QA chain
|
|
|
"""
|
|
|
try:
|
|
|
|
|
|
chunked_docs = self.chunk_documents(documents, strategy="semantic")
|
|
|
|
|
|
|
|
|
vector_store = self.create_vector_store(chunked_docs, store_type="faiss")
|
|
|
|
|
|
|
|
|
retriever = vector_store.as_retriever(
|
|
|
search_type="similarity",
|
|
|
search_kwargs={"k": 4}
|
|
|
)
|
|
|
|
|
|
|
|
|
qa_prompt_template = """Use the following pieces of context to answer the question at the end.
|
|
|
If you don't know the answer, just say that you don't know, don't try to make up an answer.
|
|
|
Focus on providing clear, accurate, and complete answers that would be suitable for AI search engines.
|
|
|
|
|
|
Context:
|
|
|
{context}
|
|
|
|
|
|
Question: {question}
|
|
|
|
|
|
Answer:"""
|
|
|
|
|
|
qa_prompt = PromptTemplate(
|
|
|
template=qa_prompt_template,
|
|
|
input_variables=["context", "question"]
|
|
|
)
|
|
|
|
|
|
|
|
|
qa_chain = RetrievalQA.from_chain_type(
|
|
|
llm=llm,
|
|
|
chain_type=chain_type,
|
|
|
retriever=retriever,
|
|
|
return_source_documents=True,
|
|
|
chain_type_kwargs={"prompt": qa_prompt}
|
|
|
)
|
|
|
|
|
|
return qa_chain
|
|
|
|
|
|
except Exception as e:
|
|
|
raise Exception(f"QA chain creation failed: {str(e)}")
|
|
|
|
|
|
def create_conversational_chain(self, documents: List[Document], llm) -> ConversationalRetrievalChain:
|
|
|
"""
|
|
|
Create a conversational retrieval chain with memory
|
|
|
|
|
|
Args:
|
|
|
documents (List[Document]): Documents for the knowledge base
|
|
|
llm: Language model for conversation
|
|
|
|
|
|
Returns:
|
|
|
ConversationalRetrievalChain: Configured conversational chain
|
|
|
"""
|
|
|
try:
|
|
|
|
|
|
chunked_docs = self.chunk_documents(documents, strategy="semantic")
|
|
|
|
|
|
|
|
|
vector_store = self.create_vector_store(chunked_docs, store_type="faiss")
|
|
|
|
|
|
|
|
|
retriever = vector_store.as_retriever(
|
|
|
search_type="similarity",
|
|
|
search_kwargs={"k": 3}
|
|
|
)
|
|
|
|
|
|
|
|
|
memory = ConversationBufferMemory(
|
|
|
memory_key="chat_history",
|
|
|
return_messages=True,
|
|
|
output_key="answer"
|
|
|
)
|
|
|
|
|
|
|
|
|
condense_question_prompt = """Given the following conversation and a follow up question,
|
|
|
rephrase the follow up question to be a standalone question that can be understood without the chat history.
|
|
|
|
|
|
Chat History:
|
|
|
{chat_history}
|
|
|
Follow Up Input: {question}
|
|
|
Standalone question:"""
|
|
|
|
|
|
|
|
|
conv_chain = ConversationalRetrievalChain.from_llm(
|
|
|
llm=llm,
|
|
|
retriever=retriever,
|
|
|
memory=memory,
|
|
|
return_source_documents=True,
|
|
|
condense_question_prompt=PromptTemplate.from_template(condense_question_prompt)
|
|
|
)
|
|
|
|
|
|
return conv_chain
|
|
|
|
|
|
except Exception as e:
|
|
|
raise Exception(f"Conversational chain creation failed: {str(e)}")
|
|
|
|
|
|
def semantic_search(self, query: str, documents: List[Document], top_k: int = 5) -> List[Dict[str, Any]]:
|
|
|
"""
|
|
|
Perform semantic search on documents
|
|
|
|
|
|
Args:
|
|
|
query (str): Search query
|
|
|
documents (List[Document]): Documents to search
|
|
|
top_k (int): Number of top results to return
|
|
|
|
|
|
Returns:
|
|
|
List[Dict]: Search results with scores
|
|
|
"""
|
|
|
try:
|
|
|
|
|
|
chunked_docs = self.chunk_documents(documents, strategy="semantic")
|
|
|
|
|
|
|
|
|
vector_store = self.create_vector_store(chunked_docs, store_type="faiss")
|
|
|
|
|
|
|
|
|
results = vector_store.similarity_search_with_score(query, k=top_k)
|
|
|
|
|
|
|
|
|
formatted_results = []
|
|
|
for doc, score in results:
|
|
|
result = {
|
|
|
'content': doc.page_content,
|
|
|
'metadata': doc.metadata,
|
|
|
'similarity_score': float(score),
|
|
|
'relevance_rank': len(formatted_results) + 1
|
|
|
}
|
|
|
formatted_results.append(result)
|
|
|
|
|
|
return formatted_results
|
|
|
|
|
|
except Exception as e:
|
|
|
raise Exception(f"Semantic search failed: {str(e)}")
|
|
|
|
|
|
def analyze_document_similarity(self, documents: List[Document]) -> Dict[str, Any]:
|
|
|
"""
|
|
|
Analyze similarity between documents
|
|
|
|
|
|
Args:
|
|
|
documents (List[Document]): Documents to analyze
|
|
|
|
|
|
Returns:
|
|
|
Dict: Similarity analysis results
|
|
|
"""
|
|
|
try:
|
|
|
if len(documents) < 2:
|
|
|
return {'error': 'Need at least 2 documents for similarity analysis'}
|
|
|
|
|
|
|
|
|
chunked_docs = self.chunk_documents(documents, strategy="semantic")
|
|
|
|
|
|
|
|
|
doc_embeddings = []
|
|
|
doc_metadata = []
|
|
|
|
|
|
for doc in chunked_docs:
|
|
|
|
|
|
embedding = self.embeddings.embed_query(doc.page_content)
|
|
|
doc_embeddings.append(embedding)
|
|
|
doc_metadata.append({
|
|
|
'content_preview': doc.page_content[:200] + "...",
|
|
|
'metadata': doc.metadata,
|
|
|
'length': len(doc.page_content)
|
|
|
})
|
|
|
|
|
|
|
|
|
similarities = []
|
|
|
embeddings_array = np.array(doc_embeddings)
|
|
|
|
|
|
for i in range(len(embeddings_array)):
|
|
|
for j in range(i + 1, len(embeddings_array)):
|
|
|
|
|
|
similarity = np.dot(embeddings_array[i], embeddings_array[j]) / (
|
|
|
np.linalg.norm(embeddings_array[i]) * np.linalg.norm(embeddings_array[j])
|
|
|
)
|
|
|
|
|
|
similarities.append({
|
|
|
'doc_1_index': i,
|
|
|
'doc_2_index': j,
|
|
|
'similarity_score': float(similarity),
|
|
|
'doc_1_preview': doc_metadata[i]['content_preview'],
|
|
|
'doc_2_preview': doc_metadata[j]['content_preview']
|
|
|
})
|
|
|
|
|
|
|
|
|
similarities.sort(key=lambda x: x['similarity_score'], reverse=True)
|
|
|
|
|
|
|
|
|
similarity_scores = [s['similarity_score'] for s in similarities]
|
|
|
|
|
|
return {
|
|
|
'total_comparisons': len(similarities),
|
|
|
'average_similarity': np.mean(similarity_scores),
|
|
|
'max_similarity': max(similarity_scores),
|
|
|
'min_similarity': min(similarity_scores),
|
|
|
'similarity_distribution': {
|
|
|
'high_similarity': len([s for s in similarity_scores if s > 0.8]),
|
|
|
'medium_similarity': len([s for s in similarity_scores if 0.5 < s <= 0.8]),
|
|
|
'low_similarity': len([s for s in similarity_scores if s <= 0.5])
|
|
|
},
|
|
|
'top_similar_pairs': similarities[:5],
|
|
|
'most_dissimilar_pairs': similarities[-3:]
|
|
|
}
|
|
|
|
|
|
except Exception as e:
|
|
|
return {'error': f"Similarity analysis failed: {str(e)}"}
|
|
|
|
|
|
def extract_key_passages(self, documents: List[Document], queries: List[str],
|
|
|
passages_per_query: int = 3) -> Dict[str, List[Dict[str, Any]]]:
|
|
|
"""
|
|
|
Extract key passages from documents based on multiple queries
|
|
|
|
|
|
Args:
|
|
|
documents (List[Document]): Documents to search
|
|
|
queries (List[str]): List of queries to search for
|
|
|
passages_per_query (int): Number of passages to extract per query
|
|
|
|
|
|
Returns:
|
|
|
Dict: Key passages organized by query
|
|
|
"""
|
|
|
try:
|
|
|
|
|
|
chunked_docs = self.chunk_documents(documents, strategy="semantic")
|
|
|
|
|
|
|
|
|
vector_store = self.create_vector_store(chunked_docs, store_type="faiss")
|
|
|
|
|
|
key_passages = {}
|
|
|
|
|
|
for query in queries:
|
|
|
|
|
|
results = vector_store.similarity_search_with_score(query, k=passages_per_query)
|
|
|
|
|
|
passages = []
|
|
|
for doc, score in results:
|
|
|
passage = {
|
|
|
'content': doc.page_content,
|
|
|
'relevance_score': float(score),
|
|
|
'metadata': doc.metadata,
|
|
|
'word_count': len(doc.page_content.split()),
|
|
|
'query_match': query
|
|
|
}
|
|
|
passages.append(passage)
|
|
|
|
|
|
key_passages[query] = passages
|
|
|
|
|
|
return key_passages
|
|
|
|
|
|
except Exception as e:
|
|
|
return {'error': f"Key passage extraction failed: {str(e)}"}
|
|
|
|
|
|
def optimize_chunking_strategy(self, documents: List[Document],
|
|
|
test_queries: List[str]) -> Dict[str, Any]:
|
|
|
"""
|
|
|
Test different chunking strategies and recommend the best one
|
|
|
|
|
|
Args:
|
|
|
documents (List[Document]): Documents to test
|
|
|
test_queries (List[str]): Queries to test retrieval performance
|
|
|
|
|
|
Returns:
|
|
|
Dict: Optimization results and recommendations
|
|
|
"""
|
|
|
try:
|
|
|
strategies = ["recursive", "character", "semantic"]
|
|
|
strategy_results = {}
|
|
|
|
|
|
for strategy in strategies:
|
|
|
try:
|
|
|
|
|
|
chunked_docs = self.chunk_documents(documents, strategy=strategy)
|
|
|
vector_store = self.create_vector_store(chunked_docs, store_type="faiss")
|
|
|
|
|
|
|
|
|
retrieval_scores = []
|
|
|
|
|
|
for query in test_queries:
|
|
|
results = vector_store.similarity_search_with_score(query, k=3)
|
|
|
|
|
|
|
|
|
if results:
|
|
|
avg_score = sum(score for _, score in results) / len(results)
|
|
|
retrieval_scores.append(float(avg_score))
|
|
|
|
|
|
|
|
|
avg_retrieval_score = np.mean(retrieval_scores) if retrieval_scores else 0
|
|
|
total_chunks = len(chunked_docs)
|
|
|
avg_chunk_size = np.mean([len(doc.page_content) for doc in chunked_docs])
|
|
|
|
|
|
strategy_results[strategy] = {
|
|
|
'average_retrieval_score': avg_retrieval_score,
|
|
|
'total_chunks': total_chunks,
|
|
|
'average_chunk_size': avg_chunk_size,
|
|
|
'retrieval_scores': retrieval_scores,
|
|
|
'chunk_size_distribution': {
|
|
|
'min': min(len(doc.page_content) for doc in chunked_docs),
|
|
|
'max': max(len(doc.page_content) for doc in chunked_docs),
|
|
|
'std': float(np.std([len(doc.page_content) for doc in chunked_docs]))
|
|
|
}
|
|
|
}
|
|
|
|
|
|
except Exception as e:
|
|
|
strategy_results[strategy] = {'error': f"Strategy test failed: {str(e)}"}
|
|
|
|
|
|
|
|
|
valid_strategies = {k: v for k, v in strategy_results.items() if 'error' not in v}
|
|
|
|
|
|
if valid_strategies:
|
|
|
best_strategy = max(valid_strategies.keys(),
|
|
|
key=lambda k: valid_strategies[k]['average_retrieval_score'])
|
|
|
|
|
|
recommendation = {
|
|
|
'recommended_strategy': best_strategy,
|
|
|
'reason': f"Best average retrieval score: {valid_strategies[best_strategy]['average_retrieval_score']:.4f}",
|
|
|
'all_results': strategy_results,
|
|
|
'performance_summary': {
|
|
|
strategy: result.get('average_retrieval_score', 0)
|
|
|
for strategy, result in valid_strategies.items()
|
|
|
}
|
|
|
}
|
|
|
else:
|
|
|
recommendation = {
|
|
|
'recommended_strategy': 'recursive',
|
|
|
'reason': 'All strategies failed, using default',
|
|
|
'all_results': strategy_results
|
|
|
}
|
|
|
|
|
|
return recommendation
|
|
|
|
|
|
except Exception as e:
|
|
|
return {'error': f"Chunking optimization failed: {str(e)}"}
|
|
|
|
|
|
def create_document_summary(self, documents: List[Document], llm,
|
|
|
summary_type: str = "extractive") -> Dict[str, Any]:
|
|
|
"""
|
|
|
Create document summaries using the chunked content
|
|
|
|
|
|
Args:
|
|
|
documents (List[Document]): Documents to summarize
|
|
|
llm: Language model for summarization
|
|
|
summary_type (str): Type of summary ("extractive", "abstractive")
|
|
|
|
|
|
Returns:
|
|
|
Dict: Summary results
|
|
|
"""
|
|
|
try:
|
|
|
|
|
|
chunked_docs = self.chunk_documents(documents, strategy="semantic")
|
|
|
|
|
|
if summary_type == "extractive":
|
|
|
|
|
|
return self._create_extractive_summary(chunked_docs)
|
|
|
else:
|
|
|
|
|
|
return self._create_abstractive_summary(chunked_docs, llm)
|
|
|
|
|
|
except Exception as e:
|
|
|
return {'error': f"Document summarization failed: {str(e)}"}
|
|
|
|
|
|
def _create_extractive_summary(self, chunked_docs: List[Document]) -> Dict[str, Any]:
|
|
|
"""Create extractive summary by selecting key chunks"""
|
|
|
try:
|
|
|
|
|
|
chunk_scores = []
|
|
|
|
|
|
for doc in chunked_docs:
|
|
|
content = doc.page_content
|
|
|
|
|
|
word_count = len(content.split())
|
|
|
sentence_count = len([s for s in content.split('.') if s.strip()])
|
|
|
|
|
|
|
|
|
density_score = word_count / max(sentence_count, 1)
|
|
|
|
|
|
|
|
|
structure_bonus = 0
|
|
|
if '?' in content:
|
|
|
structure_bonus += 1
|
|
|
if any(word in content.lower() for word in ['define', 'definition', 'means', 'refers to']):
|
|
|
structure_bonus += 2
|
|
|
if content.count('\n•') > 0 or content.count('1.') > 0:
|
|
|
structure_bonus += 1
|
|
|
|
|
|
total_score = density_score + structure_bonus
|
|
|
chunk_scores.append((doc, total_score))
|
|
|
|
|
|
|
|
|
chunk_scores.sort(key=lambda x: x[1], reverse=True)
|
|
|
top_chunks = chunk_scores[:min(5, len(chunk_scores))]
|
|
|
|
|
|
summary_content = []
|
|
|
for doc, score in top_chunks:
|
|
|
summary_content.append({
|
|
|
'content': doc.page_content,
|
|
|
'score': score,
|
|
|
'metadata': doc.metadata
|
|
|
})
|
|
|
|
|
|
return {
|
|
|
'summary_type': 'extractive',
|
|
|
'key_chunks': summary_content,
|
|
|
'total_chunks_analyzed': len(chunked_docs),
|
|
|
'chunks_selected': len(top_chunks)
|
|
|
}
|
|
|
|
|
|
except Exception as e:
|
|
|
return {'error': f"Extractive summary failed: {str(e)}"}
|
|
|
|
|
|
def _create_abstractive_summary(self, chunked_docs: List[Document], llm) -> Dict[str, Any]:
|
|
|
"""Create abstractive summary using language model"""
|
|
|
try:
|
|
|
|
|
|
combined_content = "\n\n".join([doc.page_content for doc in chunked_docs[:10]])
|
|
|
|
|
|
summary_prompt = f"""Please provide a comprehensive summary of the following content.
|
|
|
Focus on the main topics, key insights, and important details that would be valuable for AI search engines.
|
|
|
|
|
|
Content:
|
|
|
{combined_content[:5000]}
|
|
|
|
|
|
Summary:"""
|
|
|
|
|
|
from langchain.prompts import ChatPromptTemplate
|
|
|
|
|
|
prompt_template = ChatPromptTemplate.from_messages([
|
|
|
("system", "You are a professional content summarizer. Create clear, informative summaries."),
|
|
|
("user", summary_prompt)
|
|
|
])
|
|
|
|
|
|
chain = prompt_template | llm
|
|
|
result = chain.invoke({})
|
|
|
|
|
|
summary_text = result.content if hasattr(result, 'content') else str(result)
|
|
|
|
|
|
return {
|
|
|
'summary_type': 'abstractive',
|
|
|
'summary': summary_text,
|
|
|
'source_chunks': len(chunked_docs),
|
|
|
'content_length_processed': len(combined_content)
|
|
|
}
|
|
|
|
|
|
except Exception as e:
|
|
|
return {'error': f"Abstractive summary failed: {str(e)}"}
|
|
|
|
|
|
def save_vector_store(self, vector_store, directory_path: str, store_type: str = "faiss") -> bool:
|
|
|
"""
|
|
|
Save vector store to disk
|
|
|
|
|
|
Args:
|
|
|
vector_store: Vector store instance to save
|
|
|
directory_path (str): Directory to save the store
|
|
|
store_type (str): Type of vector store
|
|
|
|
|
|
Returns:
|
|
|
bool: Success status
|
|
|
"""
|
|
|
try:
|
|
|
os.makedirs(directory_path, exist_ok=True)
|
|
|
|
|
|
if store_type.lower() == "faiss":
|
|
|
vector_store.save_local(directory_path)
|
|
|
elif store_type.lower() == "chroma":
|
|
|
|
|
|
pass
|
|
|
|
|
|
return True
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"Failed to save vector store: {str(e)}")
|
|
|
return False
|
|
|
|
|
|
def load_vector_store(self, directory_path: str, store_type: str = "faiss"):
|
|
|
"""
|
|
|
Load vector store from disk
|
|
|
|
|
|
Args:
|
|
|
directory_path (str): Directory containing the saved store
|
|
|
store_type (str): Type of vector store
|
|
|
|
|
|
Returns:
|
|
|
Vector store instance or None if failed
|
|
|
"""
|
|
|
try:
|
|
|
if not os.path.exists(directory_path):
|
|
|
return None
|
|
|
|
|
|
if store_type.lower() == "faiss":
|
|
|
vector_store = FAISS.load_local(
|
|
|
directory_path,
|
|
|
self.embeddings,
|
|
|
allow_dangerous_deserialization=True
|
|
|
)
|
|
|
return vector_store
|
|
|
elif store_type.lower() == "chroma":
|
|
|
vector_store = Chroma(
|
|
|
persist_directory=directory_path,
|
|
|
embedding_function=self.embeddings
|
|
|
)
|
|
|
return vector_store
|
|
|
|
|
|
return None
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"Failed to load vector store: {str(e)}")
|
|
|
return None
|
|
|
|
|
|
def get_chunking_stats(self, documents: List[Document], strategy: str = "recursive") -> Dict[str, Any]:
|
|
|
"""
|
|
|
Get detailed statistics about document chunking
|
|
|
|
|
|
Args:
|
|
|
documents (List[Document]): Documents to analyze
|
|
|
strategy (str): Chunking strategy to use
|
|
|
|
|
|
Returns:
|
|
|
Dict: Detailed chunking statistics
|
|
|
"""
|
|
|
try:
|
|
|
|
|
|
chunked_docs = self.chunk_documents(documents, strategy=strategy)
|
|
|
|
|
|
|
|
|
chunk_sizes = [len(doc.page_content) for doc in chunked_docs]
|
|
|
word_counts = [len(doc.page_content.split()) for doc in chunked_docs]
|
|
|
|
|
|
stats = {
|
|
|
'strategy_used': strategy,
|
|
|
'original_documents': len(documents),
|
|
|
'total_chunks': len(chunked_docs),
|
|
|
'chunk_size_stats': {
|
|
|
'min': min(chunk_sizes) if chunk_sizes else 0,
|
|
|
'max': max(chunk_sizes) if chunk_sizes else 0,
|
|
|
'mean': np.mean(chunk_sizes) if chunk_sizes else 0,
|
|
|
'median': np.median(chunk_sizes) if chunk_sizes else 0,
|
|
|
'std': np.std(chunk_sizes) if chunk_sizes else 0
|
|
|
},
|
|
|
'word_count_stats': {
|
|
|
'min': min(word_counts) if word_counts else 0,
|
|
|
'max': max(word_counts) if word_counts else 0,
|
|
|
'mean': np.mean(word_counts) if word_counts else 0,
|
|
|
'median': np.median(word_counts) if word_counts else 0,
|
|
|
'std': np.std(word_counts) if word_counts else 0
|
|
|
},
|
|
|
'chunk_distribution': {
|
|
|
'very_small': len([s for s in chunk_sizes if s < 200]),
|
|
|
'small': len([s for s in chunk_sizes if 200 <= s < 500]),
|
|
|
'medium': len([s for s in chunk_sizes if 500 <= s < 1000]),
|
|
|
'large': len([s for s in chunk_sizes if 1000 <= s < 2000]),
|
|
|
'very_large': len([s for s in chunk_sizes if s >= 2000])
|
|
|
},
|
|
|
'overlap_efficiency': self._calculate_overlap_efficiency(chunked_docs),
|
|
|
'content_coverage': self._calculate_content_coverage(documents, chunked_docs)
|
|
|
}
|
|
|
|
|
|
return stats
|
|
|
|
|
|
except Exception as e:
|
|
|
return {'error': f"Chunking statistics failed: {str(e)}"}
|
|
|
|
|
|
def _calculate_overlap_efficiency(self, chunked_docs: List[Document]) -> float:
|
|
|
"""Calculate efficiency of chunk overlaps"""
|
|
|
try:
|
|
|
if len(chunked_docs) < 2:
|
|
|
return 1.0
|
|
|
|
|
|
total_content_length = sum(len(doc.page_content) for doc in chunked_docs)
|
|
|
unique_content = set()
|
|
|
|
|
|
|
|
|
for doc in chunked_docs:
|
|
|
words = doc.page_content.split()
|
|
|
for i in range(0, len(words), 10):
|
|
|
unique_content.add(' '.join(words[i:i+10]))
|
|
|
|
|
|
|
|
|
efficiency = len(unique_content) * 10 / total_content_length if total_content_length > 0 else 0
|
|
|
return min(efficiency, 1.0)
|
|
|
|
|
|
except Exception:
|
|
|
return 0.5
|
|
|
|
|
|
def _calculate_content_coverage(self, original_docs: List[Document],
|
|
|
chunked_docs: List[Document]) -> float:
|
|
|
"""Calculate how well chunks cover original content"""
|
|
|
try:
|
|
|
original_content = ' '.join([doc.page_content for doc in original_docs])
|
|
|
chunked_content = ' '.join([doc.page_content for doc in chunked_docs])
|
|
|
|
|
|
|
|
|
coverage = len(chunked_content) / len(original_content) if original_content else 0
|
|
|
return min(coverage, 1.0)
|
|
|
|
|
|
except Exception:
|
|
|
return 0.0
|
|
|
|
|
|
|
|
|
class ChunkingOptimizer:
|
|
|
"""Helper class for optimizing chunking parameters"""
|
|
|
|
|
|
def __init__(self, embeddings_model):
|
|
|
self.embeddings = embeddings_model
|
|
|
|
|
|
def optimize_chunk_size(self, documents: List[Document], test_queries: List[str],
|
|
|
size_range: Tuple[int, int] = (200, 2000),
|
|
|
step_size: int = 200) -> Dict[str, Any]:
|
|
|
"""
|
|
|
Find optimal chunk size for given documents and queries
|
|
|
|
|
|
Args:
|
|
|
documents (List[Document]): Documents to test
|
|
|
test_queries (List[str]): Queries for testing retrieval
|
|
|
size_range (Tuple[int, int]): Range of chunk sizes to test
|
|
|
step_size (int): Step size for testing
|
|
|
|
|
|
Returns:
|
|
|
Dict: Optimization results with recommended chunk size
|
|
|
"""
|
|
|
try:
|
|
|
results = {}
|
|
|
min_size, max_size = size_range
|
|
|
|
|
|
for chunk_size in range(min_size, max_size + 1, step_size):
|
|
|
|
|
|
chunker = VectorChunker(self.embeddings, chunk_size=chunk_size)
|
|
|
|
|
|
try:
|
|
|
chunked_docs = chunker.chunk_documents(documents)
|
|
|
vector_store = chunker.create_vector_store(chunked_docs)
|
|
|
|
|
|
|
|
|
retrieval_scores = []
|
|
|
for query in test_queries:
|
|
|
search_results = vector_store.similarity_search_with_score(query, k=3)
|
|
|
if search_results:
|
|
|
avg_score = sum(score for _, score in search_results) / len(search_results)
|
|
|
retrieval_scores.append(float(avg_score))
|
|
|
|
|
|
avg_performance = np.mean(retrieval_scores) if retrieval_scores else 0
|
|
|
|
|
|
results[chunk_size] = {
|
|
|
'average_retrieval_score': avg_performance,
|
|
|
'total_chunks': len(chunked_docs),
|
|
|
'retrieval_scores': retrieval_scores
|
|
|
}
|
|
|
|
|
|
except Exception as e:
|
|
|
results[chunk_size] = {'error': str(e)}
|
|
|
|
|
|
|
|
|
valid_results = {k: v for k, v in results.items() if 'error' not in v}
|
|
|
|
|
|
if valid_results:
|
|
|
optimal_size = max(valid_results.keys(),
|
|
|
key=lambda k: valid_results[k]['average_retrieval_score'])
|
|
|
|
|
|
return {
|
|
|
'optimal_chunk_size': optimal_size,
|
|
|
'optimal_performance': valid_results[optimal_size]['average_retrieval_score'],
|
|
|
'all_results': results,
|
|
|
'performance_trend': self._analyze_performance_trend(valid_results),
|
|
|
'recommendation': f"Use chunk size {optimal_size} for best retrieval performance"
|
|
|
}
|
|
|
else:
|
|
|
return {
|
|
|
'error': 'No valid chunk sizes could be tested',
|
|
|
'all_results': results
|
|
|
}
|
|
|
|
|
|
except Exception as e:
|
|
|
return {'error': f"Chunk size optimization failed: {str(e)}"}
|
|
|
|
|
|
def _analyze_performance_trend(self, results: Dict[int, Dict[str, Any]]) -> Dict[str, Any]:
|
|
|
"""Analyze performance trend across different chunk sizes"""
|
|
|
try:
|
|
|
sizes = sorted(results.keys())
|
|
|
performances = [results[size]['average_retrieval_score'] for size in sizes]
|
|
|
|
|
|
|
|
|
if len(performances) >= 2:
|
|
|
trend_direction = "increasing" if performances[-1] > performances[0] else "decreasing"
|
|
|
peak_performance = max(performances)
|
|
|
peak_size = sizes[performances.index(peak_performance)]
|
|
|
|
|
|
return {
|
|
|
'trend_direction': trend_direction,
|
|
|
'peak_performance': peak_performance,
|
|
|
'peak_size': peak_size,
|
|
|
'performance_range': max(performances) - min(performances),
|
|
|
'stable_performance': max(performances) - min(performances) < 0.1
|
|
|
}
|
|
|
else:
|
|
|
return {'error': 'Insufficient data for trend analysis'}
|
|
|
|
|
|
except Exception:
|
|
|
return {'error': 'Trend analysis failed'}
|
|
|
|
|
|
|
|
|
class RAGPipeline:
|
|
|
"""Complete RAG pipeline for document question-answering"""
|
|
|
|
|
|
def __init__(self, embeddings_model, llm):
|
|
|
self.embeddings = embeddings_model
|
|
|
self.llm = llm
|
|
|
self.chunker = VectorChunker(embeddings_model)
|
|
|
self.vector_stores = {}
|
|
|
self.qa_chains = {}
|
|
|
|
|
|
def create_pipeline(self, documents: List[Document], pipeline_id: str,
|
|
|
chunking_strategy: str = "semantic") -> Dict[str, Any]:
|
|
|
"""
|
|
|
Create a complete RAG pipeline for documents
|
|
|
|
|
|
Args:
|
|
|
documents (List[Document]): Documents to process
|
|
|
pipeline_id (str): Unique identifier for this pipeline
|
|
|
chunking_strategy (str): Strategy for document chunking
|
|
|
|
|
|
Returns:
|
|
|
Dict: Pipeline creation results
|
|
|
"""
|
|
|
try:
|
|
|
|
|
|
chunked_docs = self.chunker.chunk_documents(documents, strategy=chunking_strategy)
|
|
|
|
|
|
|
|
|
vector_store = self.chunker.create_vector_store(chunked_docs, store_type="faiss")
|
|
|
|
|
|
|
|
|
qa_chain = self.chunker.create_qa_chain(documents, self.llm)
|
|
|
|
|
|
|
|
|
self.vector_stores[pipeline_id] = vector_store
|
|
|
self.qa_chains[pipeline_id] = qa_chain
|
|
|
|
|
|
|
|
|
stats = {
|
|
|
'pipeline_id': pipeline_id,
|
|
|
'documents_processed': len(documents),
|
|
|
'chunks_created': len(chunked_docs),
|
|
|
'chunking_strategy': chunking_strategy,
|
|
|
'vector_store_type': 'faiss',
|
|
|
'embedding_model': str(self.embeddings),
|
|
|
'created_at': self._get_timestamp()
|
|
|
}
|
|
|
|
|
|
return {
|
|
|
'success': True,
|
|
|
'pipeline_stats': stats,
|
|
|
'chunking_info': self.chunker.get_chunking_stats(documents, chunking_strategy)
|
|
|
}
|
|
|
|
|
|
except Exception as e:
|
|
|
return {'error': f"Pipeline creation failed: {str(e)}"}
|
|
|
|
|
|
def query_pipeline(self, pipeline_id: str, query: str,
|
|
|
return_sources: bool = True) -> Dict[str, Any]:
|
|
|
"""
|
|
|
Query a created RAG pipeline
|
|
|
|
|
|
Args:
|
|
|
pipeline_id (str): ID of the pipeline to query
|
|
|
query (str): Question to ask
|
|
|
return_sources (bool): Whether to return source documents
|
|
|
|
|
|
Returns:
|
|
|
Dict: Query results with answer and sources
|
|
|
"""
|
|
|
try:
|
|
|
if pipeline_id not in self.qa_chains:
|
|
|
return {'error': f"Pipeline '{pipeline_id}' not found"}
|
|
|
|
|
|
qa_chain = self.qa_chains[pipeline_id]
|
|
|
|
|
|
|
|
|
result = qa_chain({"query": query})
|
|
|
|
|
|
|
|
|
response = {
|
|
|
'query': query,
|
|
|
'answer': result.get('result', 'No answer generated'),
|
|
|
'pipeline_id': pipeline_id,
|
|
|
'query_timestamp': self._get_timestamp()
|
|
|
}
|
|
|
|
|
|
|
|
|
if return_sources and 'source_documents' in result:
|
|
|
sources = []
|
|
|
for i, doc in enumerate(result['source_documents']):
|
|
|
source = {
|
|
|
'source_index': i,
|
|
|
'content': doc.page_content,
|
|
|
'metadata': doc.metadata,
|
|
|
'relevance_rank': i + 1
|
|
|
}
|
|
|
sources.append(source)
|
|
|
|
|
|
response['sources'] = sources
|
|
|
response['num_sources'] = len(sources)
|
|
|
|
|
|
return response
|
|
|
|
|
|
except Exception as e:
|
|
|
return {'error': f"Pipeline query failed: {str(e)}"}
|
|
|
|
|
|
def batch_query_pipeline(self, pipeline_id: str, queries: List[str]) -> List[Dict[str, Any]]:
|
|
|
"""
|
|
|
Execute multiple queries on a pipeline
|
|
|
|
|
|
Args:
|
|
|
pipeline_id (str): ID of the pipeline to query
|
|
|
queries (List[str]): List of questions to ask
|
|
|
|
|
|
Returns:
|
|
|
List[Dict]: List of query results
|
|
|
"""
|
|
|
results = []
|
|
|
|
|
|
for i, query in enumerate(queries):
|
|
|
try:
|
|
|
result = self.query_pipeline(pipeline_id, query, return_sources=False)
|
|
|
result['batch_index'] = i
|
|
|
results.append(result)
|
|
|
|
|
|
except Exception as e:
|
|
|
results.append({
|
|
|
'batch_index': i,
|
|
|
'query': query,
|
|
|
'error': f"Batch query failed: {str(e)}"
|
|
|
})
|
|
|
|
|
|
return results
|
|
|
|
|
|
def evaluate_pipeline(self, pipeline_id: str, test_queries: List[str],
|
|
|
expected_answers: List[str] = None) -> Dict[str, Any]:
|
|
|
"""
|
|
|
Evaluate pipeline performance on test queries
|
|
|
|
|
|
Args:
|
|
|
pipeline_id (str): ID of the pipeline to evaluate
|
|
|
test_queries (List[str]): Test questions
|
|
|
expected_answers (List[str]): Optional expected answers for comparison
|
|
|
|
|
|
Returns:
|
|
|
Dict: Evaluation results
|
|
|
"""
|
|
|
try:
|
|
|
if pipeline_id not in self.qa_chains:
|
|
|
return {'error': f"Pipeline '{pipeline_id}' not found"}
|
|
|
|
|
|
evaluation_results = []
|
|
|
response_times = []
|
|
|
|
|
|
for i, query in enumerate(test_queries):
|
|
|
import time
|
|
|
start_time = time.time()
|
|
|
|
|
|
|
|
|
result = self.query_pipeline(pipeline_id, query, return_sources=True)
|
|
|
|
|
|
end_time = time.time()
|
|
|
response_time = end_time - start_time
|
|
|
response_times.append(response_time)
|
|
|
|
|
|
|
|
|
eval_result = {
|
|
|
'query_index': i,
|
|
|
'query': query,
|
|
|
'answer_generated': not result.get('error'),
|
|
|
'response_time': response_time,
|
|
|
'answer_length': len(result.get('answer', '')),
|
|
|
'sources_returned': result.get('num_sources', 0)
|
|
|
}
|
|
|
|
|
|
|
|
|
if expected_answers and i < len(expected_answers):
|
|
|
expected = expected_answers[i]
|
|
|
generated = result.get('answer', '')
|
|
|
|
|
|
|
|
|
similarity = self._calculate_answer_similarity(expected, generated)
|
|
|
eval_result['answer_similarity'] = similarity
|
|
|
eval_result['expected_answer'] = expected
|
|
|
|
|
|
evaluation_results.append(eval_result)
|
|
|
|
|
|
|
|
|
successful_queries = len([r for r in evaluation_results if r['answer_generated']])
|
|
|
avg_response_time = np.mean(response_times) if response_times else 0
|
|
|
|
|
|
if expected_answers:
|
|
|
similarities = [r.get('answer_similarity', 0) for r in evaluation_results
|
|
|
if 'answer_similarity' in r]
|
|
|
avg_similarity = np.mean(similarities) if similarities else 0
|
|
|
else:
|
|
|
avg_similarity = None
|
|
|
|
|
|
return {
|
|
|
'pipeline_id': pipeline_id,
|
|
|
'total_queries': len(test_queries),
|
|
|
'successful_queries': successful_queries,
|
|
|
'success_rate': successful_queries / len(test_queries) if test_queries else 0,
|
|
|
'average_response_time': avg_response_time,
|
|
|
'average_answer_similarity': avg_similarity,
|
|
|
'detailed_results': evaluation_results,
|
|
|
'evaluation_timestamp': self._get_timestamp()
|
|
|
}
|
|
|
|
|
|
except Exception as e:
|
|
|
return {'error': f"Pipeline evaluation failed: {str(e)}"}
|
|
|
|
|
|
def _calculate_answer_similarity(self, expected: str, generated: str) -> float:
|
|
|
"""Calculate similarity between expected and generated answers"""
|
|
|
try:
|
|
|
|
|
|
expected_words = set(expected.lower().split())
|
|
|
generated_words = set(generated.lower().split())
|
|
|
|
|
|
if not expected_words and not generated_words:
|
|
|
return 1.0
|
|
|
|
|
|
intersection = expected_words.intersection(generated_words)
|
|
|
union = expected_words.union(generated_words)
|
|
|
|
|
|
return len(intersection) / len(union) if union else 0.0
|
|
|
|
|
|
except Exception:
|
|
|
return 0.0
|
|
|
|
|
|
def get_pipeline_info(self, pipeline_id: str) -> Dict[str, Any]:
|
|
|
"""Get information about a specific pipeline"""
|
|
|
try:
|
|
|
if pipeline_id not in self.qa_chains:
|
|
|
return {'error': f"Pipeline '{pipeline_id}' not found"}
|
|
|
|
|
|
|
|
|
vector_store = self.vector_stores.get(pipeline_id)
|
|
|
if vector_store:
|
|
|
try:
|
|
|
|
|
|
total_vectors = vector_store.index.ntotal if hasattr(vector_store, 'index') else 'unknown'
|
|
|
except:
|
|
|
total_vectors = 'unknown'
|
|
|
else:
|
|
|
total_vectors = 'unknown'
|
|
|
|
|
|
return {
|
|
|
'pipeline_id': pipeline_id,
|
|
|
'has_qa_chain': pipeline_id in self.qa_chains,
|
|
|
'has_vector_store': pipeline_id in self.vector_stores,
|
|
|
'total_vectors': total_vectors,
|
|
|
'embedding_model': str(self.embeddings),
|
|
|
'llm_model': str(self.llm)
|
|
|
}
|
|
|
|
|
|
except Exception as e:
|
|
|
return {'error': f"Failed to get pipeline info: {str(e)}"}
|
|
|
|
|
|
def list_pipelines(self) -> Dict[str, Any]:
|
|
|
"""List all created pipelines"""
|
|
|
return {
|
|
|
'total_pipelines': len(self.qa_chains),
|
|
|
'pipeline_ids': list(self.qa_chains.keys()),
|
|
|
'vector_stores': list(self.vector_stores.keys())
|
|
|
}
|
|
|
|
|
|
def delete_pipeline(self, pipeline_id: str) -> Dict[str, Any]:
|
|
|
"""Delete a pipeline and free resources"""
|
|
|
try:
|
|
|
deleted_components = []
|
|
|
|
|
|
if pipeline_id in self.qa_chains:
|
|
|
del self.qa_chains[pipeline_id]
|
|
|
deleted_components.append('qa_chain')
|
|
|
|
|
|
if pipeline_id in self.vector_stores:
|
|
|
del self.vector_stores[pipeline_id]
|
|
|
deleted_components.append('vector_store')
|
|
|
|
|
|
if deleted_components:
|
|
|
return {
|
|
|
'success': True,
|
|
|
'pipeline_id': pipeline_id,
|
|
|
'deleted_components': deleted_components
|
|
|
}
|
|
|
else:
|
|
|
return {'error': f"Pipeline '{pipeline_id}' not found"}
|
|
|
|
|
|
except Exception as e:
|
|
|
return {'error': f"Pipeline deletion failed: {str(e)}"}
|
|
|
|
|
|
def export_pipeline_config(self, pipeline_id: str) -> Dict[str, Any]:
|
|
|
"""Export pipeline configuration for recreation"""
|
|
|
try:
|
|
|
if pipeline_id not in self.qa_chains:
|
|
|
return {'error': f"Pipeline '{pipeline_id}' not found"}
|
|
|
|
|
|
config = {
|
|
|
'pipeline_id': pipeline_id,
|
|
|
'embedding_model_name': getattr(self.embeddings, 'model_name', 'unknown'),
|
|
|
'llm_model_name': getattr(self.llm, 'model_name', 'unknown'),
|
|
|
'chunker_config': {
|
|
|
'chunk_size': self.chunker.chunk_size,
|
|
|
'chunk_overlap': self.chunker.chunk_overlap
|
|
|
},
|
|
|
'export_timestamp': self._get_timestamp(),
|
|
|
'vector_store_type': 'faiss'
|
|
|
}
|
|
|
|
|
|
return config
|
|
|
|
|
|
except Exception as e:
|
|
|
return {'error': f"Pipeline export failed: {str(e)}"}
|
|
|
|
|
|
def _get_timestamp(self) -> str:
|
|
|
"""Get current timestamp"""
|
|
|
from datetime import datetime
|
|
|
return datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def optimize_rag_pipeline(documents: List[Document], embeddings_model, llm,
|
|
|
test_queries: List[str]) -> Dict[str, Any]:
|
|
|
"""
|
|
|
Optimize RAG pipeline configuration for given documents and queries
|
|
|
|
|
|
Args:
|
|
|
documents (List[Document]): Documents to optimize for
|
|
|
embeddings_model: Embedding model to use
|
|
|
llm: Language model to use
|
|
|
test_queries (List[str]): Test queries for optimization
|
|
|
|
|
|
Returns:
|
|
|
Dict: Optimization recommendations
|
|
|
"""
|
|
|
try:
|
|
|
|
|
|
chunker = VectorChunker(embeddings_model)
|
|
|
chunking_results = chunker.optimize_chunking_strategy(documents, test_queries)
|
|
|
|
|
|
|
|
|
optimizer = ChunkingOptimizer(embeddings_model)
|
|
|
size_results = optimizer.optimize_chunk_size(documents, test_queries)
|
|
|
|
|
|
|
|
|
best_strategy = chunking_results.get('recommended_strategy', 'semantic')
|
|
|
best_size = size_results.get('optimal_chunk_size', 1000)
|
|
|
|
|
|
|
|
|
optimized_chunker = VectorChunker(
|
|
|
embeddings_model,
|
|
|
chunk_size=best_size,
|
|
|
chunk_overlap=best_size // 5
|
|
|
)
|
|
|
|
|
|
|
|
|
pipeline = RAGPipeline(embeddings_model, llm)
|
|
|
pipeline.chunker = optimized_chunker
|
|
|
|
|
|
test_pipeline_id = "optimization_test"
|
|
|
creation_result = pipeline.create_pipeline(documents, test_pipeline_id, best_strategy)
|
|
|
|
|
|
if not creation_result.get('error'):
|
|
|
evaluation_result = pipeline.evaluate_pipeline(test_pipeline_id, test_queries)
|
|
|
pipeline.delete_pipeline(test_pipeline_id)
|
|
|
else:
|
|
|
evaluation_result = {'error': 'Could not evaluate optimized pipeline'}
|
|
|
|
|
|
return {
|
|
|
'optimization_complete': True,
|
|
|
'recommended_config': {
|
|
|
'chunking_strategy': best_strategy,
|
|
|
'chunk_size': best_size,
|
|
|
'chunk_overlap': best_size // 5
|
|
|
},
|
|
|
'chunking_optimization': chunking_results,
|
|
|
'size_optimization': size_results,
|
|
|
'performance_evaluation': evaluation_result,
|
|
|
'recommendations': [
|
|
|
f"Use {best_strategy} chunking strategy",
|
|
|
f"Set chunk size to {best_size} characters",
|
|
|
f"Use {best_size // 5} character overlap",
|
|
|
"Monitor and adjust based on query performance"
|
|
|
]
|
|
|
}
|
|
|
|
|
|
except Exception as e:
|
|
|
return {'error': f"RAG optimization failed: {str(e)}"}
|
|
|
|
|
|
|
|
|
def create_demo_rag_system(sample_documents: List[Document], embeddings_model, llm) -> Dict[str, Any]:
|
|
|
"""
|
|
|
Create a demonstration RAG system with sample documents
|
|
|
|
|
|
Args:
|
|
|
sample_documents (List[Document]): Sample documents for demo
|
|
|
embeddings_model: Embedding model
|
|
|
llm: Language model
|
|
|
|
|
|
Returns:
|
|
|
Dict: Demo system information and sample interactions
|
|
|
"""
|
|
|
try:
|
|
|
|
|
|
pipeline = RAGPipeline(embeddings_model, llm)
|
|
|
demo_id = "demo_system"
|
|
|
|
|
|
|
|
|
creation_result = pipeline.create_pipeline(sample_documents, demo_id, "semantic")
|
|
|
|
|
|
if creation_result.get('error'):
|
|
|
return {'error': f"Demo system creation failed: {creation_result['error']}"}
|
|
|
|
|
|
|
|
|
demo_queries = [
|
|
|
"What is the main topic of these documents?",
|
|
|
"Can you summarize the key points?",
|
|
|
"What are the most important concepts mentioned?"
|
|
|
]
|
|
|
|
|
|
|
|
|
demo_results = []
|
|
|
for query in demo_queries:
|
|
|
result = pipeline.query_pipeline(demo_id, query, return_sources=True)
|
|
|
demo_results.append(result)
|
|
|
|
|
|
|
|
|
pipeline_info = pipeline.get_pipeline_info(demo_id)
|
|
|
|
|
|
return {
|
|
|
'demo_system_created': True,
|
|
|
'pipeline_id': demo_id,
|
|
|
'creation_stats': creation_result,
|
|
|
'pipeline_info': pipeline_info,
|
|
|
'demo_queries': demo_queries,
|
|
|
'demo_results': demo_results,
|
|
|
'usage_instructions': [
|
|
|
f"Use pipeline.query_pipeline('{demo_id}', 'your question') to ask questions",
|
|
|
"The system will return answers with source document references",
|
|
|
"Sources show which parts of the documents were used for the answer"
|
|
|
]
|
|
|
}
|
|
|
|
|
|
except Exception as e:
|
|
|
return {'error': f"Demo system creation failed: {str(e)}"}
|
|
|
|
|
|
|
|
|
|
|
|
__all__ = [
|
|
|
'VectorChunker',
|
|
|
'ChunkingOptimizer',
|
|
|
'RAGPipeline',
|
|
|
'optimize_rag_pipeline',
|
|
|
'create_demo_rag_system'
|
|
|
] |