| from __future__ import annotations |
|
|
| import os |
|
|
| |
| os.environ.setdefault("TF_CPP_MIN_LOG_LEVEL", "2") |
| os.environ.setdefault("ABSL_MIN_LOG_LEVEL", "2") |
|
|
| import argparse |
| import gc |
| import json |
| import sys |
| from pathlib import Path |
|
|
| import pandas as pd |
| from sklearn.metrics import accuracy_score, classification_report, confusion_matrix, f1_score |
|
|
| BASE_DIR = Path(__file__).resolve().parent.parent |
| if str(BASE_DIR) not in sys.path: |
| sys.path.insert(0, str(BASE_DIR)) |
|
|
| from combined_inference import classify_query |
| from config import ( |
| DEFAULT_BENCHMARK_PATH, |
| EVALUATION_ARTIFACTS_DIR, |
| HEAD_CONFIGS, |
| IAB_HEAD_CONFIG, |
| IAB_BEHAVIOR_LOCK_CASES_PATH, |
| IAB_CROSS_VERTICAL_BEHAVIOR_LOCK_CASES_PATH, |
| IAB_CROSS_VERTICAL_QUALITY_TARGET_CASES_PATH, |
| IAB_QUALITY_TARGET_CASES_PATH, |
| KNOWN_FAILURE_CASES_PATH, |
| ensure_artifact_dirs, |
| ) |
| from evaluation.regression_suite import ( |
| evaluate_iab_behavior_lock_cases, |
| evaluate_iab_cross_vertical_behavior_lock_cases, |
| evaluate_iab_cross_vertical_quality_target_cases, |
| evaluate_iab_quality_target_cases, |
| evaluate_known_failure_cases, |
| ) |
| from evaluation.iab_quality import compute_path_metrics, evaluate_iab_views, path_from_label |
| from iab_classifier import predict_iab_content_classifier_batch |
| from model_runtime import get_head |
| from schemas import validate_classify_response |
|
|
|
|
| def _maybe_free_cuda_memory() -> None: |
| try: |
| import torch |
|
|
| if torch.cuda.is_available(): |
| torch.cuda.empty_cache() |
| except Exception: |
| pass |
|
|
|
|
| def load_jsonl(path: Path) -> list[dict]: |
| with path.open("r", encoding="utf-8") as handle: |
| return [json.loads(line) for line in handle] |
|
|
|
|
| def write_json(path: Path, payload: dict | list) -> None: |
| path.parent.mkdir(parents=True, exist_ok=True) |
| path.write_text(json.dumps(payload, indent=2, sort_keys=True) + "\n", encoding="utf-8") |
|
|
|
|
| def evaluate_head_dataset(head_name: str, dataset_path: Path, suite_name: str, output_dir: Path) -> dict: |
| head = get_head(head_name) |
| config = head.config |
| rows = load_jsonl(dataset_path) |
| predictions = head.predict_batch([row["text"] for row in rows]) |
|
|
| y_true = [row[config.label_field] for row in rows] |
| y_pred = [prediction["label"] for prediction in predictions] |
| accepted = [prediction["meets_confidence_threshold"] for prediction in predictions] |
|
|
| confusion = confusion_matrix(y_true, y_pred, labels=list(config.labels)) |
| confusion_df = pd.DataFrame(confusion, index=config.labels, columns=config.labels) |
| confusion_path = output_dir / f"{head_name}_{suite_name}_confusion_matrix.csv" |
| confusion_df.to_csv(confusion_path) |
|
|
| accepted_total_count = sum(accepted) |
| accepted_accuracy = ( |
| accuracy_score( |
| [truth for truth, keep in zip(y_true, accepted) if keep], |
| [pred for pred, keep in zip(y_pred, accepted) if keep], |
| ) |
| if accepted_total_count |
| else 0.0 |
| ) |
|
|
| report = classification_report( |
| y_true, |
| y_pred, |
| labels=list(config.labels), |
| output_dict=True, |
| zero_division=0, |
| ) |
| difficulty_breakdown = None |
| if rows and all("difficulty" in row for row in rows): |
| difficulty_breakdown = {} |
| for difficulty in sorted({row["difficulty"] for row in rows}): |
| indices = [idx for idx, row in enumerate(rows) if row["difficulty"] == difficulty] |
| difficulty_true = [y_true[idx] for idx in indices] |
| difficulty_pred = [y_pred[idx] for idx in indices] |
| difficulty_accepted = [accepted[idx] for idx in indices] |
| difficulty_accepted_count = sum(difficulty_accepted) |
| difficulty_accepted_accuracy = ( |
| accuracy_score( |
| [truth for truth, keep in zip(difficulty_true, difficulty_accepted) if keep], |
| [pred for pred, keep in zip(difficulty_pred, difficulty_accepted) if keep], |
| ) |
| if difficulty_accepted_count |
| else 0.0 |
| ) |
| difficulty_breakdown[difficulty] = { |
| "count": len(indices), |
| "accuracy": round(float(accuracy_score(difficulty_true, difficulty_pred)), 4), |
| "macro_f1": round(float(f1_score(difficulty_true, difficulty_pred, average="macro")), 4), |
| "accepted_coverage": round(float(difficulty_accepted_count / len(indices)), 4), |
| "accepted_accuracy": round(float(difficulty_accepted_accuracy), 4), |
| "fallback_rate": round(float(1 - (difficulty_accepted_count / len(indices))), 4), |
| } |
| summary = { |
| "head": head_name, |
| "suite": suite_name, |
| "dataset_path": str(dataset_path), |
| "count": len(rows), |
| "accuracy": round(float(accuracy_score(y_true, y_pred)), 4), |
| "macro_f1": round(float(f1_score(y_true, y_pred, average="macro")), 4), |
| "accepted_coverage": round(float(accepted_total_count / len(rows)), 4), |
| "accepted_accuracy": round(float(accepted_accuracy), 4), |
| "fallback_rate": round(float(1 - (accepted_total_count / len(rows))), 4), |
| "per_class_metrics": report, |
| "confusion_matrix_path": str(confusion_path), |
| } |
| if difficulty_breakdown is not None: |
| summary["difficulty_breakdown"] = difficulty_breakdown |
| write_json(output_dir / f"{head_name}_{suite_name}_report.json", summary) |
| return summary |
|
|
|
|
| def evaluate_iab_dataset(dataset_path: Path, suite_name: str, output_dir: Path) -> dict: |
| rows = load_jsonl(dataset_path) |
| true_paths = [path_from_label(row["iab_path"]) for row in rows] |
| true_labels = [row["iab_path"] for row in rows] |
| predictions = predict_iab_content_classifier_batch([row["text"] for row in rows]) |
| if not any(output is not None for output in predictions): |
| raise RuntimeError( |
| "IAB classifier artifacts are unavailable. Run `python3 training/train_iab.py` " |
| "and `python3 training/calibrate_confidence.py --head iab_content` " |
| "from the `agentic-intent-classifier` directory first." |
| ) |
|
|
| pred_paths = [ |
| tuple(output["path"]) if output is not None else tuple() |
| for output in predictions |
| ] |
| accepted = [bool(output and output["meets_confidence_threshold"]) for output in predictions] |
| source = next((output["source"] for output in predictions if output is not None), "supervised_classifier") |
| pred_labels = [" > ".join(path) if path else "__no_prediction__" for path in pred_paths] |
|
|
| accepted_total_count = sum(accepted) |
| accepted_accuracy = ( |
| sum(1 for truth, pred, keep in zip(true_paths, pred_paths, accepted) if keep and truth == pred) / accepted_total_count |
| if accepted_total_count |
| else 0.0 |
| ) |
| difficulty_breakdown = None |
| if rows and all("difficulty" in row for row in rows): |
| difficulty_breakdown = {} |
| for difficulty in sorted({row["difficulty"] for row in rows}): |
| indices = [idx for idx, row in enumerate(rows) if row["difficulty"] == difficulty] |
| difficulty_true_paths = [true_paths[idx] for idx in indices] |
| difficulty_pred_paths = [pred_paths[idx] for idx in indices] |
| difficulty_true_labels = [true_labels[idx] for idx in indices] |
| difficulty_pred_labels = [pred_labels[idx] for idx in indices] |
| difficulty_accepted = [accepted[idx] for idx in indices] |
| difficulty_accepted_count = sum(difficulty_accepted) |
| difficulty_accepted_accuracy = ( |
| sum( |
| 1 |
| for truth, pred, keep in zip(difficulty_true_paths, difficulty_pred_paths, difficulty_accepted) |
| if keep and truth == pred |
| ) |
| / difficulty_accepted_count |
| if difficulty_accepted_count |
| else 0.0 |
| ) |
| difficulty_breakdown[difficulty] = { |
| "count": len(indices), |
| "accuracy": round( |
| float(sum(1 for truth, pred in zip(difficulty_true_paths, difficulty_pred_paths) if truth == pred) / max(len(indices), 1)), |
| 4, |
| ), |
| "macro_f1": round(float(f1_score(difficulty_true_labels, difficulty_pred_labels, average="macro")), 4), |
| "accepted_coverage": round(float(difficulty_accepted_count / max(len(indices), 1)), 4), |
| "accepted_accuracy": round(float(difficulty_accepted_accuracy), 4), |
| "fallback_rate": round(float(1 - (difficulty_accepted_count / max(len(indices), 1))), 4), |
| } |
| summary = { |
| "head": "iab_content", |
| "suite": suite_name, |
| "dataset_path": str(dataset_path), |
| "count": len(rows), |
| "accuracy": round(float(sum(1 for truth, pred in zip(true_paths, pred_paths) if truth == pred) / max(len(rows), 1)), 4), |
| "macro_f1": round(float(f1_score(true_labels, pred_labels, average="macro")), 4), |
| "accepted_coverage": round(float(accepted_total_count / max(len(rows), 1)), 4), |
| "accepted_accuracy": round(float(accepted_accuracy), 4), |
| "fallback_rate": round(float(1 - (accepted_total_count / max(len(rows), 1))), 4), |
| "primary_source": source, |
| "tier_metrics": compute_path_metrics(true_paths, pred_paths), |
| "view_metrics": evaluate_iab_views(rows), |
| } |
| if difficulty_breakdown is not None: |
| summary["difficulty_breakdown"] = difficulty_breakdown |
| write_json(output_dir / f"iab_content_{suite_name}_report.json", summary) |
| return summary |
|
|
|
|
| def evaluate_combined_benchmark(path: Path, output_dir: Path) -> dict: |
| benchmark = json.loads(path.read_text(encoding="utf-8")) |
| outputs = [] |
| fallback_applied = 0 |
| for item in benchmark: |
| payload = validate_classify_response(classify_query(item["input"])) |
| if payload["model_output"].get("fallback"): |
| fallback_applied += 1 |
| outputs.append( |
| { |
| "input": item["input"], |
| "expected_behavior": item["expected_behavior"], |
| "response": payload, |
| } |
| ) |
| write_json(output_dir / "combined_demo_benchmark.json", outputs) |
| return { |
| "benchmark_path": str(path), |
| "count": len(outputs), |
| "fallback_rate": round(fallback_applied / len(outputs), 4) if outputs else 0.0, |
| "output_path": str(output_dir / "combined_demo_benchmark.json"), |
| } |
|
|
|
|
| def main() -> None: |
| parser = argparse.ArgumentParser(description="Run repeatable evaluation for classifier heads and combined output.") |
| parser.add_argument( |
| "--output-dir", |
| default=str(EVALUATION_ARTIFACTS_DIR / "latest"), |
| help="Directory to write evaluation artifacts into.", |
| ) |
| parser.add_argument( |
| "--skip-iab-train-eval", |
| action="store_true", |
| help="Skip the IAB train split (largest JSONL). Use on low-RAM hosts (e.g. Colab free tier).", |
| ) |
| args = parser.parse_args() |
|
|
| ensure_artifact_dirs() |
| output_dir = Path(args.output_dir) |
| output_dir.mkdir(parents=True, exist_ok=True) |
|
|
| summary = {"heads": {}, "combined": {}} |
| for head_name, config in HEAD_CONFIGS.items(): |
| if head_name == "iab_content": |
| continue |
| head_summary = {} |
| for split_name, split_path in config.split_paths.items(): |
| head_summary[split_name] = evaluate_head_dataset(head_name, split_path, split_name, output_dir) |
| for suite_name, suite_path in config.stress_suite_paths.items(): |
| head_summary[suite_name] = evaluate_head_dataset(head_name, suite_path, suite_name, output_dir) |
| summary["heads"][head_name] = head_summary |
| gc.collect() |
| _maybe_free_cuda_memory() |
|
|
| iab_summary = {} |
| for split_name, split_path in IAB_HEAD_CONFIG.split_paths.items(): |
| if args.skip_iab_train_eval and split_name == "train": |
| continue |
| iab_summary[split_name] = evaluate_iab_dataset(split_path, split_name, output_dir) |
| gc.collect() |
| _maybe_free_cuda_memory() |
| for suite_name, suite_path in IAB_HEAD_CONFIG.stress_suite_paths.items(): |
| iab_summary[suite_name] = evaluate_iab_dataset(suite_path, suite_name, output_dir) |
| gc.collect() |
| _maybe_free_cuda_memory() |
| summary["heads"]["iab_content"] = iab_summary |
|
|
| summary["combined"]["demo_benchmark"] = evaluate_combined_benchmark(DEFAULT_BENCHMARK_PATH, output_dir) |
| summary["combined"]["known_failure_regression"] = evaluate_known_failure_cases(KNOWN_FAILURE_CASES_PATH, output_dir) |
| summary["combined"]["iab_behavior_lock_regression"] = evaluate_iab_behavior_lock_cases( |
| IAB_BEHAVIOR_LOCK_CASES_PATH, |
| output_dir, |
| ) |
| summary["combined"]["iab_cross_vertical_behavior_lock_regression"] = evaluate_iab_cross_vertical_behavior_lock_cases( |
| IAB_CROSS_VERTICAL_BEHAVIOR_LOCK_CASES_PATH, |
| output_dir, |
| ) |
| summary["combined"]["iab_quality_target_eval"] = evaluate_iab_quality_target_cases( |
| IAB_QUALITY_TARGET_CASES_PATH, |
| output_dir, |
| ) |
| summary["combined"]["iab_cross_vertical_quality_target_eval"] = evaluate_iab_cross_vertical_quality_target_cases( |
| IAB_CROSS_VERTICAL_QUALITY_TARGET_CASES_PATH, |
| output_dir, |
| ) |
| write_json(output_dir / "summary.json", summary) |
| compact_summary = { |
| "heads": { |
| head_name: { |
| "test": { |
| key: head_summary["test"][key] |
| for key in ( |
| "count", |
| "accuracy", |
| "macro_f1", |
| "accepted_accuracy", |
| "accepted_coverage", |
| "fallback_rate", |
| ) |
| } |
| | ( |
| {"tier_metrics": head_summary["test"]["tier_metrics"]} |
| if "tier_metrics" in head_summary["test"] |
| else {} |
| ) |
| } |
| for head_name, head_summary in summary["heads"].items() |
| }, |
| "combined": { |
| "demo_benchmark": summary["combined"]["demo_benchmark"], |
| "known_failure_regression": { |
| "count": summary["combined"]["known_failure_regression"]["count"], |
| "passed": summary["combined"]["known_failure_regression"]["passed"], |
| "failed": summary["combined"]["known_failure_regression"]["failed"], |
| "by_status": summary["combined"]["known_failure_regression"]["by_status"], |
| }, |
| "iab_behavior_lock_regression": { |
| "count": summary["combined"]["iab_behavior_lock_regression"]["count"], |
| "passed": summary["combined"]["iab_behavior_lock_regression"]["passed"], |
| "failed": summary["combined"]["iab_behavior_lock_regression"]["failed"], |
| "by_status": summary["combined"]["iab_behavior_lock_regression"]["by_status"], |
| }, |
| "iab_cross_vertical_behavior_lock_regression": { |
| "count": summary["combined"]["iab_cross_vertical_behavior_lock_regression"]["count"], |
| "passed": summary["combined"]["iab_cross_vertical_behavior_lock_regression"]["passed"], |
| "failed": summary["combined"]["iab_cross_vertical_behavior_lock_regression"]["failed"], |
| "by_status": summary["combined"]["iab_cross_vertical_behavior_lock_regression"]["by_status"], |
| }, |
| "iab_quality_target_eval": { |
| "count": summary["combined"]["iab_quality_target_eval"]["count"], |
| "passed": summary["combined"]["iab_quality_target_eval"]["passed"], |
| "failed": summary["combined"]["iab_quality_target_eval"]["failed"], |
| "by_status": summary["combined"]["iab_quality_target_eval"]["by_status"], |
| }, |
| "iab_cross_vertical_quality_target_eval": { |
| "count": summary["combined"]["iab_cross_vertical_quality_target_eval"]["count"], |
| "passed": summary["combined"]["iab_cross_vertical_quality_target_eval"]["passed"], |
| "failed": summary["combined"]["iab_cross_vertical_quality_target_eval"]["failed"], |
| "by_status": summary["combined"]["iab_cross_vertical_quality_target_eval"]["by_status"], |
| }, |
| }, |
| "summary_path": str(output_dir / "summary.json"), |
| } |
| print(json.dumps(compact_summary, indent=2)) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|