Guilherme34's picture
Upload folder using huggingface_hub
aa15bce verified
from __future__ import annotations
from datetime import datetime, timedelta
from typing import Any, Dict, List, Optional
from zoneinfo import ZoneInfo
from ...logging_config import logger
from .models import TriggerRecord
from .store import TriggerStore
from .utils import (
build_recurrence,
coerce_start_datetime,
load_rrule,
normalize_status,
parse_iso,
resolve_timezone,
to_storage_timestamp,
utc_now,
)
MISSED_TRIGGER_GRACE_PERIOD = timedelta(minutes=5)
class TriggerService:
"""High-level trigger management with recurrence awareness."""
def __init__(self, store: TriggerStore):
self._store = store
def create_trigger(
self,
*,
agent_name: str,
payload: str,
recurrence_rule: Optional[str] = None,
start_time: Optional[str] = None,
timezone_name: Optional[str] = None,
status: Optional[str] = None,
) -> TriggerRecord:
tz = resolve_timezone(timezone_name)
now = utc_now()
start_dt_local = coerce_start_datetime(start_time, tz, now)
stored_recurrence = build_recurrence(recurrence_rule, start_dt_local, tz)
next_fire = self._compute_next_fire(
stored_recurrence=stored_recurrence,
start_dt_local=start_dt_local,
tz=tz,
now=now,
)
timestamp = to_storage_timestamp(now)
record: Dict[str, Any] = {
"agent_name": agent_name,
"payload": payload,
"start_time": to_storage_timestamp(start_dt_local),
"next_trigger": to_storage_timestamp(next_fire) if next_fire else None,
"recurrence_rule": stored_recurrence,
"timezone": getattr(tz, "key", "UTC"),
"status": normalize_status(status),
"last_error": None,
"created_at": timestamp,
"updated_at": timestamp,
}
trigger_id = self._store.insert(record)
created = self._store.fetch_one(trigger_id, agent_name)
if not created: # pragma: no cover - defensive
raise RuntimeError("Failed to load trigger after insert")
return created
def update_trigger(
self,
trigger_id: int,
*,
agent_name: str,
payload: Optional[str] = None,
recurrence_rule: Optional[str] = None,
start_time: Optional[str] = None,
timezone_name: Optional[str] = None,
status: Optional[str] = None,
last_error: Optional[str] = None,
clear_error: bool = False,
) -> Optional[TriggerRecord]:
existing = self._store.fetch_one(trigger_id, agent_name)
if existing is None:
return None
tz = resolve_timezone(timezone_name or existing.timezone)
start_reference = (
parse_iso(existing.start_time)
if existing.start_time
else utc_now()
)
start_dt_local = coerce_start_datetime(start_time, tz, start_reference)
fields: Dict[str, Any] = {}
if payload is not None:
fields["payload"] = payload
normalized_status = None
status_changed_to_active = False
if status is not None:
normalized_status = normalize_status(status)
fields["status"] = normalized_status
status_changed_to_active = (
normalized_status == "active" and existing.status != "active"
)
else:
normalized_status = existing.status
if start_time is not None:
fields["start_time"] = to_storage_timestamp(start_dt_local.astimezone(tz))
if timezone_name is not None:
fields["timezone"] = getattr(tz, "key", "UTC")
schedule_inputs_changed = any(
value is not None for value in (recurrence_rule, start_time, timezone_name)
)
recurrence_source = (
recurrence_rule if recurrence_rule is not None else existing.recurrence_rule
)
if schedule_inputs_changed:
stored_recurrence = (
build_recurrence(recurrence_source, start_dt_local, tz)
if recurrence_source
else None
)
else:
stored_recurrence = recurrence_source
next_trigger_dt = (
parse_iso(existing.next_trigger)
if existing.next_trigger
else None
)
now = utc_now()
should_recompute_schedule = schedule_inputs_changed
if status_changed_to_active:
if next_trigger_dt is None:
should_recompute_schedule = True
else:
missed_duration = now - next_trigger_dt
if missed_duration > MISSED_TRIGGER_GRACE_PERIOD:
should_recompute_schedule = True
if should_recompute_schedule:
next_fire = self._compute_next_fire(
stored_recurrence=stored_recurrence,
start_dt_local=start_dt_local,
tz=tz,
now=now,
)
if (
stored_recurrence is None
and recurrence_rule is None
and start_time is None
and status_changed_to_active
and next_fire is not None
and next_fire <= now
):
next_fire = now
fields["next_trigger"] = (
to_storage_timestamp(next_fire) if next_fire else None
)
if schedule_inputs_changed:
fields["recurrence_rule"] = stored_recurrence
elif schedule_inputs_changed:
fields["recurrence_rule"] = stored_recurrence
if clear_error:
fields["last_error"] = None
elif last_error is not None:
fields["last_error"] = last_error
if not fields:
return existing
updated = self._store.update(trigger_id, agent_name, fields)
return self._store.fetch_one(trigger_id, agent_name) if updated else existing
def list_triggers(self, *, agent_name: str) -> List[TriggerRecord]:
return self._store.list_for_agent(agent_name)
def get_due_triggers(
self, *, before: datetime, agent_name: Optional[str] = None
) -> List[TriggerRecord]:
iso_cutoff = to_storage_timestamp(before)
return self._store.fetch_due(agent_name, iso_cutoff)
def mark_as_completed(self, trigger_id: int, *, agent_name: str) -> None:
self._store.update(
trigger_id,
agent_name,
{
"status": "completed",
"next_trigger": None,
"last_error": None,
},
)
def schedule_next_occurrence(
self,
trigger: TriggerRecord,
*,
fired_at: datetime,
) -> Optional[TriggerRecord]:
if not trigger.recurrence_rule:
self.mark_as_completed(trigger.id, agent_name=trigger.agent_name)
return self._store.fetch_one(trigger.id, trigger.agent_name)
tz = resolve_timezone(trigger.timezone)
next_fire = self._compute_next_after(trigger.recurrence_rule, fired_at, tz)
fields: Dict[str, Any] = {
"next_trigger": to_storage_timestamp(next_fire) if next_fire else None,
"last_error": None,
}
if next_fire is None:
fields["status"] = "completed"
self._store.update(trigger.id, trigger.agent_name, fields)
return self._store.fetch_one(trigger.id, trigger.agent_name)
def record_failure(self, trigger: TriggerRecord, error: str) -> None:
self._store.update(
trigger.id,
trigger.agent_name,
{
"last_error": error,
},
)
def clear_next_fire(self, trigger_id: int, *, agent_name: str) -> Optional[TriggerRecord]:
self._store.update(
trigger_id,
agent_name,
{
"next_trigger": None,
},
)
return self._store.fetch_one(trigger_id, agent_name)
def clear_all(self) -> None:
self._store.clear_all()
def _compute_next_fire(
self,
*,
stored_recurrence: Optional[str],
start_dt_local: datetime,
tz: ZoneInfo,
now: datetime,
) -> Optional[datetime]:
if stored_recurrence:
rule = load_rrule(stored_recurrence)
next_occurrence = rule.after(now.astimezone(tz), inc=True)
if next_occurrence is None:
return None
if next_occurrence.tzinfo is None:
next_occurrence = next_occurrence.replace(tzinfo=tz)
return next_occurrence.astimezone(tz)
if start_dt_local < now.astimezone(tz):
logger.warning(
"start_time in the past; trigger will fire immediately",
extra={"start_time": start_dt_local.isoformat()},
)
return start_dt_local
def _compute_next_after(
self,
stored_recurrence: str,
fired_at: datetime,
tz: ZoneInfo,
) -> Optional[datetime]:
rule = load_rrule(stored_recurrence)
next_occurrence = rule.after(fired_at.astimezone(tz), inc=False)
if next_occurrence is None:
return None
if next_occurrence.tzinfo is None:
next_occurrence = next_occurrence.replace(tzinfo=tz)
return next_occurrence.astimezone(tz)
__all__ = ["TriggerService", "MISSED_TRIGGER_GRACE_PERIOD"]