Spaces:
Sleeping
Sleeping
| """ | |
| MongoDB Atlas Vector Search RAG Service with agent-specific collections. | |
| Each agent role has its own collection with specialized examples: | |
| - Product Owner: PRDs, user stories, acceptance criteria | |
| - Business Analyst: BRDs, process flows | |
| - Solution Architect: System designs, ADRs | |
| - etc. | |
| This enables more relevant RAG retrieval per agent specialty. | |
| """ | |
| import os | |
| from typing import Any | |
| from langchain_core.documents import Document | |
| from langchain_core.retrievers import BaseRetriever | |
| from langchain_core.vectorstores import VectorStore | |
| from .llm_factory import get_embeddings_model | |
| from .observability import get_logger | |
| from .schemas import TeamRole | |
| logger = get_logger("mongodb_rag") | |
| # Mapping from TeamRole to MongoDB collection name | |
| # Coordinator roles (PROJECT_REFINER, SPEC_COORDINATOR, JUDGE) don't need RAG | |
| # as they synthesize from other agent outputs | |
| ROLE_COLLECTION_MAP: dict[TeamRole, str | None] = { | |
| TeamRole.PRODUCT_OWNER: "rag_product_owner", | |
| TeamRole.BUSINESS_ANALYST: "rag_business_analyst", | |
| TeamRole.SOLUTION_ARCHITECT: "rag_solution_architect", | |
| TeamRole.DATA_ARCHITECT: "rag_data_architect", | |
| TeamRole.SECURITY_ANALYST: "rag_security_analyst", | |
| TeamRole.UX_DESIGNER: "rag_ux_designer", | |
| TeamRole.API_DESIGNER: "rag_api_designer", | |
| TeamRole.QA_STRATEGIST: "rag_qa_strategist", | |
| TeamRole.DEVOPS_ARCHITECT: "rag_devops_architect", | |
| TeamRole.ENVIRONMENT_ENGINEER: "rag_environment_engineer", | |
| TeamRole.TECHNICAL_WRITER: "rag_technical_writer", | |
| # Coordinator roles - no RAG needed | |
| TeamRole.PROJECT_REFINER: None, | |
| TeamRole.SPEC_COORDINATOR: None, | |
| TeamRole.JUDGE: None, | |
| } | |
| # All collection names for setup/seeding scripts | |
| ALL_RAG_COLLECTIONS = [ | |
| name for name in ROLE_COLLECTION_MAP.values() if name is not None | |
| ] | |
| class MongoDBRAGService: | |
| """ | |
| RAG Service with MongoDB Atlas Vector Search for agent-specific retrieval. | |
| Features: | |
| - Agent-specific collections for specialized examples | |
| - Lazy initialization of vector stores | |
| - Health check for connection monitoring | |
| - Document management (add/delete) | |
| """ | |
| def __init__(self): | |
| self.embeddings = get_embeddings_model() | |
| self._client: Any | None = None # MongoClient | |
| self._db_name: str = os.getenv("MONGODB_DATABASE", "specs_before_code") | |
| self._index_name: str = os.getenv("MONGODB_INDEX_NAME", "vector_index") | |
| self._vector_stores: dict[TeamRole, VectorStore] = {} | |
| self._initialized = False | |
| self._initialize_connection() | |
| def _initialize_connection(self) -> None: | |
| """Initialize MongoDB connection.""" | |
| uri = os.getenv("MONGODB_URI") | |
| if not uri: | |
| logger.warning("MONGODB_URI not set - MongoDB RAG disabled") | |
| return | |
| try: | |
| from pymongo import MongoClient | |
| self._client = MongoClient( | |
| uri, | |
| maxPoolSize=10, | |
| minPoolSize=2, | |
| maxIdleTimeMS=30000, | |
| serverSelectionTimeoutMS=5000, | |
| ) | |
| # Verify connection | |
| self._client.admin.command("ping") | |
| self._initialized = True | |
| logger.info( | |
| "Connected to MongoDB Atlas", | |
| data={"database": self._db_name}, | |
| ) | |
| except ImportError: | |
| logger.error("pymongo not installed - run: uv add pymongo") | |
| self._client = None | |
| except Exception as e: | |
| logger.error(f"Failed to connect to MongoDB: {e}") | |
| self._client = None | |
| def _get_collection(self, role: TeamRole) -> Any | None: | |
| """Get MongoDB collection for a specific agent role.""" | |
| if not self._client: | |
| return None | |
| collection_name = ROLE_COLLECTION_MAP.get(role) | |
| if not collection_name: | |
| logger.debug(f"No RAG collection mapped for role: {role.value}") | |
| return None | |
| return self._client[self._db_name][collection_name] | |
| def _get_vector_store(self, role: TeamRole) -> VectorStore | None: | |
| """Get or create vector store for a specific agent role (lazy init).""" | |
| # Return cached store if available | |
| if role in self._vector_stores: | |
| return self._vector_stores[role] | |
| collection = self._get_collection(role) | |
| if collection is None: | |
| return None | |
| try: | |
| from langchain_mongodb import MongoDBAtlasVectorSearch | |
| vector_store = MongoDBAtlasVectorSearch( | |
| collection=collection, | |
| embedding=self.embeddings, | |
| index_name=self._index_name, | |
| text_key="content", | |
| embedding_key="embedding", | |
| ) | |
| self._vector_stores[role] = vector_store | |
| logger.debug(f"Initialized vector store for {role.value}") | |
| return vector_store | |
| except ImportError: | |
| logger.error( | |
| "langchain-mongodb not installed - run: uv add langchain-mongodb" | |
| ) | |
| return None | |
| except Exception as e: | |
| logger.error(f"Error creating vector store for {role.value}: {e}") | |
| return None | |
| def retrieve( | |
| self, | |
| query: str, | |
| role: TeamRole, | |
| k: int = 3, | |
| ) -> list[Document]: | |
| """ | |
| Retrieve relevant documents for a specific agent role. | |
| Args: | |
| query: The search query (usually project description or context) | |
| role: The agent role to retrieve examples for | |
| k: Number of documents to retrieve (default: 3) | |
| Returns: | |
| List of relevant Document objects, empty if no matches or error | |
| """ | |
| vector_store = self._get_vector_store(role) | |
| if not vector_store: | |
| logger.debug(f"No vector store available for role {role.value}") | |
| return [] | |
| try: | |
| docs = vector_store.similarity_search(query, k=k) | |
| logger.info( | |
| f"Retrieved {len(docs)} docs for {role.value}", | |
| data={"role": role.value, "count": len(docs)}, | |
| ) | |
| return docs | |
| except Exception as e: | |
| logger.error(f"RAG retrieval error for {role.value}: {e}") | |
| return [] | |
| def get_retriever( | |
| self, | |
| role: TeamRole, | |
| k: int = 3, | |
| search_type: str = "similarity", | |
| ) -> BaseRetriever | None: | |
| """ | |
| Get a LangChain retriever for a specific agent role. | |
| Useful for LCEL chain composition with RunnablePassthrough. | |
| Args: | |
| role: The agent role for role-specific retrieval | |
| k: Number of documents to retrieve | |
| search_type: Type of search ("similarity" or "mmr") | |
| Returns: | |
| LangChain BaseRetriever or None if unavailable | |
| """ | |
| vector_store = self._get_vector_store(role) | |
| if not vector_store: | |
| return None | |
| return vector_store.as_retriever( | |
| search_type=search_type, | |
| search_kwargs={"k": k}, | |
| ) | |
| def format_docs(self, docs: list[Document]) -> str: | |
| """ | |
| Format retrieved documents for prompt injection. | |
| Args: | |
| docs: List of retrieved documents | |
| Returns: | |
| Formatted string with examples, or message if empty | |
| """ | |
| if not docs: | |
| return "No relevant examples found in knowledge base." | |
| formatted = [] | |
| for i, doc in enumerate(docs, 1): | |
| source = doc.metadata.get("source", "Unknown") | |
| role = doc.metadata.get("role", "") | |
| header = f"### Example {i}" | |
| if source != "Unknown": | |
| header += f" (Source: {source})" | |
| if role: | |
| header += f" [{role}]" | |
| formatted.append(f"{header}\n{doc.page_content}") | |
| return "\n\n---\n\n".join(formatted) | |
| async def add_documents( | |
| self, | |
| documents: list[Document], | |
| role: TeamRole, | |
| ) -> list[str]: | |
| """ | |
| Add documents to an agent's RAG collection. | |
| Args: | |
| documents: Documents to add (will be embedded) | |
| role: Target agent role (determines collection) | |
| Returns: | |
| List of inserted document IDs | |
| Raises: | |
| ValueError: If no collection exists for the role | |
| """ | |
| vector_store = self._get_vector_store(role) | |
| if not vector_store: | |
| raise ValueError(f"No RAG collection for role {role.value}") | |
| # Add role metadata to all documents | |
| for doc in documents: | |
| doc.metadata["role"] = role.value | |
| try: | |
| ids = await vector_store.aadd_documents(documents) | |
| logger.info( | |
| f"Added {len(ids)} documents to {role.value}", | |
| data={"role": role.value, "count": len(ids)}, | |
| ) | |
| return ids | |
| except Exception as e: | |
| logger.error(f"Error adding documents for {role.value}: {e}") | |
| raise | |
| async def delete_documents( | |
| self, | |
| ids: list[str], | |
| role: TeamRole, | |
| ) -> bool: | |
| """ | |
| Delete documents from an agent's collection by ID. | |
| Args: | |
| ids: Document IDs to delete | |
| role: Agent role (determines collection) | |
| Returns: | |
| True if successful, False otherwise | |
| """ | |
| vector_store = self._get_vector_store(role) | |
| if not vector_store: | |
| return False | |
| try: | |
| if hasattr(vector_store, "adelete"): | |
| await vector_store.adelete(ids=ids) | |
| elif hasattr(vector_store, "delete"): | |
| vector_store.delete(ids=ids) | |
| else: | |
| logger.warning("Vector store does not support deletion") | |
| return False | |
| return True | |
| except Exception as e: | |
| logger.error(f"Error deleting documents for {role.value}: {e}") | |
| return False | |
| def health_check(self) -> dict[str, Any]: | |
| """ | |
| Return health status of MongoDB connection. | |
| Returns: | |
| Dict with status, database name, and collection info | |
| """ | |
| if not self._client: | |
| return { | |
| "status": "disconnected", | |
| "message": "MONGODB_URI not configured or connection failed", | |
| } | |
| try: | |
| self._client.admin.command("ping") | |
| # Get collection stats | |
| db = self._client[self._db_name] | |
| existing_collections = set(db.list_collection_names()) | |
| configured_collections = [c for c in ALL_RAG_COLLECTIONS if c] | |
| return { | |
| "status": "connected", | |
| "database": self._db_name, | |
| "index_name": self._index_name, | |
| "configured_collections": configured_collections, | |
| "existing_collections": [ | |
| c for c in configured_collections if c in existing_collections | |
| ], | |
| "missing_collections": [ | |
| c for c in configured_collections if c not in existing_collections | |
| ], | |
| } | |
| except Exception as e: | |
| return { | |
| "status": "error", | |
| "message": str(e), | |
| } | |
| def is_available(self) -> bool: | |
| """Check if MongoDB RAG is available and connected.""" | |
| return self._initialized and self._client is not None | |
| def get_roles_with_rag(self) -> list[TeamRole]: | |
| """Get list of roles that have RAG collections configured.""" | |
| return [role for role, coll in ROLE_COLLECTION_MAP.items() if coll is not None] | |
| # Singleton instance | |
| _mongodb_rag_service: MongoDBRAGService | None = None | |
| def get_mongodb_rag_service() -> MongoDBRAGService: | |
| """ | |
| Get singleton instance of MongoDB RAG service. | |
| Uses module-level singleton for connection reuse. | |
| """ | |
| global _mongodb_rag_service | |
| if _mongodb_rag_service is None: | |
| _mongodb_rag_service = MongoDBRAGService() | |
| return _mongodb_rag_service | |
| def reset_mongodb_rag_service() -> None: | |
| """Reset the singleton (useful for testing).""" | |
| global _mongodb_rag_service | |
| if _mongodb_rag_service and _mongodb_rag_service._client: | |
| _mongodb_rag_service._client.close() | |
| _mongodb_rag_service = None | |