Guilherme34's picture
Upload folder using huggingface_hub
aa15bce verified
from __future__ import annotations
from datetime import datetime, timedelta, timezone
from typing import Optional
from dateutil import parser as date_parser
from dateutil.rrule import rrulestr
from zoneinfo import ZoneInfo
from ...logging_config import logger
UTC = timezone.utc
DEFAULT_STATUS = "active"
VALID_STATUSES = {"active", "paused", "completed"}
def utc_now() -> datetime:
"""Return the current time in UTC."""
return datetime.now(UTC)
def to_storage_timestamp(moment: datetime) -> str:
"""Normalize timestamps before writing to SQLite."""
return moment.astimezone(UTC).isoformat(timespec="seconds").replace("+00:00", "Z")
def resolve_timezone(timezone_name: Optional[str]) -> ZoneInfo:
"""Return a `ZoneInfo` instance, defaulting to UTC on errors."""
if timezone_name:
try:
return ZoneInfo(timezone_name)
except Exception:
logger.warning(
"unknown timezone provided; defaulting to UTC",
extra={"timezone": timezone_name},
)
return ZoneInfo("UTC")
def normalize_status(status: Optional[str]) -> str:
"""Clamp trigger status to the known set."""
if not status:
return DEFAULT_STATUS
normalized = status.lower()
if normalized not in VALID_STATUSES:
logger.warning(
"invalid status supplied; defaulting to active",
extra={"status": status},
)
return DEFAULT_STATUS
return normalized
def parse_iso(timestamp: str) -> datetime:
"""Parse an ISO timestamp, defaulting to UTC when timezone is absent."""
dt = date_parser.isoparse(timestamp)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=UTC)
return dt
def parse_datetime(timestamp: str, tz: ZoneInfo) -> datetime:
"""Parse a timestamp string into the provided timezone."""
dt = date_parser.isoparse(timestamp)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=tz)
else:
dt = dt.astimezone(tz)
return dt
def coerce_start_datetime(
start_time: Optional[str], tz: ZoneInfo, fallback: datetime
) -> datetime:
"""Return the desired start datetime in the agent's timezone."""
if start_time:
return parse_datetime(start_time, tz)
return fallback.astimezone(tz)
def build_recurrence(
recurrence_rule: Optional[str],
start_dt_local: datetime,
tz: ZoneInfo,
) -> Optional[str]:
"""Embed DTSTART metadata into the supplied RRULE text."""
if not recurrence_rule:
return None
if start_dt_local.tzinfo is None:
localized_start = start_dt_local.replace(tzinfo=tz)
else:
localized_start = start_dt_local.astimezone(tz)
if localized_start.utcoffset() == timedelta(0):
dt_line = f"DTSTART:{localized_start.astimezone(UTC).strftime('%Y%m%dT%H%M%SZ')}"
else:
tz_name = getattr(tz, "key", "UTC")
dt_line = f"DTSTART;TZID={tz_name}:{localized_start.strftime('%Y%m%dT%H%M%S')}"
lines = [segment.strip() for segment in recurrence_rule.strip().splitlines() if segment.strip()]
filtered = [segment for segment in lines if not segment.upper().startswith("DTSTART")]
if not filtered:
raise ValueError("recurrence_rule must contain an RRULE definition")
if not filtered[0].upper().startswith("RRULE"):
filtered[0] = f"RRULE:{filtered[0]}"
return "\n".join([dt_line, *filtered])
def load_rrule(recurrence_text: str):
"""Parse a stored recurrence string into a dateutil rule instance."""
return rrulestr(recurrence_text)
__all__ = [
"UTC",
"DEFAULT_STATUS",
"VALID_STATUSES",
"build_recurrence",
"coerce_start_datetime",
"load_rrule",
"normalize_status",
"parse_datetime",
"parse_iso",
"resolve_timezone",
"to_storage_timestamp",
"utc_now",
]