File size: 8,367 Bytes
470bcea
 
 
 
 
 
 
 
 
 
 
37cb069
 
470bcea
37cb069
 
 
 
 
 
 
98ce58d
 
37cb069
 
 
 
 
 
 
 
 
 
 
470bcea
 
 
37cb069
 
 
 
 
 
 
98ce58d
 
470bcea
 
37cb069
 
 
 
 
 
 
 
98ce58d
37cb069
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
98ce58d
 
 
 
470bcea
 
 
 
 
 
98ce58d
 
 
 
470bcea
98ce58d
 
 
470bcea
 
 
 
98ce58d
 
 
 
 
 
470bcea
 
 
37cb069
470bcea
 
 
 
 
 
 
 
 
 
37cb069
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
470bcea
37cb069
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
470bcea
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
"""Notification helpers for the editorial workflow.

The institute's deployment fans curator-facing alerts out to Telegram and
ntfy. Workflow code should call `send_to_curator` rather than naming a
specific channel; forks that wire other transports (Slack, Matrix, email,
generic webhook) can replace this module's body without touching the
calling code. `send_telegram` and `send_ntfy` remain available as
channel-specific adapters for callers that legitimately need a specific
transport (e.g. the Telegram conversation responder for inbound replies,
which is platform-specific by nature).
"""

import json
import re
import urllib.request
import urllib.error

import config


def send_telegram(message: str, parse_mode: str | None = "Markdown",
                  chat_override: str | None = None,
                  thread_override: str | None = None) -> int | None:
    """Send a message via Telegram bot API. Pass parse_mode=None to send as plain text.

    Returns the Telegram message_id on success (an int), None on failure.

    Callers that only care about success/failure can `if send_telegram(...):` β€”
    `None` is falsy and ints from Telegram are always truthy. Callers that
    need the message_id (e.g. the submission borderline escalation, which
    writes the id into the responder's incident JSON for reply-to lookup)
    get the real value instead of the bool subclass-of-int that the prior
    return shape silently produced.

    Routes every editorial-system message to the ICSAC forum topic when
    TELEGRAM_THREAD_ID is set in the env. The bot + supergroup are
    shared with other operator monitoring topics, so the thread id is what
    keeps the ICSAC traffic segregated.

    `chat_override` (Tier 3 test path): when set non-empty, sends to that
    chat ID instead of config.TELEGRAM_CHAT_ID. The thread id behavior is
    unchanged so a test chat that lives in the same supergroup can still
    pin to its own topic.
    """
    if (chat_override or "") == "__suppress__":
        return None
    if not config.TELEGRAM_TOKEN or not config.TELEGRAM_CHAT_ID:
        return None
    url = f"https://api.telegram.org/bot{config.TELEGRAM_TOKEN}/sendMessage"
    chat_id = (chat_override or "").strip() or config.TELEGRAM_CHAT_ID
    payload = {
        "chat_id": chat_id,
        "text": message,
    }
    if parse_mode:
        payload["parse_mode"] = parse_mode
    thread_id = (thread_override or "").strip() or getattr(config, "TELEGRAM_THREAD_ID", "")
    if thread_id:
        try:
            payload["message_thread_id"] = int(thread_id)
        except ValueError:
            print(f"  Telegram warning: ignoring non-integer thread id {thread_id!r}")
    data = json.dumps(payload).encode()

    req = urllib.request.Request(url, data=data)
    req.add_header("Content-Type", "application/json")

    try:
        with urllib.request.urlopen(req, timeout=15) as resp:
            if resp.status != 200:
                return None
            try:
                body = json.loads(resp.read().decode())
            except (ValueError, UnicodeDecodeError):
                return None
            if not body.get("ok"):
                return None
            msg_id = body.get("result", {}).get("message_id")
            return int(msg_id) if isinstance(msg_id, (int, str)) and str(msg_id).lstrip("-").isdigit() else None
    except urllib.error.URLError as e:
        print(f"  Telegram error: {e}")
        return None


