File size: 2,570 Bytes
6364e69 6d88ccb 6364e69 6d88ccb 6364e69 f526878 6364e69 6d88ccb f526878 6d88ccb f526878 6d88ccb f526878 6d88ccb 6364e69 6d88ccb |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 |
from __future__ import annotations
import hashlib
import json
import uuid
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Callable, Iterable, List
from doctrine.redaction import redact
@dataclass
class AuditRow:
trace_id: str
timestamp: str
decision: str
risk: int
summary: str
hash: str
prev_hash: str
event: dict[str, Any] | None = None
class AuditLedger:
"""Append-only audit ledger with hash chaining."""
def __init__(self, *, log_path: str | Path | None = None, redactor: Callable[[Any], Any] | None = None) -> None:
self.path = Path(log_path) if log_path else Path.home() / ".blux-ca" / "audit" / "runtime.jsonl"
self.path.parent.mkdir(parents=True, exist_ok=True)
self.redactor = redactor or redact
def _chain_hash(self, payload: dict[str, Any], prev_hash: str) -> str:
body = json.dumps({"payload": payload, "prev": prev_hash}, sort_keys=True).encode()
return hashlib.sha256(body).hexdigest()
def append(self, event: dict[str, Any]) -> AuditRow:
existing = self.tail(1)
prev_hash = existing[0].hash if existing else "0" * 64
payload = {
"trace_id": event.get("trace_id", str(uuid.uuid4())),
"timestamp": datetime.now(timezone.utc).isoformat(),
"decision": event.get("decision", "unknown"),
"risk": int(event.get("risk", 0)),
"summary": event.get("summary", ""),
"event": self.redactor(event),
}
digest = self._chain_hash(payload, prev_hash)
record = {**payload, "hash": digest, "prev_hash": prev_hash}
with self.path.open("a", encoding="utf-8") as handle:
handle.write(json.dumps(record) + "\n")
return AuditRow(**record)
def tail(self, count: int = 5) -> List[AuditRow]:
if not self.path.exists():
return []
with self.path.open("r", encoding="utf-8") as handle:
lines = handle.readlines()[-count:]
return [AuditRow(**json.loads(line)) for line in lines]
def iter_rows(self) -> Iterable[AuditRow]:
if not self.path.exists():
return []
with self.path.open("r", encoding="utf-8") as handle:
for line in handle:
yield AuditRow(**json.loads(line))
def write_audit(event: dict[str, Any], log_path: str | Path | None = None) -> Path:
ledger = AuditLedger(log_path=log_path)
row = ledger.append(event)
return ledger.path
|