from mcp.server.fastmcp import FastMCP from datetime import datetime from llama_index.core import VectorStoreIndex from llama_index.core import ( StorageContext, load_index_from_storage, ) from llama_index.core import Settings from llama_index.llms.azure_openai import AzureOpenAI from llama_index.embeddings.azure_openai import AzureOpenAIEmbedding from typing import Dict, Optional, List import json import os import aiohttp # Necesario para las peticiones HTTP asíncronas api_key = os.environ.get('AZURE_API_KEY') azure_endpoint = "https://pharmaia-gpt.openai.azure.com/" api_version = "2024-02-01" llm = AzureOpenAI( model="gpt-4.1", deployment_name="gpt-4.1", api_key=api_key, azure_endpoint=azure_endpoint, api_version=api_version, ) # You need to deploy your own embedding model as well as your own chat completion model embed_model = AzureOpenAIEmbedding( model="text-embedding-3-large", deployment_name="text-embedding-3-large", api_key=api_key, azure_endpoint=azure_endpoint, api_version=api_version, ) Settings.llm = llm Settings.embed_model = embed_model # Configuración inicial DOCUMENTS_BASE_PATH = "./" SOURCES = { "oms": "oms/", #"fda": "fda/" } indices: Dict[str, VectorStoreIndex] = {} for source, rel_path in SOURCES.items(): full_path = os.path.join(DOCUMENTS_BASE_PATH, rel_path) if not os.path.exists(full_path): print(f"Advertencia: No se encontró la ruta {full_path} para {source}") continue for root, dirs, files in os.walk(full_path): if "storage_nodes" in dirs: try: storage_path = os.path.join(root, "storage_nodes") storage_context = StorageContext.from_defaults(persist_dir=storage_path) # Usamos directamente el nombre de la carpeta (vec_who_1, etc.) index_name = os.path.basename(root) index = load_index_from_storage(storage_context, index_id="vector_index") indices[index_name] = index # Guardamos con el nombre directo # Verificación opcional de metadatos if index_name not in retrievers_metadata.get(source, {}): print(f"Advertencia: No hay metadatos para {index_name} en retrievers.json") except Exception as e: print(f"Error cargando índice en {root}: {str(e)}") continue port = int(os.getenv("PORT", 7860)) mcp = FastMCP("OnBase", port=port) @mcp.resource( uri="info://available_retrievers", name="AvailableRetrievers", description="Lista completa de retrievers con metadatos", mime_type="application/json" ) def get_available_retrievers() -> dict: available = [] for index_name in indices.keys(): # Determinar la fuente (oms/fda) basado en el prefijo source = "oms" if index_name.startswith("vec_who") else "fda" # Obtener metadatos metadata = retrievers_metadata.get(source, {}).get(index_name, {}) available.append({ "name": index_name, # Ej: "vec_who_1" "source": source, "description": metadata.get("description", "Descripción no disponible"), "content_info": metadata.get("content_info", "Información no disponible"), "last_updated": metadata.get("last_updated", "Desconocido") }) return { "retrievers": available, "count": len(available) } @mcp.tool() def retrieve_docs( query: str, retrievers: List[str], # Nombres directos (vec_who_1, etc.) top_k: int = 3 ) -> dict: results = {} invalid = [] for name in retrievers: if name not in indices: invalid.append(name) continue try: retriever = indices[name].as_retriever(similarity_top_k=top_k) nodes = retriever.retrieve(query) results[name] = [ { "content": node.get_content(), "metadata": node.metadata, "score": node.score } for node in nodes ] except Exception as e: results[name] = {"error": str(e)} if invalid: results["_warnings"] = { "invalid_retrievers": invalid, "valid_options": list(indices.keys()) } return { "query": query, "results": results, "top_k": top_k } @mcp.tool() async def search_tavily( query: str, days: int = 7, max_results: int = 1, include_answer: bool = False ) -> dict: """Perform a web search using the Tavily API. Args: query: Search query string (required) days: Restrict search to last N days (default: 7) max_results: Maximum results to return (default: 1) include_answer: Include a direct answer only when requested by the user (default: False) Returns: dict: Search results from Tavily """ # Obtener la API key de las variables de entorno tavily_api_key = os.environ.get('TAVILY_API_KEY') if not tavily_api_key: raise ValueError("TAVILY_API_KEY environment variable not set") headers = { "Authorization": f"Bearer {tavily_api_key}", "Content-Type": "application/json" } payload = { "query": query, "search_depth": "basic", "max_results": max_results, "days": days if days else None, "include_answer": include_answer } try: async with aiohttp.ClientSession() as session: async with session.post( "https://api.tavily.com/search", headers=headers, json=payload ) as response: response.raise_for_status() result = await response.json() return result except Exception as e: return { "error": str(e), "status": "failed", "query": query } if __name__ == "__main__": mcp.run("sse")