File size: 2,715 Bytes
8c486a8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
"""Validator pipeline orchestrator — runs checks in sequence, fail-fast on mechanical."""

from __future__ import annotations

import logging
import time

from pydantic import BaseModel, Field

from open_range.protocols import CheckResult, ContainerSet, SnapshotSpec, ValidatorCheck

logger = logging.getLogger(__name__)

# Check classes whose failures are advisory (logged, may trigger retry,
# but never block admission on their own).
_ADVISORY_CHECK_CLASSES = {"NPCConsistencyCheck", "RealismReviewCheck"}


class ValidationResult(BaseModel):
    """Aggregate result of all validator checks."""

    passed: bool = False
    checks: list[CheckResult] = Field(default_factory=list)
    total_time_s: float = 0.0


class ValidatorGate:
    """Run a list of :class:`ValidatorCheck` instances in order.

    Mechanical checks (``advisory=False``) fail-fast: the first failure stops
    the pipeline.  Advisory checks (``advisory=True``) are always recorded but
    never prevent an overall pass.
    """

    def __init__(self, checks: list[ValidatorCheck]) -> None:
        self.checks = list(checks)

    async def validate(
        self,
        snapshot: SnapshotSpec,
        containers: ContainerSet,
    ) -> ValidationResult:
        results: list[CheckResult] = []
        for check in self.checks:
            check_name = type(check).__name__
            is_advisory = check_name in _ADVISORY_CHECK_CLASSES
            t0 = time.monotonic()
            try:
                result = await check.check(snapshot, containers)
            except Exception as exc:  # noqa: BLE001
                result = CheckResult(
                    name=check_name,
                    passed=False,
                    advisory=is_advisory,
                    error=f"unhandled: {exc}",
                )
            result.time_s = time.monotonic() - t0
            results.append(result)

            if not result.passed:
                if result.advisory:
                    logger.info(
                        "Advisory check %s failed: %s (non-blocking)",
                        result.name,
                        result.error,
                    )
                else:
                    # Fail-fast on mechanical (non-advisory) failures.
                    logger.warning(
                        "Mechanical check %s failed: %s — stopping pipeline",
                        result.name,
                        result.error,
                    )
                    break

        passed = all(r.passed for r in results if not r.advisory)
        return ValidationResult(
            passed=passed,
            checks=results,
            total_time_s=sum(r.time_s for r in results),
        )