Spaces:
Sleeping
Sleeping
| from mcp.server.fastmcp import FastMCP | |
| import os | |
| import aiohttp # Necesario para las peticiones HTTP asíncronas | |
| port = int(os.getenv("PORT", 7860)) | |
| mcp = FastMCP("OnBase", port=port) | |
| def ask_about_topic( | |
| topic: str, # Required - no default value | |
| detail_level: str = "simple", # Optional - has default value | |
| examples: bool = True # Optional - has default value | |
| ) -> str: | |
| """Generates a user message asking for an explanation of a topic with customizable options.""" | |
| prompt = f"Podrías explicar el concepto de '{topic}'" | |
| # Añadir nivel de detalle | |
| if detail_level == "simple": | |
| prompt += " de manera concisa" | |
| elif detail_level == "technical": | |
| prompt += " con terminología técnica" | |
| elif detail_level == "detailed": | |
| prompt += " con amplio detalle" | |
| prompt += "?" | |
| # Añadir petición de ejemplos | |
| if examples: | |
| prompt += " Incluye ejemplos prácticos si es posible." | |
| return prompt | |
| async def search_tavily( | |
| query: str, | |
| days: int = 7, | |
| max_results: int = 1, | |
| include_answer: bool = False | |
| ) -> dict: | |
| """Perform a web search using the Tavily API. | |
| Args: | |
| query: Search query string (required) | |
| days: Restrict search to last N days (default: 7) | |
| max_results: Maximum results to return (default: 1) | |
| include_answer: Include direct answer (default: False) | |
| Returns: | |
| dict: Search results from Tavily | |
| """ | |
| # Obtener la API key de las variables de entorno | |
| api_key = os.environ.get('TAVILY_API_KEY') | |
| if not api_key: | |
| raise ValueError("TAVILY_API_KEY environment variable not set") | |
| headers = { | |
| "Content-Type": "application/json" | |
| } | |
| payload = { | |
| "query": query, | |
| "search_depth": "basic", | |
| "max_results": max_results, | |
| "days": days if days else None, | |
| "include_answer": include_answer | |
| } | |
| try: | |
| async with aiohttp.ClientSession() as session: | |
| async with session.post( | |
| "https://api.tavily.com/search", | |
| headers=headers, | |
| json=payload, | |
| params={"api_key": api_key} # Alternativa para enviar la key | |
| ) as response: | |
| response.raise_for_status() | |
| return await response.json() | |
| except Exception as e: | |
| return { | |
| "error": str(e), | |
| "status": "failed", | |
| "query": query | |
| } | |
| if __name__ == "__main__": | |
| mcp.run("sse") |