| import gradio as gr |
| from llama_index.core import VectorStoreIndex |
| from llama_index.core import ( |
| StorageContext, |
| load_index_from_storage, |
| ) |
| from llama_index.tools.arxiv import ArxivToolSpec |
| from llama_index.core import Settings |
| from llama_index.llms.azure_openai import AzureOpenAI |
| from llama_index.embeddings.azure_openai import AzureOpenAIEmbedding |
| from llama_index.llms.openai import OpenAI |
| from llama_index.embeddings.openai import OpenAIEmbedding |
| from typing import Optional, List, Dict, Any |
| from pathlib import Path |
| import aiohttp |
| import json |
| import os |
| from mcp_agent import MCPAgent |
| import asyncio |
|
|
|
|
| from gradio_client import Client, handle_file |
| HF_TOKEN = os.environ.get('HF_TOKEN') |
|
|
|
|
|
|
| |
| openai_api_key = os.environ.get('OPENAI_API_KEY') |
|
|
|
|
| llm = OpenAI( |
| model="gpt-4.1", |
| api_key=openai_api_key, |
| ) |
| embed_model = OpenAIEmbedding( |
| model="text-embedding-ada-002", |
| api_key=openai_api_key, |
| ) |
|
|
| Settings.llm = llm |
| Settings.embed_model = embed_model |
| |
|
|
|
|
|
|
| |
| DOCUMENTS_BASE_PATH = "./" |
| RETRIEVERS_JSON_PATH = Path("./retrievers.json") |
|
|
| |
| def load_retrievers_metadata(): |
| try: |
| with open(RETRIEVERS_JSON_PATH, 'r', encoding='utf-8') as f: |
| return json.load(f) |
| except Exception as e: |
| print(f"Error cargando retrievers.json: {str(e)}") |
| print(f"Detalles del error: {traceback.format_exc()}") |
| return {} |
|
|
| retrievers_metadata = load_retrievers_metadata() |
| SOURCES = {source: f"{source.lower()}/" for source in retrievers_metadata.keys()} |
|
|
| |
| indices: Dict[str, VectorStoreIndex] = {} |
|
|
| for source, rel_path in SOURCES.items(): |
| full_path = os.path.join(DOCUMENTS_BASE_PATH, rel_path) |
| if not os.path.exists(full_path): |
| print(f"Advertencia: No se encontró la ruta para {source}") |
| continue |
|
|
| for root, dirs, files in os.walk(full_path): |
| if "storage_nodes" in dirs: |
| try: |
| storage_path = os.path.join(root, "storage_nodes") |
| storage_context = StorageContext.from_defaults(persist_dir=storage_path) |
| index_name = os.path.basename(root) |
| indices[index_name] = load_index_from_storage(storage_context) |
| print(f"Índice cargado correctamente: {index_name}") |
| except Exception as e: |
| print(f"Error cargando índice {index_name}: {str(e)}") |
| print(f"Detalles del error: {traceback.format_exc()}") |
| |
|
|
|
|
|
|
|
|
|
|
| |
| arxiv_tool = ArxivToolSpec(max_results=5).to_tool_list()[0] |
| arxiv_tool.return_direct = True |
|
|
|
|
|
|
| |
|
|
| async def search_arxiv( |
| query: str, |
| max_results: int = 5 |
| ) -> Dict[str, Any]: |
| """ |
| Busca artículos académicos en ArXiv. |
| |
| Args: |
| query: Términos de búsqueda (ej. "deep learning") |
| max_results: Número máximo de resultados (1-10, default 5) |
| |
| Returns: |
| Dict: Resultados de la búsqueda con metadatos de los papers |
| """ |
| try: |
| |
| max_results = min(max(1, max_results), 10) |
| arxiv_tool.metadata.max_results = max_results |
| |
| |
| tool_output = arxiv_tool(query=query) |
| |
| |
| papers = [] |
| for doc in tool_output.raw_output: |
| content = doc.text_resource.text.split('\n') |
| papers.append({ |
| 'title': content[0].split(': ')[1] if ': ' in content[0] else content[0], |
| 'abstract': '\n'.join(content[1:]).strip(), |
| 'pdf_url': content[0].split(': ')[0].replace('http://', 'https://'), |
| 'arxiv_id': content[0].split(': ')[0].split('/')[-1].replace('v1', '') |
| }) |
| |
| return { |
| 'papers': papers, |
| 'count': len(papers), |
| 'query': query, |
| 'status': 'success' |
| } |
| |
| except Exception as e: |
| return { |
| 'papers': [], |
| 'count': 0, |
| 'query': query, |
| 'status': 'error', |
| 'error': str(e) |
| } |
|
|
| async def list_retrievers(source: str = None) -> dict: |
| """ |
| Devuelve la lista de retrievers disponibles. |
| Si se especifica una source y existe, filtra por ella; si no existe, devuelve todas. |
| |
| Args: |
| source (str, optional): Fuente para filtrar. Si no existe, se ignorará. Defaults to None. |
| |
| Returns: |
| dict: { |
| "retrievers": Lista de retrievers (filtrados o completos), |
| "count": Número total, |
| "status": "success"|"error", |
| "source_requested": source, # Muestra lo que se solicitó |
| "source_used": "all"|source # Muestra lo que realmente se usó |
| } |
| """ |
| try: |
| available = [] |
| source_exists = source in retrievers_metadata if source else False |
| |
| for current_source, indexes in retrievers_metadata.items(): |
| |
| if source_exists and current_source != source: |
| continue |
| |
| for index_name, metadata in indexes.items(): |
| available.append({ |
| "name": index_name, |
| "source": current_source, |
| "title": metadata.get("title", ""), |
| "description": metadata.get("description", "") |
| }) |
| |
| return { |
| "retrievers": available, |
| "count": len(available), |
| "status": "success", |
| "source_requested": source, |
| "source_used": source if source_exists else "all" |
| } |
| except Exception as e: |
| return { |
| "retrievers": [], |
| "count": 0, |
| "status": "error", |
| "error": str(e), |
| "source_requested": source, |
| "source_used": "none" |
| } |
|
|
|
|
|
|
| async def search_tavily( |
| query: str, |
| days: int = 7, |
| max_results: int = 1, |
| include_answer: bool = False |
| ) -> dict: |
| """Perform a web search using the Tavily API. |
| |
| Args: |
| query: Search query string (required) |
| days: Restrict search to last N days (default: 7) |
| max_results: Maximum results to return (default: 1) |
| include_answer: Include a direct answer only when requested by the user (default: False) |
| |
| Returns: |
| dict: Search results from Tavily |
| """ |
| |
| tavily_api_key = os.environ.get('TAVILY_API_KEY') |
| if not tavily_api_key: |
| raise ValueError("TAVILY_API_KEY environment variable not set") |
| |
| headers = { |
| "Authorization": f"Bearer {tavily_api_key}", |
| "Content-Type": "application/json" |
| } |
| |
| payload = { |
| "query": query, |
| "search_depth": "basic", |
| "max_results": max_results, |
| "days": days if days else None, |
| "include_answer": include_answer |
| } |
| |
| try: |
| async with aiohttp.ClientSession() as session: |
| async with session.post( |
| "https://api.tavily.com/search", |
| headers=headers, |
| json=payload |
| ) as response: |
| response.raise_for_status() |
| result = await response.json() |
| return result |
| |
| except Exception as e: |
| return { |
| "error": str(e), |
| "status": "failed", |
| "query": query |
| } |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| |
| sse_url = "https://pharma-ia-gradio-mcp-server.hf.space/gradio_api/mcp/sse" |
|
|
| |
| mcp_agent = MCPAgent(sse_url, HF_TOKEN, llm) |
|
|
| |
| chat_history = [] |
|
|
| def format_tool_call(tool_name, arguments): |
| message = f"🔧 **Llamando a herramienta**: `{tool_name}`\n```json\n{arguments}\n```" |
| print(message) |
| return "" |
|
|
| def format_tool_result(tool_name, result): |
| message = f"✅ **Resultado de `{tool_name}`**\n```\n{result}\n```" |
| print(message) |
| return "" |
|
|
| async def chat_with_agent(message, history): |
| history = history or [] |
| |
| |
| history.append((message, None)) |
| yield history |
| |
| |
| processing_message = None |
| |
| try: |
| |
| full_response = "" |
| async for event in mcp_agent.stream_response(message): |
| if event["type"] == "tool_call": |
| format_tool_call(event["tool_name"], event["arguments"]) |
| |
| elif event["type"] == "tool_result": |
| format_tool_result(event["tool_name"], event["result"]) |
| |
| elif event["type"] == "final_response": |
| full_response = event["content"] |
| |
| |
| if not processing_message or len(history[-1][1] or "") > len(processing_message): |
| processing_message = "✍️ Generando respuesta..." |
| history[-1] = (history[-1][0], processing_message) |
| yield history |
| |
| |
| if full_response: |
| history[-1] = (history[-1][0], full_response) |
| yield history |
| |
| except Exception as e: |
| history[-1] = (history[-1][0], f"⚠️ Error: {str(e)}") |
| yield history |
|
|
|
|
|
|
|
|
| |
|
|
| |
| with gr.Blocks(title="Herramientas MCP", theme=gr.themes.Base()) as tools_tab: |
| with gr.Accordion("Búsqueda Académica", open=False): |
| arxiv_interface = gr.Interface( |
| fn=search_arxiv, |
| inputs=[ |
| gr.Textbox(label="Términos de búsqueda", placeholder="Ej: deep learning"), |
| gr.Slider(1, 10, value=5, step=1, label="Número máximo de resultados") |
| ], |
| outputs=gr.JSON(label="Resultados de búsqueda"), |
| title="Búsqueda en ArXiv", |
| description="Busca artículos académicos en ArXiv por palabras clave.", |
| api_name="_search_arxiv" |
| ) |
| |
| with gr.Accordion("Retrievers", open=False): |
| retrievers_interface = gr.Interface( |
| fn=list_retrievers, |
| inputs=gr.Textbox(label="Fuente (opcional)", placeholder="Dejar vacío para listar todos"), |
| outputs=gr.JSON(label="Lista de retrievers"), |
| title="Lista de Retrievers", |
| description="Muestra los retrievers disponibles, opcionalmente filtrados por fuente.", |
| api_name="_list_retrievers" |
| ) |
|
|
| with gr.Accordion("Búsqueda Web", open=False): |
| tavily_interface = gr.Interface( |
| fn=search_tavily, |
| inputs=[ |
| gr.Textbox(label="Consulta de búsqueda", placeholder="Ej: últimas noticias sobre IA"), |
| gr.Slider(1, 30, value=7, step=1, label="Últimos N días (0 para sin límite)"), |
| gr.Slider(1, 10, value=1, step=1, label="Máximo de resultados"), |
| gr.Checkbox(label="Incluir respuesta directa", value=False) |
| ], |
| outputs=gr.JSON(label="Resultados de Tavily"), |
| title="Búsqueda Web (Tavily)", |
| description="Realiza búsquedas en web usando la API de Tavily.", |
| api_name="_search_tavily" |
| ) |
| |
|
|
|
|
| |
| with gr.Blocks(title="Agente MCP", theme=gr.themes.Base()) as agent_tab: |
| |
| chatbot = gr.Chatbot( |
| label="Chat con el Agente MCP", |
| height=600, |
| bubble_full_width=True, |
| render_markdown=True, |
| show_label=False |
| ) |
| |
| |
| with gr.Row(): |
| msg = gr.Textbox( |
| placeholder="Escribe tu mensaje aquí...", |
| container=False, |
| scale=9, |
| autofocus=True |
| ) |
| btn = gr.Button("Enviar", scale=1) |
| |
| clear = gr.ClearButton([msg, chatbot], value="Limpiar conversación") |
|
|
| |
| async def init_agent(): |
| await mcp_agent.initialize() |
| return gr.Info("Agente listo para conversar") |
|
|
| |
| msg.submit( |
| chat_with_agent, |
| inputs=[msg, chatbot], |
| outputs=[chatbot] |
| ).then(lambda: "", None, [msg]) |
| |
| btn.click( |
| chat_with_agent, |
| inputs=[msg, chatbot], |
| outputs=[chatbot] |
| ).then(lambda: "", None, [msg]) |
|
|
| |
| |
| demo = gr.TabbedInterface( |
| [agent_tab, tools_tab], |
| ["Agente MCP", "Tools MCP"] |
| ) |
|
|
| demo.launch(mcp_server=True) |