Spaces:
Sleeping
Sleeping
| from mcp.server.fastmcp import FastMCP | |
| import requests | |
| from typing import Optional, List, Union | |
| import re | |
| mcp = FastMCP("Notifications") | |
| def send_notification( | |
| topic: str, | |
| message: str, | |
| title: Optional[str] = None, | |
| priority: str = "urgent", | |
| tags: Optional[Union[List[str], str]] = None, | |
| click_url: Optional[str] = None, | |
| attach_url: Optional[str] = None, | |
| delay: Optional[str] = None | |
| ): | |
| """Send notification via ntfy.sh. | |
| Args: | |
| topic: Topic name (letters, numbers, dashes, underscores only) | |
| message: Message body (supports markdown) | |
| title: Optional title | |
| priority: "min", "low", "default", "high", "urgent" | |
| tags: Emoji shortcodes as list or comma-separated (e.g. "warning,fire" or ["bell","calendar"]) | |
| click_url: URL to open on click | |
| attach_url: Image/file URL to attach | |
| delay: Delay like "10s", "5m", "2h" | |
| """ | |
| # Validate topic name | |
| if not re.match(r'^[a-zA-Z0-9_-]+$', topic): | |
| return "β Invalid topic: use only letters, numbers, dashes, underscores" | |
| url = f"https://ntfy.sh/{topic}" | |
| headers = {"Content-Type": "text/plain; charset=utf-8"} | |
| if title: | |
| headers["Title"] = title | |
| if priority: | |
| if priority not in ["min", "low", "default", "high", "urgent"]: | |
| return f"β Invalid priority: {priority}" | |
| headers["Priority"] = priority | |
| if tags: | |
| # Handle both string and list inputs | |
| if isinstance(tags, str): | |
| tags = [t.strip() for t in tags.split(",")] | |
| headers["Tags"] = ",".join(tags) | |
| if click_url: | |
| headers["Click"] = click_url | |
| if attach_url: | |
| headers["Attach"] = attach_url | |
| if delay: | |
| headers["Delay"] = delay | |
| try: | |
| response = requests.post(url, data=message.encode('utf-8'), headers=headers, timeout=10) | |
| response.raise_for_status() | |
| result = f"β Sent to '{topic}'" | |
| if delay: | |
| result += f" (delayed by {delay})" | |
| return result | |
| except requests.exceptions.HTTPError as e: | |
| status = e.response.status_code | |
| if status == 400: | |
| return "β Bad request: check message format" | |
| elif status == 429: | |
| return "β Rate limited: too many requests" | |
| elif status == 507: | |
| return "β Storage full: message too large" | |
| return f"β HTTP error: {status}" | |
| except Exception as e: | |
| return f"β Error: {str(e)}" | |
| if __name__ == "__main__": | |
| mcp.run() |