Spaces:
Configuration error
Configuration error
File size: 3,845 Bytes
aa15bce |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 |
from __future__ import annotations
from datetime import datetime, timedelta, timezone
from typing import Optional
from dateutil import parser as date_parser
from dateutil.rrule import rrulestr
from zoneinfo import ZoneInfo
from ...logging_config import logger
UTC = timezone.utc
DEFAULT_STATUS = "active"
VALID_STATUSES = {"active", "paused", "completed"}
def utc_now() -> datetime:
"""Return the current time in UTC."""
return datetime.now(UTC)
def to_storage_timestamp(moment: datetime) -> str:
"""Normalize timestamps before writing to SQLite."""
return moment.astimezone(UTC).isoformat(timespec="seconds").replace("+00:00", "Z")
def resolve_timezone(timezone_name: Optional[str]) -> ZoneInfo:
"""Return a `ZoneInfo` instance, defaulting to UTC on errors."""
if timezone_name:
try:
return ZoneInfo(timezone_name)
except Exception:
logger.warning(
"unknown timezone provided; defaulting to UTC",
extra={"timezone": timezone_name},
)
return ZoneInfo("UTC")
def normalize_status(status: Optional[str]) -> str:
"""Clamp trigger status to the known set."""
if not status:
return DEFAULT_STATUS
normalized = status.lower()
if normalized not in VALID_STATUSES:
logger.warning(
"invalid status supplied; defaulting to active",
extra={"status": status},
)
return DEFAULT_STATUS
return normalized
def parse_iso(timestamp: str) -> datetime:
"""Parse an ISO timestamp, defaulting to UTC when timezone is absent."""
dt = date_parser.isoparse(timestamp)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=UTC)
return dt
def parse_datetime(timestamp: str, tz: ZoneInfo) -> datetime:
"""Parse a timestamp string into the provided timezone."""
dt = date_parser.isoparse(timestamp)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=tz)
else:
dt = dt.astimezone(tz)
return dt
def coerce_start_datetime(
start_time: Optional[str], tz: ZoneInfo, fallback: datetime
) -> datetime:
"""Return the desired start datetime in the agent's timezone."""
if start_time:
return parse_datetime(start_time, tz)
return fallback.astimezone(tz)
def build_recurrence(
recurrence_rule: Optional[str],
start_dt_local: datetime,
tz: ZoneInfo,
) -> Optional[str]:
"""Embed DTSTART metadata into the supplied RRULE text."""
if not recurrence_rule:
return None
if start_dt_local.tzinfo is None:
localized_start = start_dt_local.replace(tzinfo=tz)
else:
localized_start = start_dt_local.astimezone(tz)
if localized_start.utcoffset() == timedelta(0):
dt_line = f"DTSTART:{localized_start.astimezone(UTC).strftime('%Y%m%dT%H%M%SZ')}"
else:
tz_name = getattr(tz, "key", "UTC")
dt_line = f"DTSTART;TZID={tz_name}:{localized_start.strftime('%Y%m%dT%H%M%S')}"
lines = [segment.strip() for segment in recurrence_rule.strip().splitlines() if segment.strip()]
filtered = [segment for segment in lines if not segment.upper().startswith("DTSTART")]
if not filtered:
raise ValueError("recurrence_rule must contain an RRULE definition")
if not filtered[0].upper().startswith("RRULE"):
filtered[0] = f"RRULE:{filtered[0]}"
return "\n".join([dt_line, *filtered])
def load_rrule(recurrence_text: str):
"""Parse a stored recurrence string into a dateutil rule instance."""
return rrulestr(recurrence_text)
__all__ = [
"UTC",
"DEFAULT_STATUS",
"VALID_STATUSES",
"build_recurrence",
"coerce_start_datetime",
"load_rrule",
"normalize_status",
"parse_datetime",
"parse_iso",
"resolve_timezone",
"to_storage_timestamp",
"utc_now",
]
|