Spaces:
Runtime error
Runtime error
| """Check 4: Evidence sufficiency — verify evidence artifacts exist in containers.""" | |
| from __future__ import annotations | |
| import shlex | |
| from open_range.protocols import CheckResult, ContainerSet, SnapshotSpec | |
| class EvidenceCheck: | |
| """Verify all ``evidence_spec`` items exist in the running containers.""" | |
| async def check(self, snapshot: SnapshotSpec, containers: ContainerSet) -> CheckResult: | |
| items = snapshot.evidence_spec | |
| if not items: | |
| return CheckResult( | |
| name="evidence", | |
| passed=True, | |
| details={"note": "no evidence_spec defined — vacuously true"}, | |
| ) | |
| missing: list[dict] = [] | |
| for item in items: | |
| loc = item.location | |
| pattern = item.pattern | |
| # Determine which container to check. Location format: | |
| # "container:/path/to/file" or just "/path" (defaults to siem). | |
| if ":" in loc: | |
| host, path = loc.split(":", 1) | |
| else: | |
| host, path = "siem", loc | |
| try: | |
| safe_path = shlex.quote(path) | |
| if item.type in ("log_entry", "alert"): | |
| if pattern: | |
| result = await containers.exec_run( | |
| host, | |
| f"grep -c {shlex.quote(pattern)} {safe_path}", | |
| ) | |
| output = result.stdout.strip() | |
| if result.exit_code != 0: | |
| missing.append({ | |
| "item": item.type, | |
| "location": loc, | |
| "pattern": pattern, | |
| "error": result.combined_output | |
| or f"evidence command failed (exit={result.exit_code})", | |
| }) | |
| elif output in ("0", ""): | |
| missing.append({ | |
| "item": item.type, | |
| "location": loc, | |
| "pattern": pattern, | |
| }) | |
| else: | |
| result = await containers.exec_run(host, f"test -f {safe_path}") | |
| if result.exit_code != 0: | |
| missing.append({ | |
| "item": item.type, | |
| "location": loc, | |
| "error": result.combined_output | |
| or f"missing evidence file (exit={result.exit_code})", | |
| }) | |
| else: | |
| result = await containers.exec_run(host, f"test -f {safe_path}") | |
| if result.exit_code != 0: | |
| missing.append({ | |
| "item": item.type, | |
| "location": loc, | |
| "error": result.combined_output | |
| or f"missing evidence file (exit={result.exit_code})", | |
| }) | |
| except Exception as exc: # noqa: BLE001 | |
| missing.append({"item": item.type, "location": loc, "error": str(exc)}) | |
| passed = len(missing) == 0 | |
| return CheckResult( | |
| name="evidence", | |
| passed=passed, | |
| details={"missing": missing, "total": len(items)}, | |
| error="" if passed else f"{len(missing)} evidence item(s) not found", | |
| ) | |