tflux2011's picture
Upload 7 files
b2e0e38 verified
# ------------------------------------------------------------------------------
# Contextual Engineering Patterns
# Copyright (c) 2025 Tobi Lekan Adeosun
# Licensed under the MIT License.
#
# From Chapter 3: "The Disconnected Agent"
# ------------------------------------------------------------------------------
import uuid
import time
import json
import hashlib
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Dict, Optional
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Configuration Constants
DEFAULT_TTL_HOURS = 24
DEFAULT_MIN_BATTERY_LEVEL = 15
SECONDS_PER_HOUR = 3600
class ActionStatus(Enum):
PENDING = "PENDING"
SYNCED = "SYNCED"
FAILED = "FAILED"
EXPIRED = "EXPIRED"
def generate_hash(data: str) -> str:
"""Helper to create a deterministic hash for idempotency."""
return hashlib.sha256(data.encode('utf-8')).hexdigest()
@dataclass
class OfflineAction:
"""Represents an action to be synced when connectivity is restored."""
action_type: str
payload: Dict[str, Any]
priority: int = 1
ttl_hours: int = DEFAULT_TTL_HOURS
min_battery_level: int = DEFAULT_MIN_BATTERY_LEVEL
# Auto-generated fields
id: str = field(default_factory=lambda: str(uuid.uuid4()))
timestamp: float = field(default_factory=time.time)
status: ActionStatus = field(default=ActionStatus.PENDING)
idempotency_key: str = field(default="", init=False)
def __post_init__(self):
"""Generate idempotency key after initialization."""
# CRITICAL: The Idempotency Key ensures that if the
# network flutters and sends the request twice,
# the server only executes it once.
key_data = f"{json.dumps(self.payload, sort_keys=True)}:{self.timestamp}"
self.idempotency_key = generate_hash(key_data)
@property
def expiry_timestamp(self) -> float:
"""Calculate the expiry timestamp based on TTL."""
return self.timestamp + (self.ttl_hours * SECONDS_PER_HOUR)
def is_expired(self) -> bool:
"""Check if the action has exceeded its TTL."""
return time.time() > self.expiry_timestamp
def can_sync(self, current_battery_level: int) -> bool:
"""Check if conditions allow syncing this action."""
if self.is_expired():
logger.warning(f"Action {self.id} has expired")
return False
if current_battery_level < self.min_battery_level:
logger.warning(
f"Battery too low ({current_battery_level}%) to sync action {self.id}"
)
return False
return True
def mark_synced(self) -> None:
"""Mark the action as successfully synced."""
self.status = ActionStatus.SYNCED
logger.info(f"Action {self.id} marked as SYNCED")
def mark_failed(self, reason: Optional[str] = None) -> None:
"""Mark the action as failed."""
self.status = ActionStatus.FAILED
logger.error(f"Action {self.id} marked as FAILED: {reason}")
def mark_expired(self) -> None:
"""Mark the action as expired."""
self.status = ActionStatus.EXPIRED
logger.warning(f"Action {self.id} marked as EXPIRED")
def serialize(self) -> str:
"""Convert to JSON for storage in local SQLite."""
data = {
"id": self.id,
"action_type": self.action_type,
"payload": self.payload,
"priority": self.priority,
"timestamp": self.timestamp,
"status": self.status.value,
"ttl_hours": self.ttl_hours,
"expiry_timestamp": self.expiry_timestamp
# Note: idempotency_key intentionally excluded from serialization
# to prevent exposure; regenerate on deserialization
}
return json.dumps(data)
@classmethod
def deserialize(cls, json_str: str) -> "OfflineAction":
"""Create an OfflineAction from JSON string."""
try:
data = json.loads(json_str)
action = cls(
action_type=data["action_type"],
payload=data["payload"],
priority=data.get("priority", 1),
ttl_hours=data.get("ttl_hours", DEFAULT_TTL_HOURS)
)
action.id = data["id"]
action.timestamp = data["timestamp"]
action.status = ActionStatus(data.get("status", "PENDING"))
return action
except (json.JSONDecodeError, KeyError) as e:
logger.exception("Failed to deserialize OfflineAction")
raise ValueError(f"Invalid JSON data for OfflineAction: {str(e)}")