AgentMask / mock_tools.py
b2230765034
Initial commit - Secure Reasoning MCP Server
af6094d
"""
Mock Crypto Tools for Secure Reasoning MCP Server
Provides mock implementations for development and testing.
Replace with real implementations when connecting to actual MCP server.
"""
import hashlib
import json
from datetime import datetime
from typing import Optional, Dict, Any, List
from schemas import (
HashRequest, HashResponse,
MerkleUpdateRequest, MerkleUpdateResponse,
WORMWriteRequest, WORMWriteResponse
)
class MockMerkleTree:
"""
In-memory Merkle Tree for mock operations.
"""
def __init__(self):
self.leaves: List[str] = []
self.root: Optional[str] = None
def _calculate_root(self, leaves: List[str]) -> Optional[str]:
"""Calculate Merkle root from a list of leaves."""
if not leaves:
return None
if len(leaves) == 1:
return leaves[0]
current_level = leaves[:]
while len(current_level) > 1:
next_level = []
for i in range(0, len(current_level), 2):
left = current_level[i]
right = current_level[i + 1] if i + 1 < len(current_level) else left
combined = hashlib.sha256((left + right).encode()).hexdigest()
next_level.append(combined)
current_level = next_level
return current_level[0]
def update(self, new_hash: str) -> str:
"""Add a new leaf and recalculate the Merkle root."""
self.leaves.append(new_hash)
self.root = self._calculate_root(self.leaves)
return self.root
def get_proof(self, index: int) -> List[str]:
"""Generate Merkle proof for a leaf at given index."""
if index < 0 or index >= len(self.leaves):
return []
proof = []
current_index = index
current_level = self.leaves[:]
while len(current_level) > 1:
is_right = current_index % 2 == 1
sibling_index = current_index - 1 if is_right else current_index + 1
if sibling_index < len(current_level):
proof.append(current_level[sibling_index])
current_index = current_index // 2
next_level = []
for i in range(0, len(current_level), 2):
left = current_level[i]
right = current_level[i + 1] if i + 1 < len(current_level) else left
combined = hashlib.sha256((left + right).encode()).hexdigest()
next_level.append(combined)
current_level = next_level
return proof
class MockCryptoTools:
"""
Mock implementations of cryptographic tools.
Provides in-memory versions of hash, Merkle tree, and WORM storage.
"""
def __init__(self):
self.merkle_tree = MockMerkleTree()
self.worm_storage: Dict[str, Dict[str, Any]] = {}
self.storage_counter = 0
def hash_tool(self, request: HashRequest) -> HashResponse:
"""
Hash data using SHA-256 (or specified algorithm).
Args:
request: HashRequest with data and algorithm
Returns:
HashResponse with the hash digest
"""
data = request.data
# Ensure deterministic serialization
if isinstance(data, dict):
data_str = json.dumps(data, sort_keys=True)
else:
data_str = str(data)
# Compute hash (only SHA-256 implemented for mock)
hash_value = hashlib.sha256(data_str.encode()).hexdigest()
return HashResponse(
hash=hash_value,
algorithm=request.algorithm,
timestamp=datetime.utcnow()
)
def merkle_update_tool(self, request: MerkleUpdateRequest) -> MerkleUpdateResponse:
"""
Add a hash to the Merkle tree and return updated root.
Args:
request: MerkleUpdateRequest with leaf hash
Returns:
MerkleUpdateResponse with new root and proof
"""
# Add leaf and get new root
new_root = self.merkle_tree.update(request.leaf_hash)
leaf_index = len(self.merkle_tree.leaves) - 1
# Generate proof for the new leaf
proof = self.merkle_tree.get_proof(leaf_index)
return MerkleUpdateResponse(
merkle_root=new_root,
leaf_index=leaf_index,
proof=proof,
tree_size=len(self.merkle_tree.leaves)
)
def worm_write_tool(self, request: WORMWriteRequest) -> WORMWriteResponse:
"""
Write data to WORM (Write Once, Read Many) storage.
Args:
request: WORMWriteRequest with entry ID and data
Returns:
WORMWriteResponse with storage confirmation
"""
# Check if entry already exists (WORM = no overwrites)
if request.entry_id in self.worm_storage:
return WORMWriteResponse(
success=False,
storage_path=f"worm://{request.entry_id}",
verification_hash="",
timestamp=datetime.utcnow()
)
# Store the data
self.storage_counter += 1
storage_path = f"worm://mock/{self.storage_counter}/{request.entry_id}"
# Compute verification hash
verification_hash = hashlib.sha256(
json.dumps(request.data, sort_keys=True).encode()
).hexdigest()
# Store immutably
self.worm_storage[request.entry_id] = {
"data": request.data,
"merkle_root": request.merkle_root,
"verification_hash": verification_hash,
"storage_path": storage_path,
"timestamp": datetime.utcnow().isoformat()
}
return WORMWriteResponse(
success=True,
storage_path=storage_path,
verification_hash=verification_hash,
timestamp=datetime.utcnow()
)
def worm_read_tool(self, entry_id: str) -> Optional[Dict[str, Any]]:
"""
Read data from WORM storage (for verification).
Args:
entry_id: The ID of the entry to read
Returns:
The stored data or None if not found
"""
return self.worm_storage.get(entry_id)
def verify_proof(self, target_hash: str, proof: List[str], root: str) -> bool:
"""
Verify a Merkle proof.
Args:
target_hash: The hash to verify
proof: List of sibling hashes
root: Expected Merkle root
Returns:
True if proof is valid, False otherwise
"""
current_hash = target_hash
for sibling_hash in proof:
# Combine in lexicographic order for consistency
if current_hash < sibling_hash:
combined = current_hash + sibling_hash
else:
combined = sibling_hash + current_hash
current_hash = hashlib.sha256(combined.encode()).hexdigest()
return current_hash == root