from mcp.server.fastmcp import FastMCP, Context from datetime import datetime from llama_index.core import VectorStoreIndex from llama_index.core import ( StorageContext, load_index_from_storage, ) from llama_index.core import Settings from llama_index.llms.azure_openai import AzureOpenAI from llama_index.embeddings.azure_openai import AzureOpenAIEmbedding from typing import Dict, Optional, List from pathlib import Path import json import os import aiohttp # Necesario para las peticiones HTTP asíncronas import asyncio import logging logger = logging.getLogger(__name__) logging.basicConfig(level=logging.INFO) ##### AZURE OPENAI ##### api_key = os.environ.get('AZURE_API_KEY') azure_endpoint = "https://pharmaia-gpt.openai.azure.com/" api_version = "2024-02-01" llm = AzureOpenAI( model="gpt-4.1", deployment_name="gpt-4.1", api_key=api_key, azure_endpoint=azure_endpoint, api_version=api_version, ) # You need to deploy your own embedding model as well as your own chat completion model embed_model = AzureOpenAIEmbedding( model="text-embedding-3-large", deployment_name="text-embedding-3-large", api_key=api_key, azure_endpoint=azure_endpoint, api_version=api_version, ) Settings.llm = llm Settings.embed_model = embed_model ##### FIN AZURE OPENAI ##### # Configuración de paths DOCUMENTS_BASE_PATH = "./" RETRIEVERS_JSON_PATH = Path("./retrievers.json") # Cargar metadatos def load_retrievers_metadata(): try: with open(RETRIEVERS_JSON_PATH, 'r', encoding='utf-8') as f: return json.load(f) except Exception as e: print(f"Error cargando retrievers.json: {str(e)}") return {} retrievers_metadata = load_retrievers_metadata() SOURCES = {source: f"{source.lower()}/" for source in retrievers_metadata.keys()} # Cargar índices indices: Dict[str, VectorStoreIndex] = {} for source, rel_path in SOURCES.items(): full_path = os.path.join(DOCUMENTS_BASE_PATH, rel_path) if not os.path.exists(full_path): print(f"Advertencia: No se encontró la ruta para {source}") continue for root, dirs, files in os.walk(full_path): if "storage_nodes" in dirs: try: storage_path = os.path.join(root, "storage_nodes") storage_context = StorageContext.from_defaults(persist_dir=storage_path) index_name = os.path.basename(root) indices[index_name] = load_index_from_storage(storage_context, index_id="vector_index") print(f"Índice cargado correctamente: {index_name}") except Exception as e: print(f"Error cargando índice {index_name}: {str(e)}") port = int(os.getenv("PORT", 7860)) mcp = FastMCP("OnBase", port=port) @mcp.tool() async def list_retrievers(source: str = None) -> dict: """ Devuelve la lista de retrievers disponibles. Si se especifica una source y existe, filtra por ella; si no existe, devuelve todas. Args: source (str, optional): Fuente para filtrar. Si no existe, se ignorará. Defaults to None. Returns: dict: { "retrievers": Lista de retrievers (filtrados o completos), "count": Número total, "status": "success"|"error", "source_requested": source, # Muestra lo que se solicitó "source_used": "all"|source # Muestra lo que realmente se usó } """ try: available = [] source_exists = source in retrievers_metadata if source else False for current_source, indexes in retrievers_metadata.items(): # Solo filtrar si el source existe, sino mostrar todo if source_exists and current_source != source: continue for index_name, metadata in indexes.items(): available.append({ "name": index_name, "source": current_source, "title": metadata.get("title", ""), "description": metadata.get("description", "") }) return { "retrievers": available, "count": len(available), "status": "success", "source_requested": source, "source_used": source if source_exists else "all" } except Exception as e: return { "retrievers": [], "count": 0, "status": "error", "error": str(e), "source_requested": source, "source_used": "none" } # Función principal de búsqueda @mcp.tool() def retrieve_docs( query: str, retrievers: List[str], top_k: int = 3 ) -> dict: """ Realiza búsqueda semántica en documentos indexados. Parámetros: query (str): Texto de búsqueda (requerido) retrievers (List[str]): Nombres de retrievers a consultar (requerido) top_k (int): Número de resultados por retriever (opcional, default=3) Ejemplo: retrieve_docs( query="estándares farmacéuticos", retrievers=["vec_1", "vec_2"], top_k=2 ) """ results = {} invalid = [] for name in retrievers: if name not in indices: invalid.append(name) continue try: # 1. Obtener el índice y realizar la búsqueda retriever = indices[name].as_retriever(similarity_top_k=top_k) nodes = retriever.retrieve(query) # 2. Buscar metadatos COMPLETOS metadata = {} source = "unknown" for src, indexes in retrievers_metadata.items(): if name in indexes: metadata = indexes[name] source = src break # 3. Construir respuesta para ESTE retriever results[name] = { "title": metadata.get("title", name), "documents": [ { "content": node.get_content(), "metadata": node.metadata, "score": node.score } for node in nodes ], "description": metadata.get("description", ""), "source": source, "last_updated": metadata.get("last_updated", "") } except Exception as e: results[name] = { "error": str(e), "retriever": name } # Construir respuesta final response = { "query": query, "results": results, "top_k": top_k, } if invalid: response["warnings"] = { "invalid_retrievers": invalid, "valid_options": list(indices.keys()) } return response @mcp.tool() async def search_tavily( query: str, days: int = 7, max_results: int = 1, include_answer: bool = False ) -> dict: """Perform a web search using the Tavily API. Args: query: Search query string (required) days: Restrict search to last N days (default: 7) max_results: Maximum results to return (default: 1) include_answer: Include a direct answer only when requested by the user (default: False) Returns: dict: Search results from Tavily """ # Obtener la API key de las variables de entorno tavily_api_key = os.environ.get('TAVILY_API_KEY') if not tavily_api_key: raise ValueError("TAVILY_API_KEY environment variable not set") headers = { "Authorization": f"Bearer {tavily_api_key}", "Content-Type": "application/json" } payload = { "query": query, "search_depth": "basic", "max_results": max_results, "days": days if days else None, "include_answer": include_answer } try: async with aiohttp.ClientSession() as session: async with session.post( "https://api.tavily.com/search", headers=headers, json=payload ) as response: response.raise_for_status() result = await response.json() return result except Exception as e: return { "error": str(e), "status": "failed", "query": query } if __name__ == "__main__": mcp.run("sse")