MCP_Public_Server / server.py
geronimo-pericoli's picture
Update server.py
48d30ee verified
raw
history blame
6.28 kB
from mcp.server.fastmcp import FastMCP
from datetime import datetime
from llama_index.core import VectorStoreIndex
from llama_index.core import (
StorageContext,
load_index_from_storage,
)
from llama_index.core import Settings
from llama_index.llms.azure_openai import AzureOpenAI
from llama_index.embeddings.azure_openai import AzureOpenAIEmbedding
from typing import Dict, Optional, List
import json
import os
import aiohttp # Necesario para las peticiones HTTP as铆ncronas
api_key = os.environ.get('AZURE_API_KEY')
azure_endpoint = "https://pharmaia-gpt.openai.azure.com/"
api_version = "2024-02-01"
llm = AzureOpenAI(
model="gpt-4.1",
deployment_name="gpt-4.1",
api_key=api_key,
azure_endpoint=azure_endpoint,
api_version=api_version,
)
# You need to deploy your own embedding model as well as your own chat completion model
embed_model = AzureOpenAIEmbedding(
model="text-embedding-3-large",
deployment_name="text-embedding-3-large",
api_key=api_key,
azure_endpoint=azure_endpoint,
api_version=api_version,
)
Settings.llm = llm
Settings.embed_model = embed_model
# Configuraci贸n inicial
DOCUMENTS_BASE_PATH = "./"
SOURCES = {
"oms": "oms/",
#"fda": "fda/"
}
indices: Dict[str, VectorStoreIndex] = {}
for source, rel_path in SOURCES.items():
full_path = os.path.join(DOCUMENTS_BASE_PATH, rel_path)
if not os.path.exists(full_path):
print(f"Advertencia: No se encontr贸 la ruta {full_path} para {source}")
continue
for root, dirs, files in os.walk(full_path):
if "storage_nodes" in dirs:
try:
storage_path = os.path.join(root, "storage_nodes")
storage_context = StorageContext.from_defaults(persist_dir=storage_path)
# Usamos directamente el nombre de la carpeta (vec_who_1, etc.)
index_name = os.path.basename(root)
index = load_index_from_storage(storage_context, index_id="vector_index")
indices[index_name] = index # Guardamos con el nombre directo
# Verificaci贸n opcional de metadatos
if index_name not in retrievers_metadata.get(source, {}):
print(f"Advertencia: No hay metadatos para {index_name} en retrievers.json")
except Exception as e:
print(f"Error cargando 铆ndice en {root}: {str(e)}")
continue
port = int(os.getenv("PORT", 7860))
mcp = FastMCP("OnBase", port=port)
@mcp.resource(
uri="info://available_retrievers",
name="AvailableRetrievers",
description="Lista completa de retrievers con metadatos",
mime_type="application/json"
)
def get_available_retrievers() -> dict:
available = []
for index_name in indices.keys():
# Determinar la fuente (oms/fda) basado en el prefijo
source = "oms" if index_name.startswith("vec_who") else "fda"
# Obtener metadatos
metadata = retrievers_metadata.get(source, {}).get(index_name, {})
available.append({
"name": index_name, # Ej: "vec_who_1"
"source": source,
"description": metadata.get("description", "Descripci贸n no disponible"),
"content_info": metadata.get("content_info", "Informaci贸n no disponible"),
"last_updated": metadata.get("last_updated", "Desconocido")
})
return {
"retrievers": available,
"count": len(available)
}
@mcp.tool()
def retrieve_docs(
query: str,
retrievers: List[str], # Nombres directos (vec_who_1, etc.)
top_k: int = 3
) -> dict:
results = {}
invalid = []
for name in retrievers:
if name not in indices:
invalid.append(name)
continue
try:
retriever = indices[name].as_retriever(similarity_top_k=top_k)
nodes = retriever.retrieve(query)
results[name] = [
{
"content": node.get_content(),
"metadata": node.metadata,
"score": node.score
}
for node in nodes
]
except Exception as e:
results[name] = {"error": str(e)}
if invalid:
results["_warnings"] = {
"invalid_retrievers": invalid,
"valid_options": list(indices.keys())
}
return {
"query": query,
"results": results,
"top_k": top_k
}
@mcp.tool()
async def search_tavily(
query: str,
days: int = 7,
max_results: int = 1,
include_answer: bool = False
) -> dict:
"""Perform a web search using the Tavily API.
Args:
query: Search query string (required)
days: Restrict search to last N days (default: 7)
max_results: Maximum results to return (default: 1)
include_answer: Include a direct answer only when requested by the user (default: False)
Returns:
dict: Search results from Tavily
"""
# Obtener la API key de las variables de entorno
tavily_api_key = os.environ.get('TAVILY_API_KEY')
if not tavily_api_key:
raise ValueError("TAVILY_API_KEY environment variable not set")
headers = {
"Authorization": f"Bearer {tavily_api_key}",
"Content-Type": "application/json"
}
payload = {
"query": query,
"search_depth": "basic",
"max_results": max_results,
"days": days if days else None,
"include_answer": include_answer
}
try:
async with aiohttp.ClientSession() as session:
async with session.post(
"https://api.tavily.com/search",
headers=headers,
json=payload
) as response:
response.raise_for_status()
result = await response.json()
return result
except Exception as e:
return {
"error": str(e),
"status": "failed",
"query": query
}
if __name__ == "__main__":
mcp.run("sse")