""" Ledger Store Module ==================== Append-only ledger storage with merkle root computation. Provides immutable audit trail for agent actions. """ import json import os from datetime import datetime, timezone from pathlib import Path from typing import Any, Optional from dataclasses import dataclass, field, asdict from .merkle import compute_merkle_root, hash_leaf @dataclass class LedgerEntry: """Represents a single entry in the ledger.""" id: str timestamp: str action_type: str agent: str input_hash: str output_hash: str data: dict[str, Any] previous_hash: str entry_hash: str = "" def __post_init__(self): if not self.entry_hash: self.entry_hash = self._compute_hash() def _compute_hash(self) -> str: """Compute hash of this entry.""" content = f"{self.id}:{self.timestamp}:{self.action_type}:{self.agent}:{self.previous_hash}" return hash_leaf(content) def to_dict(self) -> dict[str, Any]: return asdict(self) @dataclass class LedgerBlock: """Represents a block of ledger entries.""" block_id: int timestamp: str entries: list[LedgerEntry] previous_block_hash: str merkle_root: str = "" block_hash: str = "" signature: str = "" # Will be filled by ECDSA signing def __post_init__(self): if not self.merkle_root: self.merkle_root = self._compute_merkle_root() if not self.block_hash: self.block_hash = self._compute_hash() def _compute_merkle_root(self) -> str: """Compute merkle root from entry hashes.""" if not self.entries: return hash_leaf("empty_block") entry_hashes = [entry.entry_hash for entry in self.entries] return compute_merkle_root(entry_hashes) def _compute_hash(self) -> str: """Compute hash of this block.""" content = f"{self.block_id}:{self.timestamp}:{self.merkle_root}:{self.previous_block_hash}" return hash_leaf(content) def to_dict(self) -> dict[str, Any]: return { "block_id": self.block_id, "timestamp": self.timestamp, "entries": [e.to_dict() for e in self.entries], "previous_block_hash": self.previous_block_hash, "merkle_root": self.merkle_root, "block_hash": self.block_hash, "signature": self.signature } class LedgerStore: """ Append-only ledger store. Provides: - Immutable storage of agent actions - Block-based organization with merkle roots - JSON persistence - Verification of chain integrity """ GENESIS_HASH = "0" * 64 # Genesis block previous hash def __init__(self, storage_path: Optional[str] = None): """ Initialize the ledger store. Args: storage_path: Path to store ledger JSON file. If None, uses in-memory only. """ self.storage_path = Path(storage_path) if storage_path else None self.blocks: list[LedgerBlock] = [] self.pending_entries: list[LedgerEntry] = [] self._entry_counter = 0 # Load existing ledger if available if self.storage_path and self.storage_path.exists(): self._load() def add_entry( self, action_type: str, agent: str, input_data: dict[str, Any], output_data: dict[str, Any] ) -> LedgerEntry: """ Add a new entry to the pending entries. Args: action_type: Type of action (e.g., "search", "summarize") agent: Name of the agent performing the action input_data: Input data for the action output_data: Output data from the action Returns: The created LedgerEntry """ self._entry_counter += 1 # Get previous hash (from last pending entry or last block) if self.pending_entries: previous_hash = self.pending_entries[-1].entry_hash elif self.blocks: previous_hash = self.blocks[-1].block_hash else: previous_hash = self.GENESIS_HASH entry = LedgerEntry( id=f"entry_{self._entry_counter}", timestamp=datetime.now(timezone.utc).isoformat(), action_type=action_type, agent=agent, input_hash=hash_leaf(json.dumps(input_data, sort_keys=True)), output_hash=hash_leaf(json.dumps(output_data, sort_keys=True)), data={"input": input_data, "output": output_data}, previous_hash=previous_hash ) self.pending_entries.append(entry) return entry def create_block(self, signer=None) -> LedgerBlock: """ Create a new block from pending entries. Args: signer: Optional signer object with sign() method for ECDSA signing Returns: The created LedgerBlock """ if not self.pending_entries: raise ValueError("No pending entries to create block") # Get previous block hash if self.blocks: previous_block_hash = self.blocks[-1].block_hash else: previous_block_hash = self.GENESIS_HASH block = LedgerBlock( block_id=len(self.blocks) + 1, timestamp=datetime.now(timezone.utc).isoformat(), entries=self.pending_entries.copy(), previous_block_hash=previous_block_hash ) # Sign the block if signer is provided if signer: block.signature = signer.sign(block.block_hash) self.blocks.append(block) self.pending_entries = [] # Persist to storage if self.storage_path: self._save() return block def get_block(self, block_id: int) -> Optional[LedgerBlock]: """Get a block by ID.""" for block in self.blocks: if block.block_id == block_id: return block return None def get_latest_block(self) -> Optional[LedgerBlock]: """Get the most recent block.""" return self.blocks[-1] if self.blocks else None def verify_chain(self) -> tuple[bool, str]: """ Verify the integrity of the entire blockchain. Returns: Tuple of (is_valid, message) """ if not self.blocks: return True, "Empty chain is valid" # Check genesis block if self.blocks[0].previous_block_hash != self.GENESIS_HASH: return False, "Genesis block has invalid previous hash" # Check each block's link for i in range(1, len(self.blocks)): current = self.blocks[i] previous = self.blocks[i - 1] if current.previous_block_hash != previous.block_hash: return False, f"Block {current.block_id} has invalid previous hash" # Verify merkle root computed_root = current._compute_merkle_root() if computed_root != current.merkle_root: return False, f"Block {current.block_id} has invalid merkle root" return True, "Chain is valid" def get_full_ledger(self) -> dict[str, Any]: """Get the complete ledger as a dictionary.""" return { "blocks": [block.to_dict() for block in self.blocks], "pending_entries": [entry.to_dict() for entry in self.pending_entries], "total_blocks": len(self.blocks), "total_entries": sum(len(b.entries) for b in self.blocks) + len(self.pending_entries) } def _save(self): """Save ledger to JSON file.""" if not self.storage_path: return # Ensure directory exists self.storage_path.parent.mkdir(parents=True, exist_ok=True) data = self.get_full_ledger() with open(self.storage_path, 'w') as f: json.dump(data, f, indent=2) def _load(self): """Load ledger from JSON file.""" if not self.storage_path or not self.storage_path.exists(): return with open(self.storage_path, 'r') as f: data = json.load(f) # Reconstruct blocks self.blocks = [] for block_data in data.get("blocks", []): entries = [ LedgerEntry(**entry_data) for entry_data in block_data.get("entries", []) ] block = LedgerBlock( block_id=block_data["block_id"], timestamp=block_data["timestamp"], entries=entries, previous_block_hash=block_data["previous_block_hash"], merkle_root=block_data["merkle_root"], block_hash=block_data["block_hash"], signature=block_data.get("signature", "") ) self.blocks.append(block) # Update entry counter total_entries = sum(len(b.entries) for b in self.blocks) self._entry_counter = total_entries