File size: 4,708 Bytes
b2e0e38 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 |
# ------------------------------------------------------------------------------
# Contextual Engineering Patterns
# Copyright (c) 2025 Tobi Lekan Adeosun
# Licensed under the MIT License.
#
# From Chapter 3: "The Disconnected Agent"
# ------------------------------------------------------------------------------
import uuid
import time
import json
import hashlib
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Dict, Optional
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Configuration Constants
DEFAULT_TTL_HOURS = 24
DEFAULT_MIN_BATTERY_LEVEL = 15
SECONDS_PER_HOUR = 3600
class ActionStatus(Enum):
PENDING = "PENDING"
SYNCED = "SYNCED"
FAILED = "FAILED"
EXPIRED = "EXPIRED"
def generate_hash(data: str) -> str:
"""Helper to create a deterministic hash for idempotency."""
return hashlib.sha256(data.encode('utf-8')).hexdigest()
@dataclass
class OfflineAction:
"""Represents an action to be synced when connectivity is restored."""
action_type: str
payload: Dict[str, Any]
priority: int = 1
ttl_hours: int = DEFAULT_TTL_HOURS
min_battery_level: int = DEFAULT_MIN_BATTERY_LEVEL
# Auto-generated fields
id: str = field(default_factory=lambda: str(uuid.uuid4()))
timestamp: float = field(default_factory=time.time)
status: ActionStatus = field(default=ActionStatus.PENDING)
idempotency_key: str = field(default="", init=False)
def __post_init__(self):
"""Generate idempotency key after initialization."""
# CRITICAL: The Idempotency Key ensures that if the
# network flutters and sends the request twice,
# the server only executes it once.
key_data = f"{json.dumps(self.payload, sort_keys=True)}:{self.timestamp}"
self.idempotency_key = generate_hash(key_data)
@property
def expiry_timestamp(self) -> float:
"""Calculate the expiry timestamp based on TTL."""
return self.timestamp + (self.ttl_hours * SECONDS_PER_HOUR)
def is_expired(self) -> bool:
"""Check if the action has exceeded its TTL."""
return time.time() > self.expiry_timestamp
def can_sync(self, current_battery_level: int) -> bool:
"""Check if conditions allow syncing this action."""
if self.is_expired():
logger.warning(f"Action {self.id} has expired")
return False
if current_battery_level < self.min_battery_level:
logger.warning(
f"Battery too low ({current_battery_level}%) to sync action {self.id}"
)
return False
return True
def mark_synced(self) -> None:
"""Mark the action as successfully synced."""
self.status = ActionStatus.SYNCED
logger.info(f"Action {self.id} marked as SYNCED")
def mark_failed(self, reason: Optional[str] = None) -> None:
"""Mark the action as failed."""
self.status = ActionStatus.FAILED
logger.error(f"Action {self.id} marked as FAILED: {reason}")
def mark_expired(self) -> None:
"""Mark the action as expired."""
self.status = ActionStatus.EXPIRED
logger.warning(f"Action {self.id} marked as EXPIRED")
def serialize(self) -> str:
"""Convert to JSON for storage in local SQLite."""
data = {
"id": self.id,
"action_type": self.action_type,
"payload": self.payload,
"priority": self.priority,
"timestamp": self.timestamp,
"status": self.status.value,
"ttl_hours": self.ttl_hours,
"expiry_timestamp": self.expiry_timestamp
# Note: idempotency_key intentionally excluded from serialization
# to prevent exposure; regenerate on deserialization
}
return json.dumps(data)
@classmethod
def deserialize(cls, json_str: str) -> "OfflineAction":
"""Create an OfflineAction from JSON string."""
try:
data = json.loads(json_str)
action = cls(
action_type=data["action_type"],
payload=data["payload"],
priority=data.get("priority", 1),
ttl_hours=data.get("ttl_hours", DEFAULT_TTL_HOURS)
)
action.id = data["id"]
action.timestamp = data["timestamp"]
action.status = ActionStatus(data.get("status", "PENDING"))
return action
except (json.JSONDecodeError, KeyError) as e:
logger.exception("Failed to deserialize OfflineAction")
raise ValueError(f"Invalid JSON data for OfflineAction: {str(e)}")
|