Spaces:
Configuration error
Configuration error
| from __future__ import annotations | |
| from datetime import datetime, timedelta, timezone | |
| from typing import Optional | |
| from dateutil import parser as date_parser | |
| from dateutil.rrule import rrulestr | |
| from zoneinfo import ZoneInfo | |
| from ...logging_config import logger | |
| UTC = timezone.utc | |
| DEFAULT_STATUS = "active" | |
| VALID_STATUSES = {"active", "paused", "completed"} | |
| def utc_now() -> datetime: | |
| """Return the current time in UTC.""" | |
| return datetime.now(UTC) | |
| def to_storage_timestamp(moment: datetime) -> str: | |
| """Normalize timestamps before writing to SQLite.""" | |
| return moment.astimezone(UTC).isoformat(timespec="seconds").replace("+00:00", "Z") | |
| def resolve_timezone(timezone_name: Optional[str]) -> ZoneInfo: | |
| """Return a `ZoneInfo` instance, defaulting to UTC on errors.""" | |
| if timezone_name: | |
| try: | |
| return ZoneInfo(timezone_name) | |
| except Exception: | |
| logger.warning( | |
| "unknown timezone provided; defaulting to UTC", | |
| extra={"timezone": timezone_name}, | |
| ) | |
| return ZoneInfo("UTC") | |
| def normalize_status(status: Optional[str]) -> str: | |
| """Clamp trigger status to the known set.""" | |
| if not status: | |
| return DEFAULT_STATUS | |
| normalized = status.lower() | |
| if normalized not in VALID_STATUSES: | |
| logger.warning( | |
| "invalid status supplied; defaulting to active", | |
| extra={"status": status}, | |
| ) | |
| return DEFAULT_STATUS | |
| return normalized | |
| def parse_iso(timestamp: str) -> datetime: | |
| """Parse an ISO timestamp, defaulting to UTC when timezone is absent.""" | |
| dt = date_parser.isoparse(timestamp) | |
| if dt.tzinfo is None: | |
| dt = dt.replace(tzinfo=UTC) | |
| return dt | |
| def parse_datetime(timestamp: str, tz: ZoneInfo) -> datetime: | |
| """Parse a timestamp string into the provided timezone.""" | |
| dt = date_parser.isoparse(timestamp) | |
| if dt.tzinfo is None: | |
| dt = dt.replace(tzinfo=tz) | |
| else: | |
| dt = dt.astimezone(tz) | |
| return dt | |
| def coerce_start_datetime( | |
| start_time: Optional[str], tz: ZoneInfo, fallback: datetime | |
| ) -> datetime: | |
| """Return the desired start datetime in the agent's timezone.""" | |
| if start_time: | |
| return parse_datetime(start_time, tz) | |
| return fallback.astimezone(tz) | |
| def build_recurrence( | |
| recurrence_rule: Optional[str], | |
| start_dt_local: datetime, | |
| tz: ZoneInfo, | |
| ) -> Optional[str]: | |
| """Embed DTSTART metadata into the supplied RRULE text.""" | |
| if not recurrence_rule: | |
| return None | |
| if start_dt_local.tzinfo is None: | |
| localized_start = start_dt_local.replace(tzinfo=tz) | |
| else: | |
| localized_start = start_dt_local.astimezone(tz) | |
| if localized_start.utcoffset() == timedelta(0): | |
| dt_line = f"DTSTART:{localized_start.astimezone(UTC).strftime('%Y%m%dT%H%M%SZ')}" | |
| else: | |
| tz_name = getattr(tz, "key", "UTC") | |
| dt_line = f"DTSTART;TZID={tz_name}:{localized_start.strftime('%Y%m%dT%H%M%S')}" | |
| lines = [segment.strip() for segment in recurrence_rule.strip().splitlines() if segment.strip()] | |
| filtered = [segment for segment in lines if not segment.upper().startswith("DTSTART")] | |
| if not filtered: | |
| raise ValueError("recurrence_rule must contain an RRULE definition") | |
| if not filtered[0].upper().startswith("RRULE"): | |
| filtered[0] = f"RRULE:{filtered[0]}" | |
| return "\n".join([dt_line, *filtered]) | |
| def load_rrule(recurrence_text: str): | |
| """Parse a stored recurrence string into a dateutil rule instance.""" | |
| return rrulestr(recurrence_text) | |
| __all__ = [ | |
| "UTC", | |
| "DEFAULT_STATUS", | |
| "VALID_STATUSES", | |
| "build_recurrence", | |
| "coerce_start_datetime", | |
| "load_rrule", | |
| "normalize_status", | |
| "parse_datetime", | |
| "parse_iso", | |
| "resolve_timezone", | |
| "to_storage_timestamp", | |
| "utc_now", | |
| ] | |