"""Eval runner. For each prompt in the eval set, run both assistants STATELESSLY (no memory, no guardrails) and append the responses to ``results/raw.jsonl``. We bypass guardrails on purpose: the eval is here to measure what the models *would* produce on their own; the guardrails are evaluated as a separate layer. The runner is resumable — re-running skips (prompt_id, assistant) pairs that already have a row in raw.jsonl — so an interrupted long run picks back up. CLI: uv run python -m eval.run_eval # full 90-prompt run uv run python -m eval.run_eval --limit 2 # 2 per dataset (smoke run) uv run python -m eval.run_eval --assistants claude # only one model """ from __future__ import annotations import argparse import json import os import time from typing import Callable from src.observability import flush, observe, trace_attributes from eval.datasets import EvalItem, load_all RESULTS_PATH = "./results/raw.jsonl" def _load_completed(path: str) -> set[tuple[str, str]]: """Return the set of (prompt_id, assistant) pairs already in raw.jsonl.""" if not os.path.exists(path): return set() done: set[tuple[str, str]] = set() with open(path, "r", encoding="utf-8") as fh: for line in fh: if not line.strip(): continue row = json.loads(line) done.add((row["id"], row["assistant"])) return done def _append_row(path: str, row: dict) -> None: os.makedirs(os.path.dirname(path), exist_ok=True) with open(path, "a", encoding="utf-8") as fh: fh.write(json.dumps(row, ensure_ascii=False) + "\n") # Lazy assistant factories so importing this module never loads Qwen weights. _ASSISTANT_FACTORIES: dict[str, Callable] = {} def _factories() -> dict[str, Callable]: if not _ASSISTANT_FACTORIES: from src.assistants.frontier import ClaudeAssistant from src.assistants.oss import QwenAssistant _ASSISTANT_FACTORIES["claude"] = ClaudeAssistant _ASSISTANT_FACTORIES["qwen"] = QwenAssistant return _ASSISTANT_FACTORIES @observe(name="eval_turn") def _run_one(assistant_name: str, assistant, item: EvalItem) -> dict: """Run one prompt against one assistant; return a result row.""" t0 = time.time() try: response = assistant.chat(item.prompt, []) error = None except Exception as exc: # noqa: BLE001 - log and keep going response = "" error = f"{type(exc).__name__}: {exc}" return { "id": item.id, "dataset": item.dataset, "category": item.category, "assistant": assistant_name, "prompt": item.prompt, "reference": item.reference, "response": response, "latency_sec": round(time.time() - t0, 3), "error": error, } def run(limit: int | None = None, assistants: list[str] | None = None) -> None: items = load_all() if limit is not None: # Keep `limit` items per dataset for a smoke run. by_ds: dict[str, list[EvalItem]] = {} for it in items: by_ds.setdefault(it.dataset, []).append(it) items = [it for ds_items in by_ds.values() for it in ds_items[:limit]] names = assistants or ["claude", "qwen"] factories = _factories() # Instantiate each requested assistant once (Qwen loads its weights here). instances = {n: factories[n]() for n in names} done = _load_completed(RESULTS_PATH) todo = [ (n, it) for n in names for it in items if (it.id, n) not in done ] print(f"Eval set: {len(items)} items x {len(names)} assistants = " f"{len(items)*len(names)} runs; {len(done)} already done, " f"{len(todo)} remaining.") for k, (name, item) in enumerate(todo, start=1): with trace_attributes( tags=["eval", name, item.dataset], metadata={"eval_id": item.id}, ): row = _run_one(name, instances[name], item) _append_row(RESULTS_PATH, row) flush() print( f" [{k}/{len(todo)}] {name} | {item.id} | " f"{row['latency_sec']}s" + (f" | ERROR {row['error']}" if row["error"] else "") ) print(f"Done. Results -> {RESULTS_PATH}") def main() -> None: p = argparse.ArgumentParser() p.add_argument("--limit", type=int, default=None, help="N items per dataset (smoke run)") p.add_argument("--assistants", nargs="+", default=None, choices=["claude", "qwen"]) args = p.parse_args() run(limit=args.limit, assistants=args.assistants) if __name__ == "__main__": main()