geronimo-pericoli's picture
Update app.py
01d0a23 verified
raw
history blame
13.1 kB
import gradio as gr
from llama_index.core import VectorStoreIndex
from llama_index.core import (
StorageContext,
load_index_from_storage,
)
from llama_index.tools.arxiv import ArxivToolSpec
from llama_index.core import Settings
from llama_index.llms.azure_openai import AzureOpenAI
from llama_index.embeddings.azure_openai import AzureOpenAIEmbedding
from llama_index.llms.openai import OpenAI
from llama_index.embeddings.openai import OpenAIEmbedding
from typing import Optional, List, Dict, Any
from pathlib import Path
import aiohttp
import json
import os
from mcp_agent import MCPAgent
import asyncio
from gradio_client import Client, handle_file
HF_TOKEN = os.environ.get('HF_TOKEN')
##### LLM #####
openai_api_key = os.environ.get('OPENAI_API_KEY')
llm = OpenAI(
model="gpt-4.1",
api_key=openai_api_key,
)
embed_model = OpenAIEmbedding(
model="text-embedding-ada-002",
api_key=openai_api_key,
)
Settings.llm = llm
Settings.embed_model = embed_model
##### END LLM #####
##### LOAD RETRIEVERS #####
DOCUMENTS_BASE_PATH = "./"
RETRIEVERS_JSON_PATH = Path("./retrievers.json")
# Cargar metadatos
def load_retrievers_metadata():
try:
with open(RETRIEVERS_JSON_PATH, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception as e:
print(f"Error cargando retrievers.json: {str(e)}")
print(f"Detalles del error: {traceback.format_exc()}") # Necesitarías importar traceback
return {}
retrievers_metadata = load_retrievers_metadata()
SOURCES = {source: f"{source.lower()}/" for source in retrievers_metadata.keys()}
# Cargar índices
indices: Dict[str, VectorStoreIndex] = {}
for source, rel_path in SOURCES.items():
full_path = os.path.join(DOCUMENTS_BASE_PATH, rel_path)
if not os.path.exists(full_path):
print(f"Advertencia: No se encontró la ruta para {source}")
continue
for root, dirs, files in os.walk(full_path):
if "storage_nodes" in dirs:
try:
storage_path = os.path.join(root, "storage_nodes")
storage_context = StorageContext.from_defaults(persist_dir=storage_path)
index_name = os.path.basename(root)
indices[index_name] = load_index_from_storage(storage_context) #, index_id="vector_index"
print(f"Índice cargado correctamente: {index_name}")
except Exception as e:
print(f"Error cargando índice {index_name}: {str(e)}")
print(f"Detalles del error: {traceback.format_exc()}")
##### ARXIV INSTANCE #####
arxiv_tool = ArxivToolSpec(max_results=5).to_tool_list()[0]
arxiv_tool.return_direct = True
##### MCP TOOLS #####
async def search_arxiv(
query: str,
max_results: int = 5
) -> Dict[str, Any]:
"""
Busca artículos académicos en ArXiv.
Args:
query: Términos de búsqueda (ej. "deep learning")
max_results: Número máximo de resultados (1-10, default 5)
Returns:
Dict: Resultados de la búsqueda con metadatos de los papers
"""
try:
# Configurar máximo de resultados
max_results = min(max(1, max_results), 10)
arxiv_tool.metadata.max_results = max_results
# Ejecutar búsqueda y obtener resultados
tool_output = arxiv_tool(query=query)
# Procesar documentos
papers = []
for doc in tool_output.raw_output: # Acceder correctamente a los documentos
content = doc.text_resource.text.split('\n')
papers.append({
'title': content[0].split(': ')[1] if ': ' in content[0] else content[0],
'abstract': '\n'.join(content[1:]).strip(),
'pdf_url': content[0].split(': ')[0].replace('http://', 'https://'),
'arxiv_id': content[0].split(': ')[0].split('/')[-1].replace('v1', '')
})
return {
'papers': papers,
'count': len(papers),
'query': query,
'status': 'success'
}
except Exception as e:
return {
'papers': [],
'count': 0,
'query': query,
'status': 'error',
'error': str(e)
}
async def list_retrievers(source: str = None) -> dict:
"""
Devuelve la lista de retrievers disponibles.
Si se especifica una source y existe, filtra por ella; si no existe, devuelve todas.
Args:
source (str, optional): Fuente para filtrar. Si no existe, se ignorará. Defaults to None.
Returns:
dict: {
"retrievers": Lista de retrievers (filtrados o completos),
"count": Número total,
"status": "success"|"error",
"source_requested": source, # Muestra lo que se solicitó
"source_used": "all"|source # Muestra lo que realmente se usó
}
"""
try:
available = []
source_exists = source in retrievers_metadata if source else False
for current_source, indexes in retrievers_metadata.items():
# Solo filtrar si el source existe, sino mostrar todo
if source_exists and current_source != source:
continue
for index_name, metadata in indexes.items():
available.append({
"name": index_name,
"source": current_source,
"title": metadata.get("title", ""),
"description": metadata.get("description", "")
})
return {
"retrievers": available,
"count": len(available),
"status": "success",
"source_requested": source,
"source_used": source if source_exists else "all"
}
except Exception as e:
return {
"retrievers": [],
"count": 0,
"status": "error",
"error": str(e),
"source_requested": source,
"source_used": "none"
}
async def search_tavily(
query: str,
days: int = 7,
max_results: int = 1,
include_answer: bool = False
) -> dict:
"""Perform a web search using the Tavily API.
Args:
query: Search query string (required)
days: Restrict search to last N days (default: 7)
max_results: Maximum results to return (default: 1)
include_answer: Include a direct answer only when requested by the user (default: False)
Returns:
dict: Search results from Tavily
"""
# Obtener la API key de las variables de entorno
tavily_api_key = os.environ.get('TAVILY_API_KEY')
if not tavily_api_key:
raise ValueError("TAVILY_API_KEY environment variable not set")
headers = {
"Authorization": f"Bearer {tavily_api_key}",
"Content-Type": "application/json"
}
payload = {
"query": query,
"search_depth": "basic",
"max_results": max_results,
"days": days if days else None,
"include_answer": include_answer
}
try:
async with aiohttp.ClientSession() as session:
async with session.post(
"https://api.tavily.com/search",
headers=headers,
json=payload
) as response:
response.raise_for_status()
result = await response.json()
return result
except Exception as e:
return {
"error": str(e),
"status": "failed",
"query": query
}
##### MCP AGENT #####
sse_url = "https://pharma-ia-gradio-mcp-server.hf.space/gradio_api/mcp/sse"
# Crear instancia del agente (con llm definido)
mcp_agent = MCPAgent(sse_url, HF_TOKEN, llm)
# Historial de chat
chat_history = []
def format_tool_call(tool_name, arguments):
message = f"🔧 **Llamando a herramienta**: `{tool_name}`\n```json\n{arguments}\n```"
print(message) # Solo imprime en consola
return "" # Devuelve cadena vacía para no mostrar en chat
def format_tool_result(tool_name, result):
message = f"✅ **Resultado de `{tool_name}`**\n```\n{result}\n```"
print(message) # Solo imprime en consola
return "" # Devuelve cadena vacía para no mostrar en chat
async def chat_with_agent(message, history):
history = history or []
# Agregar mensaje del usuario y marcador de "escribiendo"
history.append((message, None)) # None activa el indicador de typing
yield history
# Variables para seguimiento
processing_message = None
try:
# Procesar con el agente
full_response = ""
async for event in mcp_agent.stream_response(message):
if event["type"] == "tool_call":
format_tool_call(event["tool_name"], event["arguments"])
elif event["type"] == "tool_result":
format_tool_result(event["tool_name"], event["result"])
elif event["type"] == "final_response":
full_response = event["content"]
# Actualizar indicador de actividad periódicamente
if not processing_message or len(history[-1][1] or "") > len(processing_message):
processing_message = "✍️ Generando respuesta..."
history[-1] = (history[-1][0], processing_message)
yield history
# Mostrar respuesta final
if full_response:
history[-1] = (history[-1][0], full_response)
yield history
except Exception as e:
history[-1] = (history[-1][0], f"⚠️ Error: {str(e)}")
yield history
# Configuración de la interfaz Gradio
with gr.Blocks(title="Herramientas MCP", theme=gr.themes.Base()) as tools_tab:
with gr.Accordion("Búsqueda Académica", open=False):
arxiv_interface = gr.Interface(
fn=search_arxiv,
inputs=[
gr.Textbox(label="Términos de búsqueda", placeholder="Ej: deep learning"),
gr.Slider(1, 10, value=5, step=1, label="Número máximo de resultados")
],
outputs=gr.JSON(label="Resultados de búsqueda"),
title="Búsqueda en ArXiv",
description="Busca artículos académicos en ArXiv por palabras clave.",
api_name="_search_arxiv"
)
with gr.Accordion("Retrievers", open=False):
retrievers_interface = gr.Interface(
fn=list_retrievers,
inputs=gr.Textbox(label="Fuente (opcional)", placeholder="Dejar vacío para listar todos"),
outputs=gr.JSON(label="Lista de retrievers"),
title="Lista de Retrievers",
description="Muestra los retrievers disponibles, opcionalmente filtrados por fuente.",
api_name="_list_retrievers"
)
with gr.Accordion("Búsqueda Web", open=False):
tavily_interface = gr.Interface(
fn=search_tavily,
inputs=[
gr.Textbox(label="Consulta de búsqueda", placeholder="Ej: últimas noticias sobre IA"),
gr.Slider(1, 30, value=7, step=1, label="Últimos N días (0 para sin límite)"),
gr.Slider(1, 10, value=1, step=1, label="Máximo de resultados"),
gr.Checkbox(label="Incluir respuesta directa", value=False)
],
outputs=gr.JSON(label="Resultados de Tavily"),
title="Búsqueda Web (Tavily)",
description="Realiza búsquedas en web usando la API de Tavily.",
api_name="_search_tavily"
)
# Creamos el Agente MCP
with gr.Blocks(title="Agente MCP", theme=gr.themes.Base()) as agent_tab:
# Chat principal
chatbot = gr.Chatbot(
label="Chat con el Agente MCP",
height=600,
bubble_full_width=True,
render_markdown=True,
show_label=False
)
# Barra de entrada
with gr.Row():
msg = gr.Textbox(
placeholder="Escribe tu mensaje aquí...",
container=False,
scale=9,
autofocus=True
)
btn = gr.Button("Enviar", scale=1)
clear = gr.ClearButton([msg, chatbot], value="Limpiar conversación")
# Inicialización del agente (sin mostrar herramientas)
async def init_agent():
await mcp_agent.initialize()
return gr.Info("Agente listo para conversar")
# Manejo de interacciones
msg.submit(
chat_with_agent,
inputs=[msg, chatbot],
outputs=[chatbot]
).then(lambda: "", None, [msg])
btn.click(
chat_with_agent,
inputs=[msg, chatbot],
outputs=[chatbot]
).then(lambda: "", None, [msg])
# Creamos la interfaz con las dos pestañas principales
demo = gr.TabbedInterface(
[agent_tab, tools_tab],
["Agente MCP", "Tools MCP"]
)
demo.launch(mcp_server=True)