| """Append-only audit trail with hash-chain integrity checks.""" |
|
|
| from __future__ import annotations |
|
|
| import hashlib |
| import hmac |
| import json |
| import os |
| import time |
| from pathlib import Path |
| from typing import Any |
|
|
|
|
| class AuditLog: |
| def __init__(self, secret_key: str, log_path: Path): |
| self.secret_key = secret_key.encode("utf-8") |
| self.log_path = log_path |
| self.log_path.parent.mkdir(parents=True, exist_ok=True) |
| self._last_hash = self._load_last_hash() |
|
|
| def _load_last_hash(self) -> str: |
| if not self.log_path.exists(): |
| return "" |
| lines = self.log_path.read_text(encoding="utf-8").strip().splitlines() |
| if not lines: |
| return "" |
| try: |
| last = json.loads(lines[-1]) |
| return str(last.get("entry_hash", "")) |
| except json.JSONDecodeError: |
| return "" |
|
|
| @staticmethod |
| def _sha256_text(value: str) -> str: |
| return hashlib.sha256(value.encode("utf-8")).hexdigest() |
|
|
| def record( |
| self, |
| incident_id: str, |
| agent_role: str, |
| action_type: str, |
| tool_name: str | None = None, |
| tool_args: dict[str, Any] | None = None, |
| result_summary: str = "", |
| approved_by: str = "", |
| policy_check: str = "allowed", |
| ) -> dict[str, Any]: |
| tool_args = tool_args or {} |
| entry = { |
| "ts": round(time.time(), 3), |
| "incident_id": incident_id, |
| "agent_role": agent_role, |
| "action_type": action_type, |
| "tool_name": tool_name or "", |
| "tool_args_hash": self._sha256_text(json.dumps(tool_args, sort_keys=True)), |
| "result_summary": result_summary[:300], |
| "approved_by": approved_by, |
| "policy_check": policy_check, |
| "prev_hash": self._last_hash, |
| } |
| canonical = json.dumps(entry, sort_keys=True) |
| signature = hmac.new(self.secret_key, canonical.encode("utf-8"), hashlib.sha256).hexdigest() |
| entry_hash = self._sha256_text(canonical + signature) |
| entry["signature"] = signature |
| entry["entry_hash"] = entry_hash |
| with self.log_path.open("a", encoding="utf-8") as f: |
| f.write(json.dumps(entry) + "\n") |
| self._last_hash = entry_hash |
| return entry |
|
|
| def tail(self, limit: int = 100, offset: int = 0) -> list[dict[str, Any]]: |
| if not self.log_path.exists(): |
| return [] |
| lines = self.log_path.read_text(encoding="utf-8").strip().splitlines() |
| if not lines: |
| return [] |
| entries = [json.loads(line) for line in lines] |
| if offset: |
| entries = entries[offset:] |
| return entries[-limit:] |
|
|
| def verify_integrity(self) -> dict[str, Any]: |
| if not self.log_path.exists(): |
| return {"ok": True, "entries": 0} |
| lines = self.log_path.read_text(encoding="utf-8").strip().splitlines() |
| prev_hash = "" |
| for idx, line in enumerate(lines): |
| try: |
| entry = json.loads(line) |
| except json.JSONDecodeError: |
| return {"ok": False, "error": f"invalid_json_at_line_{idx + 1}"} |
| expected_prev = entry.get("prev_hash", "") |
| if expected_prev != prev_hash: |
| return {"ok": False, "error": f"hash_chain_mismatch_at_line_{idx + 1}"} |
| check = dict(entry) |
| signature = str(check.pop("signature", "")) |
| entry_hash = str(check.pop("entry_hash", "")) |
| canonical = json.dumps(check, sort_keys=True) |
| expected_sig = hmac.new(self.secret_key, canonical.encode("utf-8"), hashlib.sha256).hexdigest() |
| if expected_sig != signature: |
| return {"ok": False, "error": f"signature_mismatch_at_line_{idx + 1}"} |
| expected_hash = self._sha256_text(canonical + signature) |
| if expected_hash != entry_hash: |
| return {"ok": False, "error": f"entry_hash_mismatch_at_line_{idx + 1}"} |
| prev_hash = entry_hash |
| return {"ok": True, "entries": len(lines)} |
|
|
|
|
| _AUDIT_SECRET = os.getenv("ATLASOPS_AUDIT_SECRET", "") |
| if not _AUDIT_SECRET: |
| import warnings |
| warnings.warn( |
| "ATLASOPS_AUDIT_SECRET is not set — using insecure dev fallback. " |
| "Set this env var in production to guarantee audit integrity.", |
| stacklevel=1, |
| ) |
| _AUDIT_SECRET = "atlasops-dev-audit-secret" |
| _AUDIT_PATH = Path(os.getenv("ATLASOPS_AUDIT_LOG", "data/audit_log.jsonl")) |
| audit_log = AuditLog(secret_key=_AUDIT_SECRET, log_path=_AUDIT_PATH) |
|
|