# version_rag.py - Core VersionRAG Implementation (OpenAI Embeddings) import chromadb from chromadb.config import Settings from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain_openai import OpenAIEmbeddings, ChatOpenAI from typing import List, Dict, Optional import os from datetime import datetime import uuid class VersionRAG: """Version-Aware RAG System with Graph + Vector Store""" def __init__(self, user_id: str, model_name: str = "gpt-3.5-turbo", embedding_model: str = "text-embedding-3-small"): self.user_id = user_id self.model_name = model_name # Initialize embeddings - Using OpenAI instead of sentence-transformers self.embeddings = OpenAIEmbeddings(model=embedding_model) # Initialize ChromaDB with persistence persist_dir = f"./chroma_db_{user_id}" os.makedirs(persist_dir, exist_ok=True) self.chroma_client = chromadb.PersistentClient(path=persist_dir) # Create collection with tenant metadata collection_name = f"versionrag_{user_id}" try: self.collection = self.chroma_client.get_collection(name=collection_name) except: self.collection = self.chroma_client.create_collection( name=collection_name, metadata={"tenant_id": user_id} ) # Initialize LLM self.llm = ChatOpenAI( model_name=model_name, temperature=0 ) # Text splitter self.text_splitter = RecursiveCharacterTextSplitter( chunk_size=1000, chunk_overlap=200, length_function=len ) self.documents = [] self.metadatas = [] self.graph_manager = None def set_graph_manager(self, graph_manager): """Set graph manager for version tracking""" self.graph_manager = graph_manager def add_documents(self, texts: List[str], metadatas: List[Dict], changes: Optional[List[Dict]] = None): """Add documents to the vector store with version metadata and changes""" all_chunks = [] all_chunk_metadatas = [] all_ids = [] for idx, (text, metadata) in enumerate(zip(texts, metadatas)): # Split text into chunks chunks = self.text_splitter.split_text(text) # Add tenant_id to metadata for chunk_idx, chunk in enumerate(chunks): chunk_metadata = metadata.copy() chunk_metadata['tenant_id'] = self.user_id chunk_metadata['chunk_id'] = len(all_chunks) chunk_metadata['doc_type'] = 'content' all_chunks.append(chunk) all_chunk_metadatas.append(chunk_metadata) all_ids.append(f"{self.user_id}_content_{uuid.uuid4()}") # Add change information if provided if changes and idx < len(changes) and changes[idx]: change_info = changes[idx] # Add additions as separate chunks for addition in change_info.get('additions', [])[:20]: if len(addition.strip()) > 10: change_metadata = metadata.copy() change_metadata['tenant_id'] = self.user_id change_metadata['doc_type'] = 'change' change_metadata['change_type'] = 'addition' all_chunks.append(f"[ADDITION in {metadata.get('version')}] {addition}") all_chunk_metadatas.append(change_metadata) all_ids.append(f"{self.user_id}_change_{uuid.uuid4()}") # Add deletions as separate chunks for deletion in change_info.get('deletions', [])[:20]: if len(deletion.strip()) > 10: change_metadata = metadata.copy() change_metadata['tenant_id'] = self.user_id change_metadata['doc_type'] = 'change' change_metadata['change_type'] = 'deletion' all_chunks.append(f"[DELETION in {metadata.get('version')}] {deletion}") all_chunk_metadatas.append(change_metadata) all_ids.append(f"{self.user_id}_change_{uuid.uuid4()}") # Add modifications as separate chunks for modification in change_info.get('modifications', [])[:20]: if len(modification.strip()) > 10: change_metadata = metadata.copy() change_metadata['tenant_id'] = self.user_id change_metadata['doc_type'] = 'change' change_metadata['change_type'] = 'modification' all_chunks.append(f"[MODIFICATION in {metadata.get('version')}] {modification}") all_chunk_metadatas.append(change_metadata) all_ids.append(f"{self.user_id}_change_{uuid.uuid4()}") # Add to ChromaDB if all_chunks: embeddings = self.embeddings.embed_documents(all_chunks) self.collection.add( embeddings=embeddings, documents=all_chunks, metadatas=all_chunk_metadatas, ids=all_ids ) self.documents.extend(all_chunks) self.metadatas.extend(all_chunk_metadatas) def query(self, query: str, version_filter: Optional[str] = None, top_k: int = 5) -> Dict: """Query with version awareness""" # Embed query query_embedding = self.embeddings.embed_query(query) # Build where clause for filtering if version_filter: where = { "$and": [ {"tenant_id": self.user_id}, {"doc_type": "content"}, {"version": version_filter} ] } else: where = { "$and": [ {"tenant_id": self.user_id}, {"doc_type": "content"} ] } # Query ChromaDB try: results = self.collection.query( query_embeddings=[query_embedding], n_results=top_k, where=where ) except Exception as e: return { 'answer': f"Error querying database: {str(e)}", 'sources': [] } # Extract results if not results['documents'][0]: return { 'answer': "No relevant documents found.", 'sources': [] } # Prepare context context_docs = results['documents'][0] context_metadatas = results['metadatas'][0] distances = results['distances'][0] # Build context string context = "\n\n".join([ f"[Version {meta.get('version', 'N/A')} - {meta.get('topic', 'Unknown')}]\n{doc}" for doc, meta in zip(context_docs, context_metadatas) ]) # Generate answer using LLM prompt = f"""Based on the following context, answer the question. If the answer includes version-specific information, explicitly mention the version. Be precise and cite the version when relevant. Context: {context} Question: {query} Answer:""" try: response = self.llm.invoke(prompt) answer = response.content if hasattr(response, 'content') else str(response) except Exception as e: answer = f"Error generating answer: {str(e)}" # Prepare sources sources = [] for doc, meta, dist in zip(context_docs, context_metadatas, distances): sources.append({ 'content': doc, 'version': meta.get('version', 'N/A'), 'filename': meta.get('filename', 'N/A'), 'domain': meta.get('domain', 'N/A'), 'topic': meta.get('topic', 'N/A'), 'similarity': 1 - dist }) return { 'answer': answer, 'sources': sources, 'context': context } def version_inquiry(self, query: str) -> Dict: """Handle version-specific inquiries using graph""" if self.graph_manager: documents = self.graph_manager.get_all_documents() relevant_docs = [] query_lower = query.lower() for doc in documents: if any(word in doc.lower() for word in query_lower.split()): relevant_docs.append(doc) if relevant_docs: answer = f"Found version information for {len(relevant_docs)} document(s):\n\n" versions_found = [] for doc in relevant_docs: versions = self.graph_manager.get_document_versions(doc) versions_found.extend(versions) answer += f"**{doc}**\n" answer += f"- Versions: {', '.join(versions)}\n" for version in versions: info = self.graph_manager.get_version_info(doc, version) if info: answer += f" - {version}: {info.get('timestamp', 'N/A')}\n" answer += "\n" return { 'answer': answer, 'sources': [], 'versions': list(set(versions_found)) } # Fallback to vector search query_embedding = self.embeddings.embed_query(query) results = self.collection.query( query_embeddings=[query_embedding], n_results=20, where={ "$and": [ {"tenant_id": self.user_id}, {"doc_type": "content"} ] } ) versions = set() version_info = {} for meta in results['metadatas'][0]: version = meta.get('version', 'N/A') if version != 'N/A': versions.add(version) if version not in version_info: version_info[version] = { 'filename': meta.get('filename', 'N/A'), 'domain': meta.get('domain', 'N/A'), 'topic': meta.get('topic', 'N/A') } version_list = ", ".join(sorted(versions)) answer = f"Found {len(versions)} version(s): {version_list}\n\n" for version in sorted(versions): info = version_info[version] answer += f"- **{version}**: {info['topic']} ({info['domain']})\n" return { 'answer': answer, 'sources': [], 'versions': list(versions) } def change_retrieval(self, query: str) -> Dict: """Retrieve change information between versions""" query_embedding = self.embeddings.embed_query(query) try: results = self.collection.query( query_embeddings=[query_embedding], n_results=10, where={ "$and": [ {"tenant_id": self.user_id}, {"doc_type": "change"} ] } ) except: results = self.collection.query( query_embeddings=[query_embedding], n_results=10, where={"tenant_id": self.user_id} ) if results['documents'][0] and results['metadatas'][0]: changes = [] for doc, meta in zip(results['documents'][0], results['metadatas'][0]): if meta.get('doc_type') == 'change': changes.append({ 'content': doc, 'version': meta.get('version', 'N/A'), 'change_type': meta.get('change_type', 'unknown'), 'filename': meta.get('filename', 'N/A'), 'topic': meta.get('topic', 'N/A') }) if changes: answer = "Changes detected:\n\n" for change in changes[:5]: answer += f"**[{change['version']} - {change['change_type'].upper()}]**\n" answer += f"Topic: {change['topic']}\n" answer += f"{change['content']}\n\n" return { 'answer': answer, 'sources': changes } context_results = self.collection.query( query_embeddings=[query_embedding], n_results=5, where={"tenant_id": self.user_id} ) if context_results['documents'][0]: context = "\n\n".join(context_results['documents'][0]) prompt = f"""Based on the context, identify and describe any changes, additions, deletions, or modifications mentioned. Context: {context} Question: {query} Answer:""" try: response = self.llm.invoke(prompt) answer = response.content if hasattr(response, 'content') else str(response) except: answer = "Unable to determine changes." else: answer = "No change information found." return { 'answer': answer, 'sources': context_results['metadatas'][0][:5] if context_results['metadatas'][0] else [] } class BaselineRAG: """Standard RAG system without version awareness""" def __init__(self, user_id: str, model_name: str = "gpt-3.5-turbo", embedding_model: str = "text-embedding-3-small"): self.user_id = user_id self.model_name = model_name # Initialize embeddings - Using OpenAI self.embeddings = OpenAIEmbeddings(model=embedding_model) persist_dir = f"./chroma_baseline_{user_id}" os.makedirs(persist_dir, exist_ok=True) self.chroma_client = chromadb.PersistentClient(path=persist_dir) collection_name = f"baseline_{user_id}" try: self.collection = self.chroma_client.get_collection(name=collection_name) except: self.collection = self.chroma_client.create_collection(name=collection_name) self.llm = ChatOpenAI( model_name=model_name, temperature=0 ) self.text_splitter = RecursiveCharacterTextSplitter( chunk_size=1000, chunk_overlap=200 ) def add_documents(self, texts: List[str], metadatas: List[Dict]): """Add documents to vector store""" all_chunks = [] all_metadatas = [] all_ids = [] for text, metadata in zip(texts, metadatas): chunks = self.text_splitter.split_text(text) for chunk in chunks: all_chunks.append(chunk) all_metadatas.append(metadata.copy()) all_ids.append(f"baseline_{self.user_id}_{uuid.uuid4()}") if all_chunks: embeddings = self.embeddings.embed_documents(all_chunks) self.collection.add( embeddings=embeddings, documents=all_chunks, metadatas=all_metadatas, ids=all_ids ) def query(self, query: str, top_k: int = 5) -> Dict: """Standard query without version awareness""" query_embedding = self.embeddings.embed_query(query) try: results = self.collection.query( query_embeddings=[query_embedding], n_results=top_k ) except Exception as e: return { 'answer': f"Error: {str(e)}", 'sources': [] } if not results['documents'][0]: return { 'answer': "No relevant documents found.", 'sources': [] } context = "\n\n".join(results['documents'][0]) prompt = f"""Based on the following context, answer the question. Context: {context} Question: {query} Answer:""" try: response = self.llm.invoke(prompt) answer = response.content if hasattr(response, 'content') else str(response) except Exception as e: answer = f"Error: {str(e)}" sources = [ {'content': doc, 'metadata': meta} for doc, meta in zip(results['documents'][0], results['metadatas'][0]) ] return { 'answer': answer, 'sources': sources }