|
|
""" |
|
|
Verification Agent |
|
|
Verifies proof authenticity by recomputing and comparing hashes. |
|
|
""" |
|
|
|
|
|
from typing import Dict, Any |
|
|
import hashlib |
|
|
from datetime import datetime, timezone |
|
|
|
|
|
from core.agent_base import Agent |
|
|
from core.errors import VerificationError |
|
|
from models.proof import VerificationResult, Proof |
|
|
from agents.storage_agent import StorageAgent |
|
|
|
|
|
|
|
|
class VerificationAgent(Agent): |
|
|
""" |
|
|
Verifies proof by fetching from storage and recomputing hash. |
|
|
""" |
|
|
|
|
|
def __init__(self, storage_agent: StorageAgent): |
|
|
super().__init__() |
|
|
self.storage_agent = storage_agent |
|
|
|
|
|
def execute(self, input_data: Dict[str, Any]) -> Dict[str, Any]: |
|
|
""" |
|
|
Verify proof authenticity. |
|
|
|
|
|
Expected input_data: |
|
|
{ |
|
|
"proof_id": str, |
|
|
"content": bytes # Original content to verify |
|
|
} |
|
|
|
|
|
Returns: |
|
|
{ |
|
|
"verification_result": VerificationResult |
|
|
} |
|
|
""" |
|
|
proof_id = input_data.get("proof_id") |
|
|
content = input_data.get("content") |
|
|
|
|
|
if not proof_id: |
|
|
raise VerificationError("Missing 'proof_id'") |
|
|
|
|
|
if not content: |
|
|
raise VerificationError("Missing 'content' to verify") |
|
|
|
|
|
|
|
|
proof = self.storage_agent.get_proof(proof_id) |
|
|
|
|
|
if not proof: |
|
|
raise VerificationError(f"Proof not found: {proof_id}") |
|
|
|
|
|
|
|
|
computed_hash = self._compute_hash(content, proof.hash_algorithm) |
|
|
|
|
|
|
|
|
is_valid = computed_hash == proof.content_hash |
|
|
|
|
|
result = VerificationResult( |
|
|
proof_id=proof_id, |
|
|
is_valid=is_valid, |
|
|
original_hash=proof.content_hash, |
|
|
computed_hash=computed_hash, |
|
|
timestamp=datetime.now(timezone.utc).isoformat(), |
|
|
message="Hash match: proof is valid" if is_valid else "Hash mismatch: proof is invalid" |
|
|
) |
|
|
|
|
|
return { |
|
|
"verification_result": result |
|
|
} |
|
|
|
|
|
def _compute_hash(self, content: bytes, algorithm: str) -> str: |
|
|
""" |
|
|
Compute hash using specified algorithm. |
|
|
|
|
|
Args: |
|
|
content: Content bytes to hash |
|
|
algorithm: Hash algorithm name |
|
|
|
|
|
Returns: |
|
|
Hexadecimal hash string |
|
|
""" |
|
|
try: |
|
|
hasher = hashlib.new(algorithm) |
|
|
hasher.update(content) |
|
|
return hasher.hexdigest() |
|
|
except Exception as e: |
|
|
raise VerificationError(f"Hash computation failed: {str(e)}") from e |