Spaces:
Sleeping
Sleeping
File size: 2,600 Bytes
2c266ba 5e367ad 2c266ba 5f23750 2c266ba |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 |
from mcp.server.fastmcp import FastMCP
import requests
from typing import Optional, List, Union
import re
mcp = FastMCP("Notifications")
@mcp.tool()
def send_notification(
topic: str,
message: str,
title: Optional[str] = None,
priority: str = "urgent",
tags: Optional[Union[List[str], str]] = None,
click_url: Optional[str] = None,
attach_url: Optional[str] = None,
delay: Optional[str] = None
):
"""Send notification via ntfy.sh.
Args:
topic: Topic name (letters, numbers, dashes, underscores only)
message: Message body (supports markdown)
title: Optional title
priority: "min", "low", "default", "high", "urgent"
tags: Emoji shortcodes as list or comma-separated (e.g. "warning,fire" or ["bell","calendar"])
click_url: URL to open on click
attach_url: Image/file URL to attach
delay: Delay like "10s", "5m", "2h"
"""
# Validate topic name
if not re.match(r'^[a-zA-Z0-9_-]+$', topic):
return "β Invalid topic: use only letters, numbers, dashes, underscores"
url = f"https://ntfy.sh/{topic}"
headers = {"Content-Type": "text/plain; charset=utf-8"}
if title:
headers["Title"] = title
if priority:
if priority not in ["min", "low", "default", "high", "urgent"]:
return f"β Invalid priority: {priority}"
headers["Priority"] = priority
if tags:
# Handle both string and list inputs
if isinstance(tags, str):
tags = [t.strip() for t in tags.split(",")]
headers["Tags"] = ",".join(tags)
if click_url:
headers["Click"] = click_url
if attach_url:
headers["Attach"] = attach_url
if delay:
headers["Delay"] = delay
try:
response = requests.post(url, data=message.encode('utf-8'), headers=headers, timeout=10)
response.raise_for_status()
result = f"β
Sent to '{topic}'"
if delay:
result += f" (delayed by {delay})"
return result
except requests.exceptions.HTTPError as e:
status = e.response.status_code
if status == 400:
return "β Bad request: check message format"
elif status == 429:
return "β Rate limited: too many requests"
elif status == 507:
return "β Storage full: message too large"
return f"β HTTP error: {status}"
except Exception as e:
return f"β Error: {str(e)}"
if __name__ == "__main__":
mcp.run() |