from __future__ import annotations from datetime import datetime, timedelta from typing import Any, Dict, List, Optional from zoneinfo import ZoneInfo from ...logging_config import logger from .models import TriggerRecord from .store import TriggerStore from .utils import ( build_recurrence, coerce_start_datetime, load_rrule, normalize_status, parse_iso, resolve_timezone, to_storage_timestamp, utc_now, ) MISSED_TRIGGER_GRACE_PERIOD = timedelta(minutes=5) class TriggerService: """High-level trigger management with recurrence awareness.""" def __init__(self, store: TriggerStore): self._store = store def create_trigger( self, *, agent_name: str, payload: str, recurrence_rule: Optional[str] = None, start_time: Optional[str] = None, timezone_name: Optional[str] = None, status: Optional[str] = None, ) -> TriggerRecord: tz = resolve_timezone(timezone_name) now = utc_now() start_dt_local = coerce_start_datetime(start_time, tz, now) stored_recurrence = build_recurrence(recurrence_rule, start_dt_local, tz) next_fire = self._compute_next_fire( stored_recurrence=stored_recurrence, start_dt_local=start_dt_local, tz=tz, now=now, ) timestamp = to_storage_timestamp(now) record: Dict[str, Any] = { "agent_name": agent_name, "payload": payload, "start_time": to_storage_timestamp(start_dt_local), "next_trigger": to_storage_timestamp(next_fire) if next_fire else None, "recurrence_rule": stored_recurrence, "timezone": getattr(tz, "key", "UTC"), "status": normalize_status(status), "last_error": None, "created_at": timestamp, "updated_at": timestamp, } trigger_id = self._store.insert(record) created = self._store.fetch_one(trigger_id, agent_name) if not created: # pragma: no cover - defensive raise RuntimeError("Failed to load trigger after insert") return created def update_trigger( self, trigger_id: int, *, agent_name: str, payload: Optional[str] = None, recurrence_rule: Optional[str] = None, start_time: Optional[str] = None, timezone_name: Optional[str] = None, status: Optional[str] = None, last_error: Optional[str] = None, clear_error: bool = False, ) -> Optional[TriggerRecord]: existing = self._store.fetch_one(trigger_id, agent_name) if existing is None: return None tz = resolve_timezone(timezone_name or existing.timezone) start_reference = ( parse_iso(existing.start_time) if existing.start_time else utc_now() ) start_dt_local = coerce_start_datetime(start_time, tz, start_reference) fields: Dict[str, Any] = {} if payload is not None: fields["payload"] = payload normalized_status = None status_changed_to_active = False if status is not None: normalized_status = normalize_status(status) fields["status"] = normalized_status status_changed_to_active = ( normalized_status == "active" and existing.status != "active" ) else: normalized_status = existing.status if start_time is not None: fields["start_time"] = to_storage_timestamp(start_dt_local.astimezone(tz)) if timezone_name is not None: fields["timezone"] = getattr(tz, "key", "UTC") schedule_inputs_changed = any( value is not None for value in (recurrence_rule, start_time, timezone_name) ) recurrence_source = ( recurrence_rule if recurrence_rule is not None else existing.recurrence_rule ) if schedule_inputs_changed: stored_recurrence = ( build_recurrence(recurrence_source, start_dt_local, tz) if recurrence_source else None ) else: stored_recurrence = recurrence_source next_trigger_dt = ( parse_iso(existing.next_trigger) if existing.next_trigger else None ) now = utc_now() should_recompute_schedule = schedule_inputs_changed if status_changed_to_active: if next_trigger_dt is None: should_recompute_schedule = True else: missed_duration = now - next_trigger_dt if missed_duration > MISSED_TRIGGER_GRACE_PERIOD: should_recompute_schedule = True if should_recompute_schedule: next_fire = self._compute_next_fire( stored_recurrence=stored_recurrence, start_dt_local=start_dt_local, tz=tz, now=now, ) if ( stored_recurrence is None and recurrence_rule is None and start_time is None and status_changed_to_active and next_fire is not None and next_fire <= now ): next_fire = now fields["next_trigger"] = ( to_storage_timestamp(next_fire) if next_fire else None ) if schedule_inputs_changed: fields["recurrence_rule"] = stored_recurrence elif schedule_inputs_changed: fields["recurrence_rule"] = stored_recurrence if clear_error: fields["last_error"] = None elif last_error is not None: fields["last_error"] = last_error if not fields: return existing updated = self._store.update(trigger_id, agent_name, fields) return self._store.fetch_one(trigger_id, agent_name) if updated else existing def list_triggers(self, *, agent_name: str) -> List[TriggerRecord]: return self._store.list_for_agent(agent_name) def get_due_triggers( self, *, before: datetime, agent_name: Optional[str] = None ) -> List[TriggerRecord]: iso_cutoff = to_storage_timestamp(before) return self._store.fetch_due(agent_name, iso_cutoff) def mark_as_completed(self, trigger_id: int, *, agent_name: str) -> None: self._store.update( trigger_id, agent_name, { "status": "completed", "next_trigger": None, "last_error": None, }, ) def schedule_next_occurrence( self, trigger: TriggerRecord, *, fired_at: datetime, ) -> Optional[TriggerRecord]: if not trigger.recurrence_rule: self.mark_as_completed(trigger.id, agent_name=trigger.agent_name) return self._store.fetch_one(trigger.id, trigger.agent_name) tz = resolve_timezone(trigger.timezone) next_fire = self._compute_next_after(trigger.recurrence_rule, fired_at, tz) fields: Dict[str, Any] = { "next_trigger": to_storage_timestamp(next_fire) if next_fire else None, "last_error": None, } if next_fire is None: fields["status"] = "completed" self._store.update(trigger.id, trigger.agent_name, fields) return self._store.fetch_one(trigger.id, trigger.agent_name) def record_failure(self, trigger: TriggerRecord, error: str) -> None: self._store.update( trigger.id, trigger.agent_name, { "last_error": error, }, ) def clear_next_fire(self, trigger_id: int, *, agent_name: str) -> Optional[TriggerRecord]: self._store.update( trigger_id, agent_name, { "next_trigger": None, }, ) return self._store.fetch_one(trigger_id, agent_name) def clear_all(self) -> None: self._store.clear_all() def _compute_next_fire( self, *, stored_recurrence: Optional[str], start_dt_local: datetime, tz: ZoneInfo, now: datetime, ) -> Optional[datetime]: if stored_recurrence: rule = load_rrule(stored_recurrence) next_occurrence = rule.after(now.astimezone(tz), inc=True) if next_occurrence is None: return None if next_occurrence.tzinfo is None: next_occurrence = next_occurrence.replace(tzinfo=tz) return next_occurrence.astimezone(tz) if start_dt_local < now.astimezone(tz): logger.warning( "start_time in the past; trigger will fire immediately", extra={"start_time": start_dt_local.isoformat()}, ) return start_dt_local def _compute_next_after( self, stored_recurrence: str, fired_at: datetime, tz: ZoneInfo, ) -> Optional[datetime]: rule = load_rrule(stored_recurrence) next_occurrence = rule.after(fired_at.astimezone(tz), inc=False) if next_occurrence is None: return None if next_occurrence.tzinfo is None: next_occurrence = next_occurrence.replace(tzinfo=tz) return next_occurrence.astimezone(tz) __all__ = ["TriggerService", "MISSED_TRIGGER_GRACE_PERIOD"]