File size: 4,708 Bytes
b2e0e38
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
# ------------------------------------------------------------------------------
# Contextual Engineering Patterns
# Copyright (c) 2025 Tobi Lekan Adeosun
# Licensed under the MIT License.
#
# From Chapter 3: "The Disconnected Agent"
# ------------------------------------------------------------------------------

import uuid
import time
import json
import hashlib
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Dict, Optional
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


# Configuration Constants
DEFAULT_TTL_HOURS = 24
DEFAULT_MIN_BATTERY_LEVEL = 15
SECONDS_PER_HOUR = 3600


class ActionStatus(Enum):
    PENDING = "PENDING"
    SYNCED = "SYNCED"
    FAILED = "FAILED"
    EXPIRED = "EXPIRED"


def generate_hash(data: str) -> str:
    """Helper to create a deterministic hash for idempotency."""
    return hashlib.sha256(data.encode('utf-8')).hexdigest()


@dataclass
class OfflineAction:
    """Represents an action to be synced when connectivity is restored."""

    action_type: str
    payload: Dict[str, Any]
    priority: int = 1
    ttl_hours: int = DEFAULT_TTL_HOURS
    min_battery_level: int = DEFAULT_MIN_BATTERY_LEVEL

    # Auto-generated fields
    id: str = field(default_factory=lambda: str(uuid.uuid4()))
    timestamp: float = field(default_factory=time.time)
    status: ActionStatus = field(default=ActionStatus.PENDING)
    idempotency_key: str = field(default="", init=False)

    def __post_init__(self):
        """Generate idempotency key after initialization."""
        # CRITICAL: The Idempotency Key ensures that if the
        # network flutters and sends the request twice,
        # the server only executes it once.
        key_data = f"{json.dumps(self.payload, sort_keys=True)}:{self.timestamp}"
        self.idempotency_key = generate_hash(key_data)

    @property
    def expiry_timestamp(self) -> float:
        """Calculate the expiry timestamp based on TTL."""
        return self.timestamp + (self.ttl_hours * SECONDS_PER_HOUR)

    def is_expired(self) -> bool:
        """Check if the action has exceeded its TTL."""
        return time.time() > self.expiry_timestamp

    def can_sync(self, current_battery_level: int) -> bool:
        """Check if conditions allow syncing this action."""
        if self.is_expired():
            logger.warning(f"Action {self.id} has expired")
            return False

        if current_battery_level < self.min_battery_level:
            logger.warning(
                f"Battery too low ({current_battery_level}%) to sync action {self.id}"
            )
            return False

        return True

    def mark_synced(self) -> None:
        """Mark the action as successfully synced."""
        self.status = ActionStatus.SYNCED
        logger.info(f"Action {self.id} marked as SYNCED")

    def mark_failed(self, reason: Optional[str] = None) -> None:
        """Mark the action as failed."""
        self.status = ActionStatus.FAILED
        logger.error(f"Action {self.id} marked as FAILED: {reason}")

    def mark_expired(self) -> None:
        """Mark the action as expired."""
        self.status = ActionStatus.EXPIRED
        logger.warning(f"Action {self.id} marked as EXPIRED")

    def serialize(self) -> str:
        """Convert to JSON for storage in local SQLite."""
        data = {
            "id": self.id,
            "action_type": self.action_type,
            "payload": self.payload,
            "priority": self.priority,
            "timestamp": self.timestamp,
            "status": self.status.value,
            "ttl_hours": self.ttl_hours,
            "expiry_timestamp": self.expiry_timestamp
            # Note: idempotency_key intentionally excluded from serialization
            # to prevent exposure; regenerate on deserialization
        }
        return json.dumps(data)

    @classmethod
    def deserialize(cls, json_str: str) -> "OfflineAction":
        """Create an OfflineAction from JSON string."""
        try:
            data = json.loads(json_str)
            action = cls(
                action_type=data["action_type"],
                payload=data["payload"],
                priority=data.get("priority", 1),
                ttl_hours=data.get("ttl_hours", DEFAULT_TTL_HOURS)
            )
            action.id = data["id"]
            action.timestamp = data["timestamp"]
            action.status = ActionStatus(data.get("status", "PENDING"))
            return action
        except (json.JSONDecodeError, KeyError) as e:
            logger.exception("Failed to deserialize OfflineAction")
            raise ValueError(f"Invalid JSON data for OfflineAction: {str(e)}")