from mcp.server.fastmcp import FastMCP, Context from datetime import datetime from llama_index.core import VectorStoreIndex from llama_index.core import ( StorageContext, load_index_from_storage, ) from llama_index.tools.arxiv import ArxivToolSpec from llama_index.core import Settings from llama_index.llms.azure_openai import AzureOpenAI from llama_index.embeddings.azure_openai import AzureOpenAIEmbedding from llama_index.llms.openai import OpenAI from llama_index.embeddings.openai import OpenAIEmbedding from typing import Optional, List, Dict, Any from pathlib import Path import json import os import aiohttp # Necesario para las peticiones HTTP asíncronas import asyncio import logging logger = logging.getLogger(__name__) logging.basicConfig(level=logging.INFO) ##### OPENAI ##### # Context: openai_api_key = os.environ.get('OPENAI_API_KEY') llm = OpenAI( model="gpt-4.1", api_key=openai_api_key, ) embed_model = OpenAIEmbedding( model="text-embedding-ada-002", api_key=openai_api_key, ) Settings.llm = llm Settings.embed_model = embed_model # Configuración de paths DOCUMENTS_BASE_PATH = "./" RETRIEVERS_JSON_PATH = Path("./retrievers.json") # Cargar metadatos def load_retrievers_metadata(): try: with open(RETRIEVERS_JSON_PATH, 'r', encoding='utf-8') as f: return json.load(f) except Exception as e: logger.error(f"Error cargando retrievers.json: {str(e)}", exc_info=True) return {} retrievers_metadata = load_retrievers_metadata() SOURCES = {source: f"{source.lower()}/" for source in retrievers_metadata.keys()} # Cargar índices indices: Dict[str, VectorStoreIndex] = {} for source, rel_path in SOURCES.items(): full_path = os.path.join(DOCUMENTS_BASE_PATH, rel_path) if not os.path.exists(full_path): logger.warning(f"No se encontró la ruta para {source}") continue for root, dirs, files in os.walk(full_path): if "storage_nodes" in dirs: try: storage_path = os.path.join(root, "storage_nodes") storage_context = StorageContext.from_defaults(persist_dir=storage_path) index_name = os.path.basename(root) indices[index_name] = load_index_from_storage(storage_context) #, index_id="vector_index" logger.info(f"Índice cargado correctamente: {index_name}") except Exception as e: logger.error(f"Error cargando índice {index_name}: {str(e)}", exc_info=True) port = int(os.getenv("PORT", 7860)) mcp = FastMCP("OnBase", port=port) arxiv_tool = ArxivToolSpec(max_results=5).to_tool_list()[0] arxiv_tool.return_direct = True @mcp.tool() async def search_arxiv( query: str, max_results: int = 5 ) -> Dict[str, Any]: """ Busca artículos académicos en ArXiv. Args: query: Términos de búsqueda (ej. "deep learning") max_results: Número máximo de resultados (1-10, default 5) Returns: Dict: Resultados de la búsqueda con metadatos de los papers """ try: # Configurar máximo de resultados max_results = min(max(1, max_results), 10) arxiv_tool.metadata.max_results = max_results # Ejecutar búsqueda y obtener resultados tool_output = arxiv_tool(query=query) # Procesar documentos papers = [] for doc in tool_output.raw_output: # Acceder correctamente a los documentos content = doc.text_resource.text.split('\n') papers.append({ 'title': content[0].split(': ')[1] if ': ' in content[0] else content[0], 'abstract': '\n'.join(content[1:]).strip(), 'pdf_url': content[0].split(': ')[0].replace('http://', 'https://'), 'arxiv_id': content[0].split(': ')[0].split('/')[-1].replace('v1', '') }) return { 'papers': papers, 'count': len(papers), 'query': query, 'status': 'success' } except Exception as e: return { 'papers': [], 'count': 0, 'query': query, 'status': 'error', 'error': str(e) } @mcp.tool() async def list_retrievers(source: str = None) -> dict: """ Devuelve la lista de retrievers disponibles. Si se especifica una source y existe, filtra por ella; si no existe, devuelve todas. Args: source (str, optional): Fuente para filtrar. Si no existe, se ignorará. Defaults to None. Returns: dict: { "retrievers": Lista de retrievers (filtrados o completos), "count": Número total, "status": "success"|"error", "source_requested": source, # Muestra lo que se solicitó "source_used": "all"|source # Muestra lo que realmente se usó } """ try: available = [] source_exists = source in retrievers_metadata if source else False for current_source, indexes in retrievers_metadata.items(): # Solo filtrar si el source existe, sino mostrar todo if source_exists and current_source != source: continue for index_name, metadata in indexes.items(): available.append({ "name": index_name, "source": current_source, "title": metadata.get("title", ""), "description": metadata.get("description", "") }) return { "retrievers": available, "count": len(available), "status": "success", "source_requested": source, "source_used": source if source_exists else "all" } except Exception as e: return { "retrievers": [], "count": 0, "status": "error", "error": str(e), "source_requested": source, "source_used": "none" } @mcp.tool() async def list_retrievers(ctx: Context, source: str = None) -> dict: """ Devuelve la lista de retrievers disponibles, opcionalmente filtrados por source. Args: source (str, optional): Fuente para filtrar. Default None. Returns: dict: Lista de retrievers con metadatos. """ # Obtenemos todos los retrievers del resource result = await ctx.resources.read("data://retrievers/list") # Si hay error en el resource, lo propagamos if result.get("status") == "error": return result # Filtramos por source si se especificó if source: filtered = [r for r in result["retrievers"] if r["source"] == source] return { "retrievers": filtered, "count": len(filtered), "status": "success", "source_requested": source, "source_used": source if filtered else "none" } return { **result, "source_requested": None, "source_used": "all" } # Función de retrievers @mcp.tool() def retrieve_docs( query: str, retrievers: List[str], top_k: int = 3 ) -> dict: """ Realiza búsqueda semántica en documentos indexados. Parámetros: query (str): Texto de búsqueda (requerido) retrievers (List[str]): Nombres de retrievers a consultar (requerido) top_k (int): Número de resultados por retriever (opcional, default=3) """ logger.info(f"Iniciando búsqueda para query: '{query}'") logger.debug(f"Parámetros - retrievers: {retrievers}, top_k: {top_k}") results = {} invalid = [] for name in retrievers: if name not in indices: logger.warning(f"Retriever no encontrado: {name}") invalid.append(name) continue try: logger.info(f"Procesando retriever: {name}") # 1. Obtener el índice y realizar la búsqueda logger.debug(f"Creando retriever para {name} con top_k={top_k}") retriever = indices[name].as_retriever(similarity_top_k=top_k) nodes = retriever.retrieve(query) logger.info(f"Retrieved {len(nodes)} documentos de {name}") # 2. Buscar metadatos COMPLETOS metadata = {} source = "unknown" for src, indexes in retrievers_metadata.items(): if name in indexes: metadata = indexes[name] source = src break logger.debug(f"Metadatos encontrados para {name}: {metadata.keys()}") # 3. Construir respuesta results[name] = { "title": metadata.get("title", name), "documents": [ { "content": node.get_content(), "metadata": node.metadata, "score": node.score } for node in nodes ], "description": metadata.get("description", ""), "source": source, "last_updated": metadata.get("last_updated", "") } logger.info(f"Retriever {name} procesado exitosamente") except Exception as e: logger.error(f"Error procesando retriever {name}: {str(e)}", exc_info=True) results[name] = { "error": str(e), "retriever": name } # Construir respuesta final response = { "query": query, "results": results, "top_k": top_k, } if invalid: logger.warning(f"Retrievers inválidos: {invalid}. Opciones válidas: {list(indices.keys())}") response["warnings"] = { "invalid_retrievers": invalid, "valid_options": list(indices.keys()) } logger.info(f"Búsqueda completada. Total resultados: {len(results)}") return response @mcp.tool() async def search_tavily( query: str, days: int = 7, max_results: int = 1, include_answer: bool = False ) -> dict: """Perform a web search using the Tavily API. Args: query: Search query string (required) days: Restrict search to last N days (default: 7) max_results: Maximum results to return (default: 1) include_answer: Include a direct answer only when requested by the user (default: False) Returns: dict: Search results from Tavily """ # Obtener la API key de las variables de entorno tavily_api_key = os.environ.get('TAVILY_API_KEY') if not tavily_api_key: raise ValueError("TAVILY_API_KEY environment variable not set") headers = { "Authorization": f"Bearer {tavily_api_key}", "Content-Type": "application/json" } payload = { "query": query, "search_depth": "basic", "max_results": max_results, "days": days if days else None, "include_answer": include_answer } try: async with aiohttp.ClientSession() as session: async with session.post( "https://api.tavily.com/search", headers=headers, json=payload ) as response: response.raise_for_status() result = await response.json() return result except Exception as e: return { "error": str(e), "status": "failed", "query": query } if __name__ == "__main__": mcp.run("sse")