linvest21's picture
download
raw
3.38 kB
from __future__ import annotations
import hashlib
import json
from pathlib import Path
from typing import Any
from n21.config import load_structured, write_json
from n21.settings import REPO_ROOT, SHFT_WORKSPACE_ROOT
from observability.audit_log import utc_now
REQUIRED_EVAL_FIELDS = {"id", "task", "prompt", "expected_points", "critical_checks"}
def _load_jsonl(path: Path) -> list[dict[str, Any]]:
rows: list[dict[str, Any]] = []
for line_no, line in enumerate(path.read_text(encoding="utf-8").splitlines(), start=1):
if not line.strip():
continue
try:
item = json.loads(line)
except json.JSONDecodeError as exc:
raise ValueError(f"invalid JSONL at {path}:{line_no}: {exc}") from exc
if not isinstance(item, dict):
raise ValueError(f"eval item must be an object at {path}:{line_no}")
rows.append(item)
return rows
def _sha256(path: Path) -> str:
h = hashlib.sha256()
h.update(path.read_bytes())
return h.hexdigest()
def validate_frozen_suite(manifest_path: Path) -> dict[str, Any]:
manifest = load_structured(manifest_path)
suite_path = REPO_ROOT / str(manifest["path"])
errors: list[str] = []
if not suite_path.exists():
errors.append(f"eval suite file does not exist: {suite_path}")
rows: list[dict[str, Any]] = []
else:
rows = _load_jsonl(suite_path)
ids: set[str] = set()
task_counts: dict[str, int] = {}
for idx, row in enumerate(rows, start=1):
missing = sorted(REQUIRED_EVAL_FIELDS - set(row))
if missing:
errors.append(f"row {idx} missing fields: {', '.join(missing)}")
row_id = str(row.get("id", ""))
if not row_id:
errors.append(f"row {idx} has empty id")
elif row_id in ids:
errors.append(f"duplicate eval id: {row_id}")
ids.add(row_id)
if not str(row.get("prompt", "")).strip():
errors.append(f"row {idx} has empty prompt")
expected_points = row.get("expected_points")
if not isinstance(expected_points, list) or not expected_points:
errors.append(f"row {idx} expected_points must be a non-empty list")
critical_checks = row.get("critical_checks")
if not isinstance(critical_checks, list) or not critical_checks:
errors.append(f"row {idx} critical_checks must be a non-empty list")
task = str(row.get("task", "unknown"))
task_counts[task] = task_counts.get(task, 0) + 1
expected_count = int(manifest.get("sample_count", -1))
if expected_count != len(rows):
errors.append(f"manifest sample_count {expected_count} does not match actual {len(rows)}")
report = {
"ok": not errors,
"errors": errors,
"eval_suite_id": manifest.get("eval_suite_id"),
"version": manifest.get("version"),
"status": manifest.get("status"),
"sample_count": len(rows),
"task_counts": task_counts,
"sha256": _sha256(suite_path) if suite_path.exists() else None,
"manifest_path": str(manifest_path),
"suite_path": str(suite_path),
"created_at": utc_now(),
}
out = SHFT_WORKSPACE_ROOT / "registry" / "eval_suites" / f"{manifest.get('eval_suite_id', 'unknown')}_validation.json"
write_json(out, report)
return report

Xet Storage Details

Size:
3.38 kB
·
Xet hash:
b8dda4b1a64128c419e9164097ba3d08d9664b5fb2be5b6dce90b4615b760847

Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.