zenith-backend / core /immutable_audit.py
teoat
deploy: sync from main Sun Jan 11 18:43:53 WIT 2026
4a2ab42
import hashlib
import json
import logging
from datetime import datetime
logger = logging.getLogger(__name__)
class MerkleNode:
def __init__(self, left=None, right=None, data=None):
self.left: MerkleNode | None = left
self.right: MerkleNode | None = right
self.data = data
self.hash = self.calculate_hash()
def calculate_hash(self) -> str:
if self.data:
# Leaf node
return hashlib.sha256(self.data.encode()).hexdigest()
# Internal node
left_hash = self.left.hash if self.left else ""
right_hash = self.right.hash if self.right else ""
return hashlib.sha256((left_hash + right_hash).encode()).hexdigest()
class ImmutableAuditLog:
"""
Implements a Merkle Tree backed audit log for cryptographic integrity.
Ensures that past log entries cannot be tampered with without invalidating the root hash.
"""
def __init__(self):
self._entries: list[str] = []
self._root: MerkleNode | None = None
self._root_hash: str = ""
def add_entry(self, data: dict) -> str:
"""Add an entry and re-calculate the tree root"""
# Canonical JSON representation
entry_str = json.dumps(data, sort_keys=True)
timestamp = datetime.utcnow().isoformat()
signed_entry = f"{timestamp}|{entry_str}"
self._entries.append(signed_entry)
self._rebuild_tree()
return self._root_hash
def _rebuild_tree(self):
"""Rebuild the Merkle tree from current entries"""
if not self._entries:
self._root = None
self._root_hash = ""
return
nodes = [MerkleNode(data=e) for e in self._entries]
while len(nodes) > 1:
temp_nodes = []
for i in range(0, len(nodes), 2):
left = nodes[i]
right = nodes[i + 1] if i + 1 < len(nodes) else None
temp_nodes.append(MerkleNode(left=left, right=right))
nodes = temp_nodes
self._root = nodes[0]
self._root_hash = self._root.hash
logger.debug(f"[Audit] New Root Hash: {self._root_hash}")
def verify_integrity(self, entries: list[str], expected_root: str) -> bool:
"""Verify if a list of entries matches a known root hash"""
# Reconstruct tree mechanism (simplified for this class)
temp_log = ImmutableAuditLog()
temp_log._entries = entries
temp_log._rebuild_tree()
return temp_log._root_hash == expected_root
def get_latest_hash(self) -> str:
return self._root_hash
# Singleton
immutable_audit = ImmutableAuditLog()