Spaces:
Sleeping
Sleeping
| """ | |
| Ledger Store Module | |
| ==================== | |
| Append-only ledger storage with merkle root computation. | |
| Provides immutable audit trail for agent actions. | |
| """ | |
| import json | |
| import os | |
| from datetime import datetime, timezone | |
| from pathlib import Path | |
| from typing import Any, Optional | |
| from dataclasses import dataclass, field, asdict | |
| from .merkle import compute_merkle_root, hash_leaf | |
| class LedgerEntry: | |
| """Represents a single entry in the ledger.""" | |
| id: str | |
| timestamp: str | |
| action_type: str | |
| agent: str | |
| input_hash: str | |
| output_hash: str | |
| data: dict[str, Any] | |
| previous_hash: str | |
| entry_hash: str = "" | |
| def __post_init__(self): | |
| if not self.entry_hash: | |
| self.entry_hash = self._compute_hash() | |
| def _compute_hash(self) -> str: | |
| """Compute hash of this entry.""" | |
| content = f"{self.id}:{self.timestamp}:{self.action_type}:{self.agent}:{self.previous_hash}" | |
| return hash_leaf(content) | |
| def to_dict(self) -> dict[str, Any]: | |
| return asdict(self) | |
| class LedgerBlock: | |
| """Represents a block of ledger entries.""" | |
| block_id: int | |
| timestamp: str | |
| entries: list[LedgerEntry] | |
| previous_block_hash: str | |
| merkle_root: str = "" | |
| block_hash: str = "" | |
| signature: str = "" # Will be filled by ECDSA signing | |
| def __post_init__(self): | |
| if not self.merkle_root: | |
| self.merkle_root = self._compute_merkle_root() | |
| if not self.block_hash: | |
| self.block_hash = self._compute_hash() | |
| def _compute_merkle_root(self) -> str: | |
| """Compute merkle root from entry hashes.""" | |
| if not self.entries: | |
| return hash_leaf("empty_block") | |
| entry_hashes = [entry.entry_hash for entry in self.entries] | |
| return compute_merkle_root(entry_hashes) | |
| def _compute_hash(self) -> str: | |
| """Compute hash of this block.""" | |
| content = f"{self.block_id}:{self.timestamp}:{self.merkle_root}:{self.previous_block_hash}" | |
| return hash_leaf(content) | |
| def to_dict(self) -> dict[str, Any]: | |
| return { | |
| "block_id": self.block_id, | |
| "timestamp": self.timestamp, | |
| "entries": [e.to_dict() for e in self.entries], | |
| "previous_block_hash": self.previous_block_hash, | |
| "merkle_root": self.merkle_root, | |
| "block_hash": self.block_hash, | |
| "signature": self.signature | |
| } | |
| class LedgerStore: | |
| """ | |
| Append-only ledger store. | |
| Provides: | |
| - Immutable storage of agent actions | |
| - Block-based organization with merkle roots | |
| - JSON persistence | |
| - Verification of chain integrity | |
| """ | |
| GENESIS_HASH = "0" * 64 # Genesis block previous hash | |
| def __init__(self, storage_path: Optional[str] = None): | |
| """ | |
| Initialize the ledger store. | |
| Args: | |
| storage_path: Path to store ledger JSON file. If None, uses in-memory only. | |
| """ | |
| self.storage_path = Path(storage_path) if storage_path else None | |
| self.blocks: list[LedgerBlock] = [] | |
| self.pending_entries: list[LedgerEntry] = [] | |
| self._entry_counter = 0 | |
| # Load existing ledger if available | |
| if self.storage_path and self.storage_path.exists(): | |
| self._load() | |
| def add_entry( | |
| self, | |
| action_type: str, | |
| agent: str, | |
| input_data: dict[str, Any], | |
| output_data: dict[str, Any] | |
| ) -> LedgerEntry: | |
| """ | |
| Add a new entry to the pending entries. | |
| Args: | |
| action_type: Type of action (e.g., "search", "summarize") | |
| agent: Name of the agent performing the action | |
| input_data: Input data for the action | |
| output_data: Output data from the action | |
| Returns: | |
| The created LedgerEntry | |
| """ | |
| self._entry_counter += 1 | |
| # Get previous hash (from last pending entry or last block) | |
| if self.pending_entries: | |
| previous_hash = self.pending_entries[-1].entry_hash | |
| elif self.blocks: | |
| previous_hash = self.blocks[-1].block_hash | |
| else: | |
| previous_hash = self.GENESIS_HASH | |
| entry = LedgerEntry( | |
| id=f"entry_{self._entry_counter}", | |
| timestamp=datetime.now(timezone.utc).isoformat(), | |
| action_type=action_type, | |
| agent=agent, | |
| input_hash=hash_leaf(json.dumps(input_data, sort_keys=True)), | |
| output_hash=hash_leaf(json.dumps(output_data, sort_keys=True)), | |
| data={"input": input_data, "output": output_data}, | |
| previous_hash=previous_hash | |
| ) | |
| self.pending_entries.append(entry) | |
| return entry | |
| def create_block(self, signer=None) -> LedgerBlock: | |
| """ | |
| Create a new block from pending entries. | |
| Args: | |
| signer: Optional signer object with sign() method for ECDSA signing | |
| Returns: | |
| The created LedgerBlock | |
| """ | |
| if not self.pending_entries: | |
| raise ValueError("No pending entries to create block") | |
| # Get previous block hash | |
| if self.blocks: | |
| previous_block_hash = self.blocks[-1].block_hash | |
| else: | |
| previous_block_hash = self.GENESIS_HASH | |
| block = LedgerBlock( | |
| block_id=len(self.blocks) + 1, | |
| timestamp=datetime.now(timezone.utc).isoformat(), | |
| entries=self.pending_entries.copy(), | |
| previous_block_hash=previous_block_hash | |
| ) | |
| # Sign the block if signer is provided | |
| if signer: | |
| block.signature = signer.sign(block.block_hash) | |
| self.blocks.append(block) | |
| self.pending_entries = [] | |
| # Persist to storage | |
| if self.storage_path: | |
| self._save() | |
| return block | |
| def get_block(self, block_id: int) -> Optional[LedgerBlock]: | |
| """Get a block by ID.""" | |
| for block in self.blocks: | |
| if block.block_id == block_id: | |
| return block | |
| return None | |
| def get_latest_block(self) -> Optional[LedgerBlock]: | |
| """Get the most recent block.""" | |
| return self.blocks[-1] if self.blocks else None | |
| def verify_chain(self) -> tuple[bool, str]: | |
| """ | |
| Verify the integrity of the entire blockchain. | |
| Returns: | |
| Tuple of (is_valid, message) | |
| """ | |
| if not self.blocks: | |
| return True, "Empty chain is valid" | |
| # Check genesis block | |
| if self.blocks[0].previous_block_hash != self.GENESIS_HASH: | |
| return False, "Genesis block has invalid previous hash" | |
| # Check each block's link | |
| for i in range(1, len(self.blocks)): | |
| current = self.blocks[i] | |
| previous = self.blocks[i - 1] | |
| if current.previous_block_hash != previous.block_hash: | |
| return False, f"Block {current.block_id} has invalid previous hash" | |
| # Verify merkle root | |
| computed_root = current._compute_merkle_root() | |
| if computed_root != current.merkle_root: | |
| return False, f"Block {current.block_id} has invalid merkle root" | |
| return True, "Chain is valid" | |
| def get_full_ledger(self) -> dict[str, Any]: | |
| """Get the complete ledger as a dictionary.""" | |
| return { | |
| "blocks": [block.to_dict() for block in self.blocks], | |
| "pending_entries": [entry.to_dict() for entry in self.pending_entries], | |
| "total_blocks": len(self.blocks), | |
| "total_entries": sum(len(b.entries) for b in self.blocks) + len(self.pending_entries) | |
| } | |
| def _save(self): | |
| """Save ledger to JSON file.""" | |
| if not self.storage_path: | |
| return | |
| # Ensure directory exists | |
| self.storage_path.parent.mkdir(parents=True, exist_ok=True) | |
| data = self.get_full_ledger() | |
| with open(self.storage_path, 'w') as f: | |
| json.dump(data, f, indent=2) | |
| def _load(self): | |
| """Load ledger from JSON file.""" | |
| if not self.storage_path or not self.storage_path.exists(): | |
| return | |
| with open(self.storage_path, 'r') as f: | |
| data = json.load(f) | |
| # Reconstruct blocks | |
| self.blocks = [] | |
| for block_data in data.get("blocks", []): | |
| entries = [ | |
| LedgerEntry(**entry_data) | |
| for entry_data in block_data.get("entries", []) | |
| ] | |
| block = LedgerBlock( | |
| block_id=block_data["block_id"], | |
| timestamp=block_data["timestamp"], | |
| entries=entries, | |
| previous_block_hash=block_data["previous_block_hash"], | |
| merkle_root=block_data["merkle_root"], | |
| block_hash=block_data["block_hash"], | |
| signature=block_data.get("signature", "") | |
| ) | |
| self.blocks.append(block) | |
| # Update entry counter | |
| total_entries = sum(len(b.entries) for b in self.blocks) | |
| self._entry_counter = total_entries | |