"""Send Message Tool -- cross-channel messaging via platform APIs. Sends a message to a user or channel on any connected messaging platform (Telegram, Discord, Slack). Supports listing available targets and resolving human-friendly channel names to IDs. Works in both CLI and gateway contexts. """ import json import logging import os import re import ssl import time logger = logging.getLogger(__name__) _TELEGRAM_TOPIC_TARGET_RE = re.compile(r"^\s*(-?\d+)(?::(\d+))?\s*$") _IMAGE_EXTS = {".jpg", ".jpeg", ".png", ".webp", ".gif"} _VIDEO_EXTS = {".mp4", ".mov", ".avi", ".mkv", ".3gp"} _AUDIO_EXTS = {".ogg", ".opus", ".mp3", ".wav", ".m4a"} _VOICE_EXTS = {".ogg", ".opus"} SEND_MESSAGE_SCHEMA = { "name": "send_message", "description": ( "Send a message to a connected messaging platform, or list available targets.\n\n" "IMPORTANT: When the user asks to send to a specific channel or person " "(not just a bare platform name), call send_message(action='list') FIRST to see " "available targets, then send to the correct one.\n" "If the user just says a platform name like 'send to telegram', send directly " "to the home channel without listing first." ), "parameters": { "type": "object", "properties": { "action": { "type": "string", "enum": ["send", "list"], "description": "Action to perform. 'send' (default) sends a message. 'list' returns all available channels/contacts across connected platforms." }, "target": { "type": "string", "description": "Delivery target. Format: 'platform' (uses home channel), 'platform:#channel-name', 'platform:chat_id', or Telegram topic 'telegram:chat_id:thread_id'. Examples: 'telegram', 'telegram:-1001234567890:17585', 'discord:#bot-home', 'slack:#engineering', 'signal:+15551234567'" }, "message": { "type": "string", "description": "The message text to send" } }, "required": [] } } def send_message_tool(args, **kw): """Handle cross-channel send_message tool calls.""" action = args.get("action", "send") if action == "list": return _handle_list() return _handle_send(args) def _handle_list(): """Return formatted list of available messaging targets.""" try: from gateway.channel_directory import format_directory_for_display return json.dumps({"targets": format_directory_for_display()}) except Exception as e: return json.dumps({"error": f"Failed to load channel directory: {e}"}) def _handle_send(args): """Send a message to a platform target.""" target = args.get("target", "") message = args.get("message", "") if not target or not message: return json.dumps({"error": "Both 'target' and 'message' are required when action='send'"}) parts = target.split(":", 1) platform_name = parts[0].strip().lower() target_ref = parts[1].strip() if len(parts) > 1 else None chat_id = None thread_id = None if target_ref: chat_id, thread_id, is_explicit = _parse_target_ref(platform_name, target_ref) else: is_explicit = False # Resolve human-friendly channel names to numeric IDs if target_ref and not is_explicit: try: from gateway.channel_directory import resolve_channel_name resolved = resolve_channel_name(platform_name, target_ref) if resolved: chat_id, thread_id, _ = _parse_target_ref(platform_name, resolved) else: return json.dumps({ "error": f"Could not resolve '{target_ref}' on {platform_name}. " f"Use send_message(action='list') to see available targets." }) except Exception: return json.dumps({ "error": f"Could not resolve '{target_ref}' on {platform_name}. " f"Try using a numeric channel ID instead." }) from tools.interrupt import is_interrupted if is_interrupted(): return json.dumps({"error": "Interrupted"}) try: from gateway.config import load_gateway_config, Platform config = load_gateway_config() except Exception as e: return json.dumps({"error": f"Failed to load gateway config: {e}"}) platform_map = { "telegram": Platform.TELEGRAM, "discord": Platform.DISCORD, "slack": Platform.SLACK, "whatsapp": Platform.WHATSAPP, "signal": Platform.SIGNAL, "matrix": Platform.MATRIX, "mattermost": Platform.MATTERMOST, "homeassistant": Platform.HOMEASSISTANT, "dingtalk": Platform.DINGTALK, "email": Platform.EMAIL, "sms": Platform.SMS, } platform = platform_map.get(platform_name) if not platform: avail = ", ".join(platform_map.keys()) return json.dumps({"error": f"Unknown platform: {platform_name}. Available: {avail}"}) pconfig = config.platforms.get(platform) if not pconfig or not pconfig.enabled: return json.dumps({"error": f"Platform '{platform_name}' is not configured. Set up credentials in ~/.hermes/config.yaml or environment variables."}) from gateway.platforms.base import BasePlatformAdapter media_files, cleaned_message = BasePlatformAdapter.extract_media(message) mirror_text = cleaned_message.strip() or _describe_media_for_mirror(media_files) used_home_channel = False if not chat_id: home = config.get_home_channel(platform) if home: chat_id = home.chat_id used_home_channel = True else: return json.dumps({ "error": f"No home channel set for {platform_name} to determine where to send the message. " f"Either specify a channel directly with '{platform_name}:CHANNEL_NAME', " f"or set a home channel via: hermes config set {platform_name.upper()}_HOME_CHANNEL " }) duplicate_skip = _maybe_skip_cron_duplicate_send(platform_name, chat_id, thread_id) if duplicate_skip: return json.dumps(duplicate_skip) try: from model_tools import _run_async result = _run_async( _send_to_platform( platform, pconfig, chat_id, cleaned_message, thread_id=thread_id, media_files=media_files, ) ) if used_home_channel and isinstance(result, dict) and result.get("success"): result["note"] = f"Sent to {platform_name} home channel (chat_id: {chat_id})" # Mirror the sent message into the target's gateway session if isinstance(result, dict) and result.get("success") and mirror_text: try: from gateway.mirror import mirror_to_session source_label = os.getenv("HERMES_SESSION_PLATFORM", "cli") if mirror_to_session(platform_name, chat_id, mirror_text, source_label=source_label, thread_id=thread_id): result["mirrored"] = True except Exception: pass return json.dumps(result) except Exception as e: return json.dumps({"error": f"Send failed: {e}"}) def _parse_target_ref(platform_name: str, target_ref: str): """Parse a tool target into chat_id/thread_id and whether it is explicit.""" if platform_name == "telegram": match = _TELEGRAM_TOPIC_TARGET_RE.fullmatch(target_ref) if match: return match.group(1), match.group(2), True if target_ref.lstrip("-").isdigit(): return target_ref, None, True return None, None, False def _describe_media_for_mirror(media_files): """Return a human-readable mirror summary when a message only contains media.""" if not media_files: return "" if len(media_files) == 1: media_path, is_voice = media_files[0] ext = os.path.splitext(media_path)[1].lower() if is_voice and ext in _VOICE_EXTS: return "[Sent voice message]" if ext in _IMAGE_EXTS: return "[Sent image attachment]" if ext in _VIDEO_EXTS: return "[Sent video attachment]" if ext in _AUDIO_EXTS: return "[Sent audio attachment]" return "[Sent document attachment]" return f"[Sent {len(media_files)} media attachments]" def _get_cron_auto_delivery_target(): """Return the cron scheduler's auto-delivery target for the current run, if any.""" platform = os.getenv("HERMES_CRON_AUTO_DELIVER_PLATFORM", "").strip().lower() chat_id = os.getenv("HERMES_CRON_AUTO_DELIVER_CHAT_ID", "").strip() if not platform or not chat_id: return None thread_id = os.getenv("HERMES_CRON_AUTO_DELIVER_THREAD_ID", "").strip() or None return { "platform": platform, "chat_id": chat_id, "thread_id": thread_id, } def _maybe_skip_cron_duplicate_send(platform_name: str, chat_id: str, thread_id: str | None): """Skip redundant cron send_message calls when the scheduler will auto-deliver there.""" auto_target = _get_cron_auto_delivery_target() if not auto_target: return None same_target = ( auto_target["platform"] == platform_name and str(auto_target["chat_id"]) == str(chat_id) and auto_target.get("thread_id") == thread_id ) if not same_target: return None target_label = f"{platform_name}:{chat_id}" if thread_id is not None: target_label += f":{thread_id}" return { "success": True, "skipped": True, "reason": "cron_auto_delivery_duplicate_target", "target": target_label, "note": ( f"Skipped send_message to {target_label}. This cron job will already auto-deliver " "its final response to that same target. Put the intended user-facing content in " "your final response instead, or use a different target if you want an additional message." ), } async def _send_to_platform(platform, pconfig, chat_id, message, thread_id=None, media_files=None): """Route a message to the appropriate platform sender. Long messages are automatically chunked to fit within platform limits using the same smart-splitting algorithm as the gateway adapters (preserves code-block boundaries, adds part indicators). """ from gateway.config import Platform from gateway.platforms.base import BasePlatformAdapter from gateway.platforms.telegram import TelegramAdapter from gateway.platforms.discord import DiscordAdapter from gateway.platforms.slack import SlackAdapter media_files = media_files or [] # Platform message length limits (from adapter class attributes) _MAX_LENGTHS = { Platform.TELEGRAM: TelegramAdapter.MAX_MESSAGE_LENGTH, Platform.DISCORD: DiscordAdapter.MAX_MESSAGE_LENGTH, Platform.SLACK: SlackAdapter.MAX_MESSAGE_LENGTH, } # Smart-chunk the message to fit within platform limits. # For short messages or platforms without a known limit this is a no-op. max_len = _MAX_LENGTHS.get(platform) if max_len: chunks = BasePlatformAdapter.truncate_message(message, max_len) else: chunks = [message] # --- Telegram: special handling for media attachments --- if platform == Platform.TELEGRAM: last_result = None for i, chunk in enumerate(chunks): is_last = (i == len(chunks) - 1) result = await _send_telegram( pconfig.token, chat_id, chunk, media_files=media_files if is_last else [], thread_id=thread_id, ) if isinstance(result, dict) and result.get("error"): return result last_result = result return last_result # --- Non-Telegram platforms --- if media_files and not message.strip(): return { "error": ( f"send_message MEDIA delivery is currently only supported for telegram; " f"target {platform.value} had only media attachments" ) } warning = None if media_files: warning = ( f"MEDIA attachments were omitted for {platform.value}; " "native send_message media delivery is currently only supported for telegram" ) last_result = None for chunk in chunks: if platform == Platform.DISCORD: result = await _send_discord(pconfig.token, chat_id, chunk) elif platform == Platform.SLACK: result = await _send_slack(pconfig.token, chat_id, chunk) elif platform == Platform.WHATSAPP: result = await _send_whatsapp(pconfig.extra, chat_id, chunk) elif platform == Platform.SIGNAL: result = await _send_signal(pconfig.extra, chat_id, chunk) elif platform == Platform.EMAIL: result = await _send_email(pconfig.extra, chat_id, chunk) elif platform == Platform.SMS: result = await _send_sms(pconfig.api_key, chat_id, chunk) else: result = {"error": f"Direct sending not yet implemented for {platform.value}"} if isinstance(result, dict) and result.get("error"): return result last_result = result if warning and isinstance(last_result, dict) and last_result.get("success"): warnings = list(last_result.get("warnings", [])) warnings.append(warning) last_result["warnings"] = warnings return last_result async def _send_telegram(token, chat_id, message, media_files=None, thread_id=None): """Send via Telegram Bot API (one-shot, no polling needed). Applies markdown→MarkdownV2 formatting (same as the gateway adapter) so that bold, links, and headers render correctly. If the message already contains HTML tags, it is sent with ``parse_mode='HTML'`` instead, bypassing MarkdownV2 conversion. """ try: from telegram import Bot from telegram.constants import ParseMode # Auto-detect HTML tags — if present, skip MarkdownV2 and send as HTML. # Inspired by github.com/ashaney — PR #1568. _has_html = bool(re.search(r'<[a-zA-Z/][^>]*>', message)) if _has_html: formatted = message send_parse_mode = ParseMode.HTML else: # Reuse the gateway adapter's format_message for markdown→MarkdownV2 try: from gateway.platforms.telegram import TelegramAdapter, _escape_mdv2, _strip_mdv2 _adapter = TelegramAdapter.__new__(TelegramAdapter) formatted = _adapter.format_message(message) except Exception: # Fallback: send as-is if formatting unavailable formatted = message send_parse_mode = ParseMode.MARKDOWN_V2 bot = Bot(token=token) int_chat_id = int(chat_id) media_files = media_files or [] thread_kwargs = {} if thread_id is not None: thread_kwargs["message_thread_id"] = int(thread_id) last_msg = None warnings = [] if formatted.strip(): try: last_msg = await bot.send_message( chat_id=int_chat_id, text=formatted, parse_mode=send_parse_mode, **thread_kwargs ) except Exception as md_error: # Parse failed, fall back to plain text if "parse" in str(md_error).lower() or "markdown" in str(md_error).lower() or "html" in str(md_error).lower(): logger.warning("Parse mode %s failed in _send_telegram, falling back to plain text: %s", send_parse_mode, md_error) if not _has_html: try: from gateway.platforms.telegram import _strip_mdv2 plain = _strip_mdv2(formatted) except Exception: plain = message else: plain = message last_msg = await bot.send_message( chat_id=int_chat_id, text=plain, parse_mode=None, **thread_kwargs ) else: raise for media_path, is_voice in media_files: if not os.path.exists(media_path): warning = f"Media file not found, skipping: {media_path}" logger.warning(warning) warnings.append(warning) continue ext = os.path.splitext(media_path)[1].lower() try: with open(media_path, "rb") as f: if ext in _IMAGE_EXTS: last_msg = await bot.send_photo( chat_id=int_chat_id, photo=f, **thread_kwargs ) elif ext in _VIDEO_EXTS: last_msg = await bot.send_video( chat_id=int_chat_id, video=f, **thread_kwargs ) elif ext in _VOICE_EXTS and is_voice: last_msg = await bot.send_voice( chat_id=int_chat_id, voice=f, **thread_kwargs ) elif ext in _AUDIO_EXTS: last_msg = await bot.send_audio( chat_id=int_chat_id, audio=f, **thread_kwargs ) else: last_msg = await bot.send_document( chat_id=int_chat_id, document=f, **thread_kwargs ) except Exception as e: warning = f"Failed to send media {media_path}: {e}" logger.error(warning) warnings.append(warning) if last_msg is None: error = "No deliverable text or media remained after processing MEDIA tags" if warnings: return {"error": error, "warnings": warnings} return {"error": error} result = { "success": True, "platform": "telegram", "chat_id": chat_id, "message_id": str(last_msg.message_id), } if warnings: result["warnings"] = warnings return result except ImportError: return {"error": "python-telegram-bot not installed. Run: pip install python-telegram-bot"} except Exception as e: return {"error": f"Telegram send failed: {e}"} async def _send_discord(token, chat_id, message): """Send a single message via Discord REST API (no websocket client needed). Chunking is handled by _send_to_platform() before this is called. """ try: import aiohttp except ImportError: return {"error": "aiohttp not installed. Run: pip install aiohttp"} try: url = f"https://discord.com/api/v10/channels/{chat_id}/messages" headers = {"Authorization": f"Bot {token}", "Content-Type": "application/json"} async with aiohttp.ClientSession() as session: async with session.post(url, headers=headers, json={"content": message}) as resp: if resp.status not in (200, 201): body = await resp.text() return {"error": f"Discord API error ({resp.status}): {body}"} data = await resp.json() return {"success": True, "platform": "discord", "chat_id": chat_id, "message_id": data.get("id")} except Exception as e: return {"error": f"Discord send failed: {e}"} async def _send_slack(token, chat_id, message): """Send via Slack Web API.""" try: import aiohttp except ImportError: return {"error": "aiohttp not installed. Run: pip install aiohttp"} try: url = "https://slack.com/api/chat.postMessage" headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"} async with aiohttp.ClientSession() as session: async with session.post(url, headers=headers, json={"channel": chat_id, "text": message}) as resp: data = await resp.json() if data.get("ok"): return {"success": True, "platform": "slack", "chat_id": chat_id, "message_id": data.get("ts")} return {"error": f"Slack API error: {data.get('error', 'unknown')}"} except Exception as e: return {"error": f"Slack send failed: {e}"} async def _send_whatsapp(extra, chat_id, message): """Send via the local WhatsApp bridge HTTP API.""" try: import aiohttp except ImportError: return {"error": "aiohttp not installed. Run: pip install aiohttp"} try: bridge_port = extra.get("bridge_port", 3000) async with aiohttp.ClientSession() as session: async with session.post( f"http://localhost:{bridge_port}/send", json={"chatId": chat_id, "message": message}, timeout=aiohttp.ClientTimeout(total=30), ) as resp: if resp.status == 200: data = await resp.json() return { "success": True, "platform": "whatsapp", "chat_id": chat_id, "message_id": data.get("messageId"), } body = await resp.text() return {"error": f"WhatsApp bridge error ({resp.status}): {body}"} except Exception as e: return {"error": f"WhatsApp send failed: {e}"} async def _send_signal(extra, chat_id, message): """Send via signal-cli JSON-RPC API.""" try: import httpx except ImportError: return {"error": "httpx not installed"} try: http_url = extra.get("http_url", "http://127.0.0.1:8080").rstrip("/") account = extra.get("account", "") if not account: return {"error": "Signal account not configured"} params = {"account": account, "message": message} if chat_id.startswith("group:"): params["groupId"] = chat_id[6:] else: params["recipient"] = [chat_id] payload = { "jsonrpc": "2.0", "method": "send", "params": params, "id": f"send_{int(time.time() * 1000)}", } async with httpx.AsyncClient(timeout=30.0) as client: resp = await client.post(f"{http_url}/api/v1/rpc", json=payload) resp.raise_for_status() data = resp.json() if "error" in data: return {"error": f"Signal RPC error: {data['error']}"} return {"success": True, "platform": "signal", "chat_id": chat_id} except Exception as e: return {"error": f"Signal send failed: {e}"} async def _send_email(extra, chat_id, message): """Send via SMTP (one-shot, no persistent connection needed).""" import smtplib from email.mime.text import MIMEText address = extra.get("address") or os.getenv("EMAIL_ADDRESS", "") password = os.getenv("EMAIL_PASSWORD", "") smtp_host = extra.get("smtp_host") or os.getenv("EMAIL_SMTP_HOST", "") smtp_port = int(os.getenv("EMAIL_SMTP_PORT", "587")) if not all([address, password, smtp_host]): return {"error": "Email not configured (EMAIL_ADDRESS, EMAIL_PASSWORD, EMAIL_SMTP_HOST required)"} try: msg = MIMEText(message, "plain", "utf-8") msg["From"] = address msg["To"] = chat_id msg["Subject"] = "Hermes Agent" server = smtplib.SMTP(smtp_host, smtp_port) server.starttls(context=ssl.create_default_context()) server.login(address, password) server.send_message(msg) server.quit() return {"success": True, "platform": "email", "chat_id": chat_id} except Exception as e: return {"error": f"Email send failed: {e}"} async def _send_sms(auth_token, chat_id, message): """Send a single SMS via Twilio REST API. Uses HTTP Basic auth (Account SID : Auth Token) and form-encoded POST. Chunking is handled by _send_to_platform() before this is called. """ try: import aiohttp except ImportError: return {"error": "aiohttp not installed. Run: pip install aiohttp"} import base64 account_sid = os.getenv("TWILIO_ACCOUNT_SID", "") from_number = os.getenv("TWILIO_PHONE_NUMBER", "") if not account_sid or not auth_token or not from_number: return {"error": "SMS not configured (TWILIO_ACCOUNT_SID, TWILIO_AUTH_TOKEN, TWILIO_PHONE_NUMBER required)"} # Strip markdown — SMS renders it as literal characters message = re.sub(r"\*\*(.+?)\*\*", r"\1", message, flags=re.DOTALL) message = re.sub(r"\*(.+?)\*", r"\1", message, flags=re.DOTALL) message = re.sub(r"__(.+?)__", r"\1", message, flags=re.DOTALL) message = re.sub(r"_(.+?)_", r"\1", message, flags=re.DOTALL) message = re.sub(r"```[a-z]*\n?", "", message) message = re.sub(r"`(.+?)`", r"\1", message) message = re.sub(r"^#{1,6}\s+", "", message, flags=re.MULTILINE) message = re.sub(r"\[([^\]]+)\]\([^\)]+\)", r"\1", message) message = re.sub(r"\n{3,}", "\n\n", message) message = message.strip() try: creds = f"{account_sid}:{auth_token}" encoded = base64.b64encode(creds.encode("ascii")).decode("ascii") url = f"https://api.twilio.com/2010-04-01/Accounts/{account_sid}/Messages.json" headers = {"Authorization": f"Basic {encoded}"} async with aiohttp.ClientSession() as session: form_data = aiohttp.FormData() form_data.add_field("From", from_number) form_data.add_field("To", chat_id) form_data.add_field("Body", message) async with session.post(url, data=form_data, headers=headers) as resp: body = await resp.json() if resp.status >= 400: error_msg = body.get("message", str(body)) return {"error": f"Twilio API error ({resp.status}): {error_msg}"} msg_sid = body.get("sid", "") return {"success": True, "platform": "sms", "chat_id": chat_id, "message_id": msg_sid} except Exception as e: return {"error": f"SMS send failed: {e}"} def _check_send_message(): """Gate send_message on gateway running (always available on messaging platforms).""" platform = os.getenv("HERMES_SESSION_PLATFORM", "") if platform and platform != "local": return True try: from gateway.status import is_gateway_running return is_gateway_running() except Exception: return False # --- Registry --- from tools.registry import registry registry.register( name="send_message", toolset="messaging", schema=SEND_MESSAGE_SCHEMA, handler=send_message_tool, check_fn=_check_send_message, emoji="📨", )