from __future__ import annotations import hashlib import json import uuid from dataclasses import dataclass from datetime import datetime, timezone from pathlib import Path from typing import Any, Callable, Iterable, List from doctrine.redaction import redact @dataclass class AuditRow: trace_id: str timestamp: str decision: str risk: int summary: str hash: str prev_hash: str event: dict[str, Any] | None = None class AuditLedger: """Append-only audit ledger with hash chaining.""" def __init__(self, *, log_path: str | Path | None = None, redactor: Callable[[Any], Any] | None = None) -> None: self.path = Path(log_path) if log_path else Path.home() / ".blux-ca" / "audit" / "runtime.jsonl" self.path.parent.mkdir(parents=True, exist_ok=True) self.redactor = redactor or redact def _chain_hash(self, payload: dict[str, Any], prev_hash: str) -> str: body = json.dumps({"payload": payload, "prev": prev_hash}, sort_keys=True).encode() return hashlib.sha256(body).hexdigest() def append(self, event: dict[str, Any]) -> AuditRow: existing = self.tail(1) prev_hash = existing[0].hash if existing else "0" * 64 payload = { "trace_id": event.get("trace_id", str(uuid.uuid4())), "timestamp": datetime.now(timezone.utc).isoformat(), "decision": event.get("decision", "unknown"), "risk": int(event.get("risk", 0)), "summary": event.get("summary", ""), "event": self.redactor(event), } digest = self._chain_hash(payload, prev_hash) record = {**payload, "hash": digest, "prev_hash": prev_hash} with self.path.open("a", encoding="utf-8") as handle: handle.write(json.dumps(record) + "\n") return AuditRow(**record) def tail(self, count: int = 5) -> List[AuditRow]: if not self.path.exists(): return [] with self.path.open("r", encoding="utf-8") as handle: lines = handle.readlines()[-count:] return [AuditRow(**json.loads(line)) for line in lines] def iter_rows(self) -> Iterable[AuditRow]: if not self.path.exists(): return [] with self.path.open("r", encoding="utf-8") as handle: for line in handle: yield AuditRow(**json.loads(line)) def write_audit(event: dict[str, Any], log_path: str | Path | None = None) -> Path: ledger = AuditLedger(log_path=log_path) row = ledger.append(event) return ledger.path