| |
| from __future__ import annotations |
|
|
| import argparse |
| import json |
| from pathlib import Path |
| from typing import Any |
|
|
| ALLOWED_STATUS = { |
| "scheduled", |
| "waiting_checkpoint", |
| "completed", |
| "needs_replan", |
| "in_progress", |
| "failed", |
| } |
|
|
|
|
| def _as_mapping(value: Any) -> dict[str, Any]: |
| if isinstance(value, dict): |
| return {str(key): item for key, item in value.items()} |
| return {} |
|
|
|
|
| def _as_non_negative_int(value: Any) -> int | None: |
| try: |
| parsed = int(value) |
| except (TypeError, ValueError): |
| return None |
| if parsed < 0: |
| return None |
| return parsed |
|
|
|
|
| def _validate_cycle_payload(cycle: dict[str, Any]) -> list[str]: |
| errors: list[str] = [] |
| for key in ( |
| "due_count", |
| "executed_count", |
| "blocked_count", |
| "progressed_step_count", |
| "retry_scheduled_count", |
| "verification_failure_count", |
| "replan_count", |
| ): |
| if key not in cycle: |
| continue |
| if _as_non_negative_int(cycle.get(key)) is None: |
| errors.append(f"invalid_cycle_{key}") |
| return errors |
|
|
|
|
| def _validate_status_payload(status: dict[str, Any]) -> list[str]: |
| errors: list[str] = [] |
| for key in ( |
| "autonomy_task_count", |
| "needs_replan_count", |
| "retry_pending_count", |
| "backlog_step_count", |
| ): |
| if key not in status: |
| continue |
| if _as_non_negative_int(status.get(key)) is None: |
| errors.append(f"invalid_status_{key}") |
|
|
| failure_taxonomy = status.get("failure_taxonomy") |
| if failure_taxonomy is not None: |
| if not isinstance(failure_taxonomy, dict): |
| errors.append("invalid_status_failure_taxonomy") |
| else: |
| for key, value in failure_taxonomy.items(): |
| reason_code = str(key).strip().lower() |
| if not reason_code: |
| errors.append("invalid_status_failure_taxonomy_reason") |
| continue |
| if _as_non_negative_int(value) is None: |
| errors.append("invalid_status_failure_taxonomy_value") |
| return errors |
|
|
|
|
| def _compare_expected( |
| *, |
| label: str, |
| actual: dict[str, Any], |
| expected: dict[str, Any], |
| ) -> list[str]: |
| mismatches: list[str] = [] |
| for key, expected_value in expected.items(): |
| actual_value = actual.get(key) |
| if actual_value != expected_value: |
| mismatches.append( |
| f"{label}.{key}: expected={expected_value!r} actual={actual_value!r}" |
| ) |
| return mismatches |
|
|
|
|
| def _evaluate_case(case: dict[str, Any]) -> dict[str, Any]: |
| case_id = str(case.get("id", "case")) |
| actual_cycle = _as_mapping(case.get("actual_cycle")) |
| actual_status = _as_mapping(case.get("actual_status")) |
| expected_cycle = _as_mapping(case.get("expected_cycle")) |
| expected_status = _as_mapping(case.get("expected_status")) |
|
|
| validation_errors = _validate_cycle_payload(actual_cycle) |
| validation_errors.extend(_validate_status_payload(actual_status)) |
|
|
| mismatches = _compare_expected( |
| label="cycle", |
| actual=actual_cycle, |
| expected=expected_cycle, |
| ) |
| mismatches.extend( |
| _compare_expected( |
| label="status", |
| actual=actual_status, |
| expected=expected_status, |
| ) |
| ) |
|
|
| for status_key in ("min_replan_count", "max_replan_count"): |
| if status_key not in case: |
| continue |
| threshold = _as_non_negative_int(case.get(status_key)) |
| actual_value = _as_non_negative_int(actual_cycle.get("replan_count")) |
| if threshold is None: |
| mismatches.append(f"{status_key}: invalid threshold") |
| continue |
| if actual_value is None: |
| mismatches.append("cycle.replan_count missing/invalid") |
| continue |
| if status_key.startswith("min") and actual_value < threshold: |
| mismatches.append( |
| f"cycle.replan_count below min ({actual_value} < {threshold})" |
| ) |
| if status_key.startswith("max") and actual_value > threshold: |
| mismatches.append( |
| f"cycle.replan_count above max ({actual_value} > {threshold})" |
| ) |
|
|
| for status_key, actual_key in ( |
| ("min_retry_pending_count", "retry_pending_count"), |
| ("min_needs_replan_count", "needs_replan_count"), |
| ("min_backlog_step_count", "backlog_step_count"), |
| ): |
| if status_key not in case: |
| continue |
| threshold = _as_non_negative_int(case.get(status_key)) |
| actual_value = _as_non_negative_int(actual_status.get(actual_key)) |
| if threshold is None: |
| mismatches.append(f"{status_key}: invalid threshold") |
| continue |
| if actual_value is None: |
| mismatches.append(f"status.{actual_key} missing/invalid") |
| continue |
| if actual_value < threshold: |
| mismatches.append( |
| f"status.{actual_key} below min ({actual_value} < {threshold})" |
| ) |
|
|
| if "required_statuses" in case: |
| required_statuses = case.get("required_statuses") |
| status_counts = _as_mapping(actual_status.get("status_counts")) |
| if not isinstance(required_statuses, list): |
| mismatches.append("required_statuses must be a list") |
| else: |
| for item in required_statuses: |
| status_name = str(item).strip().lower() |
| if status_name not in ALLOWED_STATUS: |
| mismatches.append(f"invalid required status: {status_name!r}") |
| continue |
| count = _as_non_negative_int(status_counts.get(status_name)) |
| if count is None or count <= 0: |
| mismatches.append(f"required status missing: {status_name}") |
|
|
| if "min_failure_taxonomy_total" in case: |
| threshold = _as_non_negative_int(case.get("min_failure_taxonomy_total")) |
| failure_taxonomy = _as_mapping(actual_status.get("failure_taxonomy")) |
| total = 0 |
| for value in failure_taxonomy.values(): |
| parsed = _as_non_negative_int(value) |
| if parsed is not None: |
| total += parsed |
| if threshold is None: |
| mismatches.append("min_failure_taxonomy_total: invalid threshold") |
| elif total < threshold: |
| mismatches.append( |
| f"status.failure_taxonomy total below min ({total} < {threshold})" |
| ) |
|
|
| passed = not validation_errors and not mismatches |
| return { |
| "id": case_id, |
| "passed": passed, |
| "validation_errors": validation_errors, |
| "mismatches": mismatches, |
| } |
|
|
|
|
| def _evaluate_results( |
| *, |
| dataset_path: Path, |
| results: list[dict[str, Any]], |
| strict: bool, |
| min_pass_rate: float | None, |
| max_failed: int | None, |
| min_cases: int | None, |
| duplicate_ids: list[str], |
| ) -> dict[str, Any]: |
| passed = sum(1 for row in results if bool(row.get("passed"))) |
| failed = len(results) - passed |
| pass_rate = (passed / len(results)) if results else 0.0 |
| accepted = (failed == 0) if strict else (passed >= failed) |
|
|
| failure_reasons: list[str] = [] |
| if strict and failed > 0: |
| failure_reasons.append("strict_failed_cases") |
| if not strict and passed < failed: |
| failure_reasons.append("non_strict_majority_failed") |
| if min_pass_rate is not None and pass_rate < min_pass_rate: |
| accepted = False |
| failure_reasons.append("pass_rate_below_threshold") |
| if max_failed is not None and failed > max_failed: |
| accepted = False |
| failure_reasons.append("failed_count_above_threshold") |
| if min_cases is not None and len(results) < min_cases: |
| accepted = False |
| failure_reasons.append("insufficient_case_count") |
| if duplicate_ids: |
| accepted = False |
| failure_reasons.append("duplicate_case_ids") |
|
|
| return { |
| "dataset": str(dataset_path), |
| "strict": strict, |
| "thresholds": { |
| "min_pass_rate": min_pass_rate, |
| "max_failed": max_failed, |
| "min_cases": min_cases, |
| }, |
| "case_count": len(results), |
| "passed": passed, |
| "failed": failed, |
| "pass_rate": pass_rate, |
| "accepted": accepted, |
| "failure_reasons": failure_reasons, |
| "duplicate_ids": duplicate_ids, |
| "results": results, |
| } |
|
|
|
|
| def main() -> int: |
| parser = argparse.ArgumentParser( |
| description="Run deterministic planner autonomy-cycle contract checks." |
| ) |
| parser.add_argument("dataset", help="Path to autonomy cycle contract dataset JSON") |
| parser.add_argument("--output", default="") |
| parser.add_argument("--strict", action="store_true") |
| parser.add_argument( |
| "--min-pass-rate", |
| type=float, |
| default=None, |
| help="Optional minimum pass-rate acceptance threshold in [0.0, 1.0].", |
| ) |
| parser.add_argument( |
| "--max-failed", |
| type=int, |
| default=None, |
| help="Optional maximum failed-case acceptance threshold (>= 0).", |
| ) |
| parser.add_argument( |
| "--min-cases", |
| type=int, |
| default=None, |
| help="Optional minimum number of evaluation cases required.", |
| ) |
| parser.add_argument( |
| "--require-unique-ids", |
| action="store_true", |
| help="Fail if case IDs are duplicated.", |
| ) |
| args = parser.parse_args() |
|
|
| dataset_path = Path(args.dataset) |
| if args.min_pass_rate is not None and (args.min_pass_rate < 0.0 or args.min_pass_rate > 1.0): |
| raise SystemExit("--min-pass-rate must be between 0.0 and 1.0.") |
| if args.max_failed is not None and args.max_failed < 0: |
| raise SystemExit("--max-failed must be >= 0.") |
| if args.min_cases is not None and args.min_cases < 0: |
| raise SystemExit("--min-cases must be >= 0.") |
|
|
| payload = json.loads(dataset_path.read_text(encoding="utf-8")) |
| cases = payload.get("cases", []) if isinstance(payload, dict) else [] |
| if not isinstance(cases, list): |
| raise SystemExit("Dataset format error: expected top-level object with 'cases' list.") |
|
|
| case_rows = [case for case in cases if isinstance(case, dict)] |
| results = [_evaluate_case(case) for case in case_rows] |
| case_ids = [str(case.get("id", "")).strip() for case in case_rows] |
| id_counts: dict[str, int] = {} |
| for case_id in case_ids: |
| if not case_id: |
| continue |
| id_counts[case_id] = id_counts.get(case_id, 0) + 1 |
| duplicate_ids = sorted(case_id for case_id, count in id_counts.items() if count > 1) |
| if not args.require_unique_ids: |
| duplicate_ids = [] |
|
|
| summary = _evaluate_results( |
| dataset_path=dataset_path, |
| results=results, |
| strict=bool(args.strict), |
| min_pass_rate=args.min_pass_rate, |
| max_failed=args.max_failed, |
| min_cases=args.min_cases, |
| duplicate_ids=duplicate_ids, |
| ) |
|
|
| text = json.dumps(summary, indent=2) |
| print(text) |
| if args.output: |
| out_path = Path(args.output) |
| out_path.parent.mkdir(parents=True, exist_ok=True) |
| out_path.write_text(text, encoding="utf-8") |
|
|
| return 0 if summary["accepted"] else 1 |
|
|
|
|
| if __name__ == "__main__": |
| raise SystemExit(main()) |
|
|