Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| from llama_index.core import VectorStoreIndex | |
| from llama_index.core import ( | |
| StorageContext, | |
| load_index_from_storage, | |
| ) | |
| from llama_index.tools.arxiv import ArxivToolSpec | |
| from llama_index.core import Settings | |
| from llama_index.llms.azure_openai import AzureOpenAI | |
| from llama_index.embeddings.azure_openai import AzureOpenAIEmbedding | |
| from llama_index.llms.openai import OpenAI | |
| from llama_index.embeddings.openai import OpenAIEmbedding | |
| from typing import Optional, List, Dict, Any | |
| from pathlib import Path | |
| import aiohttp | |
| import json | |
| import os | |
| from mcp_agent import MCPAgent | |
| import asyncio | |
| from gradio_client import Client, handle_file | |
| HF_TOKEN = os.environ.get('HF_TOKEN') | |
| ##### LLM ##### | |
| openai_api_key = os.environ.get('OPENAI_API_KEY') | |
| llm = OpenAI( | |
| model="gpt-4.1", | |
| api_key=openai_api_key, | |
| ) | |
| embed_model = OpenAIEmbedding( | |
| model="text-embedding-ada-002", | |
| api_key=openai_api_key, | |
| ) | |
| Settings.llm = llm | |
| Settings.embed_model = embed_model | |
| ##### END LLM ##### | |
| ##### LOAD RETRIEVERS ##### | |
| DOCUMENTS_BASE_PATH = "./" | |
| RETRIEVERS_JSON_PATH = Path("./retrievers.json") | |
| # Cargar metadatos | |
| def load_retrievers_metadata(): | |
| try: | |
| with open(RETRIEVERS_JSON_PATH, 'r', encoding='utf-8') as f: | |
| return json.load(f) | |
| except Exception as e: | |
| print(f"Error cargando retrievers.json: {str(e)}") | |
| print(f"Detalles del error: {traceback.format_exc()}") # Necesitarías importar traceback | |
| return {} | |
| retrievers_metadata = load_retrievers_metadata() | |
| SOURCES = {source: f"{source.lower()}/" for source in retrievers_metadata.keys()} | |
| # Cargar índices | |
| indices: Dict[str, VectorStoreIndex] = {} | |
| for source, rel_path in SOURCES.items(): | |
| full_path = os.path.join(DOCUMENTS_BASE_PATH, rel_path) | |
| if not os.path.exists(full_path): | |
| print(f"Advertencia: No se encontró la ruta para {source}") | |
| continue | |
| for root, dirs, files in os.walk(full_path): | |
| if "storage_nodes" in dirs: | |
| try: | |
| storage_path = os.path.join(root, "storage_nodes") | |
| storage_context = StorageContext.from_defaults(persist_dir=storage_path) | |
| index_name = os.path.basename(root) | |
| indices[index_name] = load_index_from_storage(storage_context) #, index_id="vector_index" | |
| print(f"Índice cargado correctamente: {index_name}") | |
| except Exception as e: | |
| print(f"Error cargando índice {index_name}: {str(e)}") | |
| print(f"Detalles del error: {traceback.format_exc()}") | |
| ##### LOAD SPACES METADATA ##### | |
| SPACES_JSON_PATH = Path("./spaces.json") | |
| # Cargar metadatos de spaces | |
| def load_spaces_metadata(): | |
| try: | |
| with open(SPACES_JSON_PATH, 'r', encoding='utf-8') as f: | |
| return json.load(f) | |
| except Exception as e: | |
| print(f"Error cargando spaces.json: {str(e)}") | |
| return {"spaces": []} | |
| spaces_metadata = load_spaces_metadata() | |
| # Diccionario para cachear conexiones a spaces | |
| space_clients: Dict[str, Client] = {} | |
| ##### ARXIV INSTANCE ##### | |
| arxiv_tool = ArxivToolSpec(max_results=5).to_tool_list()[0] | |
| arxiv_tool.return_direct = True | |
| ##### MCP TOOLS ##### | |
| async def search_arxiv( | |
| query: str, | |
| max_results: int = 5 | |
| ) -> Dict[str, Any]: | |
| """ | |
| Busca artículos académicos en ArXiv. | |
| Args: | |
| query: Términos de búsqueda (ej. "deep learning") | |
| max_results: Número máximo de resultados (1-10, default 5) | |
| Returns: | |
| Dict: Resultados de la búsqueda con metadatos de los papers | |
| """ | |
| try: | |
| # Configurar máximo de resultados | |
| max_results = min(max(1, max_results), 10) | |
| arxiv_tool.metadata.max_results = max_results | |
| # Ejecutar búsqueda y obtener resultados | |
| tool_output = arxiv_tool(query=query) | |
| # Procesar documentos | |
| papers = [] | |
| for doc in tool_output.raw_output: # Acceder correctamente a los documentos | |
| content = doc.text_resource.text.split('\n') | |
| papers.append({ | |
| 'title': content[0].split(': ')[1] if ': ' in content[0] else content[0], | |
| 'abstract': '\n'.join(content[1:]).strip(), | |
| 'pdf_url': content[0].split(': ')[0].replace('http://', 'https://'), | |
| 'arxiv_id': content[0].split(': ')[0].split('/')[-1].replace('v1', '') | |
| }) | |
| return { | |
| 'papers': papers, | |
| 'count': len(papers), | |
| 'query': query, | |
| 'status': 'success' | |
| } | |
| except Exception as e: | |
| return { | |
| 'papers': [], | |
| 'count': 0, | |
| 'query': query, | |
| 'status': 'error', | |
| 'error': str(e) | |
| } | |
| async def list_retrievers(source: str = None) -> dict: | |
| """ | |
| Devuelve la lista de retrievers disponibles. | |
| Si se especifica una source y existe, filtra por ella; si no existe, devuelve todas. | |
| Args: | |
| source (str, optional): Fuente para filtrar. Si no existe, se ignorará. Defaults to None. | |
| Returns: | |
| dict: { | |
| "retrievers": Lista de retrievers (filtrados o completos), | |
| "count": Número total, | |
| "status": "success"|"error", | |
| "source_requested": source, # Muestra lo que se solicitó | |
| "source_used": "all"|source # Muestra lo que realmente se usó | |
| } | |
| """ | |
| try: | |
| available = [] | |
| source_exists = source in retrievers_metadata if source else False | |
| for current_source, indexes in retrievers_metadata.items(): | |
| # Solo filtrar si el source existe, sino mostrar todo | |
| if source_exists and current_source != source: | |
| continue | |
| for index_name, metadata in indexes.items(): | |
| available.append({ | |
| "name": index_name, | |
| "source": current_source, | |
| "title": metadata.get("title", ""), | |
| "description": metadata.get("description", "") | |
| }) | |
| return { | |
| "retrievers": available, | |
| "count": len(available), | |
| "status": "success", | |
| "source_requested": source, | |
| "source_used": source if source_exists else "all" | |
| } | |
| except Exception as e: | |
| return { | |
| "retrievers": [], | |
| "count": 0, | |
| "status": "error", | |
| "error": str(e), | |
| "source_requested": source, | |
| "source_used": "none" | |
| } | |
| async def search_tavily( | |
| query: str, | |
| days: int = 7, | |
| max_results: int = 1, | |
| include_answer: bool = False | |
| ) -> dict: | |
| """Perform a web search using the Tavily API. | |
| Args: | |
| query: Search query string (required) | |
| days: Restrict search to last N days (default: 7) | |
| max_results: Maximum results to return (default: 1) | |
| include_answer: Include a direct answer only when requested by the user (default: False) | |
| Returns: | |
| dict: Search results from Tavily | |
| """ | |
| # Obtener la API key de las variables de entorno | |
| tavily_api_key = os.environ.get('TAVILY_API_KEY') | |
| if not tavily_api_key: | |
| raise ValueError("TAVILY_API_KEY environment variable not set") | |
| headers = { | |
| "Authorization": f"Bearer {tavily_api_key}", | |
| "Content-Type": "application/json" | |
| } | |
| payload = { | |
| "query": query, | |
| "search_depth": "basic", | |
| "max_results": max_results, | |
| "days": days if days else None, | |
| "include_answer": include_answer | |
| } | |
| try: | |
| async with aiohttp.ClientSession() as session: | |
| async with session.post( | |
| "https://api.tavily.com/search", | |
| headers=headers, | |
| json=payload | |
| ) as response: | |
| response.raise_for_status() | |
| result = await response.json() | |
| return result | |
| except Exception as e: | |
| return { | |
| "error": str(e), | |
| "status": "failed", | |
| "query": query | |
| } | |
| async def list_spaces_names() -> dict: | |
| """ | |
| Devuelve una lista simplificada con los nombres y descripciones de todos los spaces disponibles. | |
| Returns: | |
| dict: { | |
| "status": "success"|"error", | |
| "spaces": list[dict{"name": str, "description": str}], | |
| "count": int | |
| } | |
| """ | |
| try: | |
| spaces_list = [ | |
| {"name": space["name"], "description": space["description"]} | |
| for space in spaces_metadata["spaces"] | |
| ] | |
| return { | |
| "status": "success", | |
| "spaces": spaces_list, | |
| "count": len(spaces_list) | |
| } | |
| except Exception as e: | |
| return { | |
| "status": "error", | |
| "message": f"Error al obtener la lista de spaces: {str(e)}", | |
| "spaces": [], | |
| "count": 0 | |
| } | |
| async def query_context( | |
| space_name: str, | |
| message_text: str, | |
| api_name: str = "/get_context_only" | |
| ) -> dict: | |
| """ | |
| Esta herramienta obtiene SOLO el contexto relevante para una consulta desde un Space especializado. | |
| Parámetros: | |
| space_name (str): Nombre exacto del Space a consultar (debe existir en 'list_spaces_names') | |
| message_text (str): Texto de la consulta/pregunta para generar el contexto | |
| api_name (str, optional): Endpoint de la API a usar (siempre "/get_context_only") | |
| Retorna: | |
| dict: { | |
| "status": "success"|"error", | |
| "space": str, # Nombre del Space consultado | |
| "query": str, # Texto de la consulta enviada | |
| "context": str, # Contexto formateado obtenido | |
| "url": str # URL del Space | |
| } | |
| """ | |
| try: | |
| space_info = next((s for s in spaces_metadata["spaces"] if s["name"] == space_name), None) | |
| if not space_info: | |
| print(f"Space no encontrado: {space_name}") | |
| return { | |
| "status": "error", | |
| "message": f"Space '{space_name}' no encontrado", | |
| "available_spaces": [s["name"] for s in spaces_metadata["spaces"]] | |
| } | |
| print(f"Obteniendo contexto del space: {space_name}") | |
| if space_name not in space_clients: | |
| print(f"Creando nuevo cliente para space: {space_name}") | |
| space_clients[space_name] = Client(space_info["url"], hf_token=HF_TOKEN) | |
| client = space_clients[space_name] | |
| print(f"Enviando consulta para contexto a space {space_name}") | |
| context = client.predict( | |
| message=message_text, | |
| api_name=api_name | |
| ) | |
| print(f"Contexto obtenido (longitud: {len(context) if context else 0})") | |
| return { | |
| "status": "success", | |
| "space": space_name, | |
| "query": message_text, | |
| "context": context, | |
| "url": space_info["url"] | |
| } | |
| except Exception as e: | |
| print(f"Error en query_context: {str(e)}") | |
| return { | |
| "status": "error", | |
| "space": space_name, | |
| "query": message_text, | |
| "error": str(e), | |
| "url": space_info.get("url", "") if space_info else "" | |
| } | |
| ##### MCP AGENT ##### | |
| sse_url = "https://pharma-ia-gradio-mcp-server.hf.space/gradio_api/mcp/sse" | |
| # Crear instancia del agente (debes tener tu llm definido) | |
| mcp_agent = MCPAgent(sse_url, HF_TOKEN, llm) | |
| # Historial de chat | |
| chat_history = [] | |
| def format_tool_call(tool_name, arguments): | |
| return f"🔧 **Llamando a herramienta**: `{tool_name}`\n```json\n{arguments}\n```" | |
| def format_tool_result(tool_name, result): | |
| return f"✅ **Resultado de `{tool_name}`**\n```\n{result}\n```" | |
| async def chat_with_agent(message, history): | |
| history = history or [] | |
| # Agregar mensaje del usuario al historial | |
| history.append((message, "")) | |
| # Procesar con el agente | |
| full_response = "" | |
| async for event in mcp_agent.stream_response(message): | |
| if event["type"] == "tool_call": | |
| tool_msg = format_tool_call(event["tool_name"], event["arguments"]) | |
| history.append((None, tool_msg)) | |
| yield history | |
| elif event["type"] == "tool_result": | |
| result_msg = format_tool_result(event["tool_name"], event["result"]) | |
| history.append((None, result_msg)) | |
| yield history | |
| elif event["type"] == "final_response": | |
| full_response = event["content"] | |
| # Reemplazar el último mensaje vacío con la respuesta completa | |
| history[-1] = (message, full_response) | |
| yield history | |
| # Configuración de la interfaz Gradio | |
| with gr.Blocks(title="Herramientas MCP") as tools_tab: | |
| with gr.Accordion("Búsqueda Académica", open=False): | |
| arxiv_interface = gr.Interface( | |
| fn=search_arxiv, | |
| inputs=[ | |
| gr.Textbox(label="Términos de búsqueda", placeholder="Ej: deep learning"), | |
| gr.Slider(1, 10, value=5, step=1, label="Número máximo de resultados") | |
| ], | |
| outputs=gr.JSON(label="Resultados de búsqueda"), | |
| title="Búsqueda en ArXiv", | |
| description="Busca artículos académicos en ArXiv por palabras clave.", | |
| api_name="_search_arxiv" | |
| ) | |
| with gr.Accordion("Retrievers", open=False): | |
| retrievers_interface = gr.Interface( | |
| fn=list_retrievers, | |
| inputs=gr.Textbox(label="Fuente (opcional)", placeholder="Dejar vacío para listar todos"), | |
| outputs=gr.JSON(label="Lista de retrievers"), | |
| title="Lista de Retrievers", | |
| description="Muestra los retrievers disponibles, opcionalmente filtrados por fuente.", | |
| api_name="_list_retrievers" | |
| ) | |
| with gr.Accordion("Búsqueda Web", open=False): | |
| tavily_interface = gr.Interface( | |
| fn=search_tavily, | |
| inputs=[ | |
| gr.Textbox(label="Consulta de búsqueda", placeholder="Ej: últimas noticias sobre IA"), | |
| gr.Slider(1, 30, value=7, step=1, label="Últimos N días (0 para sin límite)"), | |
| gr.Slider(1, 10, value=1, step=1, label="Máximo de resultados"), | |
| gr.Checkbox(label="Incluir respuesta directa", value=False) | |
| ], | |
| outputs=gr.JSON(label="Resultados de Tavily"), | |
| title="Búsqueda Web (Tavily)", | |
| description="Realiza búsquedas en web usando la API de Tavily.", | |
| api_name="_search_tavily" | |
| ) | |
| with gr.Accordion("HuggingFace Spaces", open=False): | |
| spaces_interface = gr.Interface( | |
| fn=list_spaces_names, | |
| inputs=None, | |
| outputs=gr.JSON(label="Lista de Spaces"), | |
| title="Lista de Spaces", | |
| description="Obtiene una lista simplificada con los nombres y descripciones de todos los spaces disponibles.", | |
| api_name="_list_space_names" | |
| ) | |
| context_interface = gr.Interface( | |
| fn=query_context, | |
| inputs=[ | |
| gr.Textbox(label="Nombre del Space", placeholder="Ej: mi-space"), | |
| gr.Textbox(label="Consulta", placeholder="Ingrese su pregunta o consulta"), | |
| gr.Textbox(label="API Name", value="/get_context_only", visible=False) | |
| ], | |
| outputs=gr.JSON(label="Contexto obtenido"), | |
| title="Obtener Contexto", | |
| description="Obtiene SOLO el contexto relevante para una consulta desde un Space especializado.", | |
| api_name="_query_context" | |
| ) | |
| # Creamos el Agente MCP (puedes personalizar esto según necesites) | |
| with gr.Blocks(title="Agente MCP") as agent_tab: | |
| gr.Markdown("# Agente MCP - Asistente con Herramientas") | |
| gr.Markdown("Interactúa con el agente que puede consultar múltiples fuentes de información especializada.") | |
| with gr.Row(): | |
| with gr.Column(scale=3): | |
| chatbot = gr.Chatbot( | |
| label="Conversación", | |
| height=600, | |
| bubble_full_width=False, | |
| render_markdown=True | |
| ) | |
| msg = gr.Textbox( | |
| label="Mensaje", | |
| placeholder="Escribe tu pregunta aquí...", | |
| container=False | |
| ) | |
| btn = gr.Button("Enviar") | |
| clear = gr.ClearButton([msg, chatbot]) | |
| with gr.Column(scale=1): | |
| gr.Markdown("### Herramientas Disponibles") | |
| tools_info = gr.JSON(label="Herramientas", value=[]) | |
| # Inicializar el agente y cargar herramientas | |
| async def init_agent(): | |
| tools = await mcp_agent.initialize() | |
| return {"tools": [tool.name for tool in tools.tools]} | |
| agent_tab.load(init_agent, outputs=[tools_info]) | |
| # Manejar interacción del chat | |
| msg.submit( | |
| chat_with_agent, | |
| inputs=[msg, chatbot], | |
| outputs=[chatbot] | |
| ) | |
| btn.click( | |
| chat_with_agent, | |
| inputs=[msg, chatbot], | |
| outputs=[chatbot] | |
| ).then(lambda: "", None, [msg]) | |
| # Creamos la interfaz con las dos pestañas principales | |
| demo = gr.TabbedInterface( | |
| [agent_tab, tools_tab], | |
| ["Agente MCP", "Tools MCP"] | |
| ) | |
| demo.launch(mcp_server=True) |