import json from pathlib import Path import yaml from loguru import logger from opik import opik_context, track from smolagents import Tool from second_brain_online.application.rag import get_retriever class MongoDBRetrieverTool(Tool): name = "mongodb_vector_search_retriever" description = """Use this tool to search and retrieve relevant documents from a knowledge base using semantic search. This tool performs similarity-based search to find the most relevant documents matching the query. Best used when you need to: - Find specific information from stored documents - Get context about a topic - Research historical data or documentation The tool will return multiple relevant document snippets.""" inputs = { "query": { "type": "string", "description": """The search query to find relevant documents for using semantic search. Should be a clear, specific question or statement about the information you're looking for.""", } } output_type = "string" def __init__(self, config_path: Path, **kwargs): super().__init__(**kwargs) self.config_path = config_path self.retriever = self.__load_retriever(config_path) def __load_retriever(self, config_path: Path): config = yaml.safe_load(config_path.read_text()) config = config["parameters"] return get_retriever( embedding_model_id=config["embedding_model_id"], embedding_model_type=config["embedding_model_type"], retriever_type=config["retriever_type"], k=5, device=config["device"], ) @track(name="MongoDBRetrieverTool.forward") def forward(self, query: str) -> str: if hasattr(self.retriever, "search_kwargs"): search_kwargs = self.retriever.search_kwargs else: try: search_kwargs = { "fulltext_penalty": self.retriever.fulltext_penalty, "vector_score_penalty": self.retriever.vector_penalty, "top_k": self.retriever.top_k, } except AttributeError: logger.warning("Could not extract search kwargs from retriever.") search_kwargs = {} opik_context.update_current_trace( tags=["agent"], metadata={ "search": search_kwargs, "embedding_model_id": self.retriever.vectorstore.embeddings.model, }, ) try: query = self.__parse_query(query) relevant_docs = self.retriever.invoke(query) formatted_docs = [] for i, doc in enumerate(relevant_docs, 1): # Extract metadata title = doc.metadata.get("title", "Untitled") datetime = doc.metadata.get("datetime", "unknown") contextual_summary = doc.metadata.get("contextual_summary", "") marketing_insights = doc.metadata.get("marketing_insights", {}) content = doc.page_content.strip() # Format marketing insights if available marketing_insights_text = "" if marketing_insights: marketing_insights_text = "\n\n" # Add quotes quotes = marketing_insights.get("quotes", []) if quotes: marketing_insights_text += "\n" for quote in quotes: marketing_insights_text += f"- \"{quote.get('quote', '')}\" (Sentiment: {quote.get('sentiment', 'Unknown')})\n" marketing_insights_text += "\n" # Add key findings findings = marketing_insights.get("key_findings", []) if findings: marketing_insights_text += "\n" for finding in findings: marketing_insights_text += f"- {finding.get('finding', '')} (Impact: {finding.get('impact', 'Unknown')})\n" marketing_insights_text += "\n" marketing_insights_text += "\n" # Create optimized document structure - truncate content to avoid token overload content_preview = content[:500] + "..." if len(content) > 500 else content formatted_docs.append( f""" {title} {datetime} {contextual_summary} {marketing_insights_text} {content_preview} """ ) result = "\n".join(formatted_docs) result = f""" {result} When using context from any document, reference the document title and date for attribution. """ return result except Exception: logger.opt(exception=True).debug("Error retrieving documents.") return "Error retrieving documents." @track(name="MongoDBRetrieverTool.parse_query") def __parse_query(self, query: str) -> str: try: # Try to parse as JSON first query_dict = json.loads(query) return query_dict["query"] except (json.JSONDecodeError, KeyError): # If JSON parsing fails, return the query as-is return query