editorial-system / watch.py
ICSAC's picture
Public-flip preparation
470bcea
"""Watcher for ICSAC community-inclusion requests.
Polls /api/user/requests, diffs against state/watched.json, fires side effects
on transitions:
unknown → submitted (open): run review (panel + write markdown locally)
submitted/reviewed → accepted: post branded comment + register landing page
submitted/reviewed → declined: post branded decline comment with review summary
submitted/reviewed → cancelled: no action (author withdrew)
Fully automated. The only human step is the click in the Zenodo curator UI.
The branded comment is delivered to the author by Zenodo's notification machinery,
so we do not need to discover author emails.
State file format (state/watched.json):
{
"<request_id>": {
"record_id": "...",
"title": "...",
"first_seen": "iso",
"status": "submitted|reviewed|accepted|declined|cancelled",
"review_path": "reviews/<id>_<slug>.md" or null,
"last_check": "iso"
}
}
Bootstrap mode marks every currently-visible request with its current status
WITHOUT firing side effects, so we don't re-fire emails for historical state.
Pain wiring: any uncaught exception in tick() fires /pain. Successful tick
also pings the Uptime Kuma push monitor for silence detection.
"""
import datetime
import json
import os
import sys
import traceback
import urllib.error
import urllib.request
import action
import config
import email_render
import submission_intake
import notify
import redaction
STATE_DIR = os.path.join(config.BASE_DIR, "state")
STATE_PATH = os.path.join(STATE_DIR, "watched.json")
TERMINAL_STATUSES = {"accepted", "declined", "cancelled", "expired"}
def _now_iso() -> str:
return datetime.datetime.now(datetime.timezone.utc).isoformat()
def _load_state() -> dict:
if not os.path.isfile(STATE_PATH):
return {}
with open(STATE_PATH) as f:
return json.load(f)
def _save_state(state: dict) -> None:
os.makedirs(STATE_DIR, exist_ok=True)
tmp = STATE_PATH + ".tmp"
with open(tmp, "w") as f:
json.dump(state, f, indent=2, sort_keys=True)
f.write("\n")
os.replace(tmp, STATE_PATH)
def _fire_pain(title: str, body: str) -> None:
url = getattr(config, "NTFY_PAIN_URL", "")
if not url:
return
try:
req = urllib.request.Request(url, data=body.encode())
req.add_header("Title", title)
urllib.request.urlopen(req, timeout=5)
except Exception:
pass
def _ping_kuma(status: str = "up", msg: str = "") -> None:
base = getattr(config, "KUMA_PUSH_URL", "")
if not base:
return
try:
url = f"{base}?status={status}&msg={urllib.request.quote(msg)}"
urllib.request.urlopen(url, timeout=5)
except Exception:
pass
def _safe_post_comment(request_id: str, body: str, kind: str, context: str) -> bool:
"""Run the redaction grep-gate on a rendered Zenodo-comment body before posting.
The accept/decline comment includes text pulled from the on-disk review
(summary, concerns). A poisoned review that survived the panel's own
defenses could still leak through this pass-through path — this gate
catches credential prefixes, filesystem paths, env-var assignments, and
vendor/model tokens before the body reaches Zenodo.
On a fatal hit the comment is NOT posted. Zenodo has already delivered
its own state-change notification to the author, so the author still
learns the decision; only our branded follow-up is suppressed. /pain
fires so the operator can inspect and post a cleaned comment manually.
"""
try:
redaction.assert_clean(body, artifact_path=f"{kind}-comment:{request_id}")
except redaction.RedactionLeak as e:
print(f" {kind} comment blocked by redaction gate: {e}")
_fire_pain(
f"ICSAC Watcher: {kind} comment blocked by redaction gate",
(
f"{e}\n\nContext: {context}\n"
f"The branded {kind} comment was NOT posted to request {request_id}. "
f"Zenodo's own state-change notification still reached the author. "
f"Inspect the rendered comment, redact the leak, and post manually "
f"via `python3 -c 'import action; action.post_request_comment(...)'`."
),
)
return False
return action.post_request_comment(request_id, body, fmt="html")
def _parse_review_flags(review_path: str | None) -> tuple[bool, bool]:
"""Read the review + RQC markdown frontmatter to extract gate flags.
Returns (disagreement, rqc_flag). Either true means the auto-posted
Zenodo comment must be suppressed and the operator must approve the
branded follow-up manually.
Missing files are treated as (False, False) — absence of signal, not
presence of agreement. The operator still sees Zenodo's own decision
notification; only our branded follow-up is gated.
"""
disagreement = False
rqc_flag = False
if review_path and os.path.isfile(review_path):
try:
with open(review_path) as f:
text = f.read()
fm = {}
if text.startswith("---\n"):
end = text.find("\n---\n", 4)
if end > 0:
for line in text[4:end].splitlines():
if ":" in line:
k, v = line.split(":", 1)
fm[k.strip()] = v.strip().strip('"').strip("'")
disagreement = fm.get("disagreement", "False").lower() == "true"
except Exception:
pass
if review_path:
record_id = os.path.basename(review_path).split("_", 1)[0]
rqc_path = os.path.join(os.path.dirname(review_path), f"{record_id}_review_quality_control.md")
if os.path.isfile(rqc_path):
try:
with open(rqc_path) as f:
text = f.read()
if text.startswith("---\n"):
end = text.find("\n---\n", 4)
if end > 0:
for line in text[4:end].splitlines():
if line.strip().startswith("review_quality_control_flag:"):
val = line.split(":", 1)[1].strip().strip('"').strip("'")
rqc_flag = val.lower() == "true"
break
except Exception:
pass
return disagreement, rqc_flag
def _parse_review_recommendation(review_path: str | None) -> str:
"""Read the review markdown frontmatter's `recommendation` field.
Returns the uppercased recommendation string (e.g. "REJECT",
"REVISE_AND_RESUBMIT", "RECOMMEND", "REVIEW_FURTHER") or an empty
string if the file or field is missing. Used by the decline
transition handler to pick the scope-reject vs R&R comment template.
"""
if not review_path or not os.path.isfile(review_path):
return ""
try:
with open(review_path) as f:
text = f.read()
except Exception:
return ""
if not text.startswith("---\n"):
return ""
end = text.find("\n---\n", 4)
if end <= 0:
return ""
for line in text[4:end].splitlines():
if ":" not in line:
continue
k, v = line.split(":", 1)
if k.strip() == "recommendation":
return v.strip().strip('"').strip("'").upper()
return ""
def _escalate_comment(rid: str, record_id: str, title: str, kind: str,
comment_md: str, disagreement: bool, rqc_flag: bool) -> None:
"""Suppress auto-posting the branded Zenodo comment; notify the operator.
The watcher calls this when the on-disk review signals panel disagreement
or a Review Quality Control flag. Zenodo still delivers its own state-change
notification to the author, so the author still learns the decision; only
the ICSAC-branded follow-up is held pending operator review.
"""
reasons = []
if disagreement:
reasons.append("panel disagreement")
if rqc_flag:
reasons.append("RQC flagged")
reason_str = " + ".join(reasons) or "quality gate"
print(f" {kind} comment gated ({reason_str}); escalating to operator")
msg = (
f"ICSAC Pipeline — {kind.capitalize()} Comment Held\n\n"
f"Record: {record_id}\n"
f"Title: {title[:160]}\n"
f"Reason: {reason_str}\n\n"
f"Zenodo's state-change notification reached the author. The ICSAC-branded "
f"{kind} comment is held pending your review. Inspect the rendered comment "
f"below, adjust if needed, then post manually via "
f"`python3 -c 'import action; action.post_request_comment(\"{rid}\", BODY, fmt=\"html\")'`.\n\n"
f"--- Rendered comment body ---\n{comment_md[:3500]}"
)
notify.send_to_curator(msg, parse_mode=None)
_fire_pain(
f"ICSAC Watcher: {kind} comment held ({reason_str})",
f"Record {record_id}: {title[:120]}\nReason: {reason_str}\nCheck the curator's configured channel for the rendered comment body.",
)
def _fetch_record_metadata(record_id: str) -> dict:
"""Fetch a record's Zenodo metadata. Public endpoint — no auth needed."""
url = f"{config.ZENODO_API}/records/{record_id}"
req = urllib.request.Request(url)
if config.ZENODO_TOKEN:
req.add_header("Authorization", f"Bearer {config.ZENODO_TOKEN}")
with urllib.request.urlopen(req, timeout=30) as resp:
return json.loads(resp.read().decode())
def _doi_from_record(record_id: str) -> str:
md = _fetch_record_metadata(record_id)
return md.get("doi") or md.get("metadata", {}).get("doi", "")
def _review_data_from_record(record_id: str, review_path: str | None) -> dict:
"""Build the dict that email_render._base_data expects.
Pulls metadata via submission_intake.ingest_doi (which uses Zenodo's record API)
and overlays the local review's recommendation/disagreement if available.
"""
doi = _doi_from_record(record_id)
data = submission_intake.ingest_doi(doi) if doi else {"record_id": record_id}
data["record_id"] = str(record_id)
return data
def _generate_review(record_id: str) -> str | None:
"""Run the review panel for a record. Returns the review markdown path,
or None on failure."""
import editorial_workflow as pl
doi = _doi_from_record(record_id)
if not doi:
print(f" no DOI for record {record_id}; skipping review")
return None
print(f" generating review for {doi}")
try:
result = pl.review_doi(doi, skip_notify=True)
except Exception as e:
print(f" review failed: {e}")
return None
review_path = result.get("review_path") if isinstance(result, dict) else None
if not review_path:
# review_doi historically didn't return path — find it under reviews/
candidates = [
os.path.join(config.REVIEWS_DIR, f)
for f in os.listdir(config.REVIEWS_DIR)
if f.startswith(f"{record_id}_") and f.endswith(".md")
]
review_path = max(candidates, key=os.path.getmtime) if candidates else None
return review_path
def _handle_new_submission(req: dict, state: dict, skip_review: bool = False) -> None:
"""Generate a review for a newly-seen open submission.
When skip_review=True, the submission is still tracked in state but
review generation is deferred until the next tick with a healthy
reviewer panel. Status stays 'submitted' so a later tick picks it up.
"""
rid = req["id"]
record_id = str(req["topic"]["record"])
raw_title = req.get("title") or ""
if isinstance(raw_title, dict):
title = raw_title.get("title", "")
else:
title = str(raw_title)
title = title or _record_title(record_id)
print(f"NEW SUBMISSION: request={rid[:8]} record={record_id}{title[:80]}")
# Skip review if one already exists on disk (covers re-runs, bootstrap)
existing = _find_existing_review(record_id)
if existing:
print(f" review already on disk: {existing}")
review_path = existing
elif skip_review:
print(f" review deferred (skip_reviews=True); tracking submission only")
review_path = None
else:
review_path = _generate_review(record_id)
state[rid] = {
"record_id": record_id,
"title": title[:200],
"first_seen": _now_iso(),
"status": "reviewed" if review_path else "submitted",
"review_path": review_path,
"last_check": _now_iso(),
}
def _handle_accept_transition(req: dict, state_entry: dict) -> None:
"""Curator accepted the request via UI/API. Post our comment + register paper."""
rid = req["id"]
record_id = state_entry["record_id"]
title = state_entry.get("title", "")
print(f"ACCEPT TRANSITION: request={rid[:8]} record={record_id}{title[:80]}")
# Comment first (lightweight, idempotency is on us — the watcher only fires
# this branch once per request because we then mark state.status=accepted).
# Quality gate: if the on-disk review shows panel disagreement or the RQC
# audit tripped, the branded comment is held for operator review rather
# than auto-posted. The landing-page registry still publishes so the
# accept itself is not blocked.
disagreement, rqc_flag = _parse_review_flags(state_entry.get("review_path"))
try:
review_data = _review_data_from_record(record_id, state_entry.get("review_path"))
landing_url = f"https://icsacinstitute.org/accepted/{record_id}"
comment_md = email_render.render_accept_comment(review_data, landing_url=landing_url)
if disagreement or rqc_flag:
_escalate_comment(rid, record_id, title, "accept", comment_md, disagreement, rqc_flag)
else:
ok = _safe_post_comment(rid, comment_md, "accept", context=title[:120])
print(f" branded comment posted: {ok}")
except Exception as e:
print(f" comment post failed (non-fatal): {e}")
_fire_pain(
"ICSAC Watcher: accept comment failed",
f"Could not post accept comment to request {rid} (record {record_id}): {e}",
)
# Then register on the website (landing page + redacted review + stats + push)
try:
action.register_accepted_paper(record_id)
except Exception as e:
print(f" registry update failed: {e}")
_fire_pain(
"ICSAC Watcher: registry push failed",
f"Accept comment posted but landing-page registry push failed for "
f"record {record_id}: {e}",
)
def _handle_decline_transition(req: dict, state_entry: dict) -> None:
"""Curator declined the request via UI/API. Post our decline comment."""
rid = req["id"]
record_id = state_entry["record_id"]
title = state_entry.get("title", "")
print(f"DECLINE TRANSITION: request={rid[:8]} record={record_id}{title[:80]}")
disagreement, rqc_flag = _parse_review_flags(state_entry.get("review_path"))
try:
review_data = _review_data_from_record(record_id, state_entry.get("review_path"))
# Route to the scope-reject vs R&R template based on the on-disk
# panel recommendation. REJECT routes to the scope-not-suitable
# template (no review_summary/concerns — the verdict is "out of
# scope," not "revise these points"). Everything else (R&R,
# REVIEW_FURTHER, or anything operator-set) takes the default
# R&R template with the parsed review blurb.
recommendation = _parse_review_recommendation(state_entry.get("review_path"))
if recommendation == "REJECT":
comment_md = email_render.render_scope_reject_comment(review_data)
else:
summary, concerns = _extract_review_blurb(state_entry.get("review_path"))
comment_md = email_render.render_revise_and_resubmit_comment(
review_data,
review_summary=summary,
specific_concerns=concerns,
)
if disagreement or rqc_flag:
_escalate_comment(rid, record_id, title, "decline", comment_md, disagreement, rqc_flag)
else:
ok = _safe_post_comment(rid, comment_md, "decline", context=title[:120])
print(f" branded decline comment posted: {ok}")
except Exception as e:
print(f" decline comment failed: {e}")
_fire_pain(
"ICSAC Watcher: decline comment failed",
f"Could not post decline comment to request {rid} (record {record_id}): {e}",
)
def _extract_review_blurb(review_path: str | None) -> tuple[str, str]:
"""Pull a short summary + concerns string from the review markdown.
Used to fill the decline comment. Falls back to generic text if parsing fails.
"""
if not review_path or not os.path.isfile(review_path):
return ("", "")
try:
with open(review_path) as f:
txt = f.read()
except Exception:
return ("", "")
summary, concerns = "", ""
# Pull the first "Summary" / "Concerns" sections if present.
# Reviews vary in shape — best-effort.
for hdr, target in (("## Summary", "summary"), ("## Concerns", "concerns"),
("### Summary", "summary"), ("### Key Concerns", "concerns")):
if hdr in txt:
chunk = txt.split(hdr, 1)[1].split("\n##", 1)[0].strip()
chunk = chunk[:600]
if target == "summary":
summary = chunk
else:
concerns = chunk
return (summary, concerns)
def _find_existing_review(record_id: str) -> str | None:
if not os.path.isdir(config.REVIEWS_DIR):
return None
candidates = [
os.path.join(config.REVIEWS_DIR, f)
for f in os.listdir(config.REVIEWS_DIR)
if f.startswith(f"{record_id}_") and f.endswith(".md")
]
return max(candidates, key=os.path.getmtime) if candidates else None
def _record_title(record_id: str) -> str:
try:
md = _fetch_record_metadata(record_id)
return md.get("metadata", {}).get("title", "") or ""
except Exception:
return ""
def tick(bootstrap: bool = False, skip_reviews: bool = False) -> None:
"""One polling cycle. Fetches all ICSAC requests (open + closed) so we can
detect transitions. Fires side effects only outside of bootstrap mode.
skip_reviews=True defers review generation (used by batch-tick when the
OR model availability check fails). Transitions always run — accept/decline
comments + landing-page publication don't depend on reviewer panel health.
"""
state = _load_state()
requests = action.get_community_requests(open_only=False)
print(f"watch-tick: {len(requests)} ICSAC requests visible "
f"(bootstrap={bootstrap}, skip_reviews={skip_reviews})")
fired = {"new": 0, "accept": 0, "decline": 0, "cancel": 0,
"deferred_review": 0, "noop": 0}
for req in requests:
rid = req["id"]
zstatus = req.get("status", "submitted")
prior = state.get(rid)
if prior is None:
# First sighting
if bootstrap:
state[rid] = {
"record_id": str(req["topic"]["record"]),
"title": _record_title(str(req["topic"]["record"]))[:200],
"first_seen": _now_iso(),
"status": zstatus,
"review_path": _find_existing_review(str(req["topic"]["record"])),
"last_check": _now_iso(),
}
fired["noop"] += 1
continue
if zstatus == "submitted":
_handle_new_submission(req, state, skip_review=skip_reviews)
fired["new"] += 1
else:
# Closed before we ever saw it open — just record, do nothing.
state[rid] = {
"record_id": str(req["topic"]["record"]),
"title": _record_title(str(req["topic"]["record"]))[:200],
"first_seen": _now_iso(),
"status": zstatus,
"review_path": _find_existing_review(str(req["topic"]["record"])),
"last_check": _now_iso(),
}
fired["noop"] += 1
continue
# Already in state — check for transitions.
prior_status = prior.get("status")
if prior_status in TERMINAL_STATUSES:
prior["last_check"] = _now_iso()
fired["noop"] += 1
continue
# Deferred-review recovery: a prior tick skipped the review because
# the panel was starved. If we're healthy now AND the submission is
# still open, try to generate the review this tick.
if (prior_status == "submitted"
and not prior.get("review_path")
and not skip_reviews
and zstatus == "submitted"):
print(f"DEFERRED REVIEW: request={rid[:8]} record={prior['record_id']}")
review_path = _generate_review(prior["record_id"])
if review_path:
prior["review_path"] = review_path
prior["status"] = "reviewed"
fired["deferred_review"] += 1
if zstatus == prior_status:
prior["last_check"] = _now_iso()
fired["noop"] += 1
continue
# Transition!
if zstatus == "accepted":
if not bootstrap:
_handle_accept_transition(req, prior)
fired["accept"] += 1
prior["status"] = "accepted"
elif zstatus == "declined":
if not bootstrap:
_handle_decline_transition(req, prior)
fired["decline"] += 1
prior["status"] = "declined"
elif zstatus == "cancelled":
prior["status"] = "cancelled"
fired["cancel"] += 1
elif zstatus == "submitted":
# Reopened? Just track; do not re-review.
prior["status"] = "submitted"
fired["noop"] += 1
prior["last_check"] = _now_iso()
_save_state(state)
summary = ", ".join(f"{k}={v}" for k, v in fired.items())
print(f"watch-tick done: {summary} (bootstrap={bootstrap})")
_ping_kuma("up", f"watch-tick ok: {summary}")
def main() -> int:
bootstrap = "--bootstrap" in sys.argv
skip_reviews = "--skip-reviews" in sys.argv
try:
tick(bootstrap=bootstrap, skip_reviews=skip_reviews)
return 0
except Exception as e:
traceback.print_exc()
scrubbed_tb = (
traceback.format_exc()
.replace(config.BASE_DIR, "…")
.replace("/home/orangepi", "…")
)
_fire_pain(
"ICSAC Watcher: tick crash",
f"watch.py tick failed: {e}\n\n{scrubbed_tb[:1500]}",
)
_ping_kuma("down", f"watch-tick crash: {e}")
return 1
if __name__ == "__main__":
sys.exit(main())