editorial-system / stats.py
ICSAC's picture
Public-flip preparation
470bcea
"""Panel-quality snapshot.
Reads every reviews/<id>_*.md and emits a JSON snapshot the website can
render at /stats. Durability > volume: the dashboard is insurance against
the panel silently rubber-stamping as traffic scales.
Metrics:
- Total reviewed (all time) and within rolling 30-day window
- Recommendation mix (RECOMMEND / REVIEW_FURTHER / REVISE_AND_RESUBMIT / REJECT / PAUSED_AI_FAILURE)
- Disagreement rate (fraction where reviewers split verdicts)
- Per-dimension mean-of-means distribution (histogram bins)
- AI provenance-flag rate (fraction where ai_provenance_signal mean ≤ 2)
No model/vendor identities leak — this snapshot is safe to publish.
"""
from __future__ import annotations
import datetime as _dt
import json
import os
import re
from collections import Counter
RECOMMENDATIONS = ("RECOMMEND", "REVIEW_FURTHER", "REVISE_AND_RESUBMIT", "REJECT", "PAUSED_AI_FAILURE")
DIMENSIONS = (
"Domain Fit",
"Methodological Transparency",
"Internal Consistency",
"Citation Integrity",
"Novelty Signal",
"AI Provenance Signal",
)
def _parse_frontmatter(text: str) -> dict:
if not text.startswith("---\n"):
return {}
end = text.find("\n---\n", 4)
if end < 0:
return {}
out: dict = {}
for line in text[4:end].splitlines():
if ":" not in line:
continue
k, v = line.split(":", 1)
out[k.strip()] = v.strip().strip('"').strip("'")
return out
def _parse_aggregate_means(text: str) -> dict[str, float]:
"""Pull dimension → mean from the aggregate markdown table."""
means: dict[str, float] = {}
in_table = False
for line in text.splitlines():
stripped = line.strip()
if stripped.startswith("## Aggregate Scores"):
in_table = True
continue
if in_table and stripped.startswith("## "):
break
if not in_table or not stripped.startswith("|"):
continue
cells = [c.strip() for c in stripped.strip("|").split("|")]
if len(cells) < 2 or cells[0].lower() == "dimension":
continue
if set("".join(cells)) <= set("- "):
continue
label = cells[0]
try:
means[label] = float(cells[1])
except ValueError:
continue
return means
def _parse_review_date(raw: str) -> _dt.datetime | None:
try:
return _dt.datetime.fromisoformat(raw.replace("Z", "+00:00"))
except Exception:
return None
def _load_rqc_flags(reviews_dir: str) -> dict[str, bool]:
"""Parse review_quality_control_flag from every RQC file by record_id.
Returns {record_id: bool}. Missing RQC file for a record yields no key;
the flag-rate metric excludes un-audited records from both numerator
and denominator.
"""
flags: dict[str, bool] = {}
if not os.path.isdir(reviews_dir):
return flags
for name in sorted(os.listdir(reviews_dir)):
if not name.endswith("_review_quality_control.md"):
continue
path = os.path.join(reviews_dir, name)
with open(path, "r", encoding="utf-8") as f:
fm = _parse_frontmatter(f.read())
rid = str(fm.get("record_id", name.split("_", 1)[0]))
raw = str(fm.get("review_quality_control_flag", "false")).lower()
flags[rid] = raw == "true"
return flags
def _load_reviews(reviews_dir: str) -> list[dict]:
out: list[dict] = []
if not os.path.isdir(reviews_dir):
return out
# Defense-in-depth guard: stats are computed off the per-paper review
# markdown files in reviews/, NOT off the audit log. Test submissions
# bail at the intake handler before the panel runs and therefore
# never produce <id>_*.md files in this directory. The guard below
# asserts the caller did not accidentally hand us a glob that pulls
# in audit-log-test.jsonl alongside the markdown set; if anyone ever
# rewires _load_reviews, this trips before contamination can happen.
assert "test" not in os.path.basename(reviews_dir.rstrip("/")), (
f"stats.py refuses to read from a directory whose basename "
f"contains 'test': {reviews_dir!r}"
)
rqc_flags = _load_rqc_flags(reviews_dir)
for name in sorted(os.listdir(reviews_dir)):
if not name.endswith(".md"):
continue
if name.endswith("_review_quality_control.md"):
# Folded in via rqc_flags; not a panel review.
continue
if name.endswith("_citations.md"):
# Pre-review citation verification artifact, not a panel review.
continue
if "ICSAC-SUB-TEST-" in name:
# Belt-and-suspenders: if a test review file ever does end up
# in reviews/ (e.g. from a hand-run experiment), skip it so
# public stats never count test data.
continue
path = os.path.join(reviews_dir, name)
with open(path, "r", encoding="utf-8") as f:
text = f.read()
fm = _parse_frontmatter(text)
means = _parse_aggregate_means(text)
rid = str(fm.get("record_id", name.split("_", 1)[0]))
out.append(
{
"record_id": rid,
"recommendation": fm.get("recommendation", "REVIEW_FURTHER"),
"disagreement": fm.get("disagreement", "False").lower() == "true",
"review_date": _parse_review_date(fm.get("review_date", "")),
"dimension_means": means,
"rqc_flag": rqc_flags.get(rid),
}
)
return out
def _histogram(values: list[float]) -> dict[str, int]:
"""Distribute 1.0–5.0 scores into five 1-wide bins."""
bins = {"1-1.99": 0, "2-2.99": 0, "3-3.99": 0, "4-4.99": 0, "5": 0}
for v in values:
if v >= 5:
bins["5"] += 1
elif v >= 4:
bins["4-4.99"] += 1
elif v >= 3:
bins["3-3.99"] += 1
elif v >= 2:
bins["2-2.99"] += 1
else:
bins["1-1.99"] += 1
return bins
def compute_stats(reviews_dir: str) -> dict:
reviews = _load_reviews(reviews_dir)
now = _dt.datetime.now(_dt.timezone.utc)
cutoff = now - _dt.timedelta(days=30)
window = [r for r in reviews if r["review_date"] and r["review_date"] >= cutoff]
rec_counts = Counter(r["recommendation"] for r in reviews)
rec_counts_30d = Counter(r["recommendation"] for r in window)
disagree_30d = sum(1 for r in window if r["disagreement"])
dim_hist: dict[str, dict[str, int]] = {}
dim_means: dict[str, float] = {}
for dim in DIMENSIONS:
vals = [r["dimension_means"][dim] for r in reviews if dim in r["dimension_means"]]
dim_hist[dim] = _histogram(vals)
dim_means[dim] = round(sum(vals) / len(vals), 2) if vals else 0.0
provenance_hits = sum(
1 for r in reviews if r["dimension_means"].get("AI Provenance Signal", 5) <= 2
)
total = len(reviews)
total_30d = len(window)
# RQC flag-rate: only count records that were actually audited.
# A None rqc_flag means RQC did not run (older reviews pre-rollout).
audited = [r for r in reviews if r.get("rqc_flag") is not None]
audited_30d = [r for r in window if r.get("rqc_flag") is not None]
rqc_flagged = sum(1 for r in audited if r["rqc_flag"])
rqc_flagged_30d = sum(1 for r in audited_30d if r["rqc_flag"])
def _rate(num: int, denom: int) -> float:
return round(num / denom, 3) if denom else 0.0
return {
"generated_at": now.strftime("%Y-%m-%dT%H:%M:%SZ"),
"total_reviewed": total,
"total_reviewed_30d": total_30d,
"recommendation_mix": {r: rec_counts.get(r, 0) for r in RECOMMENDATIONS},
"recommendation_mix_30d": {r: rec_counts_30d.get(r, 0) for r in RECOMMENDATIONS},
"reject_rate_30d": _rate(rec_counts_30d.get("REJECT", 0), total_30d),
"recommend_rate_30d": _rate(rec_counts_30d.get("RECOMMEND", 0), total_30d),
"disagreement_rate_30d": _rate(disagree_30d, total_30d),
"provenance_hit_rate_overall": _rate(provenance_hits, total),
"dimension_means_overall": dim_means,
"dimension_distribution_overall": dim_hist,
"rqc_audited_count": len(audited),
"rqc_audited_count_30d": len(audited_30d),
"rqc_flagged_count_30d": rqc_flagged_30d,
"rqc_flag_rate_overall": _rate(rqc_flagged, len(audited)),
"rqc_flag_rate_30d": _rate(rqc_flagged_30d, len(audited_30d)),
}
def write_stats(reviews_dir: str, out_path: str) -> str:
stats = compute_stats(reviews_dir)
os.makedirs(os.path.dirname(out_path), exist_ok=True)
with open(out_path, "w", encoding="utf-8") as f:
json.dump(stats, f, indent=2, ensure_ascii=False)
f.write("\n")
return out_path
if __name__ == "__main__":
import sys
rdir = (
sys.argv[1]
if len(sys.argv) > 1
else os.path.join(os.path.dirname(os.path.abspath(__file__)), "reviews")
)
out = (
sys.argv[2]
if len(sys.argv) > 2
else os.path.expanduser(
"~/Desktop/icsac/icsacinstitute.org/src/data/stats.json"
)
)
written = write_stats(rdir, out)
print(f"wrote {written}")