Spaces:
Runtime error
Runtime error
File size: 1,538 Bytes
8c486a8 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 | """Check 1: Build + boot — verify all containers are healthy."""
from __future__ import annotations
from open_range.protocols import CheckResult, ContainerSet, SnapshotSpec
class BuildBootCheck:
"""Verify every host declared in the topology is running and healthy."""
async def check(self, snapshot: SnapshotSpec, containers: ContainerSet) -> CheckResult:
topology = snapshot.topology
hosts: list[str] = topology.get("hosts", [])
# hosts may be a list of dicts with "name" keys or plain strings.
host_names: list[str] = []
for h in hosts:
if isinstance(h, dict):
host_names.append(h.get("name", ""))
else:
host_names.append(str(h))
if not host_names:
return CheckResult(
name="build_boot",
passed=False,
error="no hosts defined in topology",
)
unhealthy: list[str] = []
for name in host_names:
try:
ok = await containers.is_healthy(name)
if not ok:
unhealthy.append(name)
except Exception as exc: # noqa: BLE001
unhealthy.append(f"{name} ({exc})")
passed = len(unhealthy) == 0
return CheckResult(
name="build_boot",
passed=passed,
details={"unhealthy": unhealthy, "checked": host_names},
error="" if passed else f"unhealthy containers: {', '.join(unhealthy)}",
)
|