def send_to_curator(message: str, parse_mode: str | None = "Markdown",
                    chat_override: str | None = None,
                    thread_override: str | None = None,
                    ntfy: bool = True) -> bool:
    """Dispatch a curator-facing alert to the operator's configured channels.

    In the institute's deployment this fans out to Telegram + ntfy. Forks
    that wire other channels (Slack, Matrix, email, webhook) should swap
    this function's body; the workflow only calls send_to_curator and
    does not assume which channels are live.

    Test-tier routing: pass chat_override (and optionally thread_override)
    to redirect Telegram to a separate test chat/topic. Pass ntfy=False to
    skip the ntfy backup so test runs don't fire pain alerts.
    """
    tg_ok = bool(send_telegram(message, parse_mode=parse_mode,
                               chat_override=chat_override,
                               thread_override=thread_override))
    # ntfy doesn't render markdown; strip the formatting for the backup
    # channel. The Telegram message_id from send_telegram is the
    # caller-relevant return value for conversation-thread tracking,
    # but send_to_curator is for one-shot alerts that don't need it.
    if ntfy:
        try:
            plain = re.sub(r"[*_`]", "", message)
            send_ntfy(plain, title="ICSAC")
        except Exception:
            pass
    return tg_ok


def send_ntfy(message: str, title: str = "ICSAC Review Pipeline") -> bool:
    """Send notification to ntfy backup channel.

    Short-circuits to False (no-op) when NTFY_BACKUPS_URL is unset, matching
    the pattern used by fire_pain() in editorial_workflow β€” the ntfy channel
    is optional and a missing URL should not be a fatal config error.
    """
    url = getattr(config, "NTFY_BACKUPS_URL", "")
    if not url:
        return False
    req = urllib.request.Request(url, data=message.encode())
    req.add_header("Title", title)

    try:
        with urllib.request.urlopen(req, timeout=15) as resp:
            return resp.status == 200
    except urllib.error.URLError as e:
        print(f"  ntfy error: {e}")
        return False


def notify_review_complete(review_data: dict, aggregate: dict) -> None:
    """Send review completion notification via Telegram and ntfy."""
    title = review_data.get("title", "Untitled")
    doi = review_data.get("doi", "N/A")
    rec = aggregate.get("recommendation", "REVIEW_FURTHER")
    models = ", ".join(aggregate.get("models_used", ["unknown"]))
    disagreement = aggregate.get("disagreement", False)

    msg = (
        f"*ICSAC Review Complete*\n\n"
        f"*Title:* {title}\n"
        f"*DOI:* {doi}\n"
        f"*Recommendation:* {rec}\n"
        f"*Models:* {models}\n"
        f"*Disagreement:* {'Yes' if disagreement else 'No'}\n\n"
        f"Review saved. Awaiting human curator decision."
    )

    send_telegram(msg)
    send_ntfy(f"{title}\nDOI: {doi}\nRecommendation: {rec}", title="ICSAC Review")



def alert_panel_failure(review_data: dict, reviews: list[dict],
                        valid_count: int, total_slots: int,
                        min_required: int) -> None:
    """AI panel review fell below minimum threshold after self-heal retries.

    Sends Telegram (operator) + ntfy /pain (monitoring). The submission is
    NOT auto-processed β€” it stays pending in Zenodo for human attention.
    """
    title = review_data.get("title", "Untitled")
    doi = review_data.get("doi", "N/A")
    failed = [r.get("model", "?") for r in reviews if "error" in r]
    succeeded = [r.get("model", "?") for r in reviews if "error" not in r]
    errors = []
    for r in reviews:
        if "error" in r:
            err = r["error"][:120]
            errors.append(f"  - {r.get('model', '?')}: {err}")
    err_block = "\n".join(errors) if errors else "  (no error details)"

    msg = (
        f"ICSAC Pipeline β€” AI Panel Failure\n\n"
        f"Paper: {title}\n"
        f"DOI: {doi}\n"
        f"Reviewers OK: {valid_count}/{total_slots} (min required: {min_required})\n"
        f"Succeeded: {', '.join(succeeded) or 'none'}\n"
        f"Failed: {', '.join(failed)}\n\n"
        f"Errors:\n{err_block}\n\n"
        f"Self-heal retries exhausted. Submission paused β€” needs human attention. "
        f"Zenodo request remains pending."
    )

    send_telegram(msg, parse_mode=None)

    # Pain signal direct to monitoring endpoint
    url = getattr(config, "NTFY_PAIN_URL", "")
    if url:
        import urllib.request
        try:
            req = urllib.request.Request(
                url,
                data=f"AI panel failed for {title}: {valid_count}/{total_slots} reviewers ok".encode(),
            )
            req.add_header("Title", "ICSAC Pipeline: AI Panel Failure")
            urllib.request.urlopen(req, timeout=5)
        except Exception:
            pass