File size: 2,570 Bytes
6364e69
 
6d88ccb
6364e69
 
6d88ccb
 
6364e69
f526878
6364e69
 
 
 
6d88ccb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f526878
6d88ccb
 
f526878
6d88ccb
 
 
 
 
 
 
 
 
 
 
 
 
 
f526878
6d88ccb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6364e69
6d88ccb
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
from __future__ import annotations

import hashlib
import json
import uuid
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Callable, Iterable, List

from doctrine.redaction import redact


@dataclass
class AuditRow:
    trace_id: str
    timestamp: str
    decision: str
    risk: int
    summary: str
    hash: str
    prev_hash: str
    event: dict[str, Any] | None = None


class AuditLedger:
    """Append-only audit ledger with hash chaining."""

    def __init__(self, *, log_path: str | Path | None = None, redactor: Callable[[Any], Any] | None = None) -> None:
        self.path = Path(log_path) if log_path else Path.home() / ".blux-ca" / "audit" / "runtime.jsonl"
        self.path.parent.mkdir(parents=True, exist_ok=True)
        self.redactor = redactor or redact

    def _chain_hash(self, payload: dict[str, Any], prev_hash: str) -> str:
        body = json.dumps({"payload": payload, "prev": prev_hash}, sort_keys=True).encode()
        return hashlib.sha256(body).hexdigest()

    def append(self, event: dict[str, Any]) -> AuditRow:
        existing = self.tail(1)
        prev_hash = existing[0].hash if existing else "0" * 64
        payload = {
            "trace_id": event.get("trace_id", str(uuid.uuid4())),
            "timestamp": datetime.now(timezone.utc).isoformat(),
            "decision": event.get("decision", "unknown"),
            "risk": int(event.get("risk", 0)),
            "summary": event.get("summary", ""),
            "event": self.redactor(event),
        }
        digest = self._chain_hash(payload, prev_hash)
        record = {**payload, "hash": digest, "prev_hash": prev_hash}
        with self.path.open("a", encoding="utf-8") as handle:
            handle.write(json.dumps(record) + "\n")
        return AuditRow(**record)

    def tail(self, count: int = 5) -> List[AuditRow]:
        if not self.path.exists():
            return []
        with self.path.open("r", encoding="utf-8") as handle:
            lines = handle.readlines()[-count:]
        return [AuditRow(**json.loads(line)) for line in lines]

    def iter_rows(self) -> Iterable[AuditRow]:
        if not self.path.exists():
            return []
        with self.path.open("r", encoding="utf-8") as handle:
            for line in handle:
                yield AuditRow(**json.loads(line))


def write_audit(event: dict[str, Any], log_path: str | Path | None = None) -> Path:
    ledger = AuditLedger(log_path=log_path)
    row = ledger.append(event)
    return ledger.path