geronimo-pericoli's picture
Update app.py
f6ecd68 verified
raw
history blame
17.5 kB
import gradio as gr
from llama_index.core import VectorStoreIndex
from llama_index.core import (
StorageContext,
load_index_from_storage,
)
from llama_index.tools.arxiv import ArxivToolSpec
from llama_index.core import Settings
from llama_index.llms.azure_openai import AzureOpenAI
from llama_index.embeddings.azure_openai import AzureOpenAIEmbedding
from llama_index.llms.openai import OpenAI
from llama_index.embeddings.openai import OpenAIEmbedding
from typing import Optional, List, Dict, Any
from pathlib import Path
import aiohttp
import json
import os
from mcp_agent import MCPAgent
import asyncio
from gradio_client import Client, handle_file
HF_TOKEN = os.environ.get('HF_TOKEN')
##### LLM #####
openai_api_key = os.environ.get('OPENAI_API_KEY')
llm = OpenAI(
model="gpt-4.1",
api_key=openai_api_key,
)
embed_model = OpenAIEmbedding(
model="text-embedding-ada-002",
api_key=openai_api_key,
)
Settings.llm = llm
Settings.embed_model = embed_model
##### END LLM #####
##### LOAD RETRIEVERS #####
DOCUMENTS_BASE_PATH = "./"
RETRIEVERS_JSON_PATH = Path("./retrievers.json")
# Cargar metadatos
def load_retrievers_metadata():
try:
with open(RETRIEVERS_JSON_PATH, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception as e:
print(f"Error cargando retrievers.json: {str(e)}")
print(f"Detalles del error: {traceback.format_exc()}") # Necesitarías importar traceback
return {}
retrievers_metadata = load_retrievers_metadata()
SOURCES = {source: f"{source.lower()}/" for source in retrievers_metadata.keys()}
# Cargar índices
indices: Dict[str, VectorStoreIndex] = {}
for source, rel_path in SOURCES.items():
full_path = os.path.join(DOCUMENTS_BASE_PATH, rel_path)
if not os.path.exists(full_path):
print(f"Advertencia: No se encontró la ruta para {source}")
continue
for root, dirs, files in os.walk(full_path):
if "storage_nodes" in dirs:
try:
storage_path = os.path.join(root, "storage_nodes")
storage_context = StorageContext.from_defaults(persist_dir=storage_path)
index_name = os.path.basename(root)
indices[index_name] = load_index_from_storage(storage_context) #, index_id="vector_index"
print(f"Índice cargado correctamente: {index_name}")
except Exception as e:
print(f"Error cargando índice {index_name}: {str(e)}")
print(f"Detalles del error: {traceback.format_exc()}")
##### LOAD SPACES METADATA #####
SPACES_JSON_PATH = Path("./spaces.json")
# Cargar metadatos de spaces
def load_spaces_metadata():
try:
with open(SPACES_JSON_PATH, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception as e:
print(f"Error cargando spaces.json: {str(e)}")
return {"spaces": []}
spaces_metadata = load_spaces_metadata()
# Diccionario para cachear conexiones a spaces
space_clients: Dict[str, Client] = {}
##### ARXIV INSTANCE #####
arxiv_tool = ArxivToolSpec(max_results=5).to_tool_list()[0]
arxiv_tool.return_direct = True
##### MCP TOOLS #####
async def search_arxiv(
query: str,
max_results: int = 5
) -> Dict[str, Any]:
"""
Busca artículos académicos en ArXiv.
Args:
query: Términos de búsqueda (ej. "deep learning")
max_results: Número máximo de resultados (1-10, default 5)
Returns:
Dict: Resultados de la búsqueda con metadatos de los papers
"""
try:
# Configurar máximo de resultados
max_results = min(max(1, max_results), 10)
arxiv_tool.metadata.max_results = max_results
# Ejecutar búsqueda y obtener resultados
tool_output = arxiv_tool(query=query)
# Procesar documentos
papers = []
for doc in tool_output.raw_output: # Acceder correctamente a los documentos
content = doc.text_resource.text.split('\n')
papers.append({
'title': content[0].split(': ')[1] if ': ' in content[0] else content[0],
'abstract': '\n'.join(content[1:]).strip(),
'pdf_url': content[0].split(': ')[0].replace('http://', 'https://'),
'arxiv_id': content[0].split(': ')[0].split('/')[-1].replace('v1', '')
})
return {
'papers': papers,
'count': len(papers),
'query': query,
'status': 'success'
}
except Exception as e:
return {
'papers': [],
'count': 0,
'query': query,
'status': 'error',
'error': str(e)
}
async def list_retrievers(source: str = None) -> dict:
"""
Devuelve la lista de retrievers disponibles.
Si se especifica una source y existe, filtra por ella; si no existe, devuelve todas.
Args:
source (str, optional): Fuente para filtrar. Si no existe, se ignorará. Defaults to None.
Returns:
dict: {
"retrievers": Lista de retrievers (filtrados o completos),
"count": Número total,
"status": "success"|"error",
"source_requested": source, # Muestra lo que se solicitó
"source_used": "all"|source # Muestra lo que realmente se usó
}
"""
try:
available = []
source_exists = source in retrievers_metadata if source else False
for current_source, indexes in retrievers_metadata.items():
# Solo filtrar si el source existe, sino mostrar todo
if source_exists and current_source != source:
continue
for index_name, metadata in indexes.items():
available.append({
"name": index_name,
"source": current_source,
"title": metadata.get("title", ""),
"description": metadata.get("description", "")
})
return {
"retrievers": available,
"count": len(available),
"status": "success",
"source_requested": source,
"source_used": source if source_exists else "all"
}
except Exception as e:
return {
"retrievers": [],
"count": 0,
"status": "error",
"error": str(e),
"source_requested": source,
"source_used": "none"
}
async def search_tavily(
query: str,
days: int = 7,
max_results: int = 1,
include_answer: bool = False
) -> dict:
"""Perform a web search using the Tavily API.
Args:
query: Search query string (required)
days: Restrict search to last N days (default: 7)
max_results: Maximum results to return (default: 1)
include_answer: Include a direct answer only when requested by the user (default: False)
Returns:
dict: Search results from Tavily
"""
# Obtener la API key de las variables de entorno
tavily_api_key = os.environ.get('TAVILY_API_KEY')
if not tavily_api_key:
raise ValueError("TAVILY_API_KEY environment variable not set")
headers = {
"Authorization": f"Bearer {tavily_api_key}",
"Content-Type": "application/json"
}
payload = {
"query": query,
"search_depth": "basic",
"max_results": max_results,
"days": days if days else None,
"include_answer": include_answer
}
try:
async with aiohttp.ClientSession() as session:
async with session.post(
"https://api.tavily.com/search",
headers=headers,
json=payload
) as response:
response.raise_for_status()
result = await response.json()
return result
except Exception as e:
return {
"error": str(e),
"status": "failed",
"query": query
}
async def list_spaces_names() -> dict:
"""
Devuelve una lista simplificada con los nombres y descripciones de todos los spaces disponibles.
Returns:
dict: {
"status": "success"|"error",
"spaces": list[dict{"name": str, "description": str}],
"count": int
}
"""
try:
spaces_list = [
{"name": space["name"], "description": space["description"]}
for space in spaces_metadata["spaces"]
]
return {
"status": "success",
"spaces": spaces_list,
"count": len(spaces_list)
}
except Exception as e:
return {
"status": "error",
"message": f"Error al obtener la lista de spaces: {str(e)}",
"spaces": [],
"count": 0
}
async def query_context(
space_name: str,
message_text: str,
api_name: str = "/get_context_only"
) -> dict:
"""
Esta herramienta obtiene SOLO el contexto relevante para una consulta desde un Space especializado.
Parámetros:
space_name (str): Nombre exacto del Space a consultar (debe existir en 'list_spaces_names')
message_text (str): Texto de la consulta/pregunta para generar el contexto
api_name (str, optional): Endpoint de la API a usar (siempre "/get_context_only")
Retorna:
dict: {
"status": "success"|"error",
"space": str, # Nombre del Space consultado
"query": str, # Texto de la consulta enviada
"context": str, # Contexto formateado obtenido
"url": str # URL del Space
}
"""
try:
space_info = next((s for s in spaces_metadata["spaces"] if s["name"] == space_name), None)
if not space_info:
print(f"Space no encontrado: {space_name}")
return {
"status": "error",
"message": f"Space '{space_name}' no encontrado",
"available_spaces": [s["name"] for s in spaces_metadata["spaces"]]
}
print(f"Obteniendo contexto del space: {space_name}")
if space_name not in space_clients:
print(f"Creando nuevo cliente para space: {space_name}")
space_clients[space_name] = Client(space_info["url"], hf_token=HF_TOKEN)
client = space_clients[space_name]
print(f"Enviando consulta para contexto a space {space_name}")
context = client.predict(
message=message_text,
api_name=api_name
)
print(f"Contexto obtenido (longitud: {len(context) if context else 0})")
return {
"status": "success",
"space": space_name,
"query": message_text,
"context": context,
"url": space_info["url"]
}
except Exception as e:
print(f"Error en query_context: {str(e)}")
return {
"status": "error",
"space": space_name,
"query": message_text,
"error": str(e),
"url": space_info.get("url", "") if space_info else ""
}
##### MCP AGENT #####
sse_url = "https://pharma-ia-gradio-mcp-server.hf.space/gradio_api/mcp/sse"
# Crear instancia del agente (debes tener tu llm definido)
mcp_agent = MCPAgent(sse_url, HF_TOKEN, llm)
# Historial de chat
chat_history = []
def format_tool_call(tool_name, arguments):
return f"🔧 **Llamando a herramienta**: `{tool_name}`\n```json\n{arguments}\n```"
def format_tool_result(tool_name, result):
return f"✅ **Resultado de `{tool_name}`**\n```\n{result}\n```"
async def chat_with_agent(message, history):
history = history or []
# Agregar mensaje del usuario al historial
history.append((message, ""))
# Procesar con el agente
full_response = ""
async for event in mcp_agent.stream_response(message):
if event["type"] == "tool_call":
tool_msg = format_tool_call(event["tool_name"], event["arguments"])
history.append((None, tool_msg))
yield history
elif event["type"] == "tool_result":
result_msg = format_tool_result(event["tool_name"], event["result"])
history.append((None, result_msg))
yield history
elif event["type"] == "final_response":
full_response = event["content"]
# Reemplazar el último mensaje vacío con la respuesta completa
history[-1] = (message, full_response)
yield history
# Configuración de la interfaz Gradio
with gr.Blocks(title="Herramientas MCP") as tools_tab:
with gr.Accordion("Búsqueda Académica", open=False):
arxiv_interface = gr.Interface(
fn=search_arxiv,
inputs=[
gr.Textbox(label="Términos de búsqueda", placeholder="Ej: deep learning"),
gr.Slider(1, 10, value=5, step=1, label="Número máximo de resultados")
],
outputs=gr.JSON(label="Resultados de búsqueda"),
title="Búsqueda en ArXiv",
description="Busca artículos académicos en ArXiv por palabras clave.",
api_name="_search_arxiv"
)
with gr.Accordion("Retrievers", open=False):
retrievers_interface = gr.Interface(
fn=list_retrievers,
inputs=gr.Textbox(label="Fuente (opcional)", placeholder="Dejar vacío para listar todos"),
outputs=gr.JSON(label="Lista de retrievers"),
title="Lista de Retrievers",
description="Muestra los retrievers disponibles, opcionalmente filtrados por fuente.",
api_name="_list_retrievers"
)
with gr.Accordion("Búsqueda Web", open=False):
tavily_interface = gr.Interface(
fn=search_tavily,
inputs=[
gr.Textbox(label="Consulta de búsqueda", placeholder="Ej: últimas noticias sobre IA"),
gr.Slider(1, 30, value=7, step=1, label="Últimos N días (0 para sin límite)"),
gr.Slider(1, 10, value=1, step=1, label="Máximo de resultados"),
gr.Checkbox(label="Incluir respuesta directa", value=False)
],
outputs=gr.JSON(label="Resultados de Tavily"),
title="Búsqueda Web (Tavily)",
description="Realiza búsquedas en web usando la API de Tavily.",
api_name="_search_tavily"
)
with gr.Accordion("HuggingFace Spaces", open=False):
spaces_interface = gr.Interface(
fn=list_spaces_names,
inputs=None,
outputs=gr.JSON(label="Lista de Spaces"),
title="Lista de Spaces",
description="Obtiene una lista simplificada con los nombres y descripciones de todos los spaces disponibles.",
api_name="_list_space_names"
)
context_interface = gr.Interface(
fn=query_context,
inputs=[
gr.Textbox(label="Nombre del Space", placeholder="Ej: mi-space"),
gr.Textbox(label="Consulta", placeholder="Ingrese su pregunta o consulta"),
gr.Textbox(label="API Name", value="/get_context_only", visible=False)
],
outputs=gr.JSON(label="Contexto obtenido"),
title="Obtener Contexto",
description="Obtiene SOLO el contexto relevante para una consulta desde un Space especializado.",
api_name="_query_context"
)
# Creamos el Agente MCP (puedes personalizar esto según necesites)
with gr.Blocks(title="Agente MCP") as agent_tab:
gr.Markdown("# Agente MCP - Asistente con Herramientas")
gr.Markdown("Interactúa con el agente que puede consultar múltiples fuentes de información especializada.")
with gr.Row():
with gr.Column(scale=3):
chatbot = gr.Chatbot(
label="Conversación",
height=600,
bubble_full_width=False,
render_markdown=True
)
msg = gr.Textbox(
label="Mensaje",
placeholder="Escribe tu pregunta aquí...",
container=False
)
btn = gr.Button("Enviar")
clear = gr.ClearButton([msg, chatbot])
with gr.Column(scale=1):
gr.Markdown("### Herramientas Disponibles")
tools_info = gr.JSON(label="Herramientas", value=[])
# Inicializar el agente y cargar herramientas
async def init_agent():
tools = await mcp_agent.initialize()
return {"tools": [tool.name for tool in tools.tools]}
agent_tab.load(init_agent, outputs=[tools_info])
# Manejar interacción del chat
msg.submit(
chat_with_agent,
inputs=[msg, chatbot],
outputs=[chatbot]
)
btn.click(
chat_with_agent,
inputs=[msg, chatbot],
outputs=[chatbot]
).then(lambda: "", None, [msg])
# Creamos la interfaz con las dos pestañas principales
demo = gr.TabbedInterface(
[agent_tab, tools_tab],
["Agente MCP", "Tools MCP"]
)
demo.launch(mcp_server=True)