import hashlib import json import time import os def hash_tool(data): """ Hash tool: Convert input to deterministic string and return SHA-256 hex digest. """ if isinstance(data, dict): data_str = json.dumps(data, sort_keys=True) else: data_str = str(data) return hashlib.sha256(data_str.encode()).hexdigest() class MerkleTree: """ Merkle Tree: Maintains a list of leaves and recalculates root on each update. """ def __init__(self): self.leaves = [] self.root = None def _calculate_root(self, leaves): """ Calculate Merkle root from a list of leaves. If empty, return None. If single leaf, return it. Otherwise, build tree bottom-up. """ if not leaves: return None if len(leaves) == 1: return leaves[0] current_level = leaves[:] while len(current_level) > 1: next_level = [] for i in range(0, len(current_level), 2): left = current_level[i] right = current_level[i + 1] if i + 1 < len(current_level) else left combined = hashlib.sha256((left + right).encode()).hexdigest() next_level.append(combined) current_level = next_level return current_level[0] def update(self, new_hash): """ Add a new leaf and recalculate the Merkle root. Returns the new root. """ self.leaves.append(new_hash) self.root = self._calculate_root(self.leaves) return self.root def get_proof(self, index): """ Generate Merkle proof for a leaf at given index. Returns a list of (sibling_hash, position) tuples where position is 'left' or 'right'. """ if index < 0 or index >= len(self.leaves): return None proof = [] current_index = index current_level = self.leaves[:] while len(current_level) > 1: # Determine if current_index is odd (right) or even (left) is_right = current_index % 2 == 1 sibling_index = current_index - 1 if is_right else current_index + 1 # Get sibling hash if it exists if sibling_index < len(current_level): sibling_hash = current_level[sibling_index] position = "left" if is_right else "right" proof.append({"hash": sibling_hash, "position": position}) # Move to next level current_index = current_index // 2 next_level = [] for i in range(0, len(current_level), 2): left = current_level[i] right = current_level[i + 1] if i + 1 < len(current_level) else left combined = hashlib.sha256((left + right).encode()).hexdigest() next_level.append(combined) current_level = next_level return proof def worm_write_tool(step_data, hash_value, merkle_root, filename="worm_log.jsonl"): """ WORM (Write Once, Read Many) storage: Append a JSON record to JSONL file. Returns the record written. """ # Determine the next ID by counting existing lines next_id = 0 if os.path.exists(filename): with open(filename, "r") as f: next_id = sum(1 for _ in f) record = { "id": next_id, "timestamp": time.time(), "step": step_data, "hash": hash_value, "root": merkle_root } with open(filename, "a") as f: f.write(json.dumps(record) + "\n") return record def proof_generate_tool(record_id, filename="worm_log.jsonl"): """ Generate a Merkle proof for a specific record in the WORM log. Rehydrates the Merkle Tree from the log to ensure proof is against current state. Returns a JSON proof containing hash, merkle_proof, root, and timestamp. """ if not os.path.exists(filename): print(f"[PROOF] Error: {filename} does not exist.") return None # Read all records from WORM log records = [] hashes = [] target_record = None with open(filename, "r") as f: for line in f: record = json.loads(line.strip()) records.append(record) hashes.append(record["hash"]) if record["id"] == record_id: target_record = record if target_record is None: print(f"[PROOF] Error: Record with ID {record_id} not found.") return None # Rehydrate Merkle Tree from hashes tree = MerkleTree() for h in hashes: tree.update(h) # Get proof for the target record index proof = tree.get_proof(record_id) proof_result = { "record_id": record_id, "hash": target_record["hash"], "merkle_proof": proof, "merkle_root": tree.root, "timestamp": target_record["timestamp"], "step_details": target_record["step"] } return proof_result def verify_proof_tool(target_hash, merkle_proof, merkle_root): """ Verify if a target_hash belongs to the merkle_root using the merkle_proof. Logic: - Start with current_hash = target_hash - Loop through proof items (sibling hashes with positions) - Reconstruct the path up to the root - Compare final calculated root with provided merkle_root Returns True if valid, False otherwise. """ if merkle_proof is None: return False current_hash = target_hash # Traverse the proof path for proof_item in merkle_proof: sibling_hash = proof_item["hash"] position = proof_item["position"] # Combine hashes based on position if position == "left": # Sibling is on the left, so: hash(sibling + current) combined_str = sibling_hash + current_hash elif position == "right": # Sibling is on the right, so: hash(current + sibling) combined_str = current_hash + sibling_hash else: return False # Calculate the next level hash current_hash = hashlib.sha256(combined_str.encode()).hexdigest() # Final check: does calculated root match provided root? return current_hash == merkle_root def secure_agent_action(action_type, details, merkle_tree): """ Gatekeeper Logic: Cite-Before-Act mechanism. - READ: Auto-approve - WRITE/MUTATE: Require human approval via CLI All actions (approved or denied) are logged to WORM storage. """ action_type = action_type.upper() if action_type == "READ": # Auto-approve READ actions print(f"\n[GATEKEEPER] READ action detected: {details}") print("[GATEKEEPER] Auto-approving READ action.") step_data = { "action_type": action_type, "details": details, "status": "APPROVED" } step_hash = hash_tool(step_data) merkle_root = merkle_tree.update(step_hash) worm_write_tool(step_data, step_hash, merkle_root) print(f"[GATEKEEPER] Merkle Root: {merkle_root}") print(f"[GATEKEEPER] Action logged.\n") return True elif action_type in ["WRITE", "MUTATE", "DELETE"]: # Require approval for mutation actions print(f"\n[GATEKEEPER] ⚠️ WRITE/MUTATE action detected: {details}") print("[GATEKEEPER] This action requires human approval.") approval = input("[GATEKEEPER] Approve this action? (y/n): ").strip().lower() if approval == "y": print("[GATEKEEPER] ✓ Action APPROVED by user.") status = "APPROVED" result = True else: print("[GATEKEEPER] ✗ Action DENIED by user.") status = "DENIED" result = False # Log the action (approved or denied) to maintain audit trail step_data = { "action_type": action_type, "details": details, "status": status } step_hash = hash_tool(step_data) merkle_root = merkle_tree.update(step_hash) worm_write_tool(step_data, step_hash, merkle_root) print(f"[GATEKEEPER] Merkle Root: {merkle_root}") print(f"[GATEKEEPER] Audit logged.\n") return result else: print(f"\n[GATEKEEPER] Unknown action type: {action_type}\n") return False if __name__ == "__main__": print("=" * 70) print("SECURE REASONING MCP SERVER - TEST SCENARIO") print("=" * 70) # Initialize Merkle Tree mt = MerkleTree() # Test 1: READ action (auto-approved) print("\n[TEST 1] Simulating READ action...") secure_agent_action("READ", "Query user database for profile info", mt) # Test 2: WRITE action (user approval - simulate "y") print("[TEST 2] Simulating WRITE action (approve with 'y')...") secure_agent_action("WRITE", "Update user profile with new email address", mt) # Test 3: WRITE action (user denial - simulate "n") print("[TEST 3] Simulating WRITE action (deny with 'n')...") secure_agent_action("WRITE", "Delete user account permanently", mt) print("=" * 70) print("TEST SCENARIO COMPLETE") print("=" * 70) print("\nWORM Log saved to: worm_log.jsonl") print("Review the file to verify all actions are logged with hashes and Merkle roots.\n") # Test 4: Generate proof for record_id=1 print("=" * 70) print("PROOF GENERATION TEST") print("=" * 70) print("\n[TEST 4] Generating Merkle proof for record_id=1...") proof = proof_generate_tool(1) if proof: print("\n[PROOF] Generated Merkle Proof:") print(json.dumps(proof, indent=2)) else: proof = None print() # Test 5: Verify the proof (positive case) print("=" * 70) print("PROOF VERIFICATION TEST") print("=" * 70) if proof: print("\n[TEST 5a] Verifying proof with correct hash and root...") is_valid = verify_proof_tool(proof["hash"], proof["merkle_proof"], proof["merkle_root"]) print(f"[VERIFY] Verification Result (POSITIVE): {is_valid}") # Test 5b: Verify with tampered hash (negative case) print("\n[TEST 5b] Verifying proof with tampered hash (should fail)...") tampered_hash = proof["hash"][:-2] + "XX" # Change last 2 characters is_valid_tampered = verify_proof_tool(tampered_hash, proof["merkle_proof"], proof["merkle_root"]) print(f"[VERIFY] Verification Result (NEGATIVE - tampered hash): {is_valid_tampered}") # Test 5c: Verify with tampered root (negative case) print("\n[TEST 5c] Verifying proof with tampered root (should fail)...") tampered_root = proof["merkle_root"][:-2] + "XX" # Change last 2 characters is_valid_tampered_root = verify_proof_tool(proof["hash"], proof["merkle_proof"], tampered_root) print(f"[VERIFY] Verification Result (NEGATIVE - tampered root): {is_valid_tampered_root}") print("\n" + "=" * 70) print("ALL TESTS COMPLETE - SECURE REASONING MCP SERVER OPERATIONAL") print("=" * 70 + "\n")