File size: 4,675 Bytes
35c0d38
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
"""Eval runner.

For each prompt in the eval set, run both assistants STATELESSLY (no memory,
no guardrails) and append the responses to ``results/raw.jsonl``. We bypass
guardrails on purpose: the eval is here to measure what the models *would*
produce on their own; the guardrails are evaluated as a separate layer.

The runner is resumable — re-running skips (prompt_id, assistant) pairs that
already have a row in raw.jsonl — so an interrupted long run picks back up.

CLI:
    uv run python -m eval.run_eval                  # full 90-prompt run
    uv run python -m eval.run_eval --limit 2        # 2 per dataset (smoke run)
    uv run python -m eval.run_eval --assistants claude  # only one model
"""

from __future__ import annotations

import argparse
import json
import os
import time
from typing import Callable

from src.observability import flush, observe, trace_attributes

from eval.datasets import EvalItem, load_all

RESULTS_PATH = "./results/raw.jsonl"


def _load_completed(path: str) -> set[tuple[str, str]]:
    """Return the set of (prompt_id, assistant) pairs already in raw.jsonl."""
    if not os.path.exists(path):
        return set()
    done: set[tuple[str, str]] = set()
    with open(path, "r", encoding="utf-8") as fh:
        for line in fh:
            if not line.strip():
                continue
            row = json.loads(line)
            done.add((row["id"], row["assistant"]))
    return done


def _append_row(path: str, row: dict) -> None:
    os.makedirs(os.path.dirname(path), exist_ok=True)
    with open(path, "a", encoding="utf-8") as fh:
        fh.write(json.dumps(row, ensure_ascii=False) + "\n")


# Lazy assistant factories so importing this module never loads Qwen weights.
_ASSISTANT_FACTORIES: dict[str, Callable] = {}


def _factories() -> dict[str, Callable]:
    if not _ASSISTANT_FACTORIES:
        from src.assistants.frontier import ClaudeAssistant
        from src.assistants.oss import QwenAssistant

        _ASSISTANT_FACTORIES["claude"] = ClaudeAssistant
        _ASSISTANT_FACTORIES["qwen"] = QwenAssistant
    return _ASSISTANT_FACTORIES


@observe(name="eval_turn")
def _run_one(assistant_name: str, assistant, item: EvalItem) -> dict:
    """Run one prompt against one assistant; return a result row."""
    t0 = time.time()
    try:
        response = assistant.chat(item.prompt, [])
        error = None
    except Exception as exc:  # noqa: BLE001 - log and keep going
        response = ""
        error = f"{type(exc).__name__}: {exc}"
    return {
        "id": item.id,
        "dataset": item.dataset,
        "category": item.category,
        "assistant": assistant_name,
        "prompt": item.prompt,
        "reference": item.reference,
        "response": response,
        "latency_sec": round(time.time() - t0, 3),
        "error": error,
    }


def run(limit: int | None = None, assistants: list[str] | None = None) -> None:
    items = load_all()
    if limit is not None:
        # Keep `limit` items per dataset for a smoke run.
        by_ds: dict[str, list[EvalItem]] = {}
        for it in items:
            by_ds.setdefault(it.dataset, []).append(it)
        items = [it for ds_items in by_ds.values() for it in ds_items[:limit]]

    names = assistants or ["claude", "qwen"]
    factories = _factories()
    # Instantiate each requested assistant once (Qwen loads its weights here).
    instances = {n: factories[n]() for n in names}

    done = _load_completed(RESULTS_PATH)
    todo = [
        (n, it) for n in names for it in items if (it.id, n) not in done
    ]
    print(f"Eval set: {len(items)} items x {len(names)} assistants = "
          f"{len(items)*len(names)} runs; {len(done)} already done, "
          f"{len(todo)} remaining.")

    for k, (name, item) in enumerate(todo, start=1):
        with trace_attributes(
            tags=["eval", name, item.dataset],
            metadata={"eval_id": item.id},
        ):
            row = _run_one(name, instances[name], item)
        _append_row(RESULTS_PATH, row)
        flush()
        print(
            f"  [{k}/{len(todo)}] {name} | {item.id} | "
            f"{row['latency_sec']}s"
            + (f" | ERROR {row['error']}" if row["error"] else "")
        )

    print(f"Done. Results -> {RESULTS_PATH}")


def main() -> None:
    p = argparse.ArgumentParser()
    p.add_argument("--limit", type=int, default=None,
                   help="N items per dataset (smoke run)")
    p.add_argument("--assistants", nargs="+", default=None,
                   choices=["claude", "qwen"])
    args = p.parse_args()
    run(limit=args.limit, assistants=args.assistants)


if __name__ == "__main__":
    main()