|
|
"""Append-only audit log for BLUX-cA decisions.""" |
|
|
|
|
|
from __future__ import annotations |
|
|
|
|
|
import hashlib |
|
|
import json |
|
|
from dataclasses import dataclass, asdict |
|
|
from datetime import datetime, timezone |
|
|
from pathlib import Path |
|
|
from typing import Iterable, Iterator, List, Optional |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class AuditRecord: |
|
|
timestamp: str |
|
|
input_hash: str |
|
|
verdict: str |
|
|
doctrine_refs: List[str] |
|
|
rationale: str |
|
|
prev_hash: Optional[str] = None |
|
|
chain_hash: Optional[str] = None |
|
|
|
|
|
def to_dict(self) -> dict: |
|
|
return asdict(self) |
|
|
|
|
|
@classmethod |
|
|
def from_dict(cls, payload: dict) -> "AuditRecord": |
|
|
return cls( |
|
|
timestamp=payload["timestamp"], |
|
|
input_hash=payload["input_hash"], |
|
|
verdict=payload["verdict"], |
|
|
doctrine_refs=list(payload.get("doctrine_refs", [])), |
|
|
rationale=payload["rationale"], |
|
|
prev_hash=payload.get("prev_hash"), |
|
|
chain_hash=payload.get("chain_hash"), |
|
|
) |
|
|
|
|
|
|
|
|
class AuditLog: |
|
|
"""Append-only JSONL audit log with hash chaining.""" |
|
|
|
|
|
def __init__(self, path: Path | None = None, *, hash_alg: str = "sha256") -> None: |
|
|
self.path = path or Path.home() / ".config" / "blux-ca" / "audit" / "decisions.jsonl" |
|
|
self.path.parent.mkdir(parents=True, exist_ok=True) |
|
|
self._hash_alg = hash_alg |
|
|
self._last_hash = self._load_tail_hash() |
|
|
|
|
|
def _hash(self, payload: dict) -> str: |
|
|
canonical = json.dumps(payload, sort_keys=True, ensure_ascii=False).encode("utf-8") |
|
|
return hashlib.new(self._hash_alg, canonical).hexdigest() |
|
|
|
|
|
def _load_tail_hash(self) -> Optional[str]: |
|
|
if not self.path.exists(): |
|
|
return None |
|
|
last_line = "" |
|
|
with self.path.open("r", encoding="utf-8") as handle: |
|
|
for line in handle: |
|
|
if line.strip(): |
|
|
last_line = line |
|
|
if not last_line: |
|
|
return None |
|
|
payload = json.loads(last_line) |
|
|
return payload.get("chain_hash") |
|
|
|
|
|
def append(self, record: AuditRecord) -> AuditRecord: |
|
|
payload = record.to_dict() |
|
|
payload["prev_hash"] = self._last_hash |
|
|
payload["chain_hash"] = self._hash({key: payload[key] for key in payload if key != "chain_hash"}) |
|
|
self._last_hash = payload["chain_hash"] |
|
|
with self.path.open("a", encoding="utf-8") as handle: |
|
|
handle.write(json.dumps(payload, ensure_ascii=False) + "\n") |
|
|
return AuditRecord.from_dict(payload) |
|
|
|
|
|
def create_record( |
|
|
self, |
|
|
*, |
|
|
input_hash: str, |
|
|
verdict: str, |
|
|
doctrine_refs: Iterable[str], |
|
|
rationale: str, |
|
|
) -> AuditRecord: |
|
|
return AuditRecord( |
|
|
timestamp=datetime.now(timezone.utc).isoformat(), |
|
|
input_hash=input_hash, |
|
|
verdict=verdict, |
|
|
doctrine_refs=list(doctrine_refs), |
|
|
rationale=rationale, |
|
|
prev_hash=self._last_hash, |
|
|
) |
|
|
|
|
|
def playback(self, *, limit: int | None = None) -> List[AuditRecord]: |
|
|
records = [] |
|
|
for index, record in enumerate(self._iter_records()): |
|
|
records.append(record) |
|
|
if limit is not None and index + 1 >= limit: |
|
|
break |
|
|
return records |
|
|
|
|
|
def verify_chain(self) -> bool: |
|
|
previous: Optional[str] = None |
|
|
for record in self._iter_records(): |
|
|
expected = self._hash( |
|
|
{ |
|
|
key: getattr(record, key) |
|
|
for key in ( |
|
|
"timestamp", |
|
|
"input_hash", |
|
|
"verdict", |
|
|
"doctrine_refs", |
|
|
"rationale", |
|
|
"prev_hash", |
|
|
) |
|
|
} |
|
|
) |
|
|
if record.prev_hash != previous or record.chain_hash != expected: |
|
|
return False |
|
|
previous = record.chain_hash |
|
|
return True |
|
|
|
|
|
def _iter_records(self) -> Iterator[AuditRecord]: |
|
|
if not self.path.exists(): |
|
|
return iter(()) |
|
|
|
|
|
def generator() -> Iterator[AuditRecord]: |
|
|
with self.path.open("r", encoding="utf-8") as handle: |
|
|
for line in handle: |
|
|
if not line.strip(): |
|
|
continue |
|
|
yield AuditRecord.from_dict(json.loads(line)) |
|
|
|
|
|
return generator() |
|
|
|
|
|
|
|
|
__all__ = ["AuditLog", "AuditRecord"] |
|
|
|