manikumargouni's picture
Upload folder using huggingface_hub
0584798 verified
from __future__ import annotations
import os
# Quieter logs when TensorFlow/XLA are pulled in indirectly (common on Colab).
os.environ.setdefault("TF_CPP_MIN_LOG_LEVEL", "2")
os.environ.setdefault("ABSL_MIN_LOG_LEVEL", "2")
import argparse
import gc
import json
import sys
from pathlib import Path
import pandas as pd
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix, f1_score
BASE_DIR = Path(__file__).resolve().parent.parent
if str(BASE_DIR) not in sys.path:
sys.path.insert(0, str(BASE_DIR))
from combined_inference import classify_query
from config import (
DEFAULT_BENCHMARK_PATH,
EVALUATION_ARTIFACTS_DIR,
HEAD_CONFIGS,
IAB_HEAD_CONFIG,
IAB_BEHAVIOR_LOCK_CASES_PATH,
IAB_CROSS_VERTICAL_BEHAVIOR_LOCK_CASES_PATH,
IAB_CROSS_VERTICAL_QUALITY_TARGET_CASES_PATH,
IAB_QUALITY_TARGET_CASES_PATH,
KNOWN_FAILURE_CASES_PATH,
ensure_artifact_dirs,
)
from evaluation.regression_suite import (
evaluate_iab_behavior_lock_cases,
evaluate_iab_cross_vertical_behavior_lock_cases,
evaluate_iab_cross_vertical_quality_target_cases,
evaluate_iab_quality_target_cases,
evaluate_known_failure_cases,
)
from evaluation.iab_quality import compute_path_metrics, evaluate_iab_views, path_from_label
from iab_classifier import predict_iab_content_classifier_batch
from model_runtime import get_head
from schemas import validate_classify_response
def _maybe_free_cuda_memory() -> None:
try:
import torch
if torch.cuda.is_available():
torch.cuda.empty_cache()
except Exception:
pass
def load_jsonl(path: Path) -> list[dict]:
with path.open("r", encoding="utf-8") as handle:
return [json.loads(line) for line in handle]
def write_json(path: Path, payload: dict | list) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(payload, indent=2, sort_keys=True) + "\n", encoding="utf-8")
def evaluate_head_dataset(head_name: str, dataset_path: Path, suite_name: str, output_dir: Path) -> dict:
head = get_head(head_name)
config = head.config
rows = load_jsonl(dataset_path)
predictions = head.predict_batch([row["text"] for row in rows])
y_true = [row[config.label_field] for row in rows]
y_pred = [prediction["label"] for prediction in predictions]
accepted = [prediction["meets_confidence_threshold"] for prediction in predictions]
confusion = confusion_matrix(y_true, y_pred, labels=list(config.labels))
confusion_df = pd.DataFrame(confusion, index=config.labels, columns=config.labels)
confusion_path = output_dir / f"{head_name}_{suite_name}_confusion_matrix.csv"
confusion_df.to_csv(confusion_path)
accepted_total_count = sum(accepted)
accepted_accuracy = (
accuracy_score(
[truth for truth, keep in zip(y_true, accepted) if keep],
[pred for pred, keep in zip(y_pred, accepted) if keep],
)
if accepted_total_count
else 0.0
)
report = classification_report(
y_true,
y_pred,
labels=list(config.labels),
output_dict=True,
zero_division=0,
)
difficulty_breakdown = None
if rows and all("difficulty" in row for row in rows):
difficulty_breakdown = {}
for difficulty in sorted({row["difficulty"] for row in rows}):
indices = [idx for idx, row in enumerate(rows) if row["difficulty"] == difficulty]
difficulty_true = [y_true[idx] for idx in indices]
difficulty_pred = [y_pred[idx] for idx in indices]
difficulty_accepted = [accepted[idx] for idx in indices]
difficulty_accepted_count = sum(difficulty_accepted)
difficulty_accepted_accuracy = (
accuracy_score(
[truth for truth, keep in zip(difficulty_true, difficulty_accepted) if keep],
[pred for pred, keep in zip(difficulty_pred, difficulty_accepted) if keep],
)
if difficulty_accepted_count
else 0.0
)
difficulty_breakdown[difficulty] = {
"count": len(indices),
"accuracy": round(float(accuracy_score(difficulty_true, difficulty_pred)), 4),
"macro_f1": round(float(f1_score(difficulty_true, difficulty_pred, average="macro")), 4),
"accepted_coverage": round(float(difficulty_accepted_count / len(indices)), 4),
"accepted_accuracy": round(float(difficulty_accepted_accuracy), 4),
"fallback_rate": round(float(1 - (difficulty_accepted_count / len(indices))), 4),
}
summary = {
"head": head_name,
"suite": suite_name,
"dataset_path": str(dataset_path),
"count": len(rows),
"accuracy": round(float(accuracy_score(y_true, y_pred)), 4),
"macro_f1": round(float(f1_score(y_true, y_pred, average="macro")), 4),
"accepted_coverage": round(float(accepted_total_count / len(rows)), 4),
"accepted_accuracy": round(float(accepted_accuracy), 4),
"fallback_rate": round(float(1 - (accepted_total_count / len(rows))), 4),
"per_class_metrics": report,
"confusion_matrix_path": str(confusion_path),
}
if difficulty_breakdown is not None:
summary["difficulty_breakdown"] = difficulty_breakdown
write_json(output_dir / f"{head_name}_{suite_name}_report.json", summary)
return summary
def evaluate_iab_dataset(dataset_path: Path, suite_name: str, output_dir: Path) -> dict:
rows = load_jsonl(dataset_path)
true_paths = [path_from_label(row["iab_path"]) for row in rows]
true_labels = [row["iab_path"] for row in rows]
predictions = predict_iab_content_classifier_batch([row["text"] for row in rows])
if not any(output is not None for output in predictions):
raise RuntimeError(
"IAB classifier artifacts are unavailable. Run `python3 training/train_iab.py` "
"and `python3 training/calibrate_confidence.py --head iab_content` "
"from the `agentic-intent-classifier` directory first."
)
pred_paths = [
tuple(output["path"]) if output is not None else tuple()
for output in predictions
]
accepted = [bool(output and output["meets_confidence_threshold"]) for output in predictions]
source = next((output["source"] for output in predictions if output is not None), "supervised_classifier")
pred_labels = [" > ".join(path) if path else "__no_prediction__" for path in pred_paths]
accepted_total_count = sum(accepted)
accepted_accuracy = (
sum(1 for truth, pred, keep in zip(true_paths, pred_paths, accepted) if keep and truth == pred) / accepted_total_count
if accepted_total_count
else 0.0
)
difficulty_breakdown = None
if rows and all("difficulty" in row for row in rows):
difficulty_breakdown = {}
for difficulty in sorted({row["difficulty"] for row in rows}):
indices = [idx for idx, row in enumerate(rows) if row["difficulty"] == difficulty]
difficulty_true_paths = [true_paths[idx] for idx in indices]
difficulty_pred_paths = [pred_paths[idx] for idx in indices]
difficulty_true_labels = [true_labels[idx] for idx in indices]
difficulty_pred_labels = [pred_labels[idx] for idx in indices]
difficulty_accepted = [accepted[idx] for idx in indices]
difficulty_accepted_count = sum(difficulty_accepted)
difficulty_accepted_accuracy = (
sum(
1
for truth, pred, keep in zip(difficulty_true_paths, difficulty_pred_paths, difficulty_accepted)
if keep and truth == pred
)
/ difficulty_accepted_count
if difficulty_accepted_count
else 0.0
)
difficulty_breakdown[difficulty] = {
"count": len(indices),
"accuracy": round(
float(sum(1 for truth, pred in zip(difficulty_true_paths, difficulty_pred_paths) if truth == pred) / max(len(indices), 1)),
4,
),
"macro_f1": round(float(f1_score(difficulty_true_labels, difficulty_pred_labels, average="macro")), 4),
"accepted_coverage": round(float(difficulty_accepted_count / max(len(indices), 1)), 4),
"accepted_accuracy": round(float(difficulty_accepted_accuracy), 4),
"fallback_rate": round(float(1 - (difficulty_accepted_count / max(len(indices), 1))), 4),
}
summary = {
"head": "iab_content",
"suite": suite_name,
"dataset_path": str(dataset_path),
"count": len(rows),
"accuracy": round(float(sum(1 for truth, pred in zip(true_paths, pred_paths) if truth == pred) / max(len(rows), 1)), 4),
"macro_f1": round(float(f1_score(true_labels, pred_labels, average="macro")), 4),
"accepted_coverage": round(float(accepted_total_count / max(len(rows), 1)), 4),
"accepted_accuracy": round(float(accepted_accuracy), 4),
"fallback_rate": round(float(1 - (accepted_total_count / max(len(rows), 1))), 4),
"primary_source": source,
"tier_metrics": compute_path_metrics(true_paths, pred_paths),
"view_metrics": evaluate_iab_views(rows),
}
if difficulty_breakdown is not None:
summary["difficulty_breakdown"] = difficulty_breakdown
write_json(output_dir / f"iab_content_{suite_name}_report.json", summary)
return summary
def evaluate_combined_benchmark(path: Path, output_dir: Path) -> dict:
benchmark = json.loads(path.read_text(encoding="utf-8"))
outputs = []
fallback_applied = 0
for item in benchmark:
payload = validate_classify_response(classify_query(item["input"]))
if payload["model_output"].get("fallback"):
fallback_applied += 1
outputs.append(
{
"input": item["input"],
"expected_behavior": item["expected_behavior"],
"response": payload,
}
)
write_json(output_dir / "combined_demo_benchmark.json", outputs)
return {
"benchmark_path": str(path),
"count": len(outputs),
"fallback_rate": round(fallback_applied / len(outputs), 4) if outputs else 0.0,
"output_path": str(output_dir / "combined_demo_benchmark.json"),
}
def main() -> None:
parser = argparse.ArgumentParser(description="Run repeatable evaluation for classifier heads and combined output.")
parser.add_argument(
"--output-dir",
default=str(EVALUATION_ARTIFACTS_DIR / "latest"),
help="Directory to write evaluation artifacts into.",
)
parser.add_argument(
"--skip-iab-train-eval",
action="store_true",
help="Skip the IAB train split (largest JSONL). Use on low-RAM hosts (e.g. Colab free tier).",
)
args = parser.parse_args()
ensure_artifact_dirs()
output_dir = Path(args.output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
summary = {"heads": {}, "combined": {}}
for head_name, config in HEAD_CONFIGS.items():
if head_name == "iab_content":
continue
head_summary = {}
for split_name, split_path in config.split_paths.items():
head_summary[split_name] = evaluate_head_dataset(head_name, split_path, split_name, output_dir)
for suite_name, suite_path in config.stress_suite_paths.items():
head_summary[suite_name] = evaluate_head_dataset(head_name, suite_path, suite_name, output_dir)
summary["heads"][head_name] = head_summary
gc.collect()
_maybe_free_cuda_memory()
iab_summary = {}
for split_name, split_path in IAB_HEAD_CONFIG.split_paths.items():
if args.skip_iab_train_eval and split_name == "train":
continue
iab_summary[split_name] = evaluate_iab_dataset(split_path, split_name, output_dir)
gc.collect()
_maybe_free_cuda_memory()
for suite_name, suite_path in IAB_HEAD_CONFIG.stress_suite_paths.items():
iab_summary[suite_name] = evaluate_iab_dataset(suite_path, suite_name, output_dir)
gc.collect()
_maybe_free_cuda_memory()
summary["heads"]["iab_content"] = iab_summary
summary["combined"]["demo_benchmark"] = evaluate_combined_benchmark(DEFAULT_BENCHMARK_PATH, output_dir)
summary["combined"]["known_failure_regression"] = evaluate_known_failure_cases(KNOWN_FAILURE_CASES_PATH, output_dir)
summary["combined"]["iab_behavior_lock_regression"] = evaluate_iab_behavior_lock_cases(
IAB_BEHAVIOR_LOCK_CASES_PATH,
output_dir,
)
summary["combined"]["iab_cross_vertical_behavior_lock_regression"] = evaluate_iab_cross_vertical_behavior_lock_cases(
IAB_CROSS_VERTICAL_BEHAVIOR_LOCK_CASES_PATH,
output_dir,
)
summary["combined"]["iab_quality_target_eval"] = evaluate_iab_quality_target_cases(
IAB_QUALITY_TARGET_CASES_PATH,
output_dir,
)
summary["combined"]["iab_cross_vertical_quality_target_eval"] = evaluate_iab_cross_vertical_quality_target_cases(
IAB_CROSS_VERTICAL_QUALITY_TARGET_CASES_PATH,
output_dir,
)
write_json(output_dir / "summary.json", summary)
compact_summary = {
"heads": {
head_name: {
"test": {
key: head_summary["test"][key]
for key in (
"count",
"accuracy",
"macro_f1",
"accepted_accuracy",
"accepted_coverage",
"fallback_rate",
)
}
| (
{"tier_metrics": head_summary["test"]["tier_metrics"]}
if "tier_metrics" in head_summary["test"]
else {}
)
}
for head_name, head_summary in summary["heads"].items()
},
"combined": {
"demo_benchmark": summary["combined"]["demo_benchmark"],
"known_failure_regression": {
"count": summary["combined"]["known_failure_regression"]["count"],
"passed": summary["combined"]["known_failure_regression"]["passed"],
"failed": summary["combined"]["known_failure_regression"]["failed"],
"by_status": summary["combined"]["known_failure_regression"]["by_status"],
},
"iab_behavior_lock_regression": {
"count": summary["combined"]["iab_behavior_lock_regression"]["count"],
"passed": summary["combined"]["iab_behavior_lock_regression"]["passed"],
"failed": summary["combined"]["iab_behavior_lock_regression"]["failed"],
"by_status": summary["combined"]["iab_behavior_lock_regression"]["by_status"],
},
"iab_cross_vertical_behavior_lock_regression": {
"count": summary["combined"]["iab_cross_vertical_behavior_lock_regression"]["count"],
"passed": summary["combined"]["iab_cross_vertical_behavior_lock_regression"]["passed"],
"failed": summary["combined"]["iab_cross_vertical_behavior_lock_regression"]["failed"],
"by_status": summary["combined"]["iab_cross_vertical_behavior_lock_regression"]["by_status"],
},
"iab_quality_target_eval": {
"count": summary["combined"]["iab_quality_target_eval"]["count"],
"passed": summary["combined"]["iab_quality_target_eval"]["passed"],
"failed": summary["combined"]["iab_quality_target_eval"]["failed"],
"by_status": summary["combined"]["iab_quality_target_eval"]["by_status"],
},
"iab_cross_vertical_quality_target_eval": {
"count": summary["combined"]["iab_cross_vertical_quality_target_eval"]["count"],
"passed": summary["combined"]["iab_cross_vertical_quality_target_eval"]["passed"],
"failed": summary["combined"]["iab_cross_vertical_quality_target_eval"]["failed"],
"by_status": summary["combined"]["iab_cross_vertical_quality_target_eval"]["by_status"],
},
},
"summary_path": str(output_dir / "summary.json"),
}
print(json.dumps(compact_summary, indent=2))
if __name__ == "__main__":
main()