File size: 9,295 Bytes
37cb069
 
 
 
 
 
 
 
470bcea
37cb069
 
470bcea
37cb069
 
 
 
 
 
 
 
 
 
 
 
 
470bcea
37cb069
 
 
 
 
 
470bcea
37cb069
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
470bcea
 
37cb069
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
470bcea
37cb069
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
"""Panel-quality snapshot.

Reads every reviews/<id>_*.md and emits a JSON snapshot the website can
render at /stats. Durability > volume: the dashboard is insurance against
the panel silently rubber-stamping as traffic scales.

Metrics:
  - Total reviewed (all time) and within rolling 30-day window
  - Recommendation mix (RECOMMEND / REVIEW_FURTHER / REVISE_AND_RESUBMIT / REJECT / PAUSED_AI_FAILURE)
  - Disagreement rate (fraction where reviewers split verdicts)
  - Per-dimension mean-of-means distribution (histogram bins)
  - AI provenance-flag rate (fraction where ai_provenance_signal mean ≤ 2)

No model/vendor identities leak — this snapshot is safe to publish.
"""

from __future__ import annotations

import datetime as _dt
import json
import os
import re
from collections import Counter


RECOMMENDATIONS = ("RECOMMEND", "REVIEW_FURTHER", "REVISE_AND_RESUBMIT", "REJECT", "PAUSED_AI_FAILURE")
DIMENSIONS = (
    "Domain Fit",
    "Methodological Transparency",
    "Internal Consistency",
    "Citation Integrity",
    "Novelty Signal",
    "AI Provenance Signal",
)


def _parse_frontmatter(text: str) -> dict:
    if not text.startswith("---\n"):
        return {}
    end = text.find("\n---\n", 4)
    if end < 0:
        return {}
    out: dict = {}
    for line in text[4:end].splitlines():
        if ":" not in line:
            continue
        k, v = line.split(":", 1)
        out[k.strip()] = v.strip().strip('"').strip("'")
    return out


def _parse_aggregate_means(text: str) -> dict[str, float]:
    """Pull dimension → mean from the aggregate markdown table."""
    means: dict[str, float] = {}
    in_table = False
    for line in text.splitlines():
        stripped = line.strip()
        if stripped.startswith("## Aggregate Scores"):
            in_table = True
            continue
        if in_table and stripped.startswith("## "):
            break
        if not in_table or not stripped.startswith("|"):
            continue
        cells = [c.strip() for c in stripped.strip("|").split("|")]
        if len(cells) < 2 or cells[0].lower() == "dimension":
            continue
        if set("".join(cells)) <= set("- "):
            continue
        label = cells[0]
        try:
            means[label] = float(cells[1])
        except ValueError:
            continue
    return means


def _parse_review_date(raw: str) -> _dt.datetime | None:
    try:
        return _dt.datetime.fromisoformat(raw.replace("Z", "+00:00"))
    except Exception:
        return None


def _load_rqc_flags(reviews_dir: str) -> dict[str, bool]:
    """Parse review_quality_control_flag from every RQC file by record_id.

    Returns {record_id: bool}. Missing RQC file for a record yields no key;
    the flag-rate metric excludes un-audited records from both numerator
    and denominator.
    """
    flags: dict[str, bool] = {}
    if not os.path.isdir(reviews_dir):
        return flags
    for name in sorted(os.listdir(reviews_dir)):
        if not name.endswith("_review_quality_control.md"):
            continue
        path = os.path.join(reviews_dir, name)
        with open(path, "r", encoding="utf-8") as f:
            fm = _parse_frontmatter(f.read())
        rid = str(fm.get("record_id", name.split("_", 1)[0]))
        raw = str(fm.get("review_quality_control_flag", "false")).lower()
        flags[rid] = raw == "true"
    return flags


def _load_reviews(reviews_dir: str) -> list[dict]:
    out: list[dict] = []
    if not os.path.isdir(reviews_dir):
        return out
    # Defense-in-depth guard: stats are computed off the per-paper review
    # markdown files in reviews/, NOT off the audit log. Test submissions
    # bail at the intake handler before the panel runs and therefore
    # never produce <id>_*.md files in this directory. The guard below
    # asserts the caller did not accidentally hand us a glob that pulls
    # in audit-log-test.jsonl alongside the markdown set; if anyone ever
    # rewires _load_reviews, this trips before contamination can happen.
    assert "test" not in os.path.basename(reviews_dir.rstrip("/")), (
        f"stats.py refuses to read from a directory whose basename "
        f"contains 'test': {reviews_dir!r}"
    )
    rqc_flags = _load_rqc_flags(reviews_dir)
    for name in sorted(os.listdir(reviews_dir)):
        if not name.endswith(".md"):
            continue
        if name.endswith("_review_quality_control.md"):
            # Folded in via rqc_flags; not a panel review.
            continue
        if name.endswith("_citations.md"):
            # Pre-review citation verification artifact, not a panel review.
            continue
        if "ICSAC-SUB-TEST-" in name:
            # Belt-and-suspenders: if a test review file ever does end up
            # in reviews/ (e.g. from a hand-run experiment), skip it so
            # public stats never count test data.
            continue
        path = os.path.join(reviews_dir, name)
        with open(path, "r", encoding="utf-8") as f:
            text = f.read()
        fm = _parse_frontmatter(text)
        means = _parse_aggregate_means(text)
        rid = str(fm.get("record_id", name.split("_", 1)[0]))
        out.append(
            {
                "record_id": rid,
                "recommendation": fm.get("recommendation", "REVIEW_FURTHER"),
                "disagreement": fm.get("disagreement", "False").lower() == "true",
                "review_date": _parse_review_date(fm.get("review_date", "")),
                "dimension_means": means,
                "rqc_flag": rqc_flags.get(rid),
            }
        )
    return out


