File size: 6,684 Bytes
b43d8da
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
"""Cell 22 β€” Markdown summary table (baseline β†’ final β†’ Ξ”).

Renders the markdown table that drives DESIGN.md Β§15 pitch 2:00–2:40
"before/after" slide. Per evaluation.md Β§3.3, Β§3.4, Β§3.5:

- Per-reward baseline mean + 95% CI β†’ final mean + 95% CI β†’ paired Ξ”.
- Per-language breakdown table (n_episodes, reward_mean, R1..R5 means).
- Drift-detection latency before/after row.

Hard rules:
- No LLM-as-judge; static AST scan via ``_NO_LLM_JUDGE_FORBIDDEN_IMPORTS``.
- Every numeric cell rounds to 3 decimals.
"""

from __future__ import annotations

import math
from typing import TYPE_CHECKING

if TYPE_CHECKING:  # pragma: no cover - typing only
    from cells.step_18_eval_baseline import EvalReport, PerLanguageReport


__all__ = [
    "format_per_language_table",
    "format_per_reward_table",
    "print_summary_table",
]


_NO_LLM_JUDGE_FORBIDDEN_IMPORTS: frozenset[str] = frozenset(
    {"openai", "anthropic", "vertexai", "google.generativeai", "cohere"},
)

_REWARD_KEYS: tuple[str, ...] = ("reward", "r1", "r2", "r3", "r4", "r5")


def _fmt_ci(triple: tuple[float, float, float]) -> str:
    mean, lo, hi = triple
    if math.isnan(mean):
        return "NaN"
    return f"{mean:.3f} [{lo:.3f}, {hi:.3f}]"


def _fmt_paired(triple: tuple[float, float, float] | None) -> str:
    if triple is None:
        return "β€”"
    mean, lo, hi = triple
    if math.isnan(mean):
        return "NaN"
    sign = "+" if mean >= 0 else ""
    return f"{sign}{mean:.3f} [{lo:.3f}, {hi:.3f}]"


def format_per_reward_table(baseline: EvalReport, final: EvalReport) -> str:
    """Markdown table: per-reward baseline mean+CI β†’ final mean+CI β†’ Ξ” with CI."""
    paired_block = final.breakdown.get("paired_ci", {})
    if not isinstance(paired_block, dict):
        paired_block = {}

    lines: list[str] = []
    lines.append("| Reward | Baseline mean [95% CI] | Final mean [95% CI] | Ξ” paired [95% CI] |")
    lines.append("|--------|------------------------|---------------------|-------------------|")
    for key in _REWARD_KEYS:
        base_ci = getattr(baseline, f"{key}_mean_ci")
        final_ci = getattr(final, f"{key}_mean_ci")
        paired = paired_block.get(key)
        lines.append(
            f"| {key.upper():6s} | {_fmt_ci(base_ci):22s} | "
            f"{_fmt_ci(final_ci):19s} | {_fmt_paired(paired):17s} |",
        )
    return "\n".join(lines)


def _fmt_lang_cell(value: float) -> str:
    if math.isnan(value):
        return "NaN"
    return f"{value:.3f}"


def _per_lang_lookup(report: EvalReport) -> dict[str, PerLanguageReport]:
    return {pl.language: pl for pl in report.per_language}


def format_per_language_table(baseline: EvalReport, final: EvalReport) -> str:
    """Markdown table: per-language reward_mean baseline β†’ final."""
    base_lookup = _per_lang_lookup(baseline)
    final_lookup = _per_lang_lookup(final)
    languages = sorted(set(base_lookup) | set(final_lookup))

    lines: list[str] = []
    lines.append(
        "| Language | n_episodes | Baseline reward_mean | Final reward_mean | Ξ” reward_mean |",
    )
    lines.append(
        "|----------|------------|----------------------|-------------------|---------------|",
    )
    for lang in languages:
        b = base_lookup.get(lang)
        f = final_lookup.get(lang)
        n = max(b.n_episodes if b else 0, f.n_episodes if f else 0)
        b_mean = b.reward_mean if b else float("nan")
        f_mean = f.reward_mean if f else float("nan")
        if math.isnan(b_mean) or math.isnan(f_mean):
            delta_str = "β€”"
        else:
            delta = f_mean - b_mean
            sign = "+" if delta >= 0 else ""
            delta_str = f"{sign}{delta:.3f}"
        lines.append(
            f"| {lang:8s} | {n:10d} | {_fmt_lang_cell(b_mean):20s} | "
            f"{_fmt_lang_cell(f_mean):17s} | {delta_str:13s} |",
        )
    return "\n".join(lines)


def _fmt_latency(value: float) -> str:
    if math.isnan(value):
        return "NaN"
    return f"{value:.2f}"


def format_drift_latency_table(baseline: EvalReport, final: EvalReport) -> str:
    """Markdown table: drift-detection latency p50/p95 baseline vs final."""
    bl = baseline.drift_detection_latency
    fl = final.drift_detection_latency
    lines: list[str] = []
    lines.append("| Stage | Baseline p50 | Baseline p95 | Final p50 | Final p95 | Undetected |")
    lines.append("|-------|--------------|--------------|-----------|-----------|------------|")
    lines.append(
        f"| Stage 2 | {_fmt_latency(bl.stage2_median):12s} | "
        f"{_fmt_latency(bl.stage2_p95):12s} | "
        f"{_fmt_latency(fl.stage2_median):9s} | "
        f"{_fmt_latency(fl.stage2_p95):9s} | "
        f"{fl.undetected_count:10d} |",
    )
    lines.append(
        f"| Stage 3 | {_fmt_latency(bl.stage3_median):12s} | "
        f"{_fmt_latency(bl.stage3_p95):12s} | "
        f"{_fmt_latency(fl.stage3_median):9s} | "
        f"{_fmt_latency(fl.stage3_p95):9s} | "
        f"{bl.undetected_count:10d} |",
    )
    return "\n".join(lines)


def print_summary_table(baseline: EvalReport, final: EvalReport) -> str:
    """Top-level entry point β€” emit the full multi-section markdown summary."""
    sections: list[str] = []
    sections.append("# DriftCall β€” Baseline β†’ Final summary")
    sections.append("")
    sections.append(f"**Baseline model:** `{baseline.model_path}`")
    sections.append(f"**Final model:** `{final.model_path}`")
    sections.append(f"**Episodes:** baseline {baseline.n_episodes}, final {final.n_episodes}")
    sections.append("")
    sections.append("## Per-reward (mean + 95% CI)")
    sections.append("")
    sections.append(format_per_reward_table(baseline, final))
    sections.append("")
    sections.append("## Per-language breakdown")
    sections.append("")
    sections.append(format_per_language_table(baseline, final))
    sections.append("")
    sections.append("## Drift-detection latency")
    sections.append("")
    sections.append(format_drift_latency_table(baseline, final))
    sections.append("")

    # Reward-hacking offenses summary (DESIGN.md Β§15 pitch).
    sections.append("## Reward-hacking offenses (final vs baseline)")
    sections.append("")
    sections.append("| Class | Baseline | Final |")
    sections.append("|-------|----------|-------|")
    keys = sorted(set(baseline.reward_hacking_offenses) | set(final.reward_hacking_offenses))
    for key in keys:
        b_count = baseline.reward_hacking_offenses.get(key, 0)
        f_count = final.reward_hacking_offenses.get(key, 0)
        sections.append(f"| {key:22s} | {b_count:8d} | {f_count:5d} |")
    sections.append("")
    return "\n".join(sections)