import gradio as gr from llama_index.core import VectorStoreIndex from llama_index.core import ( StorageContext, load_index_from_storage, ) from llama_index.tools.arxiv import ArxivToolSpec from llama_index.core import Settings from llama_index.llms.azure_openai import AzureOpenAI from llama_index.embeddings.azure_openai import AzureOpenAIEmbedding from llama_index.llms.openai import OpenAI from llama_index.embeddings.openai import OpenAIEmbedding from typing import Optional, List, Dict, Any from pathlib import Path import aiohttp import json import os from mcp_agent import MCPAgent import asyncio from gradio_client import Client, handle_file HF_TOKEN = os.environ.get('HF_TOKEN') ##### LLM ##### openai_api_key = os.environ.get('OPENAI_API_KEY') llm = OpenAI( model="gpt-4.1", api_key=openai_api_key, ) embed_model = OpenAIEmbedding( model="text-embedding-ada-002", api_key=openai_api_key, ) Settings.llm = llm Settings.embed_model = embed_model ##### END LLM ##### ##### LOAD RETRIEVERS ##### DOCUMENTS_BASE_PATH = "./" RETRIEVERS_JSON_PATH = Path("./retrievers.json") # Cargar metadatos def load_retrievers_metadata(): try: with open(RETRIEVERS_JSON_PATH, 'r', encoding='utf-8') as f: return json.load(f) except Exception as e: print(f"Error cargando retrievers.json: {str(e)}") print(f"Detalles del error: {traceback.format_exc()}") # Necesitarías importar traceback return {} retrievers_metadata = load_retrievers_metadata() SOURCES = {source: f"{source.lower()}/" for source in retrievers_metadata.keys()} # Cargar índices indices: Dict[str, VectorStoreIndex] = {} for source, rel_path in SOURCES.items(): full_path = os.path.join(DOCUMENTS_BASE_PATH, rel_path) if not os.path.exists(full_path): print(f"Advertencia: No se encontró la ruta para {source}") continue for root, dirs, files in os.walk(full_path): if "storage_nodes" in dirs: try: storage_path = os.path.join(root, "storage_nodes") storage_context = StorageContext.from_defaults(persist_dir=storage_path) index_name = os.path.basename(root) indices[index_name] = load_index_from_storage(storage_context) #, index_id="vector_index" print(f"Índice cargado correctamente: {index_name}") except Exception as e: print(f"Error cargando índice {index_name}: {str(e)}") print(f"Detalles del error: {traceback.format_exc()}") ##### ARXIV INSTANCE ##### arxiv_tool = ArxivToolSpec(max_results=5).to_tool_list()[0] arxiv_tool.return_direct = True ##### MCP TOOLS ##### async def search_arxiv( query: str, max_results: int = 5 ) -> Dict[str, Any]: """ Busca artículos académicos en ArXiv. Args: query: Términos de búsqueda (ej. "deep learning") max_results: Número máximo de resultados (1-10, default 5) Returns: Dict: Resultados de la búsqueda con metadatos de los papers """ try: # Configurar máximo de resultados max_results = min(max(1, max_results), 10) arxiv_tool.metadata.max_results = max_results # Ejecutar búsqueda y obtener resultados tool_output = arxiv_tool(query=query) # Procesar documentos papers = [] for doc in tool_output.raw_output: # Acceder correctamente a los documentos content = doc.text_resource.text.split('\n') papers.append({ 'title': content[0].split(': ')[1] if ': ' in content[0] else content[0], 'abstract': '\n'.join(content[1:]).strip(), 'pdf_url': content[0].split(': ')[0].replace('http://', 'https://'), 'arxiv_id': content[0].split(': ')[0].split('/')[-1].replace('v1', '') }) return { 'papers': papers, 'count': len(papers), 'query': query, 'status': 'success' } except Exception as e: return { 'papers': [], 'count': 0, 'query': query, 'status': 'error', 'error': str(e) } async def list_retrievers(source: str = None) -> dict: """ Devuelve la lista de retrievers disponibles. Si se especifica una source y existe, filtra por ella; si no existe, devuelve todas. Args: source (str, optional): Fuente para filtrar. Si no existe, se ignorará. Defaults to None. Returns: dict: { "retrievers": Lista de retrievers (filtrados o completos), "count": Número total, "status": "success"|"error", "source_requested": source, # Muestra lo que se solicitó "source_used": "all"|source # Muestra lo que realmente se usó } """ try: available = [] source_exists = source in retrievers_metadata if source else False for current_source, indexes in retrievers_metadata.items(): # Solo filtrar si el source existe, sino mostrar todo if source_exists and current_source != source: continue for index_name, metadata in indexes.items(): available.append({ "name": index_name, "source": current_source, "title": metadata.get("title", ""), "description": metadata.get("description", "") }) return { "retrievers": available, "count": len(available), "status": "success", "source_requested": source, "source_used": source if source_exists else "all" } except Exception as e: return { "retrievers": [], "count": 0, "status": "error", "error": str(e), "source_requested": source, "source_used": "none" } async def search_tavily( query: str, days: int = 7, max_results: int = 1, include_answer: bool = False ) -> dict: """Perform a web search using the Tavily API. Args: query: Search query string (required) days: Restrict search to last N days (default: 7) max_results: Maximum results to return (default: 1) include_answer: Include a direct answer only when requested by the user (default: False) Returns: dict: Search results from Tavily """ # Obtener la API key de las variables de entorno tavily_api_key = os.environ.get('TAVILY_API_KEY') if not tavily_api_key: raise ValueError("TAVILY_API_KEY environment variable not set") headers = { "Authorization": f"Bearer {tavily_api_key}", "Content-Type": "application/json" } payload = { "query": query, "search_depth": "basic", "max_results": max_results, "days": days if days else None, "include_answer": include_answer } try: async with aiohttp.ClientSession() as session: async with session.post( "https://api.tavily.com/search", headers=headers, json=payload ) as response: response.raise_for_status() result = await response.json() return result except Exception as e: return { "error": str(e), "status": "failed", "query": query } ##### MCP AGENT ##### sse_url = "https://pharma-ia-gradio-mcp-server.hf.space/gradio_api/mcp/sse" # Crear instancia del agente (con llm definido) mcp_agent = MCPAgent(sse_url, HF_TOKEN, llm) # Historial de chat chat_history = [] def format_tool_call(tool_name, arguments): message = f"🔧 **Llamando a herramienta**: `{tool_name}`\n```json\n{arguments}\n```" print(message) # Solo imprime en consola return "" # Devuelve cadena vacía para no mostrar en chat def format_tool_result(tool_name, result): message = f"✅ **Resultado de `{tool_name}`**\n```\n{result}\n```" print(message) # Solo imprime en consola return "" # Devuelve cadena vacía para no mostrar en chat async def chat_with_agent(message, history): history = history or [] # Agregar mensaje del usuario y marcador de "escribiendo" history.append((message, None)) # None activa el indicador de typing yield history # Variables para seguimiento processing_message = None try: # Procesar con el agente full_response = "" async for event in mcp_agent.stream_response(message): if event["type"] == "tool_call": format_tool_call(event["tool_name"], event["arguments"]) elif event["type"] == "tool_result": format_tool_result(event["tool_name"], event["result"]) elif event["type"] == "final_response": full_response = event["content"] # Actualizar indicador de actividad periódicamente if not processing_message or len(history[-1][1] or "") > len(processing_message): processing_message = "✍️ Generando respuesta..." history[-1] = (history[-1][0], processing_message) yield history # Mostrar respuesta final if full_response: history[-1] = (history[-1][0], full_response) yield history except Exception as e: history[-1] = (history[-1][0], f"⚠️ Error: {str(e)}") yield history # Configuración de la interfaz Gradio with gr.Blocks(title="Herramientas MCP", theme=gr.themes.Base()) as tools_tab: with gr.Accordion("Búsqueda Académica", open=False): arxiv_interface = gr.Interface( fn=search_arxiv, inputs=[ gr.Textbox(label="Términos de búsqueda", placeholder="Ej: deep learning"), gr.Slider(1, 10, value=5, step=1, label="Número máximo de resultados") ], outputs=gr.JSON(label="Resultados de búsqueda"), title="Búsqueda en ArXiv", description="Busca artículos académicos en ArXiv por palabras clave.", api_name="_search_arxiv" ) with gr.Accordion("Retrievers", open=False): retrievers_interface = gr.Interface( fn=list_retrievers, inputs=gr.Textbox(label="Fuente (opcional)", placeholder="Dejar vacío para listar todos"), outputs=gr.JSON(label="Lista de retrievers"), title="Lista de Retrievers", description="Muestra los retrievers disponibles, opcionalmente filtrados por fuente.", api_name="_list_retrievers" ) with gr.Accordion("Búsqueda Web", open=False): tavily_interface = gr.Interface( fn=search_tavily, inputs=[ gr.Textbox(label="Consulta de búsqueda", placeholder="Ej: últimas noticias sobre IA"), gr.Slider(1, 30, value=7, step=1, label="Últimos N días (0 para sin límite)"), gr.Slider(1, 10, value=1, step=1, label="Máximo de resultados"), gr.Checkbox(label="Incluir respuesta directa", value=False) ], outputs=gr.JSON(label="Resultados de Tavily"), title="Búsqueda Web (Tavily)", description="Realiza búsquedas en web usando la API de Tavily.", api_name="_search_tavily" ) # Creamos el Agente MCP with gr.Blocks(title="Agente MCP", theme=gr.themes.Base()) as agent_tab: # Chat principal chatbot = gr.Chatbot( label="Chat con el Agente MCP", height=600, bubble_full_width=True, render_markdown=True, show_label=False ) # Barra de entrada with gr.Row(): msg = gr.Textbox( placeholder="Escribe tu mensaje aquí...", container=False, scale=9, autofocus=True ) btn = gr.Button("Enviar", scale=1) clear = gr.ClearButton([msg, chatbot], value="Limpiar conversación") # Inicialización del agente (sin mostrar herramientas) async def init_agent(): await mcp_agent.initialize() return gr.Info("Agente listo para conversar") # Manejo de interacciones msg.submit( chat_with_agent, inputs=[msg, chatbot], outputs=[chatbot] ).then(lambda: "", None, [msg]) btn.click( chat_with_agent, inputs=[msg, chatbot], outputs=[chatbot] ).then(lambda: "", None, [msg]) # Creamos la interfaz con las dos pestañas principales demo = gr.TabbedInterface( [agent_tab, tools_tab], ["Agente MCP", "Tools MCP"] ) demo.launch(mcp_server=True)