CounterFeint / eval_suite.py
QuantumTransformer's picture
Upload folder using huggingface_hub
26bf1c9 verified
Raw
History Blame Contribute Delete
25.5 kB
"""
CounterFeint — Before/After Evaluation Lane.
This module provides a reproducible, held-out eval harness that compares two
Investigator policies ("before training" vs "after training") on a fixed set
of (task_id, seed) tuples which are **distinct** from any seeds used during
training. The outputs are the three artefacts the README's headline section
needs:
eval_results.json — full per-episode metrics for every (tag, task, seed)
eval_summary.md — markdown delta table, human-readable
eval_plot.png — bar-chart visualisation (matplotlib, dev-only)
Per-episode metrics tracked:
* ``grader_score`` — Investigator triage/calibration score from
:func:`counterfeint.graders.grade_episode`
* ``track_a_score`` — reasoning-audit score from the Auditor's
submitted :class:`AuditReport`
* ``track_b_score`` — Fraudster plausibility score (sanity signal:
higher = realistic adversary)
* ``n_fraud_leaks`` — false approvals on ground-truth fraud ads
(the single most visible failure mode)
* ``budget_used_pct`` — fraction of the Investigator's action budget
actually consumed (efficiency proxy)
* ``fallback_count`` — LLM → scripted fallback count for the
Investigator in this episode (0 means the
LLM answered every turn cleanly)
* ``rewards_by_role`` — raw cumulative rewards per role
Design notes
------------
* **Factories, not instances.** The caller provides zero-argument callables
that construct a fresh Investigator (and optional Fraudster / Auditor).
This guarantees clean per-episode state — especially important for the
:class:`counterfeint.agents.base.LLMPolicyBase` ``fallback_count``.
* **Live server required.** The eval sweep drives real episodes via
:func:`counterfeint.inference.run_three_agent_episode`, so the CounterFeint
FraudArena server must be running at ``COUNTERFEINT_ENV_URL``.
* **Sweep modes.** The primary sweep uses :class:`ReactiveFraudster` (stable,
headline numbers). The caller can swap in :class:`LLMFraudster` for a
harder adversarial-curriculum eval, or set ``use_real_world_ads=True`` to
route through the Meta-CIB-modeled holdout set (see
:mod:`counterfeint.data.real_world_loader`) once that asset lands.
"""
from __future__ import annotations
import json
import logging
import os
import sys
import time
from dataclasses import asdict, dataclass, field
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional
try:
from .data.real_world_loader import (
HoldoutAd,
count_by_ring,
list_case_studies,
load_real_world_holdout,
)
from .inference import ENV_URL, run_three_agent_episode
from .scripted import HeuristicAuditor, ReactiveFraudster, ScriptedInvestigator
except ImportError: # pragma: no cover - script execution fallback
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
from counterfeint.data.real_world_loader import ( # type: ignore[no-redef]
HoldoutAd,
count_by_ring,
list_case_studies,
load_real_world_holdout,
)
from counterfeint.inference import ENV_URL, run_three_agent_episode # type: ignore[no-redef]
from counterfeint.scripted import ( # type: ignore[no-redef]
HeuristicAuditor,
ReactiveFraudster,
ScriptedInvestigator,
)
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Public constants
# ---------------------------------------------------------------------------
EVAL_SEEDS: Dict[str, List[int]] = {
"task_1": list(range(1001, 1011)), # 1001..1010
"task_2": list(range(2001, 2011)), # 2001..2010
"task_3": list(range(3001, 3011)), # 3001..3010
"task_3_unseen": list(range(4001, 4006)), # 4001..4005 — held-out budget regime
}
"""Held-out seeds — 10 per training-tier task, plus 5 generalisation seeds
on ``task_3_unseen``. All seeds are deliberately disjoint from the training
seed range (which uses seed=42 for the scripted baseline and small integers
for self-play rollouts). ``task_3_unseen`` shares the template universe of
``task_3`` but ships a tighter action budget (~1.2/ad) and one extra ring,
so a model that over-fit to ``task_3``'s budget distribution should regress
visibly on these seeds. Five seeds (vs ten on the training-tier tasks)
keeps the eval-loop wallclock from doubling for what is purely a
generalisation probe.
Judges can reproduce any eval number with
``python -m counterfeint.eval_suite --seed 1001 --task task_1``.
"""
PolicyFactory = Callable[[], Any]
"""Zero-arg callable returning a fresh Policy (one per episode)."""
# ---------------------------------------------------------------------------
# Metric dataclasses
# ---------------------------------------------------------------------------
@dataclass
class EpisodeMetrics:
"""Flat, JSON-serialisable record of a single eval episode."""
tag: str
task_id: str
seed: int
grader_score: float
track_a_score: float
track_b_score: float
n_fraud_leaks: int
n_ground_truth_fraud: int
budget_used_pct: float
fallback_count: int
steps: int
end_reason: Optional[str]
rewards_by_role: Dict[str, float] = field(default_factory=dict)
error: Optional[str] = None
def to_dict(self) -> Dict[str, Any]:
return asdict(self)
@dataclass
class AggregatedMetrics:
"""Averages over the 10 episodes of a single task (under one tag)."""
tag: str
task_id: str
n_episodes: int
grader_score_mean: float
track_a_score_mean: float
n_fraud_leaks_mean: float
budget_used_pct_mean: float
fallback_count_total: int
errors: int
def to_dict(self) -> Dict[str, Any]:
return asdict(self)
# ---------------------------------------------------------------------------
# Metric extraction (pure, unit-testable)
# ---------------------------------------------------------------------------
def _parse_episode_metrics(
tag: str,
task_id: str,
seed: int,
episode_result: Dict[str, Any],
) -> EpisodeMetrics:
"""Distill a ``run_three_agent_episode`` return dict into flat metrics.
Pure function — no I/O, no server calls. Kept separate from
:func:`run_eval` so it can be exercised directly from unit tests
without spinning up a live environment.
"""
final_state = episode_result.get("final_state") or {}
audit_report = final_state.get("audit_report") or {}
track_a_score = float(audit_report.get("investigator_audit_score", 1.0))
track_b_score = float(audit_report.get("fraudster_plausibility_score", 1.0))
investigator_state = final_state.get("investigator_state") or {}
verdicts = investigator_state.get("verdicts") or {}
n_fraud_leaks = 0
n_ground_truth_fraud = 0
for v in verdicts.values():
ground_truth = v.get("ground_truth")
if ground_truth == "fraud":
n_ground_truth_fraud += 1
if v.get("verdict") == "approve":
n_fraud_leaks += 1
total_ads = int(investigator_state.get("total_ads") or 0)
remaining_budget = int(investigator_state.get("remaining_budget") or 0)
budget_used_pct = 0.0
if total_ads > 0 and remaining_budget >= 0:
# action_budget per task is proportional to total_ads; we express
# budget consumption as "1 - remaining/total" as a stable proxy.
budget_used_pct = max(0.0, min(1.0, 1.0 - (remaining_budget / max(total_ads, 1))))
fallback_counts = episode_result.get("fallback_counts") or {}
fallback_count = int(fallback_counts.get("investigator", 0))
return EpisodeMetrics(
tag=tag,
task_id=task_id,
seed=seed,
grader_score=float(episode_result.get("grader_score", 0.0)),
track_a_score=track_a_score,
track_b_score=track_b_score,
n_fraud_leaks=n_fraud_leaks,
n_ground_truth_fraud=n_ground_truth_fraud,
budget_used_pct=budget_used_pct,
fallback_count=fallback_count,
steps=int(episode_result.get("steps", 0)),
end_reason=episode_result.get("end_reason"),
rewards_by_role=dict(episode_result.get("rewards_by_role") or {}),
error=episode_result.get("error"),
)
def _aggregate_per_task(
tag: str,
task_id: str,
episodes: List[EpisodeMetrics],
) -> AggregatedMetrics:
"""Compute mean-over-episodes for the summary markdown + plot."""
valid = [m for m in episodes if m.error is None]
n = len(valid)
if n == 0:
return AggregatedMetrics(
tag=tag,
task_id=task_id,
n_episodes=0,
grader_score_mean=0.0,
track_a_score_mean=0.0,
n_fraud_leaks_mean=0.0,
budget_used_pct_mean=0.0,
fallback_count_total=sum(m.fallback_count for m in episodes),
errors=len(episodes) - n,
)
return AggregatedMetrics(
tag=tag,
task_id=task_id,
n_episodes=n,
grader_score_mean=sum(m.grader_score for m in valid) / n,
track_a_score_mean=sum(m.track_a_score for m in valid) / n,
n_fraud_leaks_mean=sum(m.n_fraud_leaks for m in valid) / n,
budget_used_pct_mean=sum(m.budget_used_pct for m in valid) / n,
fallback_count_total=sum(m.fallback_count for m in episodes),
errors=len(episodes) - n,
)
# ---------------------------------------------------------------------------
# Core sweep
# ---------------------------------------------------------------------------
def run_eval(
*,
tag: str,
investigator_factory: PolicyFactory,
fraudster_factory: PolicyFactory = lambda: ReactiveFraudster(seed=42),
auditor_factory: PolicyFactory = lambda: HeuristicAuditor(),
seeds: Optional[Dict[str, List[int]]] = None,
env_base_url: str = ENV_URL,
max_steps: int = 200,
log: bool = False,
) -> Dict[str, List[EpisodeMetrics]]:
"""Run the held-out sweep under a single named policy configuration.
Parameters
----------
tag
Short string tagging this run in outputs (e.g. ``"before"``,
``"after_grpo_v1"``). Shown in the summary table and plot legend.
investigator_factory
Zero-arg callable building the Investigator to evaluate.
fraudster_factory, auditor_factory
Zero-arg callables for the opponent and auditor. Default to the
scripted ``ReactiveFraudster`` / ``HeuristicAuditor`` so eval
numbers are stable across runs.
seeds
Override :data:`EVAL_SEEDS`. Useful for a smoke test with a
single seed per task.
env_base_url
CounterFeint server URL. Default: ``COUNTERFEINT_ENV_URL`` env
var or ``http://localhost:8000``.
max_steps, log
Forwarded to :func:`run_three_agent_episode`.
Returns
-------
dict
``{task_id: [EpisodeMetrics, ...]}`` with one entry per seed.
"""
seeds = seeds or EVAL_SEEDS
results: Dict[str, List[EpisodeMetrics]] = {}
for task_id, task_seeds in seeds.items():
task_metrics: List[EpisodeMetrics] = []
for seed in task_seeds:
logger.info(
"[%s] running %s seed=%d (fraudster=%s, investigator=%s)...",
tag,
task_id,
seed,
type(fraudster_factory()).__name__,
type(investigator_factory()).__name__,
)
try:
episode_result = run_three_agent_episode(
task_id,
seed=seed,
env_base_url=env_base_url,
max_steps=max_steps,
fraudster_policy=fraudster_factory(),
investigator_policy=investigator_factory(),
auditor_policy=auditor_factory(),
log=log,
)
except Exception as exc: # noqa: BLE001 - eval must never raise
logger.exception(
"[%s] %s seed=%d raised: %s", tag, task_id, seed, exc
)
episode_result = {
"task_id": task_id,
"grader_score": 0.0,
"steps": 0,
"end_reason": "eval_exception",
"rewards_by_role": {},
"fallback_counts": {},
"final_state": {},
"error": str(exc),
}
metrics = _parse_episode_metrics(tag, task_id, seed, episode_result)
task_metrics.append(metrics)
logger.info(
"[%s] %s seed=%d score=%.3f leaks=%d fallbacks=%d",
tag,
task_id,
seed,
metrics.grader_score,
metrics.n_fraud_leaks,
metrics.fallback_count,
)
results[task_id] = task_metrics
return results
# ---------------------------------------------------------------------------
# Before/After artefact writers
# ---------------------------------------------------------------------------
def summarize_real_world_holdout() -> Dict[str, Any]:
"""Summary of the Meta-CIB-modeled holdout dataset (no opt-in needed).
Pulls counts via :func:`counterfeint.data.real_world_loader.count_by_ring`
so we can render "evaluated against N synthetic ads grounded in M
Meta CIB case studies" without ever loading the ad text itself.
Suitable for embedding in :func:`run_before_after` outputs and the
README "Evaluated against Meta-CIB-modeled ads" subsection.
"""
counts = count_by_ring()
case_studies = list_case_studies()
return {
"n_ads_total": sum(counts.values()),
"n_case_studies": len(case_studies),
"case_studies": case_studies,
"ads_per_case_study": counts,
}
def _write_eval_json(
before: Dict[str, List[EpisodeMetrics]],
after: Dict[str, List[EpisodeMetrics]],
before_tag: str,
after_tag: str,
path: Path,
*,
holdout_summary: Optional[Dict[str, Any]] = None,
) -> None:
payload = {
"schema": "counterfeint.eval_suite.v1",
"generated_at": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
"tags": {"before": before_tag, "after": after_tag},
"before": {
task_id: [m.to_dict() for m in metrics]
for task_id, metrics in before.items()
},
"after": {
task_id: [m.to_dict() for m in metrics]
for task_id, metrics in after.items()
},
}
if holdout_summary is not None:
payload["real_world_holdout"] = holdout_summary
path.write_text(json.dumps(payload, indent=2), encoding="utf-8")
def _format_delta(delta: float, *, higher_is_better: bool = True) -> str:
sign = "+" if delta >= 0 else ""
marker = ""
if higher_is_better and delta > 0:
marker = " ↑"
elif higher_is_better and delta < 0:
marker = " ↓"
elif not higher_is_better and delta < 0:
marker = " ↑" # lower is better, so a decrease is an improvement
elif not higher_is_better and delta > 0:
marker = " ↓"
return f"{sign}{delta:.3f}{marker}"
def _write_eval_summary_md(
before_agg: Dict[str, AggregatedMetrics],
after_agg: Dict[str, AggregatedMetrics],
before_tag: str,
after_tag: str,
path: Path,
) -> None:
"""Render a human-readable delta table to ``eval_summary.md``."""
lines: List[str] = []
lines.append(f"# CounterFeint Eval: `{before_tag}` → `{after_tag}`")
lines.append("")
lines.append(
f"Held-out sweep across {len(EVAL_SEEDS)} tasks × 10 seeds each, "
f"evaluated against `ReactiveFraudster` (stable adversary)."
)
lines.append("")
lines.append(
"| Task | Metric | "
f"{before_tag} | {after_tag} | Delta |"
)
lines.append("|------|--------|--------|-------|-------|")
task_ids = list(before_agg.keys() or after_agg.keys())
for task_id in task_ids:
b = before_agg.get(task_id)
a = after_agg.get(task_id)
if b is None or a is None:
continue
def _row(label: str, b_val: float, a_val: float, *, higher_is_better: bool = True) -> str:
delta = a_val - b_val
return (
f"| {task_id} | {label} | "
f"{b_val:.3f} | {a_val:.3f} | "
f"{_format_delta(delta, higher_is_better=higher_is_better)} |"
)
lines.append(_row("grader_score (↑)", b.grader_score_mean, a.grader_score_mean))
lines.append(_row("track_a_score (↑)", b.track_a_score_mean, a.track_a_score_mean))
lines.append(
_row(
"n_fraud_leaks (↓)",
b.n_fraud_leaks_mean,
a.n_fraud_leaks_mean,
higher_is_better=False,
)
)
lines.append(
_row(
"budget_used_pct (↓)",
b.budget_used_pct_mean,
a.budget_used_pct_mean,
higher_is_better=False,
)
)
lines.append("")
if any(a.fallback_count_total for a in after_agg.values()):
lines.append("### Fallback activity (LLM → scripted, per tag/task)")
lines.append("")
lines.append(f"| Task | {before_tag} fallbacks | {after_tag} fallbacks |")
lines.append("|------|----------------------|----------------------|")
for task_id in task_ids:
b = before_agg.get(task_id)
a = after_agg.get(task_id)
if b is None or a is None:
continue
lines.append(
f"| {task_id} | {b.fallback_count_total} | {a.fallback_count_total} |"
)
path.write_text("\n".join(lines) + "\n", encoding="utf-8")
def _write_eval_plot(
before_agg: Dict[str, AggregatedMetrics],
after_agg: Dict[str, AggregatedMetrics],
before_tag: str,
after_tag: str,
path: Path,
) -> None:
"""Render a 2x2 bar-chart PNG comparing the four headline metrics.
matplotlib is a **dev-only** dependency (see ``requirements-dev.txt``)
to keep the runtime Docker image slim. If matplotlib is not available,
this writes a small text stub next to ``path`` instead of raising.
"""
try:
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt # noqa: WPS433 - lazy import by design
except ImportError:
stub = path.with_suffix(".txt")
stub.write_text(
"matplotlib not installed — install requirements-dev.txt to generate the PNG.\n",
encoding="utf-8",
)
logger.warning("matplotlib unavailable; wrote stub to %s", stub)
return
task_ids = list(before_agg.keys() or after_agg.keys())
if not task_ids:
logger.warning("No tasks to plot; skipping %s", path)
return
n_tasks = len(task_ids)
fig, axes = plt.subplots(2, 2, figsize=(10, 7))
fig.suptitle(
f"CounterFeint Eval: {before_tag} vs {after_tag}\n"
f"{n_tasks} tasks × 10 held-out seeds"
)
metric_specs = [
("grader_score_mean", "Grader score (↑ better)", axes[0][0]),
("track_a_score_mean", "Track A reasoning (↑)", axes[0][1]),
("n_fraud_leaks_mean", "Mean fraud leaks (↓)", axes[1][0]),
("budget_used_pct_mean", "Budget used pct (↓)", axes[1][1]),
]
for attr, title, ax in metric_specs:
before_vals = [getattr(before_agg[t], attr) for t in task_ids]
after_vals = [getattr(after_agg[t], attr) for t in task_ids]
x = list(range(n_tasks))
width = 0.35
ax.bar([i - width / 2 for i in x], before_vals, width, label=before_tag)
ax.bar([i + width / 2 for i in x], after_vals, width, label=after_tag)
ax.set_title(title)
ax.set_xticks(x)
ax.set_xticklabels(task_ids)
ax.legend(fontsize="x-small")
ax.grid(True, axis="y", linestyle="--", alpha=0.4)
fig.tight_layout(rect=(0, 0, 1, 0.94))
fig.savefig(path, dpi=150)
plt.close(fig)
logger.info("Wrote %s", path)
def run_before_after(
*,
before_tag: str,
after_tag: str,
before_investigator_factory: PolicyFactory,
after_investigator_factory: PolicyFactory,
fraudster_factory: PolicyFactory = lambda: ReactiveFraudster(seed=42),
auditor_factory: PolicyFactory = lambda: HeuristicAuditor(),
out_dir: Path,
seeds: Optional[Dict[str, List[int]]] = None,
env_base_url: str = ENV_URL,
include_real_world_summary: bool = True,
) -> Dict[str, Any]:
"""Run the full before/after comparison and write all three artefacts.
Writes:
* ``{out_dir}/eval_results.json``
* ``{out_dir}/eval_summary.md``
* ``{out_dir}/eval_plot.png`` (stub .txt if matplotlib is missing)
Returns the aggregated metrics dict for programmatic use (e.g. the
final cell of the Colab training notebook).
"""
out_dir.mkdir(parents=True, exist_ok=True)
logger.info("=== Running BEFORE sweep: %s ===", before_tag)
before = run_eval(
tag=before_tag,
investigator_factory=before_investigator_factory,
fraudster_factory=fraudster_factory,
auditor_factory=auditor_factory,
seeds=seeds,
env_base_url=env_base_url,
)
logger.info("=== Running AFTER sweep: %s ===", after_tag)
after = run_eval(
tag=after_tag,
investigator_factory=after_investigator_factory,
fraudster_factory=fraudster_factory,
auditor_factory=auditor_factory,
seeds=seeds,
env_base_url=env_base_url,
)
before_agg = {
task_id: _aggregate_per_task(before_tag, task_id, eps)
for task_id, eps in before.items()
}
after_agg = {
task_id: _aggregate_per_task(after_tag, task_id, eps)
for task_id, eps in after.items()
}
holdout_summary: Optional[Dict[str, Any]] = None
if include_real_world_summary:
try:
holdout_summary = summarize_real_world_holdout()
except Exception as exc: # noqa: BLE001
logger.warning("Could not summarise holdout dataset: %s", exc)
json_path = out_dir / "eval_results.json"
md_path = out_dir / "eval_summary.md"
png_path = out_dir / "eval_plot.png"
_write_eval_json(
before,
after,
before_tag,
after_tag,
json_path,
holdout_summary=holdout_summary,
)
_write_eval_summary_md(before_agg, after_agg, before_tag, after_tag, md_path)
_write_eval_plot(before_agg, after_agg, before_tag, after_tag, png_path)
logger.info("Wrote %s, %s, %s", json_path, md_path, png_path)
return {
"before": {t: m.to_dict() for t, m in before_agg.items()},
"after": {t: m.to_dict() for t, m in after_agg.items()},
"real_world_holdout": holdout_summary,
"out_dir": str(out_dir),
}
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def _default_out_dir() -> Path:
return Path(os.environ.get("COUNTERFEINT_EVAL_OUT_DIR", "eval_outputs"))
def main() -> None: # pragma: no cover - thin CLI glue
import argparse
logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s")
parser = argparse.ArgumentParser(
description="CounterFeint before/after eval sweep (scripted vs scripted by default)."
)
parser.add_argument("--out-dir", type=Path, default=_default_out_dir())
parser.add_argument("--before-tag", default="scripted")
parser.add_argument("--after-tag", default="scripted_rerun")
parser.add_argument(
"--smoke",
action="store_true",
help="Run one seed per task for a quick smoke test.",
)
args = parser.parse_args()
seeds: Optional[Dict[str, List[int]]] = None
if args.smoke:
seeds = {task: [seeds_list[0]] for task, seeds_list in EVAL_SEEDS.items()}
run_before_after(
before_tag=args.before_tag,
after_tag=args.after_tag,
before_investigator_factory=lambda: ScriptedInvestigator(),
after_investigator_factory=lambda: ScriptedInvestigator(),
out_dir=args.out_dir,
seeds=seeds,
)
if __name__ == "__main__": # pragma: no cover
main()