File size: 4,675 Bytes
35c0d38 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 | """Eval runner.
For each prompt in the eval set, run both assistants STATELESSLY (no memory,
no guardrails) and append the responses to ``results/raw.jsonl``. We bypass
guardrails on purpose: the eval is here to measure what the models *would*
produce on their own; the guardrails are evaluated as a separate layer.
The runner is resumable — re-running skips (prompt_id, assistant) pairs that
already have a row in raw.jsonl — so an interrupted long run picks back up.
CLI:
uv run python -m eval.run_eval # full 90-prompt run
uv run python -m eval.run_eval --limit 2 # 2 per dataset (smoke run)
uv run python -m eval.run_eval --assistants claude # only one model
"""
from __future__ import annotations
import argparse
import json
import os
import time
from typing import Callable
from src.observability import flush, observe, trace_attributes
from eval.datasets import EvalItem, load_all
RESULTS_PATH = "./results/raw.jsonl"
def _load_completed(path: str) -> set[tuple[str, str]]:
"""Return the set of (prompt_id, assistant) pairs already in raw.jsonl."""
if not os.path.exists(path):
return set()
done: set[tuple[str, str]] = set()
with open(path, "r", encoding="utf-8") as fh:
for line in fh:
if not line.strip():
continue
row = json.loads(line)
done.add((row["id"], row["assistant"]))
return done
def _append_row(path: str, row: dict) -> None:
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, "a", encoding="utf-8") as fh:
fh.write(json.dumps(row, ensure_ascii=False) + "\n")
# Lazy assistant factories so importing this module never loads Qwen weights.
_ASSISTANT_FACTORIES: dict[str, Callable] = {}
def _factories() -> dict[str, Callable]:
if not _ASSISTANT_FACTORIES:
from src.assistants.frontier import ClaudeAssistant
from src.assistants.oss import QwenAssistant
_ASSISTANT_FACTORIES["claude"] = ClaudeAssistant
_ASSISTANT_FACTORIES["qwen"] = QwenAssistant
return _ASSISTANT_FACTORIES
@observe(name="eval_turn")
def _run_one(assistant_name: str, assistant, item: EvalItem) -> dict:
"""Run one prompt against one assistant; return a result row."""
t0 = time.time()
try:
response = assistant.chat(item.prompt, [])
error = None
except Exception as exc: # noqa: BLE001 - log and keep going
response = ""
error = f"{type(exc).__name__}: {exc}"
return {
"id": item.id,
"dataset": item.dataset,
"category": item.category,
"assistant": assistant_name,
"prompt": item.prompt,
"reference": item.reference,
"response": response,
"latency_sec": round(time.time() - t0, 3),
"error": error,
}
def run(limit: int | None = None, assistants: list[str] | None = None) -> None:
items = load_all()
if limit is not None:
# Keep `limit` items per dataset for a smoke run.
by_ds: dict[str, list[EvalItem]] = {}
for it in items:
by_ds.setdefault(it.dataset, []).append(it)
items = [it for ds_items in by_ds.values() for it in ds_items[:limit]]
names = assistants or ["claude", "qwen"]
factories = _factories()
# Instantiate each requested assistant once (Qwen loads its weights here).
instances = {n: factories[n]() for n in names}
done = _load_completed(RESULTS_PATH)
todo = [
(n, it) for n in names for it in items if (it.id, n) not in done
]
print(f"Eval set: {len(items)} items x {len(names)} assistants = "
f"{len(items)*len(names)} runs; {len(done)} already done, "
f"{len(todo)} remaining.")
for k, (name, item) in enumerate(todo, start=1):
with trace_attributes(
tags=["eval", name, item.dataset],
metadata={"eval_id": item.id},
):
row = _run_one(name, instances[name], item)
_append_row(RESULTS_PATH, row)
flush()
print(
f" [{k}/{len(todo)}] {name} | {item.id} | "
f"{row['latency_sec']}s"
+ (f" | ERROR {row['error']}" if row["error"] else "")
)
print(f"Done. Results -> {RESULTS_PATH}")
def main() -> None:
p = argparse.ArgumentParser()
p.add_argument("--limit", type=int, default=None,
help="N items per dataset (smoke run)")
p.add_argument("--assistants", nargs="+", default=None,
choices=["claude", "qwen"])
args = p.parse_args()
run(limit=args.limit, assistants=args.assistants)
if __name__ == "__main__":
main()
|