from mcp.server.fastmcp import FastMCP from datetime import datetime from llama_index.core import VectorStoreIndex from llama_index.core import ( StorageContext, load_index_from_storage, ) from llama_index.core import Settings from llama_index.llms.azure_openai import AzureOpenAI from llama_index.embeddings.azure_openai import AzureOpenAIEmbedding from typing import Dict, Optional, List import json import os import aiohttp # Necesario para las peticiones HTTP asíncronas api_key = os.environ.get('AZURE_API_KEY') azure_endpoint = "https://pharmaia-gpt.openai.azure.com/" api_version = "2024-02-01" llm = AzureOpenAI( model="gpt-4.1", deployment_name="gpt-4.1", api_key=api_key, azure_endpoint=azure_endpoint, api_version=api_version, ) # You need to deploy your own embedding model as well as your own chat completion model embed_model = AzureOpenAIEmbedding( model="text-embedding-3-large", deployment_name="text-embedding-3-large", api_key=api_key, azure_endpoint=azure_endpoint, api_version=api_version, ) Settings.llm = llm Settings.embed_model = embed_model # Configuración inicial (esto probablemente estaría en otro módulo) DOCUMENTS_BASE_PATH = "./" SOURCES = { "oms": "oms/", # Esta será la carpeta base que contiene todos los subíndices } # Cargar índices recursivamente indices: Dict[str, VectorStoreIndex] = {} for source, rel_path in SOURCES.items(): full_path = os.path.join(DOCUMENTS_BASE_PATH, rel_path) if not os.path.exists(full_path): continue # Buscar todas las subcarpetas que contengan índices for root, dirs, files in os.walk(full_path): if "storage_nodes" in dirs: # Esta es una carpeta que contiene un índice try: storage_path = os.path.join(root, "storage_nodes") storage_context = StorageContext.from_defaults(persist_dir=storage_path) # Usamos el nombre de la carpeta padre como clave (ej: "vec_1") index_name = os.path.basename(root) full_index_name = f"{source}_{index_name}" # ej: "oms_vec_1" index = load_index_from_storage(storage_context, index_id="vector_index") indices[full_index_name] = index except Exception as e: print(f"Error cargando índice en {root}: {str(e)}") continue port = int(os.getenv("PORT", 7860)) mcp = FastMCP("OnBase", port=port) # Configuración del archivo retrievers.json RETRIEVERS_METADATA_PATH = Path("./retrievers.json") # Cargar metadatos de los retrievers def load_retrievers_metadata() -> Dict: try: with open(RETRIEVERS_METADATA_PATH, 'r', encoding='utf-8') as f: return json.load(f) except FileNotFoundError: print(f"Warning: {RETRIEVERS_METADATA_PATH} not found. Using empty metadata.") return {} except json.JSONDecodeError: print(f"Warning: {RETRIEVERS_METADATA_PATH} is invalid JSON. Using empty metadata.") return {} retrievers_metadata = load_retrievers_metadata() # Resource para listar solo títulos/disponibles @mcp.resource( uri="info://available_retriever_titles", name="AvailableRetrieverTitles", description="Lista los nombres/títulos disponibles de los retrievers", mime_type="application/json" ) def get_retriever_titles() -> dict: """ Devuelve una lista con los títulos/nombres de los retrievers disponibles """ return { "titles": list(retrievers_metadata.keys()), "count": len(retrievers_metadata) } # Resource para obtener metadatos específicos @mcp.resource( uri="info://retriever_details/{retriever_title}", name="RetrieverDetails", description="Obtiene información detallada sobre un retriever específico", mime_type="application/json" ) def get_retriever_details(retriever_title: str) -> dict: """ Devuelve los metadatos completos para un retriever específico Parameters: retriever_title: El título/nombre del retriever (ej: 'oms') """ if retriever_title not in retrievers_metadata: return { "error": f"Retriever '{retriever_title}' no encontrado", "available_titles": list(retrievers_metadata.keys()) } return { "retriever": retriever_title, "details": retrievers_metadata[retriever_title] } # Modificación del resource existente para usar los metadatos @mcp.resource( uri="info://available_retrievers", name="AvailableRetrievers", description="Provides information about available document retrievers including their names and descriptions.", mime_type="application/json" ) def get_available_retrievers(retriever_title: Optional[str] = None) -> dict: """ Versión mejorada que puede filtrar por título de retriever Parameters: retriever_title: Opcional. Si se especifica, solo devuelve los de este título """ available_retrievers = [] for full_index_name in indices.keys(): parts = full_index_name.split('_') source = parts[0] # Filtrar por título si se especificó if retriever_title and source != retriever_title: continue # Obtener metadatos del JSON si existen metadata = retrievers_metadata.get(source, {}).get(full_index_name, {}) available_retrievers.append({ "retriever_name": full_index_name, "source": source, "index_name": '_'.join(parts[1:]) if len(parts) > 1 else "default", "description": metadata.get("description", f"Documentos de {source.upper()}"), "content_info": metadata.get("content_info", "No description available"), "last_updated": metadata.get("last_updated", "unknown") }) if retriever_title and not available_retrievers: return { "error": f"No hay retrievers para el título '{retriever_title}'", "available_titles": list(retrievers_metadata.keys()) } return { "retrievers": available_retrievers, "count": len(available_retrievers), "filtered_by": retriever_title if retriever_title else "all" } @mcp.tool() def retrieve_docs( query: str, retrievers: List[str], top_k: int = 3 ) -> dict: """ Retrieve documents from different regulations using semantic search. Parameters: query: Search query (required). retrievers: List of specific retriever names to use (required). top_k: Number of results to return per retriever (default: 3). Example: retrieve_docs( query="salud pública", retrievers=["oms_vec_1", "oms_tree_2"], top_k=2 ) """ if not query: return {"error": "Query parameter is required"} if not retrievers: return {"error": "At least one retriever must be specified", "available_retrievers": list(indices.keys())} # Verificar que todos los retrievers solicitados existan invalid_retrievers = [r for r in retrievers if r not in indices] if invalid_retrievers: return { "error": f"Invalid retrievers specified: {invalid_retrievers}", "available_retrievers": list(indices.keys()) } results = {} for retriever_name in retrievers: try: retriever = indices[retriever_name].as_retriever(similarity_top_k=top_k) nodes = retriever.retrieve(query) results[retriever_name] = [ { "content": node.get_content(), "metadata": node.metadata, "score": node.score } for node in nodes ] except Exception as e: results[retriever_name] = { "error": f"Error retrieving documents: {str(e)}" } return { "results": results, "query": query, "retrievers_used": retrievers, "top_k": top_k, "successful_retrievers": [r for r in retrievers if isinstance(results[r], list)], "failed_retrievers": [r for r in retrievers if not isinstance(results[r], list)] } @mcp.tool() async def search_tavily( query: str, days: int = 7, max_results: int = 1, include_answer: bool = False ) -> dict: """Perform a web search using the Tavily API. Args: query: Search query string (required) days: Restrict search to last N days (default: 7) max_results: Maximum results to return (default: 1) include_answer: Include a direct answer only when requested by the user (default: False) Returns: dict: Search results from Tavily """ # Obtener la API key de las variables de entorno tavily_api_key = os.environ.get('TAVILY_API_KEY') if not tavily_api_key: raise ValueError("TAVILY_API_KEY environment variable not set") headers = { "Authorization": f"Bearer {tavily_api_key}", "Content-Type": "application/json" } payload = { "query": query, "search_depth": "basic", "max_results": max_results, "days": days if days else None, "include_answer": include_answer } try: async with aiohttp.ClientSession() as session: async with session.post( "https://api.tavily.com/search", headers=headers, json=payload ) as response: response.raise_for_status() result = await response.json() return result except Exception as e: return { "error": str(e), "status": "failed", "query": query } if __name__ == "__main__": mcp.run("sse")