MCP_Public_Server / server.py
geronimo-pericoli's picture
Update server.py
1ccf917 verified
from mcp.server.fastmcp import FastMCP, Context
from datetime import datetime
from llama_index.core import VectorStoreIndex
from llama_index.core import (
StorageContext,
load_index_from_storage,
)
from llama_index.tools.arxiv import ArxivToolSpec
from llama_index.core import Settings
from llama_index.llms.azure_openai import AzureOpenAI
from llama_index.embeddings.azure_openai import AzureOpenAIEmbedding
from llama_index.llms.openai import OpenAI
from llama_index.embeddings.openai import OpenAIEmbedding
from typing import Optional, List, Dict, Any
from pathlib import Path
import json
import os
import aiohttp # Necesario para las peticiones HTTP asíncronas
import asyncio
import logging
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
##### OPENAI #####
# Context:
openai_api_key = os.environ.get('OPENAI_API_KEY')
llm = OpenAI(
model="gpt-4.1",
api_key=openai_api_key,
)
embed_model = OpenAIEmbedding(
model="text-embedding-ada-002",
api_key=openai_api_key,
)
Settings.llm = llm
Settings.embed_model = embed_model
# Configuración de paths
DOCUMENTS_BASE_PATH = "./"
RETRIEVERS_JSON_PATH = Path("./retrievers.json")
# Cargar metadatos
def load_retrievers_metadata():
try:
with open(RETRIEVERS_JSON_PATH, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception as e:
logger.error(f"Error cargando retrievers.json: {str(e)}", exc_info=True)
return {}
retrievers_metadata = load_retrievers_metadata()
SOURCES = {source: f"{source.lower()}/" for source in retrievers_metadata.keys()}
# Cargar índices
indices: Dict[str, VectorStoreIndex] = {}
for source, rel_path in SOURCES.items():
full_path = os.path.join(DOCUMENTS_BASE_PATH, rel_path)
if not os.path.exists(full_path):
logger.warning(f"No se encontró la ruta para {source}")
continue
for root, dirs, files in os.walk(full_path):
if "storage_nodes" in dirs:
try:
storage_path = os.path.join(root, "storage_nodes")
storage_context = StorageContext.from_defaults(persist_dir=storage_path)
index_name = os.path.basename(root)
indices[index_name] = load_index_from_storage(storage_context) #, index_id="vector_index"
logger.info(f"Índice cargado correctamente: {index_name}")
except Exception as e:
logger.error(f"Error cargando índice {index_name}: {str(e)}", exc_info=True)
port = int(os.getenv("PORT", 7860))
mcp = FastMCP("OnBase", port=port)
arxiv_tool = ArxivToolSpec(max_results=5).to_tool_list()[0]
arxiv_tool.return_direct = True
@mcp.tool()
async def search_arxiv(
query: str,
max_results: int = 5
) -> Dict[str, Any]:
"""
Busca artículos académicos en ArXiv.
Args:
query: Términos de búsqueda (ej. "deep learning")
max_results: Número máximo de resultados (1-10, default 5)
Returns:
Dict: Resultados de la búsqueda con metadatos de los papers
"""
try:
# Configurar máximo de resultados
max_results = min(max(1, max_results), 10)
arxiv_tool.metadata.max_results = max_results
# Ejecutar búsqueda y obtener resultados
tool_output = arxiv_tool(query=query)
# Procesar documentos
papers = []
for doc in tool_output.raw_output: # Acceder correctamente a los documentos
content = doc.text_resource.text.split('\n')
papers.append({
'title': content[0].split(': ')[1] if ': ' in content[0] else content[0],
'abstract': '\n'.join(content[1:]).strip(),
'pdf_url': content[0].split(': ')[0].replace('http://', 'https://'),
'arxiv_id': content[0].split(': ')[0].split('/')[-1].replace('v1', '')
})
return {
'papers': papers,
'count': len(papers),
'query': query,
'status': 'success'
}
except Exception as e:
return {
'papers': [],
'count': 0,
'query': query,
'status': 'error',
'error': str(e)
}
@mcp.tool()
async def list_retrievers(source: str = None) -> dict:
"""
Devuelve la lista de retrievers disponibles.
Si se especifica una source y existe, filtra por ella; si no existe, devuelve todas.
Args:
source (str, optional): Fuente para filtrar. Si no existe, se ignorará. Defaults to None.
Returns:
dict: {
"retrievers": Lista de retrievers (filtrados o completos),
"count": Número total,
"status": "success"|"error",
"source_requested": source, # Muestra lo que se solicitó
"source_used": "all"|source # Muestra lo que realmente se usó
}
"""
try:
available = []
source_exists = source in retrievers_metadata if source else False
for current_source, indexes in retrievers_metadata.items():
# Solo filtrar si el source existe, sino mostrar todo
if source_exists and current_source != source:
continue
for index_name, metadata in indexes.items():
available.append({
"name": index_name,
"source": current_source,
"title": metadata.get("title", ""),
"description": metadata.get("description", "")
})
return {
"retrievers": available,
"count": len(available),
"status": "success",
"source_requested": source,
"source_used": source if source_exists else "all"
}
except Exception as e:
return {
"retrievers": [],
"count": 0,
"status": "error",
"error": str(e),
"source_requested": source,
"source_used": "none"
}
@mcp.tool()
async def list_retrievers(ctx: Context, source: str = None) -> dict:
"""
Devuelve la lista de retrievers disponibles, opcionalmente filtrados por source.
Args:
source (str, optional): Fuente para filtrar. Default None.
Returns:
dict: Lista de retrievers con metadatos.
"""
# Obtenemos todos los retrievers del resource
result = await ctx.resources.read("data://retrievers/list")
# Si hay error en el resource, lo propagamos
if result.get("status") == "error":
return result
# Filtramos por source si se especificó
if source:
filtered = [r for r in result["retrievers"] if r["source"] == source]
return {
"retrievers": filtered,
"count": len(filtered),
"status": "success",
"source_requested": source,
"source_used": source if filtered else "none"
}
return {
**result,
"source_requested": None,
"source_used": "all"
}
# Función de retrievers
@mcp.tool()
def retrieve_docs(
query: str,
retrievers: List[str],
top_k: int = 3
) -> dict:
"""
Realiza búsqueda semántica en documentos indexados.
Parámetros:
query (str): Texto de búsqueda (requerido)
retrievers (List[str]): Nombres de retrievers a consultar (requerido)
top_k (int): Número de resultados por retriever (opcional, default=3)
"""
logger.info(f"Iniciando búsqueda para query: '{query}'")
logger.debug(f"Parámetros - retrievers: {retrievers}, top_k: {top_k}")
results = {}
invalid = []
for name in retrievers:
if name not in indices:
logger.warning(f"Retriever no encontrado: {name}")
invalid.append(name)
continue
try:
logger.info(f"Procesando retriever: {name}")
# 1. Obtener el índice y realizar la búsqueda
logger.debug(f"Creando retriever para {name} con top_k={top_k}")
retriever = indices[name].as_retriever(similarity_top_k=top_k)
nodes = retriever.retrieve(query)
logger.info(f"Retrieved {len(nodes)} documentos de {name}")
# 2. Buscar metadatos COMPLETOS
metadata = {}
source = "unknown"
for src, indexes in retrievers_metadata.items():
if name in indexes:
metadata = indexes[name]
source = src
break
logger.debug(f"Metadatos encontrados para {name}: {metadata.keys()}")
# 3. Construir respuesta
results[name] = {
"title": metadata.get("title", name),
"documents": [
{
"content": node.get_content(),
"metadata": node.metadata,
"score": node.score
}
for node in nodes
],
"description": metadata.get("description", ""),
"source": source,
"last_updated": metadata.get("last_updated", "")
}
logger.info(f"Retriever {name} procesado exitosamente")
except Exception as e:
logger.error(f"Error procesando retriever {name}: {str(e)}", exc_info=True)
results[name] = {
"error": str(e),
"retriever": name
}
# Construir respuesta final
response = {
"query": query,
"results": results,
"top_k": top_k,
}
if invalid:
logger.warning(f"Retrievers inválidos: {invalid}. Opciones válidas: {list(indices.keys())}")
response["warnings"] = {
"invalid_retrievers": invalid,
"valid_options": list(indices.keys())
}
logger.info(f"Búsqueda completada. Total resultados: {len(results)}")
return response
@mcp.tool()
async def search_tavily(
query: str,
days: int = 7,
max_results: int = 1,
include_answer: bool = False
) -> dict:
"""Perform a web search using the Tavily API.
Args:
query: Search query string (required)
days: Restrict search to last N days (default: 7)
max_results: Maximum results to return (default: 1)
include_answer: Include a direct answer only when requested by the user (default: False)
Returns:
dict: Search results from Tavily
"""
# Obtener la API key de las variables de entorno
tavily_api_key = os.environ.get('TAVILY_API_KEY')
if not tavily_api_key:
raise ValueError("TAVILY_API_KEY environment variable not set")
headers = {
"Authorization": f"Bearer {tavily_api_key}",
"Content-Type": "application/json"
}
payload = {
"query": query,
"search_depth": "basic",
"max_results": max_results,
"days": days if days else None,
"include_answer": include_answer
}
try:
async with aiohttp.ClientSession() as session:
async with session.post(
"https://api.tavily.com/search",
headers=headers,
json=payload
) as response:
response.raise_for_status()
result = await response.json()
return result
except Exception as e:
return {
"error": str(e),
"status": "failed",
"query": query
}
if __name__ == "__main__":
mcp.run("sse")