Spaces:
Sleeping
Sleeping
| """ | |
| Mock Crypto Tools for Secure Reasoning MCP Server | |
| Provides mock implementations for development and testing. | |
| Replace with real implementations when connecting to actual MCP server. | |
| """ | |
| import hashlib | |
| import json | |
| from datetime import datetime | |
| from typing import Optional, Dict, Any, List | |
| from schemas import ( | |
| HashRequest, HashResponse, | |
| MerkleUpdateRequest, MerkleUpdateResponse, | |
| WORMWriteRequest, WORMWriteResponse | |
| ) | |
| class MockMerkleTree: | |
| """ | |
| In-memory Merkle Tree for mock operations. | |
| """ | |
| def __init__(self): | |
| self.leaves: List[str] = [] | |
| self.root: Optional[str] = None | |
| def _calculate_root(self, leaves: List[str]) -> Optional[str]: | |
| """Calculate Merkle root from a list of leaves.""" | |
| if not leaves: | |
| return None | |
| if len(leaves) == 1: | |
| return leaves[0] | |
| current_level = leaves[:] | |
| while len(current_level) > 1: | |
| next_level = [] | |
| for i in range(0, len(current_level), 2): | |
| left = current_level[i] | |
| right = current_level[i + 1] if i + 1 < len(current_level) else left | |
| combined = hashlib.sha256((left + right).encode()).hexdigest() | |
| next_level.append(combined) | |
| current_level = next_level | |
| return current_level[0] | |
| def update(self, new_hash: str) -> str: | |
| """Add a new leaf and recalculate the Merkle root.""" | |
| self.leaves.append(new_hash) | |
| self.root = self._calculate_root(self.leaves) | |
| return self.root | |
| def get_proof(self, index: int) -> List[str]: | |
| """Generate Merkle proof for a leaf at given index.""" | |
| if index < 0 or index >= len(self.leaves): | |
| return [] | |
| proof = [] | |
| current_index = index | |
| current_level = self.leaves[:] | |
| while len(current_level) > 1: | |
| is_right = current_index % 2 == 1 | |
| sibling_index = current_index - 1 if is_right else current_index + 1 | |
| if sibling_index < len(current_level): | |
| proof.append(current_level[sibling_index]) | |
| current_index = current_index // 2 | |
| next_level = [] | |
| for i in range(0, len(current_level), 2): | |
| left = current_level[i] | |
| right = current_level[i + 1] if i + 1 < len(current_level) else left | |
| combined = hashlib.sha256((left + right).encode()).hexdigest() | |
| next_level.append(combined) | |
| current_level = next_level | |
| return proof | |
| class MockCryptoTools: | |
| """ | |
| Mock implementations of cryptographic tools. | |
| Provides in-memory versions of hash, Merkle tree, and WORM storage. | |
| """ | |
| def __init__(self): | |
| self.merkle_tree = MockMerkleTree() | |
| self.worm_storage: Dict[str, Dict[str, Any]] = {} | |
| self.storage_counter = 0 | |
| def hash_tool(self, request: HashRequest) -> HashResponse: | |
| """ | |
| Hash data using SHA-256 (or specified algorithm). | |
| Args: | |
| request: HashRequest with data and algorithm | |
| Returns: | |
| HashResponse with the hash digest | |
| """ | |
| data = request.data | |
| # Ensure deterministic serialization | |
| if isinstance(data, dict): | |
| data_str = json.dumps(data, sort_keys=True) | |
| else: | |
| data_str = str(data) | |
| # Compute hash (only SHA-256 implemented for mock) | |
| hash_value = hashlib.sha256(data_str.encode()).hexdigest() | |
| return HashResponse( | |
| hash=hash_value, | |
| algorithm=request.algorithm, | |
| timestamp=datetime.utcnow() | |
| ) | |
| def merkle_update_tool(self, request: MerkleUpdateRequest) -> MerkleUpdateResponse: | |
| """ | |
| Add a hash to the Merkle tree and return updated root. | |
| Args: | |
| request: MerkleUpdateRequest with leaf hash | |
| Returns: | |
| MerkleUpdateResponse with new root and proof | |
| """ | |
| # Add leaf and get new root | |
| new_root = self.merkle_tree.update(request.leaf_hash) | |
| leaf_index = len(self.merkle_tree.leaves) - 1 | |
| # Generate proof for the new leaf | |
| proof = self.merkle_tree.get_proof(leaf_index) | |
| return MerkleUpdateResponse( | |
| merkle_root=new_root, | |
| leaf_index=leaf_index, | |
| proof=proof, | |
| tree_size=len(self.merkle_tree.leaves) | |
| ) | |
| def worm_write_tool(self, request: WORMWriteRequest) -> WORMWriteResponse: | |
| """ | |
| Write data to WORM (Write Once, Read Many) storage. | |
| Args: | |
| request: WORMWriteRequest with entry ID and data | |
| Returns: | |
| WORMWriteResponse with storage confirmation | |
| """ | |
| # Check if entry already exists (WORM = no overwrites) | |
| if request.entry_id in self.worm_storage: | |
| return WORMWriteResponse( | |
| success=False, | |
| storage_path=f"worm://{request.entry_id}", | |
| verification_hash="", | |
| timestamp=datetime.utcnow() | |
| ) | |
| # Store the data | |
| self.storage_counter += 1 | |
| storage_path = f"worm://mock/{self.storage_counter}/{request.entry_id}" | |
| # Compute verification hash | |
| verification_hash = hashlib.sha256( | |
| json.dumps(request.data, sort_keys=True).encode() | |
| ).hexdigest() | |
| # Store immutably | |
| self.worm_storage[request.entry_id] = { | |
| "data": request.data, | |
| "merkle_root": request.merkle_root, | |
| "verification_hash": verification_hash, | |
| "storage_path": storage_path, | |
| "timestamp": datetime.utcnow().isoformat() | |
| } | |
| return WORMWriteResponse( | |
| success=True, | |
| storage_path=storage_path, | |
| verification_hash=verification_hash, | |
| timestamp=datetime.utcnow() | |
| ) | |
| def worm_read_tool(self, entry_id: str) -> Optional[Dict[str, Any]]: | |
| """ | |
| Read data from WORM storage (for verification). | |
| Args: | |
| entry_id: The ID of the entry to read | |
| Returns: | |
| The stored data or None if not found | |
| """ | |
| return self.worm_storage.get(entry_id) | |
| def verify_proof(self, target_hash: str, proof: List[str], root: str) -> bool: | |
| """ | |
| Verify a Merkle proof. | |
| Args: | |
| target_hash: The hash to verify | |
| proof: List of sibling hashes | |
| root: Expected Merkle root | |
| Returns: | |
| True if proof is valid, False otherwise | |
| """ | |
| current_hash = target_hash | |
| for sibling_hash in proof: | |
| # Combine in lexicographic order for consistency | |
| if current_hash < sibling_hash: | |
| combined = current_hash + sibling_hash | |
| else: | |
| combined = sibling_hash + current_hash | |
| current_hash = hashlib.sha256(combined.encode()).hexdigest() | |
| return current_hash == root | |