firepenguindisopanda
Refactor code structure for improved readability and maintainability
5e8f51e
"""
RAG (Retrieval-Augmented Generation) Service.
Unified RAG interface with MongoDB Atlas as primary and in-memory fallback.
Supports both role-specific retrieval (via MongoDB) and generic retrieval.
Features:
- MongoDB Atlas Vector Search for production (agent-specific collections)
- In-memory vectorstore fallback for local development
- LangChain Retriever interface for RAG chains
- Document ingestion from corpus directory
"""
import os
from pathlib import Path
from typing import Any
from dotenv import load_dotenv
from langchain_core.documents import Document
from langchain_core.retrievers import BaseRetriever
from langchain_core.vectorstores import InMemoryVectorStore, VectorStore
from langchain_text_splitters import RecursiveCharacterTextSplitter
from .llm_factory import get_embeddings_model
from .mongodb_rag import MongoDBRAGService, get_mongodb_rag_service
from .observability import get_logger
from .schemas import TeamRole
load_dotenv()
logger = get_logger("rag")
# Define paths
BASE_DIR = Path(__file__).resolve().parents[2]
CORPUS_DIR = BASE_DIR / "corpus_rag"
VECTOR_STORE_PATH = BASE_DIR / "public" / "vector_store"
# Error messages
ERR_VECTOR_STORE_NOT_INIT = "Vector store not initialized"
class RAGService:
"""
Unified RAG Service with MongoDB primary and in-memory fallback.
Priority order:
1. MongoDB Atlas Vector Search (if MONGODB_URI configured)
2. In-memory vectorstore (development fallback)
For role-specific retrieval, use the `role` parameter in retrieve/get_retriever.
When role is provided and MongoDB is available, retrieval is from agent-specific
collections for more relevant examples.
"""
def __init__(self):
self.embeddings = get_embeddings_model()
self._mongodb_service: MongoDBRAGService | None = None
self._fallback_store: VectorStore | None = None
self._initialize()
def _initialize(self) -> None:
"""Initialize RAG backends in priority order."""
# Try MongoDB first
if os.getenv("MONGODB_URI"):
self._mongodb_service = get_mongodb_rag_service()
if self._mongodb_service.is_available():
logger.info("Using MongoDB Atlas for RAG (primary)")
# Still initialize fallback for non-role-specific queries
self._init_fallback_store()
return
# Fallback to in-memory only
logger.info("Using in-memory vector store only (MongoDB unavailable)")
self._init_fallback_store()
def _init_fallback_store(self) -> None:
"""Initialize fallback in-memory vectorstore from corpus."""
if not CORPUS_DIR.exists():
logger.warning(f"Corpus directory not found: {CORPUS_DIR}")
self._fallback_store = InMemoryVectorStore(embedding=self.embeddings)
return
# Load and split documents
documents = self._load_documents()
if not documents:
logger.warning("No documents found for fallback store")
self._fallback_store = InMemoryVectorStore(embedding=self.embeddings)
return
chunks = self._split_documents(documents)
logger.info(f"Created {len(chunks)} chunks from {len(documents)} documents")
self._fallback_store = InMemoryVectorStore.from_documents(
documents=chunks,
embedding=self.embeddings,
)
logger.info("Fallback in-memory vectorstore initialized")
def _load_documents(self) -> list[Document]:
"""Load documents from corpus directory."""
documents = []
for ext in ["*.md", "*.yaml", "*.txt"]:
for file_path in CORPUS_DIR.glob(ext):
try:
text = file_path.read_text(encoding="utf-8")
doc = Document(
page_content=text,
metadata={
"source": str(file_path.name),
"file_type": file_path.suffix,
},
)
documents.append(doc)
except Exception as e:
logger.warning(f"Error loading {file_path}: {e}")
return documents
def _split_documents(self, documents: list[Document]) -> list[Document]:
"""Split documents into chunks for embedding."""
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200,
add_start_index=True,
separators=["\n\n", "\n", ". ", " ", ""],
)
return text_splitter.split_documents(documents)
def retrieve(
self,
query: str,
role: TeamRole | None = None,
k: int = 3,
) -> list[Document]:
"""
Retrieve relevant documents for a query.
Args:
query: Search query (project description or context)
role: Optional agent role for role-specific retrieval (MongoDB only)
k: Number of documents to retrieve
Returns:
List of relevant documents
"""
# Use MongoDB for role-specific retrieval if available
if self._mongodb_service and self._mongodb_service.is_available() and role:
docs = self._mongodb_service.retrieve(query, role, k)
if docs:
return docs
# Fall through to fallback if no MongoDB results
# Fallback retrieval
if self._fallback_store:
try:
docs = self._fallback_store.similarity_search(query, k=k)
logger.debug(f"Retrieved {len(docs)} documents from fallback store")
return docs
except Exception as e:
logger.error(f"Error during fallback retrieval: {e}")
return []
logger.warning("No vector store available for retrieval")
return []
def get_retriever(
self,
role: TeamRole | None = None,
k: int = 3,
filter: dict[str, Any] | None = None,
search_type: str = "similarity",
) -> BaseRetriever | None:
"""
Get a LangChain Retriever for RAG chains.
Args:
role: Optional agent role for role-specific retrieval
k: Number of documents to retrieve
filter: Optional filter dict (fallback store only)
search_type: "similarity" or "mmr"
Returns:
LangChain BaseRetriever or None
"""
# Use MongoDB for role-specific retrieval if available
if self._mongodb_service and self._mongodb_service.is_available() and role:
retriever = self._mongodb_service.get_retriever(role, k, search_type)
if retriever:
return retriever
# Fallback retriever
if self._fallback_store:
search_kwargs: dict[str, Any] = {"k": k}
if filter:
search_kwargs["filter"] = filter
return self._fallback_store.as_retriever(
search_type=search_type,
search_kwargs=search_kwargs,
)
raise RuntimeError("RAG retriever not initialized")
def format_docs(self, docs: list[Document]) -> str:
"""
Format retrieved documents into a string for context injection.
Uses MongoDB service formatter if available (includes role metadata),
otherwise uses simple formatting.
"""
if self._mongodb_service and self._mongodb_service.is_available():
return self._mongodb_service.format_docs(docs)
if not docs:
return "No relevant context found."
return "\n\n".join(
f"Source: {doc.metadata.get('source', 'Unknown')}\n"
f"Content:\n{doc.page_content}"
for doc in docs
)
async def add_documents(
self,
documents: list[Document],
role: TeamRole | None = None,
ids: list[str] | None = None,
) -> list[str]:
"""
Add documents to the vectorstore.
Args:
documents: Documents to add
role: Agent role (required for MongoDB, determines collection)
ids: Optional document IDs
Returns:
List of document IDs
"""
# Add to MongoDB if role specified and available
if self._mongodb_service and self._mongodb_service.is_available() and role:
return await self._mongodb_service.add_documents(documents, role)
# Add to fallback store
if self._fallback_store:
if hasattr(self._fallback_store, "aadd_documents"):
return await self._fallback_store.aadd_documents(documents, ids=ids)
else:
return self._fallback_store.add_documents(documents, ids=ids)
raise RuntimeError(ERR_VECTOR_STORE_NOT_INIT)
async def delete_documents(
self,
ids: list[str],
role: TeamRole | None = None,
) -> bool:
"""
Delete documents from the vectorstore by ID.
Args:
ids: Document IDs to delete
role: Agent role (required for MongoDB deletion)
Returns:
True if successful
"""
# Delete from MongoDB if role specified and available
if self._mongodb_service and self._mongodb_service.is_available() and role:
return await self._mongodb_service.delete_documents(ids, role)
# Delete from fallback store
if self._fallback_store:
try:
if hasattr(self._fallback_store, "adelete"):
await self._fallback_store.adelete(ids=ids)
elif hasattr(self._fallback_store, "delete"):
self._fallback_store.delete(ids=ids)
else:
logger.warning("Fallback store does not support deletion")
return False
return True
except Exception as e:
logger.error(f"Error deleting documents: {e}")
return False
return False
def health_check(self) -> dict[str, Any]:
"""
Return health status of RAG service.
Returns:
Dict with status and backend information
"""
result: dict[str, Any] = {
"fallback_store_initialized": self._fallback_store is not None,
}
if self._mongodb_service:
result["mongodb"] = self._mongodb_service.health_check()
result["primary_backend"] = (
"mongodb" if self._mongodb_service.is_available() else "fallback"
)
else:
result["mongodb"] = {"status": "not_configured"}
result["primary_backend"] = "fallback"
return result
def is_mongodb_available(self) -> bool:
"""Check if MongoDB RAG backend is available."""
return (
self._mongodb_service is not None and self._mongodb_service.is_available()
)
def get_roles_with_rag(self) -> list[TeamRole]:
"""Get list of agent roles that have RAG collections configured."""
if self._mongodb_service:
return self._mongodb_service.get_roles_with_rag()
return []
# Module-level singleton
_rag_service: RAGService | None = None
def get_rag_service() -> RAGService:
"""Get singleton RAG service instance."""
global _rag_service
if _rag_service is None:
_rag_service = RAGService()
return _rag_service
def reset_rag_service() -> None:
"""Reset the RAG service singleton (for testing)."""
global _rag_service
_rag_service = None