MCP_Public_Server / server.py
geronimo-pericoli's picture
Update server.py
8bb703e verified
raw
history blame
10 kB
from mcp.server.fastmcp import FastMCP
from datetime import datetime
from llama_index.core import VectorStoreIndex
from llama_index.core import (
StorageContext,
load_index_from_storage,
)
from llama_index.core import Settings
from llama_index.llms.azure_openai import AzureOpenAI
from llama_index.embeddings.azure_openai import AzureOpenAIEmbedding
from typing import Dict, Optional, List
import json
import os
import aiohttp # Necesario para las peticiones HTTP asíncronas
api_key = os.environ.get('AZURE_API_KEY')
azure_endpoint = "https://pharmaia-gpt.openai.azure.com/"
api_version = "2024-02-01"
llm = AzureOpenAI(
model="gpt-4.1",
deployment_name="gpt-4.1",
api_key=api_key,
azure_endpoint=azure_endpoint,
api_version=api_version,
)
# You need to deploy your own embedding model as well as your own chat completion model
embed_model = AzureOpenAIEmbedding(
model="text-embedding-3-large",
deployment_name="text-embedding-3-large",
api_key=api_key,
azure_endpoint=azure_endpoint,
api_version=api_version,
)
Settings.llm = llm
Settings.embed_model = embed_model
# Configuración inicial (esto probablemente estaría en otro módulo)
DOCUMENTS_BASE_PATH = "./"
SOURCES = {
"oms": "oms/", # Esta será la carpeta base que contiene todos los subíndices
}
# Cargar índices recursivamente
indices: Dict[str, VectorStoreIndex] = {}
for source, rel_path in SOURCES.items():
full_path = os.path.join(DOCUMENTS_BASE_PATH, rel_path)
if not os.path.exists(full_path):
continue
# Buscar todas las subcarpetas que contengan índices
for root, dirs, files in os.walk(full_path):
if "storage_nodes" in dirs:
# Esta es una carpeta que contiene un índice
try:
storage_path = os.path.join(root, "storage_nodes")
storage_context = StorageContext.from_defaults(persist_dir=storage_path)
# Usamos el nombre de la carpeta padre como clave (ej: "vec_1")
index_name = os.path.basename(root)
full_index_name = f"{source}_{index_name}" # ej: "oms_vec_1"
index = load_index_from_storage(storage_context, index_id="vector_index")
indices[full_index_name] = index
except Exception as e:
print(f"Error cargando índice en {root}: {str(e)}")
continue
port = int(os.getenv("PORT", 7860))
mcp = FastMCP("OnBase", port=port)
# Configuración del archivo retrievers.json
RETRIEVERS_METADATA_PATH = Path("./retrievers.json")
# Cargar metadatos de los retrievers
def load_retrievers_metadata() -> Dict:
try:
with open(RETRIEVERS_METADATA_PATH, 'r', encoding='utf-8') as f:
return json.load(f)
except FileNotFoundError:
print(f"Warning: {RETRIEVERS_METADATA_PATH} not found. Using empty metadata.")
return {}
except json.JSONDecodeError:
print(f"Warning: {RETRIEVERS_METADATA_PATH} is invalid JSON. Using empty metadata.")
return {}
retrievers_metadata = load_retrievers_metadata()
# Resource para listar solo títulos/disponibles
@mcp.resource(
uri="info://available_retriever_titles",
name="AvailableRetrieverTitles",
description="Lista los nombres/títulos disponibles de los retrievers",
mime_type="application/json"
)
def get_retriever_titles() -> dict:
"""
Devuelve una lista con los títulos/nombres de los retrievers disponibles
"""
return {
"titles": list(retrievers_metadata.keys()),
"count": len(retrievers_metadata)
}
# Resource para obtener metadatos específicos
@mcp.resource(
uri="info://retriever_details/{retriever_title}",
name="RetrieverDetails",
description="Obtiene información detallada sobre un retriever específico",
mime_type="application/json"
)
def get_retriever_details(retriever_title: str) -> dict:
"""
Devuelve los metadatos completos para un retriever específico
Parameters:
retriever_title: El título/nombre del retriever (ej: 'oms')
"""
if retriever_title not in retrievers_metadata:
return {
"error": f"Retriever '{retriever_title}' no encontrado",
"available_titles": list(retrievers_metadata.keys())
}
return {
"retriever": retriever_title,
"details": retrievers_metadata[retriever_title]
}
# Modificación del resource existente para usar los metadatos
@mcp.resource(
uri="info://available_retrievers",
name="AvailableRetrievers",
description="Provides information about available document retrievers including their names and descriptions.",
mime_type="application/json"
)
def get_available_retrievers(retriever_title: Optional[str] = None) -> dict:
"""
Versión mejorada que puede filtrar por título de retriever
Parameters:
retriever_title: Opcional. Si se especifica, solo devuelve los de este título
"""
available_retrievers = []
for full_index_name in indices.keys():
parts = full_index_name.split('_')
source = parts[0]
# Filtrar por título si se especificó
if retriever_title and source != retriever_title:
continue
# Obtener metadatos del JSON si existen
metadata = retrievers_metadata.get(source, {}).get(full_index_name, {})
available_retrievers.append({
"retriever_name": full_index_name,
"source": source,
"index_name": '_'.join(parts[1:]) if len(parts) > 1 else "default",
"description": metadata.get("description", f"Documentos de {source.upper()}"),
"content_info": metadata.get("content_info", "No description available"),
"last_updated": metadata.get("last_updated", "unknown")
})
if retriever_title and not available_retrievers:
return {
"error": f"No hay retrievers para el título '{retriever_title}'",
"available_titles": list(retrievers_metadata.keys())
}
return {
"retrievers": available_retrievers,
"count": len(available_retrievers),
"filtered_by": retriever_title if retriever_title else "all"
}
@mcp.tool()
def retrieve_docs(
query: str,
retrievers: List[str],
top_k: int = 3
) -> dict:
"""
Retrieve documents from different regulations using semantic search.
Parameters:
query: Search query (required).
retrievers: List of specific retriever names to use (required).
top_k: Number of results to return per retriever (default: 3).
Example:
retrieve_docs(
query="salud pública",
retrievers=["oms_vec_1", "oms_tree_2"],
top_k=2
)
"""
if not query:
return {"error": "Query parameter is required"}
if not retrievers:
return {"error": "At least one retriever must be specified", "available_retrievers": list(indices.keys())}
# Verificar que todos los retrievers solicitados existan
invalid_retrievers = [r for r in retrievers if r not in indices]
if invalid_retrievers:
return {
"error": f"Invalid retrievers specified: {invalid_retrievers}",
"available_retrievers": list(indices.keys())
}
results = {}
for retriever_name in retrievers:
try:
retriever = indices[retriever_name].as_retriever(similarity_top_k=top_k)
nodes = retriever.retrieve(query)
results[retriever_name] = [
{
"content": node.get_content(),
"metadata": node.metadata,
"score": node.score
}
for node in nodes
]
except Exception as e:
results[retriever_name] = {
"error": f"Error retrieving documents: {str(e)}"
}
return {
"results": results,
"query": query,
"retrievers_used": retrievers,
"top_k": top_k,
"successful_retrievers": [r for r in retrievers if isinstance(results[r], list)],
"failed_retrievers": [r for r in retrievers if not isinstance(results[r], list)]
}
@mcp.tool()
async def search_tavily(
query: str,
days: int = 7,
max_results: int = 1,
include_answer: bool = False
) -> dict:
"""Perform a web search using the Tavily API.
Args:
query: Search query string (required)
days: Restrict search to last N days (default: 7)
max_results: Maximum results to return (default: 1)
include_answer: Include a direct answer only when requested by the user (default: False)
Returns:
dict: Search results from Tavily
"""
# Obtener la API key de las variables de entorno
tavily_api_key = os.environ.get('TAVILY_API_KEY')
if not tavily_api_key:
raise ValueError("TAVILY_API_KEY environment variable not set")
headers = {
"Authorization": f"Bearer {tavily_api_key}",
"Content-Type": "application/json"
}
payload = {
"query": query,
"search_depth": "basic",
"max_results": max_results,
"days": days if days else None,
"include_answer": include_answer
}
try:
async with aiohttp.ClientSession() as session:
async with session.post(
"https://api.tavily.com/search",
headers=headers,
json=payload
) as response:
response.raise_for_status()
result = await response.json()
return result
except Exception as e:
return {
"error": str(e),
"status": "failed",
"query": query
}
if __name__ == "__main__":
mcp.run("sse")