blux-ca / ca /runtime /audit.py
~JADIS
Document repo standards and add quality tooling (#7)
f526878
from __future__ import annotations
import hashlib
import json
import uuid
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Callable, Iterable, List
from doctrine.redaction import redact
@dataclass
class AuditRow:
trace_id: str
timestamp: str
decision: str
risk: int
summary: str
hash: str
prev_hash: str
event: dict[str, Any] | None = None
class AuditLedger:
"""Append-only audit ledger with hash chaining."""
def __init__(self, *, log_path: str | Path | None = None, redactor: Callable[[Any], Any] | None = None) -> None:
self.path = Path(log_path) if log_path else Path.home() / ".blux-ca" / "audit" / "runtime.jsonl"
self.path.parent.mkdir(parents=True, exist_ok=True)
self.redactor = redactor or redact
def _chain_hash(self, payload: dict[str, Any], prev_hash: str) -> str:
body = json.dumps({"payload": payload, "prev": prev_hash}, sort_keys=True).encode()
return hashlib.sha256(body).hexdigest()
def append(self, event: dict[str, Any]) -> AuditRow:
existing = self.tail(1)
prev_hash = existing[0].hash if existing else "0" * 64
payload = {
"trace_id": event.get("trace_id", str(uuid.uuid4())),
"timestamp": datetime.now(timezone.utc).isoformat(),
"decision": event.get("decision", "unknown"),
"risk": int(event.get("risk", 0)),
"summary": event.get("summary", ""),
"event": self.redactor(event),
}
digest = self._chain_hash(payload, prev_hash)
record = {**payload, "hash": digest, "prev_hash": prev_hash}
with self.path.open("a", encoding="utf-8") as handle:
handle.write(json.dumps(record) + "\n")
return AuditRow(**record)
def tail(self, count: int = 5) -> List[AuditRow]:
if not self.path.exists():
return []
with self.path.open("r", encoding="utf-8") as handle:
lines = handle.readlines()[-count:]
return [AuditRow(**json.loads(line)) for line in lines]
def iter_rows(self) -> Iterable[AuditRow]:
if not self.path.exists():
return []
with self.path.open("r", encoding="utf-8") as handle:
for line in handle:
yield AuditRow(**json.loads(line))
def write_audit(event: dict[str, Any], log_path: str | Path | None = None) -> Path:
ledger = AuditLedger(log_path=log_path)
row = ledger.append(event)
return ledger.path