File size: 4,428 Bytes
0fcfe1c
 
 
 
a3bcd92
0fcfe1c
 
 
 
a3bcd92
0fcfe1c
 
 
 
 
 
 
 
 
a3bcd92
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0fcfe1c
 
 
a3bcd92
0fcfe1c
a3bcd92
0fcfe1c
 
a3bcd92
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0fcfe1c
a3bcd92
 
 
 
 
0fcfe1c
a3bcd92
 
0fcfe1c
 
a3bcd92
 
 
 
 
 
0fcfe1c
 
 
 
 
 
 
a3bcd92
0fcfe1c
 
a3bcd92
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0fcfe1c
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
"""Append-only audit log for BLUX-cA decisions."""

from __future__ import annotations

import hashlib
import json
from dataclasses import dataclass, asdict
from datetime import datetime, timezone
from pathlib import Path
from typing import Iterable, Iterator, List, Optional


@dataclass
class AuditRecord:
    timestamp: str
    input_hash: str
    verdict: str
    doctrine_refs: List[str]
    rationale: str
    prev_hash: Optional[str] = None
    chain_hash: Optional[str] = None

    def to_dict(self) -> dict:
        return asdict(self)

    @classmethod
    def from_dict(cls, payload: dict) -> "AuditRecord":
        return cls(
            timestamp=payload["timestamp"],
            input_hash=payload["input_hash"],
            verdict=payload["verdict"],
            doctrine_refs=list(payload.get("doctrine_refs", [])),
            rationale=payload["rationale"],
            prev_hash=payload.get("prev_hash"),
            chain_hash=payload.get("chain_hash"),
        )


class AuditLog:
    """Append-only JSONL audit log with hash chaining."""

    def __init__(self, path: Path | None = None, *, hash_alg: str = "sha256") -> None:
        self.path = path or Path.home() / ".config" / "blux-ca" / "audit" / "decisions.jsonl"
        self.path.parent.mkdir(parents=True, exist_ok=True)
        self._hash_alg = hash_alg
        self._last_hash = self._load_tail_hash()

    def _hash(self, payload: dict) -> str:
        canonical = json.dumps(payload, sort_keys=True, ensure_ascii=False).encode("utf-8")
        return hashlib.new(self._hash_alg, canonical).hexdigest()

    def _load_tail_hash(self) -> Optional[str]:
        if not self.path.exists():
            return None
        last_line = ""
        with self.path.open("r", encoding="utf-8") as handle:
            for line in handle:
                if line.strip():
                    last_line = line
        if not last_line:
            return None
        payload = json.loads(last_line)
        return payload.get("chain_hash")

    def append(self, record: AuditRecord) -> AuditRecord:
        payload = record.to_dict()
        payload["prev_hash"] = self._last_hash
        payload["chain_hash"] = self._hash({key: payload[key] for key in payload if key != "chain_hash"})
        self._last_hash = payload["chain_hash"]
        with self.path.open("a", encoding="utf-8") as handle:
            handle.write(json.dumps(payload, ensure_ascii=False) + "\n")
        return AuditRecord.from_dict(payload)

    def create_record(
        self,
        *,
        input_hash: str,
        verdict: str,
        doctrine_refs: Iterable[str],
        rationale: str,
    ) -> AuditRecord:
        return AuditRecord(
            timestamp=datetime.now(timezone.utc).isoformat(),
            input_hash=input_hash,
            verdict=verdict,
            doctrine_refs=list(doctrine_refs),
            rationale=rationale,
            prev_hash=self._last_hash,
        )

    def playback(self, *, limit: int | None = None) -> List[AuditRecord]:
        records = []
        for index, record in enumerate(self._iter_records()):
            records.append(record)
            if limit is not None and index + 1 >= limit:
                break
        return records

    def verify_chain(self) -> bool:
        previous: Optional[str] = None
        for record in self._iter_records():
            expected = self._hash(
                {
                    key: getattr(record, key)
                    for key in (
                        "timestamp",
                        "input_hash",
                        "verdict",
                        "doctrine_refs",
                        "rationale",
                        "prev_hash",
                    )
                }
            )
            if record.prev_hash != previous or record.chain_hash != expected:
                return False
            previous = record.chain_hash
        return True

    def _iter_records(self) -> Iterator[AuditRecord]:
        if not self.path.exists():
            return iter(())

        def generator() -> Iterator[AuditRecord]:
            with self.path.open("r", encoding="utf-8") as handle:
                for line in handle:
                    if not line.strip():
                        continue
                    yield AuditRecord.from_dict(json.loads(line))

        return generator()


__all__ = ["AuditLog", "AuditRecord"]