def _histogram(values: list[float]) -> dict[str, int]:
    """Distribute 1.0–5.0 scores into five 1-wide bins."""
    bins = {"1-1.99": 0, "2-2.99": 0, "3-3.99": 0, "4-4.99": 0, "5": 0}
    for v in values:
        if v >= 5:
            bins["5"] += 1
        elif v >= 4:
            bins["4-4.99"] += 1
        elif v >= 3:
            bins["3-3.99"] += 1
        elif v >= 2:
            bins["2-2.99"] += 1
        else:
            bins["1-1.99"] += 1
    return bins


def compute_stats(reviews_dir: str) -> dict:
    reviews = _load_reviews(reviews_dir)
    now = _dt.datetime.now(_dt.timezone.utc)
    cutoff = now - _dt.timedelta(days=30)

    window = [r for r in reviews if r["review_date"] and r["review_date"] >= cutoff]

    rec_counts = Counter(r["recommendation"] for r in reviews)
    rec_counts_30d = Counter(r["recommendation"] for r in window)

    disagree_30d = sum(1 for r in window if r["disagreement"])

    dim_hist: dict[str, dict[str, int]] = {}
    dim_means: dict[str, float] = {}
    for dim in DIMENSIONS:
        vals = [r["dimension_means"][dim] for r in reviews if dim in r["dimension_means"]]
        dim_hist[dim] = _histogram(vals)
        dim_means[dim] = round(sum(vals) / len(vals), 2) if vals else 0.0

    provenance_hits = sum(
        1 for r in reviews if r["dimension_means"].get("AI Provenance Signal", 5) <= 2
    )

    total = len(reviews)
    total_30d = len(window)

    # RQC flag-rate: only count records that were actually audited.
    # A None rqc_flag means RQC did not run (older reviews pre-rollout).
    audited = [r for r in reviews if r.get("rqc_flag") is not None]
    audited_30d = [r for r in window if r.get("rqc_flag") is not None]
    rqc_flagged = sum(1 for r in audited if r["rqc_flag"])
    rqc_flagged_30d = sum(1 for r in audited_30d if r["rqc_flag"])

    def _rate(num: int, denom: int) -> float:
        return round(num / denom, 3) if denom else 0.0

    return {
        "generated_at": now.strftime("%Y-%m-%dT%H:%M:%SZ"),
        "total_reviewed": total,
        "total_reviewed_30d": total_30d,
        "recommendation_mix": {r: rec_counts.get(r, 0) for r in RECOMMENDATIONS},
        "recommendation_mix_30d": {r: rec_counts_30d.get(r, 0) for r in RECOMMENDATIONS},
        "reject_rate_30d": _rate(rec_counts_30d.get("REJECT", 0), total_30d),
        "recommend_rate_30d": _rate(rec_counts_30d.get("RECOMMEND", 0), total_30d),
        "disagreement_rate_30d": _rate(disagree_30d, total_30d),
        "provenance_hit_rate_overall": _rate(provenance_hits, total),
        "dimension_means_overall": dim_means,
        "dimension_distribution_overall": dim_hist,
        "rqc_audited_count": len(audited),
        "rqc_audited_count_30d": len(audited_30d),
        "rqc_flagged_count_30d": rqc_flagged_30d,
        "rqc_flag_rate_overall": _rate(rqc_flagged, len(audited)),
        "rqc_flag_rate_30d": _rate(rqc_flagged_30d, len(audited_30d)),
    }


def write_stats(reviews_dir: str, out_path: str) -> str:
    stats = compute_stats(reviews_dir)
    os.makedirs(os.path.dirname(out_path), exist_ok=True)
    with open(out_path, "w", encoding="utf-8") as f:
        json.dump(stats, f, indent=2, ensure_ascii=False)
        f.write("\n")
    return out_path


if __name__ == "__main__":
    import sys

    rdir = (
        sys.argv[1]
        if len(sys.argv) > 1
        else os.path.join(os.path.dirname(os.path.abspath(__file__)), "reviews")
    )
    out = (
        sys.argv[2]
        if len(sys.argv) > 2
        else os.path.expanduser(
            "~/Desktop/icsac/icsacinstitute.org/src/data/stats.json"
        )
    )
    written = write_stats(rdir, out)
    print(f"wrote {written}")