AgentMask / crypto_engine.py
b2230765034
Initial commit - Secure Reasoning MCP Server
af6094d
import hashlib
import json
import time
import os
def hash_tool(data):
"""
Hash tool: Convert input to deterministic string and return SHA-256 hex digest.
"""
if isinstance(data, dict):
data_str = json.dumps(data, sort_keys=True)
else:
data_str = str(data)
return hashlib.sha256(data_str.encode()).hexdigest()
class MerkleTree:
"""
Merkle Tree: Maintains a list of leaves and recalculates root on each update.
"""
def __init__(self):
self.leaves = []
self.root = None
def _calculate_root(self, leaves):
"""
Calculate Merkle root from a list of leaves.
If empty, return None. If single leaf, return it. Otherwise, build tree bottom-up.
"""
if not leaves:
return None
if len(leaves) == 1:
return leaves[0]
current_level = leaves[:]
while len(current_level) > 1:
next_level = []
for i in range(0, len(current_level), 2):
left = current_level[i]
right = current_level[i + 1] if i + 1 < len(current_level) else left
combined = hashlib.sha256((left + right).encode()).hexdigest()
next_level.append(combined)
current_level = next_level
return current_level[0]
def update(self, new_hash):
"""
Add a new leaf and recalculate the Merkle root.
Returns the new root.
"""
self.leaves.append(new_hash)
self.root = self._calculate_root(self.leaves)
return self.root
def get_proof(self, index):
"""
Generate Merkle proof for a leaf at given index.
Returns a list of (sibling_hash, position) tuples where position is 'left' or 'right'.
"""
if index < 0 or index >= len(self.leaves):
return None
proof = []
current_index = index
current_level = self.leaves[:]
while len(current_level) > 1:
# Determine if current_index is odd (right) or even (left)
is_right = current_index % 2 == 1
sibling_index = current_index - 1 if is_right else current_index + 1
# Get sibling hash if it exists
if sibling_index < len(current_level):
sibling_hash = current_level[sibling_index]
position = "left" if is_right else "right"
proof.append({"hash": sibling_hash, "position": position})
# Move to next level
current_index = current_index // 2
next_level = []
for i in range(0, len(current_level), 2):
left = current_level[i]
right = current_level[i + 1] if i + 1 < len(current_level) else left
combined = hashlib.sha256((left + right).encode()).hexdigest()
next_level.append(combined)
current_level = next_level
return proof
def worm_write_tool(step_data, hash_value, merkle_root, filename="worm_log.jsonl"):
"""
WORM (Write Once, Read Many) storage: Append a JSON record to JSONL file.
Returns the record written.
"""
# Determine the next ID by counting existing lines
next_id = 0
if os.path.exists(filename):
with open(filename, "r") as f:
next_id = sum(1 for _ in f)
record = {
"id": next_id,
"timestamp": time.time(),
"step": step_data,
"hash": hash_value,
"root": merkle_root
}
with open(filename, "a") as f:
f.write(json.dumps(record) + "\n")
return record
def proof_generate_tool(record_id, filename="worm_log.jsonl"):
"""
Generate a Merkle proof for a specific record in the WORM log.
Rehydrates the Merkle Tree from the log to ensure proof is against current state.
Returns a JSON proof containing hash, merkle_proof, root, and timestamp.
"""
if not os.path.exists(filename):
print(f"[PROOF] Error: {filename} does not exist.")
return None
# Read all records from WORM log
records = []
hashes = []
target_record = None
with open(filename, "r") as f:
for line in f:
record = json.loads(line.strip())
records.append(record)
hashes.append(record["hash"])
if record["id"] == record_id:
target_record = record
if target_record is None:
print(f"[PROOF] Error: Record with ID {record_id} not found.")
return None
# Rehydrate Merkle Tree from hashes
tree = MerkleTree()
for h in hashes:
tree.update(h)
# Get proof for the target record index
proof = tree.get_proof(record_id)
proof_result = {
"record_id": record_id,
"hash": target_record["hash"],
"merkle_proof": proof,
"merkle_root": tree.root,
"timestamp": target_record["timestamp"],
"step_details": target_record["step"]
}
return proof_result
def verify_proof_tool(target_hash, merkle_proof, merkle_root):
"""
Verify if a target_hash belongs to the merkle_root using the merkle_proof.
Logic:
- Start with current_hash = target_hash
- Loop through proof items (sibling hashes with positions)
- Reconstruct the path up to the root
- Compare final calculated root with provided merkle_root
Returns True if valid, False otherwise.
"""
if merkle_proof is None:
return False
current_hash = target_hash
# Traverse the proof path
for proof_item in merkle_proof:
sibling_hash = proof_item["hash"]
position = proof_item["position"]
# Combine hashes based on position
if position == "left":
# Sibling is on the left, so: hash(sibling + current)
combined_str = sibling_hash + current_hash
elif position == "right":
# Sibling is on the right, so: hash(current + sibling)
combined_str = current_hash + sibling_hash
else:
return False
# Calculate the next level hash
current_hash = hashlib.sha256(combined_str.encode()).hexdigest()
# Final check: does calculated root match provided root?
return current_hash == merkle_root
def secure_agent_action(action_type, details, merkle_tree):
"""
Gatekeeper Logic: Cite-Before-Act mechanism.
- READ: Auto-approve
- WRITE/MUTATE: Require human approval via CLI
All actions (approved or denied) are logged to WORM storage.
"""
action_type = action_type.upper()
if action_type == "READ":
# Auto-approve READ actions
print(f"\n[GATEKEEPER] READ action detected: {details}")
print("[GATEKEEPER] Auto-approving READ action.")
step_data = {
"action_type": action_type,
"details": details,
"status": "APPROVED"
}
step_hash = hash_tool(step_data)
merkle_root = merkle_tree.update(step_hash)
worm_write_tool(step_data, step_hash, merkle_root)
print(f"[GATEKEEPER] Merkle Root: {merkle_root}")
print(f"[GATEKEEPER] Action logged.\n")
return True
elif action_type in ["WRITE", "MUTATE", "DELETE"]:
# Require approval for mutation actions
print(f"\n[GATEKEEPER] ⚠️ WRITE/MUTATE action detected: {details}")
print("[GATEKEEPER] This action requires human approval.")
approval = input("[GATEKEEPER] Approve this action? (y/n): ").strip().lower()
if approval == "y":
print("[GATEKEEPER] ✓ Action APPROVED by user.")
status = "APPROVED"
result = True
else:
print("[GATEKEEPER] ✗ Action DENIED by user.")
status = "DENIED"
result = False
# Log the action (approved or denied) to maintain audit trail
step_data = {
"action_type": action_type,
"details": details,
"status": status
}
step_hash = hash_tool(step_data)
merkle_root = merkle_tree.update(step_hash)
worm_write_tool(step_data, step_hash, merkle_root)
print(f"[GATEKEEPER] Merkle Root: {merkle_root}")
print(f"[GATEKEEPER] Audit logged.\n")
return result
else:
print(f"\n[GATEKEEPER] Unknown action type: {action_type}\n")
return False
if __name__ == "__main__":
print("=" * 70)
print("SECURE REASONING MCP SERVER - TEST SCENARIO")
print("=" * 70)
# Initialize Merkle Tree
mt = MerkleTree()
# Test 1: READ action (auto-approved)
print("\n[TEST 1] Simulating READ action...")
secure_agent_action("READ", "Query user database for profile info", mt)
# Test 2: WRITE action (user approval - simulate "y")
print("[TEST 2] Simulating WRITE action (approve with 'y')...")
secure_agent_action("WRITE", "Update user profile with new email address", mt)
# Test 3: WRITE action (user denial - simulate "n")
print("[TEST 3] Simulating WRITE action (deny with 'n')...")
secure_agent_action("WRITE", "Delete user account permanently", mt)
print("=" * 70)
print("TEST SCENARIO COMPLETE")
print("=" * 70)
print("\nWORM Log saved to: worm_log.jsonl")
print("Review the file to verify all actions are logged with hashes and Merkle roots.\n")
# Test 4: Generate proof for record_id=1
print("=" * 70)
print("PROOF GENERATION TEST")
print("=" * 70)
print("\n[TEST 4] Generating Merkle proof for record_id=1...")
proof = proof_generate_tool(1)
if proof:
print("\n[PROOF] Generated Merkle Proof:")
print(json.dumps(proof, indent=2))
else:
proof = None
print()
# Test 5: Verify the proof (positive case)
print("=" * 70)
print("PROOF VERIFICATION TEST")
print("=" * 70)
if proof:
print("\n[TEST 5a] Verifying proof with correct hash and root...")
is_valid = verify_proof_tool(proof["hash"], proof["merkle_proof"], proof["merkle_root"])
print(f"[VERIFY] Verification Result (POSITIVE): {is_valid}")
# Test 5b: Verify with tampered hash (negative case)
print("\n[TEST 5b] Verifying proof with tampered hash (should fail)...")
tampered_hash = proof["hash"][:-2] + "XX" # Change last 2 characters
is_valid_tampered = verify_proof_tool(tampered_hash, proof["merkle_proof"], proof["merkle_root"])
print(f"[VERIFY] Verification Result (NEGATIVE - tampered hash): {is_valid_tampered}")
# Test 5c: Verify with tampered root (negative case)
print("\n[TEST 5c] Verifying proof with tampered root (should fail)...")
tampered_root = proof["merkle_root"][:-2] + "XX" # Change last 2 characters
is_valid_tampered_root = verify_proof_tool(proof["hash"], proof["merkle_proof"], tampered_root)
print(f"[VERIFY] Verification Result (NEGATIVE - tampered root): {is_valid_tampered_root}")
print("\n" + "=" * 70)
print("ALL TESTS COMPLETE - SECURE REASONING MCP SERVER OPERATIONAL")
print("=" * 70 + "\n")