multi-agent-system / app /core /mongodb_rag.py
firepenguindisopanda
Refactor code structure for improved readability and maintainability
1a608b5
"""
MongoDB Atlas Vector Search RAG Service with agent-specific collections.
Each agent role has its own collection with specialized examples:
- Product Owner: PRDs, user stories, acceptance criteria
- Business Analyst: BRDs, process flows
- Solution Architect: System designs, ADRs
- etc.
This enables more relevant RAG retrieval per agent specialty.
"""
import os
from typing import Any
from langchain_core.documents import Document
from langchain_core.retrievers import BaseRetriever
from langchain_core.vectorstores import VectorStore
from .llm_factory import get_embeddings_model
from .observability import get_logger
from .schemas import TeamRole
logger = get_logger("mongodb_rag")
# Mapping from TeamRole to MongoDB collection name
# Coordinator roles (PROJECT_REFINER, SPEC_COORDINATOR, JUDGE) don't need RAG
# as they synthesize from other agent outputs
ROLE_COLLECTION_MAP: dict[TeamRole, str | None] = {
TeamRole.PRODUCT_OWNER: "rag_product_owner",
TeamRole.BUSINESS_ANALYST: "rag_business_analyst",
TeamRole.SOLUTION_ARCHITECT: "rag_solution_architect",
TeamRole.DATA_ARCHITECT: "rag_data_architect",
TeamRole.SECURITY_ANALYST: "rag_security_analyst",
TeamRole.UX_DESIGNER: "rag_ux_designer",
TeamRole.API_DESIGNER: "rag_api_designer",
TeamRole.QA_STRATEGIST: "rag_qa_strategist",
TeamRole.DEVOPS_ARCHITECT: "rag_devops_architect",
TeamRole.ENVIRONMENT_ENGINEER: "rag_environment_engineer",
TeamRole.TECHNICAL_WRITER: "rag_technical_writer",
# Coordinator roles - no RAG needed
TeamRole.PROJECT_REFINER: None,
TeamRole.SPEC_COORDINATOR: None,
TeamRole.JUDGE: None,
}
# All collection names for setup/seeding scripts
ALL_RAG_COLLECTIONS = [
name for name in ROLE_COLLECTION_MAP.values() if name is not None
]
class MongoDBRAGService:
"""
RAG Service with MongoDB Atlas Vector Search for agent-specific retrieval.
Features:
- Agent-specific collections for specialized examples
- Lazy initialization of vector stores
- Health check for connection monitoring
- Document management (add/delete)
"""
def __init__(self):
self.embeddings = get_embeddings_model()
self._client: Any | None = None # MongoClient
self._db_name: str = os.getenv("MONGODB_DATABASE", "specs_before_code")
self._index_name: str = os.getenv("MONGODB_INDEX_NAME", "vector_index")
self._vector_stores: dict[TeamRole, VectorStore] = {}
self._initialized = False
self._initialize_connection()
def _initialize_connection(self) -> None:
"""Initialize MongoDB connection."""
uri = os.getenv("MONGODB_URI")
if not uri:
logger.warning("MONGODB_URI not set - MongoDB RAG disabled")
return
try:
from pymongo import MongoClient
self._client = MongoClient(
uri,
maxPoolSize=10,
minPoolSize=2,
maxIdleTimeMS=30000,
serverSelectionTimeoutMS=5000,
)
# Verify connection
self._client.admin.command("ping")
self._initialized = True
logger.info(
"Connected to MongoDB Atlas",
data={"database": self._db_name},
)
except ImportError:
logger.error("pymongo not installed - run: uv add pymongo")
self._client = None
except Exception as e:
logger.error(f"Failed to connect to MongoDB: {e}")
self._client = None
def _get_collection(self, role: TeamRole) -> Any | None:
"""Get MongoDB collection for a specific agent role."""
if not self._client:
return None
collection_name = ROLE_COLLECTION_MAP.get(role)
if not collection_name:
logger.debug(f"No RAG collection mapped for role: {role.value}")
return None
return self._client[self._db_name][collection_name]
def _get_vector_store(self, role: TeamRole) -> VectorStore | None:
"""Get or create vector store for a specific agent role (lazy init)."""
# Return cached store if available
if role in self._vector_stores:
return self._vector_stores[role]
collection = self._get_collection(role)
if collection is None:
return None
try:
from langchain_mongodb import MongoDBAtlasVectorSearch
vector_store = MongoDBAtlasVectorSearch(
collection=collection,
embedding=self.embeddings,
index_name=self._index_name,
text_key="content",
embedding_key="embedding",
)
self._vector_stores[role] = vector_store
logger.debug(f"Initialized vector store for {role.value}")
return vector_store
except ImportError:
logger.error(
"langchain-mongodb not installed - run: uv add langchain-mongodb"
)
return None
except Exception as e:
logger.error(f"Error creating vector store for {role.value}: {e}")
return None
def retrieve(
self,
query: str,
role: TeamRole,
k: int = 3,
) -> list[Document]:
"""
Retrieve relevant documents for a specific agent role.
Args:
query: The search query (usually project description or context)
role: The agent role to retrieve examples for
k: Number of documents to retrieve (default: 3)
Returns:
List of relevant Document objects, empty if no matches or error
"""
vector_store = self._get_vector_store(role)
if not vector_store:
logger.debug(f"No vector store available for role {role.value}")
return []
try:
docs = vector_store.similarity_search(query, k=k)
logger.info(
f"Retrieved {len(docs)} docs for {role.value}",
data={"role": role.value, "count": len(docs)},
)
return docs
except Exception as e:
logger.error(f"RAG retrieval error for {role.value}: {e}")
return []
def get_retriever(
self,
role: TeamRole,
k: int = 3,
search_type: str = "similarity",
) -> BaseRetriever | None:
"""
Get a LangChain retriever for a specific agent role.
Useful for LCEL chain composition with RunnablePassthrough.
Args:
role: The agent role for role-specific retrieval
k: Number of documents to retrieve
search_type: Type of search ("similarity" or "mmr")
Returns:
LangChain BaseRetriever or None if unavailable
"""
vector_store = self._get_vector_store(role)
if not vector_store:
return None
return vector_store.as_retriever(
search_type=search_type,
search_kwargs={"k": k},
)
def format_docs(self, docs: list[Document]) -> str:
"""
Format retrieved documents for prompt injection.
Args:
docs: List of retrieved documents
Returns:
Formatted string with examples, or message if empty
"""
if not docs:
return "No relevant examples found in knowledge base."
formatted = []
for i, doc in enumerate(docs, 1):
source = doc.metadata.get("source", "Unknown")
role = doc.metadata.get("role", "")
header = f"### Example {i}"
if source != "Unknown":
header += f" (Source: {source})"
if role:
header += f" [{role}]"
formatted.append(f"{header}\n{doc.page_content}")
return "\n\n---\n\n".join(formatted)
async def add_documents(
self,
documents: list[Document],
role: TeamRole,
) -> list[str]:
"""
Add documents to an agent's RAG collection.
Args:
documents: Documents to add (will be embedded)
role: Target agent role (determines collection)
Returns:
List of inserted document IDs
Raises:
ValueError: If no collection exists for the role
"""
vector_store = self._get_vector_store(role)
if not vector_store:
raise ValueError(f"No RAG collection for role {role.value}")
# Add role metadata to all documents
for doc in documents:
doc.metadata["role"] = role.value
try:
ids = await vector_store.aadd_documents(documents)
logger.info(
f"Added {len(ids)} documents to {role.value}",
data={"role": role.value, "count": len(ids)},
)
return ids
except Exception as e:
logger.error(f"Error adding documents for {role.value}: {e}")
raise
async def delete_documents(
self,
ids: list[str],
role: TeamRole,
) -> bool:
"""
Delete documents from an agent's collection by ID.
Args:
ids: Document IDs to delete
role: Agent role (determines collection)
Returns:
True if successful, False otherwise
"""
vector_store = self._get_vector_store(role)
if not vector_store:
return False
try:
if hasattr(vector_store, "adelete"):
await vector_store.adelete(ids=ids)
elif hasattr(vector_store, "delete"):
vector_store.delete(ids=ids)
else:
logger.warning("Vector store does not support deletion")
return False
return True
except Exception as e:
logger.error(f"Error deleting documents for {role.value}: {e}")
return False
def health_check(self) -> dict[str, Any]:
"""
Return health status of MongoDB connection.
Returns:
Dict with status, database name, and collection info
"""
if not self._client:
return {
"status": "disconnected",
"message": "MONGODB_URI not configured or connection failed",
}
try:
self._client.admin.command("ping")
# Get collection stats
db = self._client[self._db_name]
existing_collections = set(db.list_collection_names())
configured_collections = [c for c in ALL_RAG_COLLECTIONS if c]
return {
"status": "connected",
"database": self._db_name,
"index_name": self._index_name,
"configured_collections": configured_collections,
"existing_collections": [
c for c in configured_collections if c in existing_collections
],
"missing_collections": [
c for c in configured_collections if c not in existing_collections
],
}
except Exception as e:
return {
"status": "error",
"message": str(e),
}
def is_available(self) -> bool:
"""Check if MongoDB RAG is available and connected."""
return self._initialized and self._client is not None
def get_roles_with_rag(self) -> list[TeamRole]:
"""Get list of roles that have RAG collections configured."""
return [role for role, coll in ROLE_COLLECTION_MAP.items() if coll is not None]
# Singleton instance
_mongodb_rag_service: MongoDBRAGService | None = None
def get_mongodb_rag_service() -> MongoDBRAGService:
"""
Get singleton instance of MongoDB RAG service.
Uses module-level singleton for connection reuse.
"""
global _mongodb_rag_service
if _mongodb_rag_service is None:
_mongodb_rag_service = MongoDBRAGService()
return _mongodb_rag_service
def reset_mongodb_rag_service() -> None:
"""Reset the singleton (useful for testing)."""
global _mongodb_rag_service
if _mongodb_rag_service and _mongodb_rag_service._client:
_mongodb_rag_service._client.close()
_mongodb_rag_service = None