| """Notification helpers for the editorial workflow. |
| |
| The institute's deployment fans curator-facing alerts out to Telegram and |
| ntfy. Workflow code should call `send_to_curator` rather than naming a |
| specific channel; forks that wire other transports (Slack, Matrix, email, |
| generic webhook) can replace this module's body without touching the |
| calling code. `send_telegram` and `send_ntfy` remain available as |
| channel-specific adapters for callers that legitimately need a specific |
| transport (e.g. the Telegram conversation responder for inbound replies, |
| which is platform-specific by nature). |
| """ |
|
|
| import json |
| import re |
| import urllib.request |
| import urllib.error |
|
|
| import config |
|
|
|
|
| def send_telegram(message: str, parse_mode: str | None = "Markdown", |
| chat_override: str | None = None, |
| thread_override: str | None = None) -> int | None: |
| """Send a message via Telegram bot API. Pass parse_mode=None to send as plain text. |
| |
| Returns the Telegram message_id on success (an int), None on failure. |
| |
| Callers that only care about success/failure can `if send_telegram(...):` — |
| `None` is falsy and ints from Telegram are always truthy. Callers that |
| need the message_id (e.g. the submission borderline escalation, which |
| writes the id into the responder's incident JSON for reply-to lookup) |
| get the real value instead of the bool subclass-of-int that the prior |
| return shape silently produced. |
| |
| Routes every editorial-system message to the ICSAC forum topic when |
| TELEGRAM_THREAD_ID is set in the env. The bot + supergroup are |
| shared with other operator monitoring topics, so the thread id is what |
| keeps the ICSAC traffic segregated. |
| |
| `chat_override` (Tier 3 test path): when set non-empty, sends to that |
| chat ID instead of config.TELEGRAM_CHAT_ID. The thread id behavior is |
| unchanged so a test chat that lives in the same supergroup can still |
| pin to its own topic. |
| """ |
| if (chat_override or "") == "__suppress__": |
| return None |
| if not config.TELEGRAM_TOKEN or not config.TELEGRAM_CHAT_ID: |
| return None |
| url = f"https://api.telegram.org/bot{config.TELEGRAM_TOKEN}/sendMessage" |
| chat_id = (chat_override or "").strip() or config.TELEGRAM_CHAT_ID |
| payload = { |
| "chat_id": chat_id, |
| "text": message, |
| } |
| if parse_mode: |
| payload["parse_mode"] = parse_mode |
| thread_id = (thread_override or "").strip() or getattr(config, "TELEGRAM_THREAD_ID", "") |
| if thread_id: |
| try: |
| payload["message_thread_id"] = int(thread_id) |
| except ValueError: |
| print(f" Telegram warning: ignoring non-integer thread id {thread_id!r}") |
| data = json.dumps(payload).encode() |
|
|
| req = urllib.request.Request(url, data=data) |
| req.add_header("Content-Type", "application/json") |
|
|
| try: |
| with urllib.request.urlopen(req, timeout=15) as resp: |
| if resp.status != 200: |
| return None |
| try: |
| body = json.loads(resp.read().decode()) |
| except (ValueError, UnicodeDecodeError): |
| return None |
| if not body.get("ok"): |
| return None |
| msg_id = body.get("result", {}).get("message_id") |
| return int(msg_id) if isinstance(msg_id, (int, str)) and str(msg_id).lstrip("-").isdigit() else None |
| except urllib.error.URLError as e: |
| print(f" Telegram error: {e}") |
| return None |
|
|
|
|
| def send_to_curator(message: str, parse_mode: str | None = "Markdown", |
| chat_override: str | None = None, |
| thread_override: str | None = None, |
| ntfy: bool = True) -> bool: |
| """Dispatch a curator-facing alert to the operator's configured channels. |
| |
| In the institute's deployment this fans out to Telegram + ntfy. Forks |
| that wire other channels (Slack, Matrix, email, webhook) should swap |
| this function's body; the workflow only calls send_to_curator and |
| does not assume which channels are live. |
| |
| Test-tier routing: pass chat_override (and optionally thread_override) |
| to redirect Telegram to a separate test chat/topic. Pass ntfy=False to |
| skip the ntfy backup so test runs don't fire pain alerts. |
| """ |
| tg_ok = bool(send_telegram(message, parse_mode=parse_mode, |
| chat_override=chat_override, |
| thread_override=thread_override)) |
| |
| |
| |
| |
| if ntfy: |
| try: |
| plain = re.sub(r"[*_`]", "", message) |
| send_ntfy(plain, title="ICSAC") |
| except Exception: |
| pass |
| return tg_ok |
|
|
|
|
| def send_ntfy(message: str, title: str = "ICSAC Review Pipeline") -> bool: |
| """Send notification to ntfy backup channel. |
| |
| Short-circuits to False (no-op) when NTFY_BACKUPS_URL is unset, matching |
| the pattern used by fire_pain() in editorial_workflow — the ntfy channel |
| is optional and a missing URL should not be a fatal config error. |
| """ |
| url = getattr(config, "NTFY_BACKUPS_URL", "") |
| if not url: |
| return False |
| req = urllib.request.Request(url, data=message.encode()) |
| req.add_header("Title", title) |
|
|
| try: |
| with urllib.request.urlopen(req, timeout=15) as resp: |
| return resp.status == 200 |
| except urllib.error.URLError as e: |
| print(f" ntfy error: {e}") |
| return False |
|
|
|
|
| def notify_review_complete(review_data: dict, aggregate: dict) -> None: |
| """Send review completion notification via Telegram and ntfy.""" |
| title = review_data.get("title", "Untitled") |
| doi = review_data.get("doi", "N/A") |
| rec = aggregate.get("recommendation", "REVIEW_FURTHER") |
| models = ", ".join(aggregate.get("models_used", ["unknown"])) |
| disagreement = aggregate.get("disagreement", False) |
|
|
| msg = ( |
| f"*ICSAC Review Complete*\n\n" |
| f"*Title:* {title}\n" |
| f"*DOI:* {doi}\n" |
| f"*Recommendation:* {rec}\n" |
| f"*Models:* {models}\n" |
| f"*Disagreement:* {'Yes' if disagreement else 'No'}\n\n" |
| f"Review saved. Awaiting human curator decision." |
| ) |
|
|
| send_telegram(msg) |
| send_ntfy(f"{title}\nDOI: {doi}\nRecommendation: {rec}", title="ICSAC Review") |
|
|
|
|
|
|
| def alert_panel_failure(review_data: dict, reviews: list[dict], |
| valid_count: int, total_slots: int, |
| min_required: int) -> None: |
| """AI panel review fell below minimum threshold after self-heal retries. |
| |
| Sends Telegram (operator) + ntfy /pain (monitoring). The submission is |
| NOT auto-processed — it stays pending in Zenodo for human attention. |
| """ |
| title = review_data.get("title", "Untitled") |
| doi = review_data.get("doi", "N/A") |
| failed = [r.get("model", "?") for r in reviews if "error" in r] |
| succeeded = [r.get("model", "?") for r in reviews if "error" not in r] |
| errors = [] |
| for r in reviews: |
| if "error" in r: |
| err = r["error"][:120] |
| errors.append(f" - {r.get('model', '?')}: {err}") |
| err_block = "\n".join(errors) if errors else " (no error details)" |
|
|
| msg = ( |
| f"ICSAC Pipeline — AI Panel Failure\n\n" |
| f"Paper: {title}\n" |
| f"DOI: {doi}\n" |
| f"Reviewers OK: {valid_count}/{total_slots} (min required: {min_required})\n" |
| f"Succeeded: {', '.join(succeeded) or 'none'}\n" |
| f"Failed: {', '.join(failed)}\n\n" |
| f"Errors:\n{err_block}\n\n" |
| f"Self-heal retries exhausted. Submission paused — needs human attention. " |
| f"Zenodo request remains pending." |
| ) |
|
|
| send_telegram(msg, parse_mode=None) |
|
|
| |
| url = getattr(config, "NTFY_PAIN_URL", "") |
| if url: |
| import urllib.request |
| try: |
| req = urllib.request.Request( |
| url, |
| data=f"AI panel failed for {title}: {valid_count}/{total_slots} reviewers ok".encode(), |
| ) |
| req.add_header("Title", "ICSAC Pipeline: AI Panel Failure") |
| urllib.request.urlopen(req, timeout=5) |
| except Exception: |
| pass |
|
|