MCP_Public_Server / server.py
geronimo-pericoli's picture
Update server.py
8370383 verified
raw
history blame
2.64 kB
from mcp.server.fastmcp import FastMCP
import os
import aiohttp # Necesario para las peticiones HTTP asíncronas
port = int(os.getenv("PORT", 7860))
mcp = FastMCP("OnBase", port=port)
@mcp.prompt()
def ask_about_topic(
topic: str, # Required - no default value
detail_level: str = "simple", # Optional - has default value
examples: bool = True # Optional - has default value
) -> str:
"""Generates a user message asking for an explanation of a topic with customizable options."""
prompt = f"Podrías explicar el concepto de '{topic}'"
# Añadir nivel de detalle
if detail_level == "simple":
prompt += " de manera concisa"
elif detail_level == "technical":
prompt += " con terminología técnica"
elif detail_level == "detailed":
prompt += " con amplio detalle"
prompt += "?"
# Añadir petición de ejemplos
if examples:
prompt += " Incluye ejemplos prácticos si es posible."
return prompt
@mcp.tool()
async def search_tavily(
query: str,
days: int = 7,
max_results: int = 1,
include_answer: bool = False
) -> dict:
"""Perform a web search using the Tavily API.
Args:
query: Search query string (required)
days: Restrict search to last N days (default: 7)
max_results: Maximum results to return (default: 1)
include_answer: Include direct answer (default: False)
Returns:
dict: Search results from Tavily
"""
# Obtener la API key de las variables de entorno
api_key = os.environ.get('TAVILY_API_KEY')
if not api_key:
raise ValueError("TAVILY_API_KEY environment variable not set")
headers = {
"Content-Type": "application/json"
}
payload = {
"query": query,
"search_depth": "basic",
"max_results": max_results,
"days": days if days else None,
"include_answer": include_answer
}
try:
async with aiohttp.ClientSession() as session:
async with session.post(
"https://api.tavily.com/search",
headers=headers,
json=payload,
params={"api_key": api_key} # Alternativa para enviar la key
) as response:
response.raise_for_status()
return await response.json()
except Exception as e:
return {
"error": str(e),
"status": "failed",
"query": query
}
if __name__ == "__main__":
mcp.run("sse")