Spaces:
Runtime error
Runtime error
| """Validator pipeline orchestrator — runs checks in sequence, fail-fast on mechanical.""" | |
| from __future__ import annotations | |
| import logging | |
| import time | |
| from pydantic import BaseModel, Field | |
| from open_range.protocols import CheckResult, ContainerSet, SnapshotSpec, ValidatorCheck | |
| logger = logging.getLogger(__name__) | |
| # Check classes whose failures are advisory (logged, may trigger retry, | |
| # but never block admission on their own). | |
| _ADVISORY_CHECK_CLASSES = {"NPCConsistencyCheck", "RealismReviewCheck"} | |
| class ValidationResult(BaseModel): | |
| """Aggregate result of all validator checks.""" | |
| passed: bool = False | |
| checks: list[CheckResult] = Field(default_factory=list) | |
| total_time_s: float = 0.0 | |
| class ValidatorGate: | |
| """Run a list of :class:`ValidatorCheck` instances in order. | |
| Mechanical checks (``advisory=False``) fail-fast: the first failure stops | |
| the pipeline. Advisory checks (``advisory=True``) are always recorded but | |
| never prevent an overall pass. | |
| """ | |
| def __init__(self, checks: list[ValidatorCheck]) -> None: | |
| self.checks = list(checks) | |
| async def validate( | |
| self, | |
| snapshot: SnapshotSpec, | |
| containers: ContainerSet, | |
| ) -> ValidationResult: | |
| results: list[CheckResult] = [] | |
| for check in self.checks: | |
| check_name = type(check).__name__ | |
| is_advisory = check_name in _ADVISORY_CHECK_CLASSES | |
| t0 = time.monotonic() | |
| try: | |
| result = await check.check(snapshot, containers) | |
| except Exception as exc: # noqa: BLE001 | |
| result = CheckResult( | |
| name=check_name, | |
| passed=False, | |
| advisory=is_advisory, | |
| error=f"unhandled: {exc}", | |
| ) | |
| result.time_s = time.monotonic() - t0 | |
| results.append(result) | |
| if not result.passed: | |
| if result.advisory: | |
| logger.info( | |
| "Advisory check %s failed: %s (non-blocking)", | |
| result.name, | |
| result.error, | |
| ) | |
| else: | |
| # Fail-fast on mechanical (non-advisory) failures. | |
| logger.warning( | |
| "Mechanical check %s failed: %s — stopping pipeline", | |
| result.name, | |
| result.error, | |
| ) | |
| break | |
| passed = all(r.passed for r in results if not r.advisory) | |
| return ValidationResult( | |
| passed=passed, | |
| checks=results, | |
| total_time_s=sum(r.time_s for r in results), | |
| ) | |