File size: 2,221 Bytes
2c41dce |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 |
"""
Hashing Agent
Generates cryptographic hashes for content verification.
"""
from typing import Dict, Any
import hashlib
from datetime import datetime, timezone
from core.agent_base import Agent
from core.errors import HashingError
from config.settings import settings
class HashingAgent(Agent):
"""
Generates SHA-256 hash for files or text content.
"""
def execute(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
"""
Generate cryptographic hash.
Expected input_data:
{
"content": bytes,
...other fields...
}
Returns:
{
"content_hash": str,
"hash_algorithm": str,
"hash_timestamp": str,
...passes through input_data...
}
"""
content = input_data.get("content")
if not content:
raise HashingError("Missing 'content' field")
if not isinstance(content, bytes):
raise HashingError("Content must be bytes")
# Generate hash
algorithm = settings.HASH_ALGORITHM
hash_value = self._compute_hash(content, algorithm)
# Add hash info to input data
result = input_data.copy()
result.update({
"content_hash": hash_value,
"hash_algorithm": algorithm,
"hash_timestamp": datetime.now(timezone.utc).isoformat(),
})
return result
def _compute_hash(self, content: bytes, algorithm: str) -> str:
"""
Compute hash using specified algorithm.
Args:
content: Content bytes to hash
algorithm: Hash algorithm name (e.g., 'sha256')
Returns:
Hexadecimal hash string
"""
try:
hasher = hashlib.new(algorithm)
hasher.update(content)
return hasher.hexdigest()
except ValueError as e:
raise HashingError(f"Unsupported hash algorithm: {algorithm}") from e
except Exception as e:
raise HashingError(f"Hash computation failed: {str(e)}") from e |