File size: 3,845 Bytes
aa15bce
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
from __future__ import annotations

from datetime import datetime, timedelta, timezone
from typing import Optional

from dateutil import parser as date_parser
from dateutil.rrule import rrulestr
from zoneinfo import ZoneInfo

from ...logging_config import logger


UTC = timezone.utc
DEFAULT_STATUS = "active"
VALID_STATUSES = {"active", "paused", "completed"}


def utc_now() -> datetime:
    """Return the current time in UTC."""

    return datetime.now(UTC)


def to_storage_timestamp(moment: datetime) -> str:
    """Normalize timestamps before writing to SQLite."""

    return moment.astimezone(UTC).isoformat(timespec="seconds").replace("+00:00", "Z")


def resolve_timezone(timezone_name: Optional[str]) -> ZoneInfo:
    """Return a `ZoneInfo` instance, defaulting to UTC on errors."""

    if timezone_name:
        try:
            return ZoneInfo(timezone_name)
        except Exception:
            logger.warning(
                "unknown timezone provided; defaulting to UTC",
                extra={"timezone": timezone_name},
            )
    return ZoneInfo("UTC")


def normalize_status(status: Optional[str]) -> str:
    """Clamp trigger status to the known set."""

    if not status:
        return DEFAULT_STATUS
    normalized = status.lower()
    if normalized not in VALID_STATUSES:
        logger.warning(
            "invalid status supplied; defaulting to active",
            extra={"status": status},
        )
        return DEFAULT_STATUS
    return normalized


def parse_iso(timestamp: str) -> datetime:
    """Parse an ISO timestamp, defaulting to UTC when timezone is absent."""

    dt = date_parser.isoparse(timestamp)
    if dt.tzinfo is None:
        dt = dt.replace(tzinfo=UTC)
    return dt


def parse_datetime(timestamp: str, tz: ZoneInfo) -> datetime:
    """Parse a timestamp string into the provided timezone."""

    dt = date_parser.isoparse(timestamp)
    if dt.tzinfo is None:
        dt = dt.replace(tzinfo=tz)
    else:
        dt = dt.astimezone(tz)
    return dt


def coerce_start_datetime(
    start_time: Optional[str], tz: ZoneInfo, fallback: datetime
) -> datetime:
    """Return the desired start datetime in the agent's timezone."""

    if start_time:
        return parse_datetime(start_time, tz)
    return fallback.astimezone(tz)


def build_recurrence(
    recurrence_rule: Optional[str],
    start_dt_local: datetime,
    tz: ZoneInfo,
) -> Optional[str]:
    """Embed DTSTART metadata into the supplied RRULE text."""

    if not recurrence_rule:
        return None

    if start_dt_local.tzinfo is None:
        localized_start = start_dt_local.replace(tzinfo=tz)
    else:
        localized_start = start_dt_local.astimezone(tz)

    if localized_start.utcoffset() == timedelta(0):
        dt_line = f"DTSTART:{localized_start.astimezone(UTC).strftime('%Y%m%dT%H%M%SZ')}"
    else:
        tz_name = getattr(tz, "key", "UTC")
        dt_line = f"DTSTART;TZID={tz_name}:{localized_start.strftime('%Y%m%dT%H%M%S')}"

    lines = [segment.strip() for segment in recurrence_rule.strip().splitlines() if segment.strip()]
    filtered = [segment for segment in lines if not segment.upper().startswith("DTSTART")]
    if not filtered:
        raise ValueError("recurrence_rule must contain an RRULE definition")

    if not filtered[0].upper().startswith("RRULE"):
        filtered[0] = f"RRULE:{filtered[0]}"

    return "\n".join([dt_line, *filtered])


def load_rrule(recurrence_text: str):
    """Parse a stored recurrence string into a dateutil rule instance."""

    return rrulestr(recurrence_text)


__all__ = [
    "UTC",
    "DEFAULT_STATUS",
    "VALID_STATUSES",
    "build_recurrence",
    "coerce_start_datetime",
    "load_rrule",
    "normalize_status",
    "parse_datetime",
    "parse_iso",
    "resolve_timezone",
    "to_storage_timestamp",
    "utc_now",
]