atlasops / agents /audit.py
Harikishanth R
fix: skip-kubectl + scroll + health — HF Space ready
7e9a520
"""Append-only audit trail with hash-chain integrity checks."""
from __future__ import annotations
import hashlib
import hmac
import json
import os
import time
from pathlib import Path
from typing import Any
class AuditLog:
def __init__(self, secret_key: str, log_path: Path):
self.secret_key = secret_key.encode("utf-8")
self.log_path = log_path
self.log_path.parent.mkdir(parents=True, exist_ok=True)
self._last_hash = self._load_last_hash()
def _load_last_hash(self) -> str:
if not self.log_path.exists():
return ""
lines = self.log_path.read_text(encoding="utf-8").strip().splitlines()
if not lines:
return ""
try:
last = json.loads(lines[-1])
return str(last.get("entry_hash", ""))
except json.JSONDecodeError:
return ""
@staticmethod
def _sha256_text(value: str) -> str:
return hashlib.sha256(value.encode("utf-8")).hexdigest()
def record(
self,
incident_id: str,
agent_role: str,
action_type: str,
tool_name: str | None = None,
tool_args: dict[str, Any] | None = None,
result_summary: str = "",
approved_by: str = "",
policy_check: str = "allowed",
) -> dict[str, Any]:
tool_args = tool_args or {}
entry = {
"ts": round(time.time(), 3),
"incident_id": incident_id,
"agent_role": agent_role,
"action_type": action_type,
"tool_name": tool_name or "",
"tool_args_hash": self._sha256_text(json.dumps(tool_args, sort_keys=True)),
"result_summary": result_summary[:300],
"approved_by": approved_by,
"policy_check": policy_check,
"prev_hash": self._last_hash,
}
canonical = json.dumps(entry, sort_keys=True)
signature = hmac.new(self.secret_key, canonical.encode("utf-8"), hashlib.sha256).hexdigest()
entry_hash = self._sha256_text(canonical + signature)
entry["signature"] = signature
entry["entry_hash"] = entry_hash
with self.log_path.open("a", encoding="utf-8") as f:
f.write(json.dumps(entry) + "\n")
self._last_hash = entry_hash
return entry
def tail(self, limit: int = 100, offset: int = 0) -> list[dict[str, Any]]:
if not self.log_path.exists():
return []
lines = self.log_path.read_text(encoding="utf-8").strip().splitlines()
if not lines:
return []
entries = [json.loads(line) for line in lines]
if offset:
entries = entries[offset:]
return entries[-limit:]
def verify_integrity(self) -> dict[str, Any]:
if not self.log_path.exists():
return {"ok": True, "entries": 0}
lines = self.log_path.read_text(encoding="utf-8").strip().splitlines()
prev_hash = ""
for idx, line in enumerate(lines):
try:
entry = json.loads(line)
except json.JSONDecodeError:
return {"ok": False, "error": f"invalid_json_at_line_{idx + 1}"}
expected_prev = entry.get("prev_hash", "")
if expected_prev != prev_hash:
return {"ok": False, "error": f"hash_chain_mismatch_at_line_{idx + 1}"}
check = dict(entry)
signature = str(check.pop("signature", ""))
entry_hash = str(check.pop("entry_hash", ""))
canonical = json.dumps(check, sort_keys=True)
expected_sig = hmac.new(self.secret_key, canonical.encode("utf-8"), hashlib.sha256).hexdigest()
if expected_sig != signature:
return {"ok": False, "error": f"signature_mismatch_at_line_{idx + 1}"}
expected_hash = self._sha256_text(canonical + signature)
if expected_hash != entry_hash:
return {"ok": False, "error": f"entry_hash_mismatch_at_line_{idx + 1}"}
prev_hash = entry_hash
return {"ok": True, "entries": len(lines)}
_AUDIT_SECRET = os.getenv("ATLASOPS_AUDIT_SECRET", "")
if not _AUDIT_SECRET:
import warnings
warnings.warn(
"ATLASOPS_AUDIT_SECRET is not set — using insecure dev fallback. "
"Set this env var in production to guarantee audit integrity.",
stacklevel=1,
)
_AUDIT_SECRET = "atlasops-dev-audit-secret"
_AUDIT_PATH = Path(os.getenv("ATLASOPS_AUDIT_LOG", "data/audit_log.jsonl"))
audit_log = AuditLog(secret_key=_AUDIT_SECRET, log_path=_AUDIT_PATH)