File size: 8,367 Bytes
470bcea 37cb069 470bcea 37cb069 98ce58d 37cb069 470bcea 37cb069 98ce58d 470bcea 37cb069 98ce58d 37cb069 98ce58d 470bcea 98ce58d 470bcea 98ce58d 470bcea 98ce58d 470bcea 37cb069 470bcea 37cb069 470bcea 37cb069 470bcea | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 | """Notification helpers for the editorial workflow.
The institute's deployment fans curator-facing alerts out to Telegram and
ntfy. Workflow code should call `send_to_curator` rather than naming a
specific channel; forks that wire other transports (Slack, Matrix, email,
generic webhook) can replace this module's body without touching the
calling code. `send_telegram` and `send_ntfy` remain available as
channel-specific adapters for callers that legitimately need a specific
transport (e.g. the Telegram conversation responder for inbound replies,
which is platform-specific by nature).
"""
import json
import re
import urllib.request
import urllib.error
import config
def send_telegram(message: str, parse_mode: str | None = "Markdown",
chat_override: str | None = None,
thread_override: str | None = None) -> int | None:
"""Send a message via Telegram bot API. Pass parse_mode=None to send as plain text.
Returns the Telegram message_id on success (an int), None on failure.
Callers that only care about success/failure can `if send_telegram(...):` β
`None` is falsy and ints from Telegram are always truthy. Callers that
need the message_id (e.g. the submission borderline escalation, which
writes the id into the responder's incident JSON for reply-to lookup)
get the real value instead of the bool subclass-of-int that the prior
return shape silently produced.
Routes every editorial-system message to the ICSAC forum topic when
TELEGRAM_THREAD_ID is set in the env. The bot + supergroup are
shared with other operator monitoring topics, so the thread id is what
keeps the ICSAC traffic segregated.
`chat_override` (Tier 3 test path): when set non-empty, sends to that
chat ID instead of config.TELEGRAM_CHAT_ID. The thread id behavior is
unchanged so a test chat that lives in the same supergroup can still
pin to its own topic.
"""
if (chat_override or "") == "__suppress__":
return None
if not config.TELEGRAM_TOKEN or not config.TELEGRAM_CHAT_ID:
return None
url = f"https://api.telegram.org/bot{config.TELEGRAM_TOKEN}/sendMessage"
chat_id = (chat_override or "").strip() or config.TELEGRAM_CHAT_ID
payload = {
"chat_id": chat_id,
"text": message,
}
if parse_mode:
payload["parse_mode"] = parse_mode
thread_id = (thread_override or "").strip() or getattr(config, "TELEGRAM_THREAD_ID", "")
if thread_id:
try:
payload["message_thread_id"] = int(thread_id)
except ValueError:
print(f" Telegram warning: ignoring non-integer thread id {thread_id!r}")
data = json.dumps(payload).encode()
req = urllib.request.Request(url, data=data)
req.add_header("Content-Type", "application/json")
try:
with urllib.request.urlopen(req, timeout=15) as resp:
if resp.status != 200:
return None
try:
body = json.loads(resp.read().decode())
except (ValueError, UnicodeDecodeError):
return None
if not body.get("ok"):
return None
msg_id = body.get("result", {}).get("message_id")
return int(msg_id) if isinstance(msg_id, (int, str)) and str(msg_id).lstrip("-").isdigit() else None
except urllib.error.URLError as e:
print(f" Telegram error: {e}")
return None
def send_to_curator(message: str, parse_mode: str | None = "Markdown",
chat_override: str | None = None,
thread_override: str | None = None,
ntfy: bool = True) -> bool:
"""Dispatch a curator-facing alert to the operator's configured channels.
In the institute's deployment this fans out to Telegram + ntfy. Forks
that wire other channels (Slack, Matrix, email, webhook) should swap
this function's body; the workflow only calls send_to_curator and
does not assume which channels are live.
Test-tier routing: pass chat_override (and optionally thread_override)
to redirect Telegram to a separate test chat/topic. Pass ntfy=False to
skip the ntfy backup so test runs don't fire pain alerts.
"""
tg_ok = bool(send_telegram(message, parse_mode=parse_mode,
chat_override=chat_override,
thread_override=thread_override))
# ntfy doesn't render markdown; strip the formatting for the backup
# channel. The Telegram message_id from send_telegram is the
# caller-relevant return value for conversation-thread tracking,
# but send_to_curator is for one-shot alerts that don't need it.
if ntfy:
try:
plain = re.sub(r"[*_`]", "", message)
send_ntfy(plain, title="ICSAC")
except Exception:
pass
return tg_ok
def send_ntfy(message: str, title: str = "ICSAC Review Pipeline") -> bool:
"""Send notification to ntfy backup channel.
Short-circuits to False (no-op) when NTFY_BACKUPS_URL is unset, matching
the pattern used by fire_pain() in editorial_workflow β the ntfy channel
is optional and a missing URL should not be a fatal config error.
"""
url = getattr(config, "NTFY_BACKUPS_URL", "")
if not url:
return False
req = urllib.request.Request(url, data=message.encode())
req.add_header("Title", title)
try:
with urllib.request.urlopen(req, timeout=15) as resp:
return resp.status == 200
except urllib.error.URLError as e:
print(f" ntfy error: {e}")
return False
def notify_review_complete(review_data: dict, aggregate: dict) -> None:
"""Send review completion notification via Telegram and ntfy."""
title = review_data.get("title", "Untitled")
doi = review_data.get("doi", "N/A")
rec = aggregate.get("recommendation", "REVIEW_FURTHER")
models = ", ".join(aggregate.get("models_used", ["unknown"]))
disagreement = aggregate.get("disagreement", False)
msg = (
f"*ICSAC Review Complete*\n\n"
f"*Title:* {title}\n"
f"*DOI:* {doi}\n"
f"*Recommendation:* {rec}\n"
f"*Models:* {models}\n"
f"*Disagreement:* {'Yes' if disagreement else 'No'}\n\n"
f"Review saved. Awaiting human curator decision."
)
send_telegram(msg)
send_ntfy(f"{title}\nDOI: {doi}\nRecommendation: {rec}", title="ICSAC Review")
def alert_panel_failure(review_data: dict, reviews: list[dict],
valid_count: int, total_slots: int,
min_required: int) -> None:
"""AI panel review fell below minimum threshold after self-heal retries.
Sends Telegram (operator) + ntfy /pain (monitoring). The submission is
NOT auto-processed β it stays pending in Zenodo for human attention.
"""
title = review_data.get("title", "Untitled")
doi = review_data.get("doi", "N/A")
failed = [r.get("model", "?") for r in reviews if "error" in r]
succeeded = [r.get("model", "?") for r in reviews if "error" not in r]
errors = []
for r in reviews:
if "error" in r:
err = r["error"][:120]
errors.append(f" - {r.get('model', '?')}: {err}")
err_block = "\n".join(errors) if errors else " (no error details)"
msg = (
f"ICSAC Pipeline β AI Panel Failure\n\n"
f"Paper: {title}\n"
f"DOI: {doi}\n"
f"Reviewers OK: {valid_count}/{total_slots} (min required: {min_required})\n"
f"Succeeded: {', '.join(succeeded) or 'none'}\n"
f"Failed: {', '.join(failed)}\n\n"
f"Errors:\n{err_block}\n\n"
f"Self-heal retries exhausted. Submission paused β needs human attention. "
f"Zenodo request remains pending."
)
send_telegram(msg, parse_mode=None)
# Pain signal direct to monitoring endpoint
url = getattr(config, "NTFY_PAIN_URL", "")
if url:
import urllib.request
try:
req = urllib.request.Request(
url,
data=f"AI panel failed for {title}: {valid_count}/{total_slots} reviewers ok".encode(),
)
req.add_header("Title", "ICSAC Pipeline: AI Panel Failure")
urllib.request.urlopen(req, timeout=5)
except Exception:
pass
|