Spaces:
Sleeping
Sleeping
| """ | |
| RAG (Retrieval-Augmented Generation) Service. | |
| Unified RAG interface with MongoDB Atlas as primary and in-memory fallback. | |
| Supports both role-specific retrieval (via MongoDB) and generic retrieval. | |
| Features: | |
| - MongoDB Atlas Vector Search for production (agent-specific collections) | |
| - In-memory vectorstore fallback for local development | |
| - LangChain Retriever interface for RAG chains | |
| - Document ingestion from corpus directory | |
| """ | |
| import os | |
| from pathlib import Path | |
| from typing import Any | |
| from dotenv import load_dotenv | |
| from langchain_core.documents import Document | |
| from langchain_core.retrievers import BaseRetriever | |
| from langchain_core.vectorstores import InMemoryVectorStore, VectorStore | |
| from langchain_text_splitters import RecursiveCharacterTextSplitter | |
| from .llm_factory import get_embeddings_model | |
| from .mongodb_rag import MongoDBRAGService, get_mongodb_rag_service | |
| from .observability import get_logger | |
| from .schemas import TeamRole | |
| load_dotenv() | |
| logger = get_logger("rag") | |
| # Define paths | |
| BASE_DIR = Path(__file__).resolve().parents[2] | |
| CORPUS_DIR = BASE_DIR / "corpus_rag" | |
| VECTOR_STORE_PATH = BASE_DIR / "public" / "vector_store" | |
| # Error messages | |
| ERR_VECTOR_STORE_NOT_INIT = "Vector store not initialized" | |
| class RAGService: | |
| """ | |
| Unified RAG Service with MongoDB primary and in-memory fallback. | |
| Priority order: | |
| 1. MongoDB Atlas Vector Search (if MONGODB_URI configured) | |
| 2. In-memory vectorstore (development fallback) | |
| For role-specific retrieval, use the `role` parameter in retrieve/get_retriever. | |
| When role is provided and MongoDB is available, retrieval is from agent-specific | |
| collections for more relevant examples. | |
| """ | |
| def __init__(self): | |
| self.embeddings = get_embeddings_model() | |
| self._mongodb_service: MongoDBRAGService | None = None | |
| self._fallback_store: VectorStore | None = None | |
| self._initialize() | |
| def _initialize(self) -> None: | |
| """Initialize RAG backends in priority order.""" | |
| # Try MongoDB first | |
| if os.getenv("MONGODB_URI"): | |
| self._mongodb_service = get_mongodb_rag_service() | |
| if self._mongodb_service.is_available(): | |
| logger.info("Using MongoDB Atlas for RAG (primary)") | |
| # Still initialize fallback for non-role-specific queries | |
| self._init_fallback_store() | |
| return | |
| # Fallback to in-memory only | |
| logger.info("Using in-memory vector store only (MongoDB unavailable)") | |
| self._init_fallback_store() | |
| def _init_fallback_store(self) -> None: | |
| """Initialize fallback in-memory vectorstore from corpus.""" | |
| if not CORPUS_DIR.exists(): | |
| logger.warning(f"Corpus directory not found: {CORPUS_DIR}") | |
| self._fallback_store = InMemoryVectorStore(embedding=self.embeddings) | |
| return | |
| # Load and split documents | |
| documents = self._load_documents() | |
| if not documents: | |
| logger.warning("No documents found for fallback store") | |
| self._fallback_store = InMemoryVectorStore(embedding=self.embeddings) | |
| return | |
| chunks = self._split_documents(documents) | |
| logger.info(f"Created {len(chunks)} chunks from {len(documents)} documents") | |
| self._fallback_store = InMemoryVectorStore.from_documents( | |
| documents=chunks, | |
| embedding=self.embeddings, | |
| ) | |
| logger.info("Fallback in-memory vectorstore initialized") | |
| def _load_documents(self) -> list[Document]: | |
| """Load documents from corpus directory.""" | |
| documents = [] | |
| for ext in ["*.md", "*.yaml", "*.txt"]: | |
| for file_path in CORPUS_DIR.glob(ext): | |
| try: | |
| text = file_path.read_text(encoding="utf-8") | |
| doc = Document( | |
| page_content=text, | |
| metadata={ | |
| "source": str(file_path.name), | |
| "file_type": file_path.suffix, | |
| }, | |
| ) | |
| documents.append(doc) | |
| except Exception as e: | |
| logger.warning(f"Error loading {file_path}: {e}") | |
| return documents | |
| def _split_documents(self, documents: list[Document]) -> list[Document]: | |
| """Split documents into chunks for embedding.""" | |
| text_splitter = RecursiveCharacterTextSplitter( | |
| chunk_size=1000, | |
| chunk_overlap=200, | |
| add_start_index=True, | |
| separators=["\n\n", "\n", ". ", " ", ""], | |
| ) | |
| return text_splitter.split_documents(documents) | |
| def retrieve( | |
| self, | |
| query: str, | |
| role: TeamRole | None = None, | |
| k: int = 3, | |
| ) -> list[Document]: | |
| """ | |
| Retrieve relevant documents for a query. | |
| Args: | |
| query: Search query (project description or context) | |
| role: Optional agent role for role-specific retrieval (MongoDB only) | |
| k: Number of documents to retrieve | |
| Returns: | |
| List of relevant documents | |
| """ | |
| # Use MongoDB for role-specific retrieval if available | |
| if self._mongodb_service and self._mongodb_service.is_available() and role: | |
| docs = self._mongodb_service.retrieve(query, role, k) | |
| if docs: | |
| return docs | |
| # Fall through to fallback if no MongoDB results | |
| # Fallback retrieval | |
| if self._fallback_store: | |
| try: | |
| docs = self._fallback_store.similarity_search(query, k=k) | |
| logger.debug(f"Retrieved {len(docs)} documents from fallback store") | |
| return docs | |
| except Exception as e: | |
| logger.error(f"Error during fallback retrieval: {e}") | |
| return [] | |
| logger.warning("No vector store available for retrieval") | |
| return [] | |
| def get_retriever( | |
| self, | |
| role: TeamRole | None = None, | |
| k: int = 3, | |
| filter: dict[str, Any] | None = None, | |
| search_type: str = "similarity", | |
| ) -> BaseRetriever | None: | |
| """ | |
| Get a LangChain Retriever for RAG chains. | |
| Args: | |
| role: Optional agent role for role-specific retrieval | |
| k: Number of documents to retrieve | |
| filter: Optional filter dict (fallback store only) | |
| search_type: "similarity" or "mmr" | |
| Returns: | |
| LangChain BaseRetriever or None | |
| """ | |
| # Use MongoDB for role-specific retrieval if available | |
| if self._mongodb_service and self._mongodb_service.is_available() and role: | |
| retriever = self._mongodb_service.get_retriever(role, k, search_type) | |
| if retriever: | |
| return retriever | |
| # Fallback retriever | |
| if self._fallback_store: | |
| search_kwargs: dict[str, Any] = {"k": k} | |
| if filter: | |
| search_kwargs["filter"] = filter | |
| return self._fallback_store.as_retriever( | |
| search_type=search_type, | |
| search_kwargs=search_kwargs, | |
| ) | |
| raise RuntimeError("RAG retriever not initialized") | |
| def format_docs(self, docs: list[Document]) -> str: | |
| """ | |
| Format retrieved documents into a string for context injection. | |
| Uses MongoDB service formatter if available (includes role metadata), | |
| otherwise uses simple formatting. | |
| """ | |
| if self._mongodb_service and self._mongodb_service.is_available(): | |
| return self._mongodb_service.format_docs(docs) | |
| if not docs: | |
| return "No relevant context found." | |
| return "\n\n".join( | |
| f"Source: {doc.metadata.get('source', 'Unknown')}\n" | |
| f"Content:\n{doc.page_content}" | |
| for doc in docs | |
| ) | |
| async def add_documents( | |
| self, | |
| documents: list[Document], | |
| role: TeamRole | None = None, | |
| ids: list[str] | None = None, | |
| ) -> list[str]: | |
| """ | |
| Add documents to the vectorstore. | |
| Args: | |
| documents: Documents to add | |
| role: Agent role (required for MongoDB, determines collection) | |
| ids: Optional document IDs | |
| Returns: | |
| List of document IDs | |
| """ | |
| # Add to MongoDB if role specified and available | |
| if self._mongodb_service and self._mongodb_service.is_available() and role: | |
| return await self._mongodb_service.add_documents(documents, role) | |
| # Add to fallback store | |
| if self._fallback_store: | |
| if hasattr(self._fallback_store, "aadd_documents"): | |
| return await self._fallback_store.aadd_documents(documents, ids=ids) | |
| else: | |
| return self._fallback_store.add_documents(documents, ids=ids) | |
| raise RuntimeError(ERR_VECTOR_STORE_NOT_INIT) | |
| async def delete_documents( | |
| self, | |
| ids: list[str], | |
| role: TeamRole | None = None, | |
| ) -> bool: | |
| """ | |
| Delete documents from the vectorstore by ID. | |
| Args: | |
| ids: Document IDs to delete | |
| role: Agent role (required for MongoDB deletion) | |
| Returns: | |
| True if successful | |
| """ | |
| # Delete from MongoDB if role specified and available | |
| if self._mongodb_service and self._mongodb_service.is_available() and role: | |
| return await self._mongodb_service.delete_documents(ids, role) | |
| # Delete from fallback store | |
| if self._fallback_store: | |
| try: | |
| if hasattr(self._fallback_store, "adelete"): | |
| await self._fallback_store.adelete(ids=ids) | |
| elif hasattr(self._fallback_store, "delete"): | |
| self._fallback_store.delete(ids=ids) | |
| else: | |
| logger.warning("Fallback store does not support deletion") | |
| return False | |
| return True | |
| except Exception as e: | |
| logger.error(f"Error deleting documents: {e}") | |
| return False | |
| return False | |
| def health_check(self) -> dict[str, Any]: | |
| """ | |
| Return health status of RAG service. | |
| Returns: | |
| Dict with status and backend information | |
| """ | |
| result: dict[str, Any] = { | |
| "fallback_store_initialized": self._fallback_store is not None, | |
| } | |
| if self._mongodb_service: | |
| result["mongodb"] = self._mongodb_service.health_check() | |
| result["primary_backend"] = ( | |
| "mongodb" if self._mongodb_service.is_available() else "fallback" | |
| ) | |
| else: | |
| result["mongodb"] = {"status": "not_configured"} | |
| result["primary_backend"] = "fallback" | |
| return result | |
| def is_mongodb_available(self) -> bool: | |
| """Check if MongoDB RAG backend is available.""" | |
| return ( | |
| self._mongodb_service is not None and self._mongodb_service.is_available() | |
| ) | |
| def get_roles_with_rag(self) -> list[TeamRole]: | |
| """Get list of agent roles that have RAG collections configured.""" | |
| if self._mongodb_service: | |
| return self._mongodb_service.get_roles_with_rag() | |
| return [] | |
| # Module-level singleton | |
| _rag_service: RAGService | None = None | |
| def get_rag_service() -> RAGService: | |
| """Get singleton RAG service instance.""" | |
| global _rag_service | |
| if _rag_service is None: | |
| _rag_service = RAGService() | |
| return _rag_service | |
| def reset_rag_service() -> None: | |
| """Reset the RAG service singleton (for testing).""" | |
| global _rag_service | |
| _rag_service = None | |