|
|
""" |
|
|
Hashing Agent |
|
|
Generates cryptographic hashes for content verification. |
|
|
""" |
|
|
|
|
|
from typing import Dict, Any |
|
|
import hashlib |
|
|
from datetime import datetime, timezone |
|
|
|
|
|
from core.agent_base import Agent |
|
|
from core.errors import HashingError |
|
|
from config.settings import settings |
|
|
|
|
|
|
|
|
class HashingAgent(Agent): |
|
|
""" |
|
|
Generates SHA-256 hash for files or text content. |
|
|
""" |
|
|
|
|
|
def execute(self, input_data: Dict[str, Any]) -> Dict[str, Any]: |
|
|
""" |
|
|
Generate cryptographic hash. |
|
|
|
|
|
Expected input_data: |
|
|
{ |
|
|
"content": bytes, |
|
|
...other fields... |
|
|
} |
|
|
|
|
|
Returns: |
|
|
{ |
|
|
"content_hash": str, |
|
|
"hash_algorithm": str, |
|
|
"hash_timestamp": str, |
|
|
...passes through input_data... |
|
|
} |
|
|
""" |
|
|
content = input_data.get("content") |
|
|
|
|
|
if not content: |
|
|
raise HashingError("Missing 'content' field") |
|
|
|
|
|
if not isinstance(content, bytes): |
|
|
raise HashingError("Content must be bytes") |
|
|
|
|
|
|
|
|
algorithm = settings.HASH_ALGORITHM |
|
|
hash_value = self._compute_hash(content, algorithm) |
|
|
|
|
|
|
|
|
result = input_data.copy() |
|
|
result.update({ |
|
|
"content_hash": hash_value, |
|
|
"hash_algorithm": algorithm, |
|
|
"hash_timestamp": datetime.now(timezone.utc).isoformat(), |
|
|
}) |
|
|
|
|
|
return result |
|
|
|
|
|
def _compute_hash(self, content: bytes, algorithm: str) -> str: |
|
|
""" |
|
|
Compute hash using specified algorithm. |
|
|
|
|
|
Args: |
|
|
content: Content bytes to hash |
|
|
algorithm: Hash algorithm name (e.g., 'sha256') |
|
|
|
|
|
Returns: |
|
|
Hexadecimal hash string |
|
|
""" |
|
|
try: |
|
|
hasher = hashlib.new(algorithm) |
|
|
hasher.update(content) |
|
|
return hasher.hexdigest() |
|
|
except ValueError as e: |
|
|
raise HashingError(f"Unsupported hash algorithm: {algorithm}") from e |
|
|
except Exception as e: |
|
|
raise HashingError(f"Hash computation failed: {str(e)}") from e |