Spaces:
Runtime error
Runtime error
File size: 3,536 Bytes
8c486a8 7fedc25 8c486a8 7fedc25 8c486a8 595e190 72e9079 595e190 72e9079 8c486a8 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 | """Check 4: Evidence sufficiency — verify evidence artifacts exist in containers."""
from __future__ import annotations
import shlex
from open_range.protocols import CheckResult, ContainerSet, SnapshotSpec
class EvidenceCheck:
"""Verify all ``evidence_spec`` items exist in the running containers."""
async def check(self, snapshot: SnapshotSpec, containers: ContainerSet) -> CheckResult:
items = snapshot.evidence_spec
if not items:
return CheckResult(
name="evidence",
passed=True,
details={"note": "no evidence_spec defined — vacuously true"},
)
missing: list[dict] = []
for item in items:
loc = item.location
pattern = item.pattern
# Determine which container to check. Location format:
# "container:/path/to/file" or just "/path" (defaults to siem).
if ":" in loc:
host, path = loc.split(":", 1)
else:
host, path = "siem", loc
try:
safe_path = shlex.quote(path)
if item.type in ("log_entry", "alert"):
if pattern:
result = await containers.exec_run(
host,
f"grep -c {shlex.quote(pattern)} {safe_path}",
)
output = result.stdout.strip()
if result.exit_code != 0:
missing.append({
"item": item.type,
"location": loc,
"pattern": pattern,
"error": result.combined_output
or f"evidence command failed (exit={result.exit_code})",
})
elif output in ("0", ""):
missing.append({
"item": item.type,
"location": loc,
"pattern": pattern,
})
else:
result = await containers.exec_run(host, f"test -f {safe_path}")
if result.exit_code != 0:
missing.append({
"item": item.type,
"location": loc,
"error": result.combined_output
or f"missing evidence file (exit={result.exit_code})",
})
else:
result = await containers.exec_run(host, f"test -f {safe_path}")
if result.exit_code != 0:
missing.append({
"item": item.type,
"location": loc,
"error": result.combined_output
or f"missing evidence file (exit={result.exit_code})",
})
except Exception as exc: # noqa: BLE001
missing.append({"item": item.type, "location": loc, "error": str(exc)})
passed = len(missing) == 0
return CheckResult(
name="evidence",
passed=passed,
details={"missing": missing, "total": len(items)},
error="" if passed else f"{len(missing)} evidence item(s) not found",
)
|