linvest21's picture
download
raw
6.56 kB
from __future__ import annotations
import json
from pathlib import Path
from typing import Any
from n21.settings import SHFT_WORKSPACE_ROOT
from observability.audit_log import utc_now
A_PLUS_THRESHOLDS = {
"min_aggregate_delta_abs": 0.05,
"min_pairwise_win_rate": 0.55,
"max_pairwise_loss_rate": 0.02,
"min_candidate_critical_pass_rate": 0.95,
"require_human_review_approved": True,
"max_human_review_critical_failures": 0,
}
def _load(path: Path) -> dict[str, Any] | None:
if not path.exists():
return None
return json.loads(path.read_text(encoding="utf-8-sig"))
def _write(path: Path, payload: dict[str, Any]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(payload, indent=2, sort_keys=True) + "\n", encoding="utf-8")
def _num(value: Any, default: float = 0.0) -> float:
try:
return float(value)
except (TypeError, ValueError):
return default
def _metric_report(paired: dict[str, Any] | None) -> dict[str, Any]:
if not paired:
return {
"paired_eval_present": False,
"baseline_aggregate": 0.0,
"candidate_aggregate": 0.0,
"aggregate_delta_abs": 0.0,
"pairwise_win_rate": 0.0,
"pairwise_loss_rate": 1.0,
"candidate_critical_pass_rate": 0.0,
"sample_count": 0,
}
baseline = paired.get("baseline") or {}
candidate = paired.get("candidate") or {}
improvement = paired.get("improvement") or {}
baseline_aggregate = _num(baseline.get("aggregate"))
candidate_aggregate = _num(candidate.get("aggregate"))
delta = improvement.get("aggregate_abs")
if delta is None:
delta = candidate_aggregate - baseline_aggregate
return {
"paired_eval_present": True,
"baseline_aggregate": baseline_aggregate,
"candidate_aggregate": candidate_aggregate,
"aggregate_delta_abs": round(_num(delta), 6),
"pairwise_win_rate": _num(improvement.get("pairwise_win_rate")),
"pairwise_loss_rate": _num(improvement.get("pairwise_loss_rate"), 1.0),
"candidate_critical_pass_rate": _num(candidate.get("critical_pass_rate")),
"sample_count": int(_num(candidate.get("sample_count") or baseline.get("sample_count"))),
}
def _human_metrics(human: dict[str, Any] | None) -> dict[str, Any]:
if not human:
return {"present": False, "approved": False, "critical_failures": None}
approved = bool(human.get("approved") or human.get("human_review_approved"))
critical_failures = human.get("critical_failures", human.get("critical_failure_count"))
if critical_failures is None:
critical_failures = len(human.get("critical_failures_detail") or human.get("critical_failure_ids") or [])
return {"present": True, "approved": approved, "critical_failures": int(_num(critical_failures))}
def build_a_plus_report(
*,
run_id: str,
source_run_id: str | None = None,
workspace_root: Path = SHFT_WORKSPACE_ROOT,
output_path: Path | None = None,
) -> dict[str, Any]:
run_dir = workspace_root / "runs" / run_id
eval_dir = run_dir / "eval"
paired = _load(eval_dir / "paired_eval_report.json")
quality_gate = _load(eval_dir / "model_quality_gate.json")
human = _load(eval_dir / "human_spot_check_report.json")
preference_result = _load(run_dir / "remote_artifacts" / "preference_training_result.json") or _load(
run_dir / "preference_training_result.json"
)
metrics = _metric_report(paired)
human_metrics = _human_metrics(human)
checks = {
"preference_training_completed": {
"ok": bool(preference_result and preference_result.get("status") in {"completed", "dry_run_validated"}),
"detail": None if preference_result is None else preference_result.get("status"),
},
"paired_eval_present": {"ok": metrics["paired_eval_present"], "detail": str(eval_dir / "paired_eval_report.json")},
"aggregate_delta_abs": {
"ok": metrics["aggregate_delta_abs"] >= A_PLUS_THRESHOLDS["min_aggregate_delta_abs"],
"detail": f"{metrics['aggregate_delta_abs']:.4f} >= {A_PLUS_THRESHOLDS['min_aggregate_delta_abs']}",
},
"pairwise_win_rate": {
"ok": metrics["pairwise_win_rate"] >= A_PLUS_THRESHOLDS["min_pairwise_win_rate"],
"detail": f"{metrics['pairwise_win_rate']:.4f} >= {A_PLUS_THRESHOLDS['min_pairwise_win_rate']}",
},
"pairwise_loss_rate": {
"ok": metrics["pairwise_loss_rate"] <= A_PLUS_THRESHOLDS["max_pairwise_loss_rate"],
"detail": f"{metrics['pairwise_loss_rate']:.4f} <= {A_PLUS_THRESHOLDS['max_pairwise_loss_rate']}",
},
"critical_pass_rate": {
"ok": metrics["candidate_critical_pass_rate"] >= A_PLUS_THRESHOLDS["min_candidate_critical_pass_rate"],
"detail": (
f"{metrics['candidate_critical_pass_rate']:.4f} >= "
f"{A_PLUS_THRESHOLDS['min_candidate_critical_pass_rate']}"
),
},
"human_review_approved": {
"ok": human_metrics["approved"] is True,
"detail": f"approved={human_metrics['approved']}",
},
"human_review_critical_failures": {
"ok": human_metrics["critical_failures"] == 0,
"detail": f"critical_failures={human_metrics['critical_failures']}",
},
}
blockers = [f"{name}: {check['detail']}" for name, check in checks.items() if not check["ok"]]
report = {
"schema_version": "shft_a_plus_upgrade_report_v1",
"created_at": utc_now(),
"run_id": run_id,
"source_run_id": source_run_id,
"grade": "A+" if not blockers else "not_a_plus",
"ok": not blockers,
"thresholds": A_PLUS_THRESHOLDS,
"metrics": metrics,
"human_review": human_metrics,
"quality_gate_ok": bool(quality_gate and quality_gate.get("ok") is True),
"checks": checks,
"blockers": blockers,
"artifacts": {
"paired_eval_report": str(eval_dir / "paired_eval_report.json"),
"model_quality_gate": str(eval_dir / "model_quality_gate.json"),
"human_spot_check_report": str(eval_dir / "human_spot_check_report.json"),
"preference_training_result": str(run_dir / "remote_artifacts" / "preference_training_result.json"),
},
}
_write(output_path or eval_dir / "a_plus_upgrade_report.json", report)
return report

Xet Storage Details

Size:
6.56 kB
·
Xet hash:
bf0d8bb7e13f9db4a7b0d1f4cd3c3d4eca08dd88ff818b38cc1aabad5c6b2d91

Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.