MCP_Public_Server / server.py
geronimo-pericoli's picture
Update server.py
89082d2 verified
raw
history blame
9.3 kB
from mcp.server.fastmcp import FastMCP, Context
from datetime import datetime
from llama_index.core import VectorStoreIndex
from llama_index.core import (
StorageContext,
load_index_from_storage,
)
from llama_index.core import Settings
from llama_index.llms.azure_openai import AzureOpenAI
from llama_index.embeddings.azure_openai import AzureOpenAIEmbedding
from typing import Dict, Optional, List
from pathlib import Path
import json
import os
import aiohttp # Necesario para las peticiones HTTP asíncronas
import asyncio
api_key = os.environ.get('AZURE_API_KEY')
azure_endpoint = "https://pharmaia-gpt.openai.azure.com/"
api_version = "2024-02-01"
llm = AzureOpenAI(
model="gpt-4.1",
deployment_name="gpt-4.1",
api_key=api_key,
azure_endpoint=azure_endpoint,
api_version=api_version,
)
# You need to deploy your own embedding model as well as your own chat completion model
embed_model = AzureOpenAIEmbedding(
model="text-embedding-3-large",
deployment_name="text-embedding-3-large",
api_key=api_key,
azure_endpoint=azure_endpoint,
api_version=api_version,
)
Settings.llm = llm
Settings.embed_model = embed_model
# Configuración de paths
DOCUMENTS_BASE_PATH = "./"
RETRIEVERS_JSON_PATH = Path("./retrievers.json")
# Cargar metadatos
def load_retrievers_metadata():
try:
with open(RETRIEVERS_JSON_PATH, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception as e:
print(f"Error cargando retrievers.json: {str(e)}")
return {}
retrievers_metadata = load_retrievers_metadata()
SOURCES = {source: f"{source.lower()}/" for source in retrievers_metadata.keys()}
# Cargar índices
indices: Dict[str, VectorStoreIndex] = {}
for source, rel_path in SOURCES.items():
full_path = os.path.join(DOCUMENTS_BASE_PATH, rel_path)
if not os.path.exists(full_path):
print(f"Advertencia: No se encontró la ruta para {source}")
continue
for root, dirs, files in os.walk(full_path):
if "storage_nodes" in dirs:
try:
storage_path = os.path.join(root, "storage_nodes")
storage_context = StorageContext.from_defaults(persist_dir=storage_path)
index_name = os.path.basename(root)
indices[index_name] = load_index_from_storage(storage_context, index_id="vector_index")
print(f"Índice cargado correctamente: {index_name}")
except Exception as e:
print(f"Error cargando índice {index_name}: {str(e)}")
port = int(os.getenv("PORT", 7860))
mcp = FastMCP("OnBase", port=port)
@mcp.resource(
uri="info://available_retrievers/{ctx}",
name="AvailableRetrievers",
description="Lista de retrievers con metadatos",
mime_type="application/json"
)
async def _get_available_retrievers(ctx: Context) -> dict:
try:
await ctx.debug("Generando lista de retrievers disponibles")
available = []
for source, indexes in retrievers_metadata.items():
for index_name, metadata in indexes.items():
available.append({
"name": index_name,
"source": source,
"title": metadata.get("title", ""),
"description": metadata.get("description", "")
})
await ctx.info(f"Retrievers generados: {len(available)}")
return {
"retrievers": available,
"count": len(available),
"status": "success"
}
except Exception as e:
await ctx.error(f"Error generando retrievers: {str(e)}")
return {
"retrievers": [],
"count": 0,
"status": "error",
"error": str(e)
}
@mcp.tool()
async def list_retrievers(ctx: Context) -> dict:
"""
Devuelve la lista de retrievers disponibles
Returns:
dict: {
"retrievers": [{
"name": str,
"title": str,
"description": str,
"source": str
}],
"count": int,
"status": str,
"error": str (opcional)
}
"""
try:
# Lectura del resource
resource_contents = await ctx.read_resource("info://available_retrievers")
if not resource_contents:
await ctx.warning("No resource contents found for info://available_retrievers")
return {
"retrievers": [],
"count": 0,
"status": "error",
"error": "Resource not found"
}
resource = resource_contents[0].content # Acceder al contenido real
await ctx.debug(f"Retrieved {len(resource.get('retrievers', []))} retrievers")
return {
"retrievers": resource.get("retrievers", []),
"count": resource.get("count", 0),
"status": "success"
}
except Exception as e:
await ctx.error(f"Error en list_retrievers: {str(e)}")
return {
"retrievers": [],
"count": 0,
"status": "error",
"error": str(e)
}
# Función principal de búsqueda
@mcp.tool()
def retrieve_docs(
query: str,
retrievers: List[str],
top_k: int = 3
) -> dict:
"""
Realiza búsqueda semántica en documentos indexados.
Parámetros:
query (str): Texto de búsqueda (requerido)
retrievers (List[str]): Nombres de retrievers a consultar (requerido)
top_k (int): Número de resultados por retriever (opcional, default=3)
Ejemplo:
retrieve_docs(
query="estándares farmacéuticos",
retrievers=["vec_1", "vec_2"],
top_k=2
)
"""
results = {}
invalid = []
for name in retrievers:
if name not in indices:
invalid.append(name)
continue
try:
# 1. Obtener el índice y realizar la búsqueda
retriever = indices[name].as_retriever(similarity_top_k=top_k)
nodes = retriever.retrieve(query)
# 2. Buscar metadatos COMPLETOS
metadata = {}
source = "unknown"
for src, indexes in retrievers_metadata.items():
if name in indexes:
metadata = indexes[name]
source = src
break
# 3. Construir respuesta para ESTE retriever
results[name] = {
"title": metadata.get("title", name),
"documents": [
{
"content": node.get_content(),
"metadata": node.metadata,
"score": node.score
}
for node in nodes
],
"description": metadata.get("description", ""),
"source": source,
"last_updated": metadata.get("last_updated", "")
}
except Exception as e:
results[name] = {
"error": str(e),
"retriever": name
}
# Construir respuesta final
response = {
"query": query,
"results": results,
"top_k": top_k,
}
if invalid:
response["warnings"] = {
"invalid_retrievers": invalid,
"valid_options": list(indices.keys())
}
return response
@mcp.tool()
async def search_tavily(
query: str,
days: int = 7,
max_results: int = 1,
include_answer: bool = False
) -> dict:
"""Perform a web search using the Tavily API.
Args:
query: Search query string (required)
days: Restrict search to last N days (default: 7)
max_results: Maximum results to return (default: 1)
include_answer: Include a direct answer only when requested by the user (default: False)
Returns:
dict: Search results from Tavily
"""
# Obtener la API key de las variables de entorno
tavily_api_key = os.environ.get('TAVILY_API_KEY')
if not tavily_api_key:
raise ValueError("TAVILY_API_KEY environment variable not set")
headers = {
"Authorization": f"Bearer {tavily_api_key}",
"Content-Type": "application/json"
}
payload = {
"query": query,
"search_depth": "basic",
"max_results": max_results,
"days": days if days else None,
"include_answer": include_answer
}
try:
async with aiohttp.ClientSession() as session:
async with session.post(
"https://api.tavily.com/search",
headers=headers,
json=payload
) as response:
response.raise_for_status()
result = await response.json()
return result
except Exception as e:
return {
"error": str(e),
"status": "failed",
"query": query
}
if __name__ == "__main__":
mcp.run("sse")