Spaces:
Sleeping
Sleeping
| # version_rag.py - Core VersionRAG Implementation (OpenAI Embeddings) | |
| import chromadb | |
| from chromadb.config import Settings | |
| from langchain.text_splitter import RecursiveCharacterTextSplitter | |
| from langchain_openai import OpenAIEmbeddings, ChatOpenAI | |
| from typing import List, Dict, Optional | |
| import os | |
| from datetime import datetime | |
| import uuid | |
| class VersionRAG: | |
| """Version-Aware RAG System with Graph + Vector Store""" | |
| def __init__(self, user_id: str, model_name: str = "gpt-3.5-turbo", | |
| embedding_model: str = "text-embedding-3-small"): | |
| self.user_id = user_id | |
| self.model_name = model_name | |
| # Initialize embeddings - Using OpenAI instead of sentence-transformers | |
| self.embeddings = OpenAIEmbeddings(model=embedding_model) | |
| # Initialize ChromaDB with persistence | |
| persist_dir = f"./chroma_db_{user_id}" | |
| os.makedirs(persist_dir, exist_ok=True) | |
| self.chroma_client = chromadb.PersistentClient(path=persist_dir) | |
| # Create collection with tenant metadata | |
| collection_name = f"versionrag_{user_id}" | |
| try: | |
| self.collection = self.chroma_client.get_collection(name=collection_name) | |
| except: | |
| self.collection = self.chroma_client.create_collection( | |
| name=collection_name, | |
| metadata={"tenant_id": user_id} | |
| ) | |
| # Initialize LLM | |
| self.llm = ChatOpenAI( | |
| model_name=model_name, | |
| temperature=0 | |
| ) | |
| # Text splitter | |
| self.text_splitter = RecursiveCharacterTextSplitter( | |
| chunk_size=1000, | |
| chunk_overlap=200, | |
| length_function=len | |
| ) | |
| self.documents = [] | |
| self.metadatas = [] | |
| self.graph_manager = None | |
| def set_graph_manager(self, graph_manager): | |
| """Set graph manager for version tracking""" | |
| self.graph_manager = graph_manager | |
| def add_documents(self, texts: List[str], metadatas: List[Dict], changes: Optional[List[Dict]] = None): | |
| """Add documents to the vector store with version metadata and changes""" | |
| all_chunks = [] | |
| all_chunk_metadatas = [] | |
| all_ids = [] | |
| for idx, (text, metadata) in enumerate(zip(texts, metadatas)): | |
| # Split text into chunks | |
| chunks = self.text_splitter.split_text(text) | |
| # Add tenant_id to metadata | |
| for chunk_idx, chunk in enumerate(chunks): | |
| chunk_metadata = metadata.copy() | |
| chunk_metadata['tenant_id'] = self.user_id | |
| chunk_metadata['chunk_id'] = len(all_chunks) | |
| chunk_metadata['doc_type'] = 'content' | |
| all_chunks.append(chunk) | |
| all_chunk_metadatas.append(chunk_metadata) | |
| all_ids.append(f"{self.user_id}_content_{uuid.uuid4()}") | |
| # Add change information if provided | |
| if changes and idx < len(changes) and changes[idx]: | |
| change_info = changes[idx] | |
| # Add additions as separate chunks | |
| for addition in change_info.get('additions', [])[:20]: | |
| if len(addition.strip()) > 10: | |
| change_metadata = metadata.copy() | |
| change_metadata['tenant_id'] = self.user_id | |
| change_metadata['doc_type'] = 'change' | |
| change_metadata['change_type'] = 'addition' | |
| all_chunks.append(f"[ADDITION in {metadata.get('version')}] {addition}") | |
| all_chunk_metadatas.append(change_metadata) | |
| all_ids.append(f"{self.user_id}_change_{uuid.uuid4()}") | |
| # Add deletions as separate chunks | |
| for deletion in change_info.get('deletions', [])[:20]: | |
| if len(deletion.strip()) > 10: | |
| change_metadata = metadata.copy() | |
| change_metadata['tenant_id'] = self.user_id | |
| change_metadata['doc_type'] = 'change' | |
| change_metadata['change_type'] = 'deletion' | |
| all_chunks.append(f"[DELETION in {metadata.get('version')}] {deletion}") | |
| all_chunk_metadatas.append(change_metadata) | |
| all_ids.append(f"{self.user_id}_change_{uuid.uuid4()}") | |
| # Add modifications as separate chunks | |
| for modification in change_info.get('modifications', [])[:20]: | |
| if len(modification.strip()) > 10: | |
| change_metadata = metadata.copy() | |
| change_metadata['tenant_id'] = self.user_id | |
| change_metadata['doc_type'] = 'change' | |
| change_metadata['change_type'] = 'modification' | |
| all_chunks.append(f"[MODIFICATION in {metadata.get('version')}] {modification}") | |
| all_chunk_metadatas.append(change_metadata) | |
| all_ids.append(f"{self.user_id}_change_{uuid.uuid4()}") | |
| # Add to ChromaDB | |
| if all_chunks: | |
| embeddings = self.embeddings.embed_documents(all_chunks) | |
| self.collection.add( | |
| embeddings=embeddings, | |
| documents=all_chunks, | |
| metadatas=all_chunk_metadatas, | |
| ids=all_ids | |
| ) | |
| self.documents.extend(all_chunks) | |
| self.metadatas.extend(all_chunk_metadatas) | |
| def query(self, query: str, version_filter: Optional[str] = None, | |
| top_k: int = 5) -> Dict: | |
| """Query with version awareness""" | |
| # Embed query | |
| query_embedding = self.embeddings.embed_query(query) | |
| # Build where clause for filtering | |
| if version_filter: | |
| where = { | |
| "$and": [ | |
| {"tenant_id": self.user_id}, | |
| {"doc_type": "content"}, | |
| {"version": version_filter} | |
| ] | |
| } | |
| else: | |
| where = { | |
| "$and": [ | |
| {"tenant_id": self.user_id}, | |
| {"doc_type": "content"} | |
| ] | |
| } | |
| # Query ChromaDB | |
| try: | |
| results = self.collection.query( | |
| query_embeddings=[query_embedding], | |
| n_results=top_k, | |
| where=where | |
| ) | |
| except Exception as e: | |
| return { | |
| 'answer': f"Error querying database: {str(e)}", | |
| 'sources': [] | |
| } | |
| # Extract results | |
| if not results['documents'][0]: | |
| return { | |
| 'answer': "No relevant documents found.", | |
| 'sources': [] | |
| } | |
| # Prepare context | |
| context_docs = results['documents'][0] | |
| context_metadatas = results['metadatas'][0] | |
| distances = results['distances'][0] | |
| # Build context string | |
| context = "\n\n".join([ | |
| f"[Version {meta.get('version', 'N/A')} - {meta.get('topic', 'Unknown')}]\n{doc}" | |
| for doc, meta in zip(context_docs, context_metadatas) | |
| ]) | |
| # Generate answer using LLM | |
| prompt = f"""Based on the following context, answer the question. | |
| If the answer includes version-specific information, explicitly mention the version. | |
| Be precise and cite the version when relevant. | |
| Context: | |
| {context} | |
| Question: {query} | |
| Answer:""" | |
| try: | |
| response = self.llm.invoke(prompt) | |
| answer = response.content if hasattr(response, 'content') else str(response) | |
| except Exception as e: | |
| answer = f"Error generating answer: {str(e)}" | |
| # Prepare sources | |
| sources = [] | |
| for doc, meta, dist in zip(context_docs, context_metadatas, distances): | |
| sources.append({ | |
| 'content': doc, | |
| 'version': meta.get('version', 'N/A'), | |
| 'filename': meta.get('filename', 'N/A'), | |
| 'domain': meta.get('domain', 'N/A'), | |
| 'topic': meta.get('topic', 'N/A'), | |
| 'similarity': 1 - dist | |
| }) | |
| return { | |
| 'answer': answer, | |
| 'sources': sources, | |
| 'context': context | |
| } | |
| def version_inquiry(self, query: str) -> Dict: | |
| """Handle version-specific inquiries using graph""" | |
| if self.graph_manager: | |
| documents = self.graph_manager.get_all_documents() | |
| relevant_docs = [] | |
| query_lower = query.lower() | |
| for doc in documents: | |
| if any(word in doc.lower() for word in query_lower.split()): | |
| relevant_docs.append(doc) | |
| if relevant_docs: | |
| answer = f"Found version information for {len(relevant_docs)} document(s):\n\n" | |
| versions_found = [] | |
| for doc in relevant_docs: | |
| versions = self.graph_manager.get_document_versions(doc) | |
| versions_found.extend(versions) | |
| answer += f"**{doc}**\n" | |
| answer += f"- Versions: {', '.join(versions)}\n" | |
| for version in versions: | |
| info = self.graph_manager.get_version_info(doc, version) | |
| if info: | |
| answer += f" - {version}: {info.get('timestamp', 'N/A')}\n" | |
| answer += "\n" | |
| return { | |
| 'answer': answer, | |
| 'sources': [], | |
| 'versions': list(set(versions_found)) | |
| } | |
| # Fallback to vector search | |
| query_embedding = self.embeddings.embed_query(query) | |
| results = self.collection.query( | |
| query_embeddings=[query_embedding], | |
| n_results=20, | |
| where={ | |
| "$and": [ | |
| {"tenant_id": self.user_id}, | |
| {"doc_type": "content"} | |
| ] | |
| } | |
| ) | |
| versions = set() | |
| version_info = {} | |
| for meta in results['metadatas'][0]: | |
| version = meta.get('version', 'N/A') | |
| if version != 'N/A': | |
| versions.add(version) | |
| if version not in version_info: | |
| version_info[version] = { | |
| 'filename': meta.get('filename', 'N/A'), | |
| 'domain': meta.get('domain', 'N/A'), | |
| 'topic': meta.get('topic', 'N/A') | |
| } | |
| version_list = ", ".join(sorted(versions)) | |
| answer = f"Found {len(versions)} version(s): {version_list}\n\n" | |
| for version in sorted(versions): | |
| info = version_info[version] | |
| answer += f"- **{version}**: {info['topic']} ({info['domain']})\n" | |
| return { | |
| 'answer': answer, | |
| 'sources': [], | |
| 'versions': list(versions) | |
| } | |
| def change_retrieval(self, query: str) -> Dict: | |
| """Retrieve change information between versions""" | |
| query_embedding = self.embeddings.embed_query(query) | |
| try: | |
| results = self.collection.query( | |
| query_embeddings=[query_embedding], | |
| n_results=10, | |
| where={ | |
| "$and": [ | |
| {"tenant_id": self.user_id}, | |
| {"doc_type": "change"} | |
| ] | |
| } | |
| ) | |
| except: | |
| results = self.collection.query( | |
| query_embeddings=[query_embedding], | |
| n_results=10, | |
| where={"tenant_id": self.user_id} | |
| ) | |
| if results['documents'][0] and results['metadatas'][0]: | |
| changes = [] | |
| for doc, meta in zip(results['documents'][0], results['metadatas'][0]): | |
| if meta.get('doc_type') == 'change': | |
| changes.append({ | |
| 'content': doc, | |
| 'version': meta.get('version', 'N/A'), | |
| 'change_type': meta.get('change_type', 'unknown'), | |
| 'filename': meta.get('filename', 'N/A'), | |
| 'topic': meta.get('topic', 'N/A') | |
| }) | |
| if changes: | |
| answer = "Changes detected:\n\n" | |
| for change in changes[:5]: | |
| answer += f"**[{change['version']} - {change['change_type'].upper()}]**\n" | |
| answer += f"Topic: {change['topic']}\n" | |
| answer += f"{change['content']}\n\n" | |
| return { | |
| 'answer': answer, | |
| 'sources': changes | |
| } | |
| context_results = self.collection.query( | |
| query_embeddings=[query_embedding], | |
| n_results=5, | |
| where={"tenant_id": self.user_id} | |
| ) | |
| if context_results['documents'][0]: | |
| context = "\n\n".join(context_results['documents'][0]) | |
| prompt = f"""Based on the context, identify and describe any changes, additions, deletions, or modifications mentioned. | |
| Context: | |
| {context} | |
| Question: {query} | |
| Answer:""" | |
| try: | |
| response = self.llm.invoke(prompt) | |
| answer = response.content if hasattr(response, 'content') else str(response) | |
| except: | |
| answer = "Unable to determine changes." | |
| else: | |
| answer = "No change information found." | |
| return { | |
| 'answer': answer, | |
| 'sources': context_results['metadatas'][0][:5] if context_results['metadatas'][0] else [] | |
| } | |
| class BaselineRAG: | |
| """Standard RAG system without version awareness""" | |
| def __init__(self, user_id: str, model_name: str = "gpt-3.5-turbo", | |
| embedding_model: str = "text-embedding-3-small"): | |
| self.user_id = user_id | |
| self.model_name = model_name | |
| # Initialize embeddings - Using OpenAI | |
| self.embeddings = OpenAIEmbeddings(model=embedding_model) | |
| persist_dir = f"./chroma_baseline_{user_id}" | |
| os.makedirs(persist_dir, exist_ok=True) | |
| self.chroma_client = chromadb.PersistentClient(path=persist_dir) | |
| collection_name = f"baseline_{user_id}" | |
| try: | |
| self.collection = self.chroma_client.get_collection(name=collection_name) | |
| except: | |
| self.collection = self.chroma_client.create_collection(name=collection_name) | |
| self.llm = ChatOpenAI( | |
| model_name=model_name, | |
| temperature=0 | |
| ) | |
| self.text_splitter = RecursiveCharacterTextSplitter( | |
| chunk_size=1000, | |
| chunk_overlap=200 | |
| ) | |
| def add_documents(self, texts: List[str], metadatas: List[Dict]): | |
| """Add documents to vector store""" | |
| all_chunks = [] | |
| all_metadatas = [] | |
| all_ids = [] | |
| for text, metadata in zip(texts, metadatas): | |
| chunks = self.text_splitter.split_text(text) | |
| for chunk in chunks: | |
| all_chunks.append(chunk) | |
| all_metadatas.append(metadata.copy()) | |
| all_ids.append(f"baseline_{self.user_id}_{uuid.uuid4()}") | |
| if all_chunks: | |
| embeddings = self.embeddings.embed_documents(all_chunks) | |
| self.collection.add( | |
| embeddings=embeddings, | |
| documents=all_chunks, | |
| metadatas=all_metadatas, | |
| ids=all_ids | |
| ) | |
| def query(self, query: str, top_k: int = 5) -> Dict: | |
| """Standard query without version awareness""" | |
| query_embedding = self.embeddings.embed_query(query) | |
| try: | |
| results = self.collection.query( | |
| query_embeddings=[query_embedding], | |
| n_results=top_k | |
| ) | |
| except Exception as e: | |
| return { | |
| 'answer': f"Error: {str(e)}", | |
| 'sources': [] | |
| } | |
| if not results['documents'][0]: | |
| return { | |
| 'answer': "No relevant documents found.", | |
| 'sources': [] | |
| } | |
| context = "\n\n".join(results['documents'][0]) | |
| prompt = f"""Based on the following context, answer the question. | |
| Context: | |
| {context} | |
| Question: {query} | |
| Answer:""" | |
| try: | |
| response = self.llm.invoke(prompt) | |
| answer = response.content if hasattr(response, 'content') else str(response) | |
| except Exception as e: | |
| answer = f"Error: {str(e)}" | |
| sources = [ | |
| {'content': doc, 'metadata': meta} | |
| for doc, meta in zip(results['documents'][0], results['metadatas'][0]) | |
| ] | |
| return { | |
| 'answer': answer, | |
| 'sources': sources | |
| } |