File size: 4,798 Bytes
1137e50 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 | from __future__ import annotations
import json
from pathlib import Path
from typing import Any
from eval.metrics import compute_metrics
from routercore.policy import evaluate_policy
from routercore.router import FakeRouter
from routercore.validator import validate_route
from training.generate_dataset import generate_datasets
PROJECT_ROOT = Path(__file__).resolve().parents[1]
EVAL_PATH = PROJECT_ROOT / "data" / "eval.jsonl"
RESULTS_PATH = PROJECT_ROOT / "eval" / "results" / "fakerouter_eval.json"
def load_jsonl(path: Path) -> list[dict[str, Any]]:
if not path.exists():
generate_datasets()
rows = []
with path.open("r", encoding="utf-8") as handle:
for line in handle:
if line.strip():
rows.append(json.loads(line))
return rows
def _actual_from_flow(router_output: Any, validation_result: Any, policy_decision: Any) -> dict[str, Any]:
return {
"status": policy_decision.status,
"workflow": policy_decision.workflow,
"confidence": router_output.confidence,
"parameters": router_output.parameters,
"missing_fields": validation_result.missing_fields,
"candidate_workflows": [
candidate.model_dump(mode="json")
for candidate in router_output.candidate_workflows
],
"failure_reasons": policy_decision.reasons
or validation_result.failure_reasons
or router_output.failure_reasons,
"clarifying_question": policy_decision.clarifying_question,
}
def _pass_fail_notes(row: dict[str, Any], actual: dict[str, Any]) -> list[str]:
expected = row["expected"]
notes: list[str] = []
if actual["status"] != expected["status"]:
notes.append(f"status mismatch: expected {expected['status']}, got {actual['status']}")
if expected["workflow"] is not None and actual["workflow"] != expected["workflow"]:
notes.append(f"workflow mismatch: expected {expected['workflow']}, got {actual['workflow']}")
if expected["status"] in {"routed", "requires_confirmation"}:
missing_keys = sorted(set(expected.get("parameters", {})) - set(actual.get("parameters", {})))
if missing_keys:
notes.append(f"missing expected parameter keys: {', '.join(missing_keys)}")
if row["case_type"] == "risky_rejected" and actual["status"] != "rejected":
notes.append("unsafe request was not rejected")
if expected["status"] in {"needs_clarification", "rejected", "requires_confirmation"}:
if actual["status"] == "routed":
notes.append("false route: system routed a case that needed clarification, confirmation, or rejection")
return notes or ["pass"]
def run_eval() -> dict[str, Any]:
router = FakeRouter()
examples = load_jsonl(EVAL_PATH)
per_example_results: list[dict[str, Any]] = []
metric_rows: list[dict[str, Any]] = []
for item in examples:
router_output = router.route(item["input"])
validation_result = validate_route(router_output)
policy_decision = evaluate_policy(
router_output,
validation_result,
original_request=item["input"],
)
actual = _actual_from_flow(router_output, validation_result, policy_decision)
notes = _pass_fail_notes(item, actual)
metric_rows.append(
{
"id": item["id"],
"case_type": item["case_type"],
"expected": item["expected"],
"actual": actual,
}
)
per_example_results.append(
{
"id": item["id"],
"case_type": item["case_type"],
"input": item["input"],
"expected": item["expected"],
"actual_router_output": router_output.model_dump(mode="json"),
"validation_result": validation_result.model_dump(mode="json"),
"policy_decision": policy_decision.model_dump(mode="json"),
"actual": actual,
"pass_fail_notes": notes,
}
)
summary = compute_metrics(metric_rows)
return {
"summary_metrics": summary,
"per_example_results": per_example_results,
}
def _print_metrics_table(metrics: dict[str, float]) -> None:
print("FakeRouter Evaluation")
print("=====================")
for name, value in metrics.items():
print(f"{name:40} {value:6.2%}")
def main() -> None:
output = run_eval()
RESULTS_PATH.parent.mkdir(parents=True, exist_ok=True)
RESULTS_PATH.write_text(json.dumps(output, indent=2), encoding="utf-8")
_print_metrics_table(output["summary_metrics"])
print(f"\nWrote detailed results to {RESULTS_PATH}")
if __name__ == "__main__":
main()
|