""" Mock Crypto Tools for Secure Reasoning MCP Server Provides mock implementations for development and testing. Replace with real implementations when connecting to actual MCP server. """ import hashlib import json from datetime import datetime from typing import Optional, Dict, Any, List from schemas import ( HashRequest, HashResponse, MerkleUpdateRequest, MerkleUpdateResponse, WORMWriteRequest, WORMWriteResponse ) class MockMerkleTree: """ In-memory Merkle Tree for mock operations. """ def __init__(self): self.leaves: List[str] = [] self.root: Optional[str] = None def _calculate_root(self, leaves: List[str]) -> Optional[str]: """Calculate Merkle root from a list of leaves.""" if not leaves: return None if len(leaves) == 1: return leaves[0] current_level = leaves[:] while len(current_level) > 1: next_level = [] for i in range(0, len(current_level), 2): left = current_level[i] right = current_level[i + 1] if i + 1 < len(current_level) else left combined = hashlib.sha256((left + right).encode()).hexdigest() next_level.append(combined) current_level = next_level return current_level[0] def update(self, new_hash: str) -> str: """Add a new leaf and recalculate the Merkle root.""" self.leaves.append(new_hash) self.root = self._calculate_root(self.leaves) return self.root def get_proof(self, index: int) -> List[str]: """Generate Merkle proof for a leaf at given index.""" if index < 0 or index >= len(self.leaves): return [] proof = [] current_index = index current_level = self.leaves[:] while len(current_level) > 1: is_right = current_index % 2 == 1 sibling_index = current_index - 1 if is_right else current_index + 1 if sibling_index < len(current_level): proof.append(current_level[sibling_index]) current_index = current_index // 2 next_level = [] for i in range(0, len(current_level), 2): left = current_level[i] right = current_level[i + 1] if i + 1 < len(current_level) else left combined = hashlib.sha256((left + right).encode()).hexdigest() next_level.append(combined) current_level = next_level return proof class MockCryptoTools: """ Mock implementations of cryptographic tools. Provides in-memory versions of hash, Merkle tree, and WORM storage. """ def __init__(self): self.merkle_tree = MockMerkleTree() self.worm_storage: Dict[str, Dict[str, Any]] = {} self.storage_counter = 0 def hash_tool(self, request: HashRequest) -> HashResponse: """ Hash data using SHA-256 (or specified algorithm). Args: request: HashRequest with data and algorithm Returns: HashResponse with the hash digest """ data = request.data # Ensure deterministic serialization if isinstance(data, dict): data_str = json.dumps(data, sort_keys=True) else: data_str = str(data) # Compute hash (only SHA-256 implemented for mock) hash_value = hashlib.sha256(data_str.encode()).hexdigest() return HashResponse( hash=hash_value, algorithm=request.algorithm, timestamp=datetime.utcnow() ) def merkle_update_tool(self, request: MerkleUpdateRequest) -> MerkleUpdateResponse: """ Add a hash to the Merkle tree and return updated root. Args: request: MerkleUpdateRequest with leaf hash Returns: MerkleUpdateResponse with new root and proof """ # Add leaf and get new root new_root = self.merkle_tree.update(request.leaf_hash) leaf_index = len(self.merkle_tree.leaves) - 1 # Generate proof for the new leaf proof = self.merkle_tree.get_proof(leaf_index) return MerkleUpdateResponse( merkle_root=new_root, leaf_index=leaf_index, proof=proof, tree_size=len(self.merkle_tree.leaves) ) def worm_write_tool(self, request: WORMWriteRequest) -> WORMWriteResponse: """ Write data to WORM (Write Once, Read Many) storage. Args: request: WORMWriteRequest with entry ID and data Returns: WORMWriteResponse with storage confirmation """ # Check if entry already exists (WORM = no overwrites) if request.entry_id in self.worm_storage: return WORMWriteResponse( success=False, storage_path=f"worm://{request.entry_id}", verification_hash="", timestamp=datetime.utcnow() ) # Store the data self.storage_counter += 1 storage_path = f"worm://mock/{self.storage_counter}/{request.entry_id}" # Compute verification hash verification_hash = hashlib.sha256( json.dumps(request.data, sort_keys=True).encode() ).hexdigest() # Store immutably self.worm_storage[request.entry_id] = { "data": request.data, "merkle_root": request.merkle_root, "verification_hash": verification_hash, "storage_path": storage_path, "timestamp": datetime.utcnow().isoformat() } return WORMWriteResponse( success=True, storage_path=storage_path, verification_hash=verification_hash, timestamp=datetime.utcnow() ) def worm_read_tool(self, entry_id: str) -> Optional[Dict[str, Any]]: """ Read data from WORM storage (for verification). Args: entry_id: The ID of the entry to read Returns: The stored data or None if not found """ return self.worm_storage.get(entry_id) def verify_proof(self, target_hash: str, proof: List[str], root: str) -> bool: """ Verify a Merkle proof. Args: target_hash: The hash to verify proof: List of sibling hashes root: Expected Merkle root Returns: True if proof is valid, False otherwise """ current_hash = target_hash for sibling_hash in proof: # Combine in lexicographic order for consistency if current_hash < sibling_hash: combined = current_hash + sibling_hash else: combined = sibling_hash + current_hash current_hash = hashlib.sha256(combined.encode()).hexdigest() return current_hash == root