# ------------------------------------------------------------------------------ # Contextual Engineering Patterns # Copyright (c) 2025 Tobi Lekan Adeosun # Licensed under the MIT License. # # From Chapter 3: "The Disconnected Agent" # ------------------------------------------------------------------------------ import uuid import time import json import hashlib from dataclasses import dataclass, field from enum import Enum from typing import Any, Dict, Optional import logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Configuration Constants DEFAULT_TTL_HOURS = 24 DEFAULT_MIN_BATTERY_LEVEL = 15 SECONDS_PER_HOUR = 3600 class ActionStatus(Enum): PENDING = "PENDING" SYNCED = "SYNCED" FAILED = "FAILED" EXPIRED = "EXPIRED" def generate_hash(data: str) -> str: """Helper to create a deterministic hash for idempotency.""" return hashlib.sha256(data.encode('utf-8')).hexdigest() @dataclass class OfflineAction: """Represents an action to be synced when connectivity is restored.""" action_type: str payload: Dict[str, Any] priority: int = 1 ttl_hours: int = DEFAULT_TTL_HOURS min_battery_level: int = DEFAULT_MIN_BATTERY_LEVEL # Auto-generated fields id: str = field(default_factory=lambda: str(uuid.uuid4())) timestamp: float = field(default_factory=time.time) status: ActionStatus = field(default=ActionStatus.PENDING) idempotency_key: str = field(default="", init=False) def __post_init__(self): """Generate idempotency key after initialization.""" # CRITICAL: The Idempotency Key ensures that if the # network flutters and sends the request twice, # the server only executes it once. key_data = f"{json.dumps(self.payload, sort_keys=True)}:{self.timestamp}" self.idempotency_key = generate_hash(key_data) @property def expiry_timestamp(self) -> float: """Calculate the expiry timestamp based on TTL.""" return self.timestamp + (self.ttl_hours * SECONDS_PER_HOUR) def is_expired(self) -> bool: """Check if the action has exceeded its TTL.""" return time.time() > self.expiry_timestamp def can_sync(self, current_battery_level: int) -> bool: """Check if conditions allow syncing this action.""" if self.is_expired(): logger.warning(f"Action {self.id} has expired") return False if current_battery_level < self.min_battery_level: logger.warning( f"Battery too low ({current_battery_level}%) to sync action {self.id}" ) return False return True def mark_synced(self) -> None: """Mark the action as successfully synced.""" self.status = ActionStatus.SYNCED logger.info(f"Action {self.id} marked as SYNCED") def mark_failed(self, reason: Optional[str] = None) -> None: """Mark the action as failed.""" self.status = ActionStatus.FAILED logger.error(f"Action {self.id} marked as FAILED: {reason}") def mark_expired(self) -> None: """Mark the action as expired.""" self.status = ActionStatus.EXPIRED logger.warning(f"Action {self.id} marked as EXPIRED") def serialize(self) -> str: """Convert to JSON for storage in local SQLite.""" data = { "id": self.id, "action_type": self.action_type, "payload": self.payload, "priority": self.priority, "timestamp": self.timestamp, "status": self.status.value, "ttl_hours": self.ttl_hours, "expiry_timestamp": self.expiry_timestamp # Note: idempotency_key intentionally excluded from serialization # to prevent exposure; regenerate on deserialization } return json.dumps(data) @classmethod def deserialize(cls, json_str: str) -> "OfflineAction": """Create an OfflineAction from JSON string.""" try: data = json.loads(json_str) action = cls( action_type=data["action_type"], payload=data["payload"], priority=data.get("priority", 1), ttl_hours=data.get("ttl_hours", DEFAULT_TTL_HOURS) ) action.id = data["id"] action.timestamp = data["timestamp"] action.status = ActionStatus(data.get("status", "PENDING")) return action except (json.JSONDecodeError, KeyError) as e: logger.exception("Failed to deserialize OfflineAction") raise ValueError(f"Invalid JSON data for OfflineAction: {str(e)}")