linvest21's picture
download
raw
22.3 kB
from __future__ import annotations
from pathlib import Path
from typing import Any
from n21.config import load_structured
from n21.settings import CONFIG_ROOT
DEFAULT_CONFIG = CONFIG_ROOT / "thresholds" / "model_quality.yaml"
def load_model_quality_thresholds(path: Path | None = None) -> dict[str, Any]:
return load_structured(path or DEFAULT_CONFIG)
def _number(value: Any, default: float = 0.0) -> float:
try:
return float(value)
except (TypeError, ValueError):
return default
def _bool(value: Any) -> bool:
return bool(value) is True
def _add(checks: dict[str, Any], errors: list[str], name: str, ok: bool, detail: str) -> None:
checks[name] = {"ok": ok, "detail": detail}
if not ok:
errors.append(f"{name}: {detail}")
def evaluate_model_quality_gate(
*,
paired_eval: dict[str, Any] | None = None,
training_plan: dict[str, Any] | None = None,
training_result: dict[str, Any] | None = None,
trainer_metrics_summary: dict[str, Any] | None = None,
selected_checkpoint: dict[str, Any] | None = None,
dataset_manifest: dict[str, Any] | None = None,
model_judge_report: dict[str, Any] | None = None,
human_review_report: dict[str, Any] | None = None,
baseline_proof_report: dict[str, Any] | None = None,
paired_diagnostics_manifest: dict[str, Any] | None = None,
preference_manifest: dict[str, Any] | None = None,
thresholds: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""Evaluate production model-quality gates from measured evidence.
This deliberately treats fixture/orchestration cycle evidence as irrelevant.
A model can pass only with paired model-vs-model measurement, adequate
training budget, corpus-retention evidence, model-as-judge rubric evidence,
and human spot-check evidence.
"""
cfg = thresholds or load_model_quality_thresholds()
errors: list[str] = []
warnings: list[str] = []
checks: dict[str, Any] = {}
paired_cfg = cfg.get("paired_eval", {})
if paired_cfg.get("required", True) and paired_eval is None:
_add(checks, errors, "paired_eval_present", False, "missing eval/paired_eval_report.json")
elif paired_eval is not None:
baseline = paired_eval.get("baseline", {})
candidate = paired_eval.get("candidate", {})
improvement = paired_eval.get("improvement", {})
sample_count = int(_number(paired_eval.get("sample_count") or candidate.get("sample_count")))
candidate_aggregate = _number(candidate.get("aggregate"))
baseline_aggregate = _number(baseline.get("aggregate"))
candidate_critical = _number(candidate.get("critical_pass_rate"))
aggregate_delta = _number(improvement.get("aggregate_abs"))
critical_delta = _number(improvement.get("critical_pass_rate_abs"))
loss_rate = _number(improvement.get("pairwise_loss_rate"))
win_rate = _number(improvement.get("pairwise_win_rate"))
aggregate_pct = improvement.get("aggregate_pct")
_add(
checks,
errors,
"paired_eval_sample_count",
sample_count >= int(paired_cfg.get("min_samples", 120)),
f"{sample_count} >= {paired_cfg.get('min_samples', 120)}",
)
_add(
checks,
errors,
"candidate_aggregate_absolute",
candidate_aggregate >= _number(paired_cfg.get("min_candidate_aggregate"), 0.60),
f"{candidate_aggregate:.4f} >= {paired_cfg.get('min_candidate_aggregate', 0.60)}",
)
_add(
checks,
errors,
"candidate_beats_baseline",
candidate_aggregate > baseline_aggregate,
f"{candidate_aggregate:.4f} > {baseline_aggregate:.4f}",
)
_add(
checks,
errors,
"aggregate_delta_absolute",
aggregate_delta >= _number(paired_cfg.get("min_aggregate_delta_abs"), 0.05),
f"{aggregate_delta:.4f} >= {paired_cfg.get('min_aggregate_delta_abs', 0.05)}",
)
_add(
checks,
errors,
"critical_pass_absolute",
candidate_critical >= _number(paired_cfg.get("min_candidate_critical_pass_rate"), 0.70),
f"{candidate_critical:.4f} >= {paired_cfg.get('min_candidate_critical_pass_rate', 0.70)}",
)
_add(
checks,
errors,
"critical_pass_not_regressed",
critical_delta >= _number(paired_cfg.get("min_critical_pass_delta_abs"), 0.0),
f"{critical_delta:.4f} >= {paired_cfg.get('min_critical_pass_delta_abs', 0.0)}",
)
_add(
checks,
errors,
"pairwise_loss_rate",
loss_rate <= _number(paired_cfg.get("max_pairwise_loss_rate"), 0.02),
f"{loss_rate:.4f} <= {paired_cfg.get('max_pairwise_loss_rate', 0.02)}",
)
_add(
checks,
errors,
"pairwise_win_rate",
win_rate >= _number(paired_cfg.get("min_pairwise_win_rate"), 0.55),
f"{win_rate:.4f} >= {paired_cfg.get('min_pairwise_win_rate', 0.55)}",
)
baseline_proof_mode = (baseline_proof_report or paired_eval.get("baseline_proof") or {}).get("proof_mode")
absolute_only_proof = baseline_proof_mode == "absolute_only_cold_start"
if baseline_aggregate == 0 and aggregate_pct is None and not paired_cfg.get("allow_zero_baseline_percent", False):
_add(
checks,
errors,
"nonzero_baseline_for_relative_proof",
absolute_only_proof,
(
"baseline aggregate is zero; explicit absolute_only_cold_start proof is present"
if absolute_only_proof
else "baseline aggregate is zero; relative improvement is undefined"
),
)
budget_cfg = cfg.get("training_budget", {})
if budget_cfg.get("required", True) and training_plan is None:
_add(checks, errors, "training_plan_present", False, "missing remote_artifacts/training_plan.json")
elif training_plan is not None:
hp = training_plan.get("hyperparameters", {})
readiness = training_plan.get("readiness", {})
is_preference_training = str(training_plan.get("training_stage") or "") == "preference_dpo"
loss_repair_cfg = cfg.get("loss_targeted_repair", {})
train_records = int(_number(training_plan.get("train_records") or training_plan.get("train_pair_count")))
valid_records = int(_number(training_plan.get("valid_records") or training_plan.get("valid_pair_count")))
admitted_records = int(_number(training_plan.get("admitted_pair_count"))) if is_preference_training else train_records
max_steps = int(_number(hp.get("max_steps")))
min_train_records = int(
loss_repair_cfg.get("min_preference_pairs", 50)
if is_preference_training
else budget_cfg.get("min_train_records", 100)
)
min_valid_records = 1 if is_preference_training else int(budget_cfg.get("min_valid_records", 10))
min_steps = 100 if is_preference_training else int(budget_cfg.get("min_max_steps", 300))
_add(
checks,
errors,
"training_records_minimum",
(admitted_records if is_preference_training else train_records) >= min_train_records,
f"{admitted_records if is_preference_training else train_records} >= {min_train_records}",
)
_add(
checks,
errors,
"validation_records_minimum",
valid_records >= min_valid_records,
f"{valid_records} >= {min_valid_records}",
)
_add(
checks,
errors,
"training_steps_minimum",
max_steps >= min_steps,
f"{max_steps} >= {min_steps}",
)
if budget_cfg.get("require_production_candidate", True):
production_candidate = readiness.get("production_candidate") is True
if is_preference_training and training_result is not None:
production_candidate = production_candidate or training_result.get("status") == "completed"
_add(
checks,
errors,
"trainer_readiness_candidate",
production_candidate,
f"production_candidate={readiness.get('production_candidate')} training_stage={training_plan.get('training_stage')}",
)
for warning in readiness.get("warnings", []):
warnings.append(f"trainer readiness warning: {warning}")
overfit_cfg = cfg.get("trainer_overfit", {})
if overfit_cfg.get("required", False):
if selected_checkpoint is None:
_add(checks, errors, "selected_checkpoint_present", False, "missing remote_artifacts/selected_checkpoint.json")
else:
selection_metric = selected_checkpoint.get("selection_metric")
selected_value = _number(selected_checkpoint.get("selection_metric_value"), None)
selected_path = str(selected_checkpoint.get("selected_checkpoint") or selected_checkpoint.get("checkpoint_path") or "")
candidate_adapter = str(selected_checkpoint.get("candidate_adapter_dir") or "")
_add(
checks,
errors,
"selected_checkpoint_metric",
selection_metric == overfit_cfg.get("selection_metric", "eval_loss"),
f"{selection_metric} == {overfit_cfg.get('selection_metric', 'eval_loss')}",
)
_add(
checks,
errors,
"selected_checkpoint_metric_value_present",
selected_value is not None,
f"selection_metric_value={selected_checkpoint.get('selection_metric_value')}",
)
_add(
checks,
errors,
"selected_checkpoint_path_present",
bool(selected_path or candidate_adapter),
f"selected_checkpoint={selected_path} candidate_adapter_dir={candidate_adapter}",
)
if trainer_metrics_summary is None:
_add(
checks,
errors,
"trainer_metrics_summary_present",
False,
"missing remote_artifacts/trainer_metrics_summary.json",
)
else:
eval_count = int(_number(trainer_metrics_summary.get("eval_row_count"), 0) or 0)
train_eval_gap = _number(trainer_metrics_summary.get("train_eval_loss_gap"), 0.0)
late_regression = _number(trainer_metrics_summary.get("late_eval_loss_regression"), 0.0)
overfit_detected = trainer_metrics_summary.get("overfit_detected") is True
_add(
checks,
errors,
"trainer_eval_checkpoint_count",
eval_count >= int(overfit_cfg.get("min_checkpoint_count", 3)),
f"{eval_count} >= {overfit_cfg.get('min_checkpoint_count', 3)}",
)
_add(
checks,
errors,
"trainer_train_eval_loss_gap",
train_eval_gap <= _number(overfit_cfg.get("max_train_eval_loss_gap"), 0.75),
f"{train_eval_gap:.4f} <= {overfit_cfg.get('max_train_eval_loss_gap', 0.75)}",
)
_add(
checks,
errors,
"trainer_late_eval_loss_regression",
late_regression <= _number(overfit_cfg.get("max_late_eval_loss_regression"), 0.10),
f"{late_regression:.4f} <= {overfit_cfg.get('max_late_eval_loss_regression', 0.10)}",
)
_add(
checks,
errors,
"trainer_overfit_not_detected",
not overfit_detected,
f"overfit_detected={overfit_detected} flags={trainer_metrics_summary.get('overfit_flags')}",
)
if training_result is not None:
selected_checkpoint_recorded = isinstance(training_result.get("selected_checkpoint"), dict)
selected_checkpoint_recorded = selected_checkpoint_recorded or selected_checkpoint is not None
_add(
checks,
errors,
"training_result_selected_checkpoint_recorded",
selected_checkpoint_recorded,
"training_result.selected_checkpoint or remote selected_checkpoint.json is present",
)
coverage_cfg = cfg.get("corpus_coverage", {})
if coverage_cfg.get("required", True) and dataset_manifest is None:
_add(checks, errors, "dataset_manifest_present", False, "missing dataset_snapshot/dataset_manifest.json")
elif dataset_manifest is not None:
quality = dataset_manifest.get("quality", {})
split_counts = dataset_manifest.get("split_counts", {})
source_records = int(_number(quality.get("record_count")))
train_count = int(_number(split_counts.get("train")))
valid_count = int(_number(split_counts.get("valid")))
test_count = int(_number(split_counts.get("test")))
split_total = train_count + valid_count + test_count
retention = (split_total / source_records) if source_records else 0.0
train_ratio = (train_count / split_total) if split_total else 0.0
valid_ratio = (valid_count / split_total) if split_total else 0.0
test_ratio = (test_count / split_total) if split_total else 0.0
_add(
checks,
errors,
"corpus_record_retention",
retention >= _number(coverage_cfg.get("min_total_record_retention"), 0.95),
f"{retention:.4f} >= {coverage_cfg.get('min_total_record_retention', 0.95)}",
)
if coverage_cfg.get("require_train_valid_test", True):
_add(checks, errors, "train_valid_test_present", train_count > 0 and valid_count > 0 and test_count > 0, f"{split_counts}")
_add(
checks,
errors,
"train_split_ratio",
train_ratio >= _number(coverage_cfg.get("min_train_split_ratio"), 0.70),
f"{train_ratio:.4f} >= {coverage_cfg.get('min_train_split_ratio', 0.70)}",
)
_add(
checks,
errors,
"valid_split_ratio",
valid_ratio >= _number(coverage_cfg.get("min_valid_split_ratio"), 0.05),
f"{valid_ratio:.4f} >= {coverage_cfg.get('min_valid_split_ratio', 0.05)}",
)
_add(
checks,
errors,
"test_split_ratio",
test_ratio >= _number(coverage_cfg.get("min_test_split_ratio"), 0.05),
f"{test_ratio:.4f} >= {coverage_cfg.get('min_test_split_ratio', 0.05)}",
)
if training_plan is not None:
is_preference_training = str(training_plan.get("training_stage") or "") == "preference_dpo"
train_records = int(_number(training_plan.get("train_records")))
valid_records = int(_number(training_plan.get("valid_records")))
training_provenance = training_plan.get("dataset_provenance", {})
if is_preference_training:
_add(
checks,
errors,
"preference_inherits_source_dataset_manifest",
True,
f"source_run_id={training_plan.get('source_run_id')}",
)
else:
_add(
checks,
errors,
"remote_train_records_match_dataset_manifest",
train_records == train_count,
f"{train_records} == {train_count}",
)
_add(
checks,
errors,
"remote_valid_records_match_dataset_manifest",
valid_records == valid_count,
f"{valid_records} == {valid_count}",
)
if training_provenance:
_add(
checks,
errors,
"trainer_dataset_provenance_gate",
training_provenance.get("ok") is True,
f"ok={training_provenance.get('ok')} errors={training_provenance.get('errors')}",
)
judge_cfg = cfg.get("strong_scoring", {})
if judge_cfg.get("require_model_as_judge", True) and model_judge_report is None:
_add(checks, errors, "model_as_judge_present", False, "missing eval/model_judge_report.json")
elif model_judge_report is not None:
sample_count = int(_number(model_judge_report.get("sample_count")))
mean_score = _number(model_judge_report.get("mean_score"))
critical_rate = _number(model_judge_report.get("critical_pass_rate"))
rubric = model_judge_report.get("rubric_version")
_add(
checks,
errors,
"model_judge_rubric_version",
rubric == judge_cfg.get("required_rubric_version"),
f"{rubric} == {judge_cfg.get('required_rubric_version')}",
)
_add(
checks,
errors,
"model_judge_sample_count",
sample_count >= int(judge_cfg.get("min_judged_samples", 30)),
f"{sample_count} >= {judge_cfg.get('min_judged_samples', 30)}",
)
_add(
checks,
errors,
"model_judge_mean_score",
mean_score >= _number(judge_cfg.get("min_mean_score"), 0.60),
f"{mean_score:.4f} >= {judge_cfg.get('min_mean_score', 0.60)}",
)
_add(
checks,
errors,
"model_judge_critical_pass_rate",
critical_rate >= _number(judge_cfg.get("min_critical_pass_rate"), 0.70),
f"{critical_rate:.4f} >= {judge_cfg.get('min_critical_pass_rate', 0.70)}",
)
human_cfg = cfg.get("human_spot_check", {})
if human_cfg.get("required", True) and human_review_report is None:
_add(checks, errors, "human_spot_check_present", False, "missing eval/human_spot_check_report.json")
elif human_review_report is not None:
reviewed = int(_number(human_review_report.get("sample_count") or human_review_report.get("reviewed_samples")))
critical_failures = int(_number(human_review_report.get("critical_failures")))
approved = human_review_report.get("approved") is True or human_review_report.get("status") == "approved"
_add(
checks,
errors,
"human_review_sample_count",
reviewed >= int(human_cfg.get("min_reviewed_samples", 10)),
f"{reviewed} >= {human_cfg.get('min_reviewed_samples', 10)}",
)
_add(
checks,
errors,
"human_review_critical_failures",
critical_failures <= int(human_cfg.get("max_critical_failures", 0)),
f"{critical_failures} <= {human_cfg.get('max_critical_failures', 0)}",
)
if human_cfg.get("require_approval", True):
_add(checks, errors, "human_review_approved", approved, f"approved={approved}")
repair_cfg = cfg.get("loss_targeted_repair", {})
repair_required = repair_cfg.get("required", False)
if repair_required and paired_diagnostics_manifest is None:
_add(checks, errors, "paired_diagnostics_present", False, "missing diagnostics/paired_eval_diagnostics_manifest.json")
elif paired_diagnostics_manifest is not None:
summary = paired_diagnostics_manifest.get("summary", {})
coverage = _number(summary.get("latest_loss_coverage_ratio"), 0.0)
repair_targets = int(_number(summary.get("accepted_repair_target_count"), 0))
_add(
checks,
errors,
"latest_loss_repair_coverage",
coverage >= _number(repair_cfg.get("min_latest_loss_coverage"), 0.90),
f"{coverage:.4f} >= {repair_cfg.get('min_latest_loss_coverage', 0.90)}",
)
_add(
checks,
errors,
"accepted_repair_targets_present",
repair_targets >= int(repair_cfg.get("min_accepted_repair_targets", 1)),
f"{repair_targets} >= {repair_cfg.get('min_accepted_repair_targets', 1)}",
)
if repair_required and preference_manifest is None:
_add(checks, errors, "preference_manifest_present", False, "missing preference_memory/preference_manifest.json")
elif preference_manifest is not None:
summary = preference_manifest.get("summary", {})
pair_count = int(_number(summary.get("preference_pair_count"), 0))
admitted_count = int(_number(summary.get("admitted_pair_count"), 0))
min_pairs = int(repair_cfg.get("min_preference_pairs", summary.get("min_records") or 50))
_add(
checks,
errors,
"preference_pair_count",
pair_count >= min_pairs,
f"{pair_count} >= {min_pairs}",
)
_add(
checks,
errors,
"preference_pairs_admitted",
admitted_count == pair_count and pair_count > 0,
f"admitted={admitted_count} pairs={pair_count}",
)
_add(
checks,
errors,
"preference_min_records_met",
summary.get("min_records_met") is not False,
f"min_records_met={summary.get('min_records_met')}",
)
return {
"ok": not errors,
"eligible_for_promotion": not errors,
"quality_signal": "measured_model_quality" if not errors else "blocked_model_quality",
"errors": errors,
"warnings": warnings,
"checks": checks,
"thresholds": cfg,
}

Xet Storage Details

Size:
22.3 kB
·
Xet hash:
2460b2925d77b2e9ad6f149ecb34637ec65f67ef04634b73bd8916b051e0294a

Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.