Spaces:
Configuration error
Configuration error
File size: 9,616 Bytes
aa15bce |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 |
from __future__ import annotations
from datetime import datetime, timedelta
from typing import Any, Dict, List, Optional
from zoneinfo import ZoneInfo
from ...logging_config import logger
from .models import TriggerRecord
from .store import TriggerStore
from .utils import (
build_recurrence,
coerce_start_datetime,
load_rrule,
normalize_status,
parse_iso,
resolve_timezone,
to_storage_timestamp,
utc_now,
)
MISSED_TRIGGER_GRACE_PERIOD = timedelta(minutes=5)
class TriggerService:
"""High-level trigger management with recurrence awareness."""
def __init__(self, store: TriggerStore):
self._store = store
def create_trigger(
self,
*,
agent_name: str,
payload: str,
recurrence_rule: Optional[str] = None,
start_time: Optional[str] = None,
timezone_name: Optional[str] = None,
status: Optional[str] = None,
) -> TriggerRecord:
tz = resolve_timezone(timezone_name)
now = utc_now()
start_dt_local = coerce_start_datetime(start_time, tz, now)
stored_recurrence = build_recurrence(recurrence_rule, start_dt_local, tz)
next_fire = self._compute_next_fire(
stored_recurrence=stored_recurrence,
start_dt_local=start_dt_local,
tz=tz,
now=now,
)
timestamp = to_storage_timestamp(now)
record: Dict[str, Any] = {
"agent_name": agent_name,
"payload": payload,
"start_time": to_storage_timestamp(start_dt_local),
"next_trigger": to_storage_timestamp(next_fire) if next_fire else None,
"recurrence_rule": stored_recurrence,
"timezone": getattr(tz, "key", "UTC"),
"status": normalize_status(status),
"last_error": None,
"created_at": timestamp,
"updated_at": timestamp,
}
trigger_id = self._store.insert(record)
created = self._store.fetch_one(trigger_id, agent_name)
if not created: # pragma: no cover - defensive
raise RuntimeError("Failed to load trigger after insert")
return created
def update_trigger(
self,
trigger_id: int,
*,
agent_name: str,
payload: Optional[str] = None,
recurrence_rule: Optional[str] = None,
start_time: Optional[str] = None,
timezone_name: Optional[str] = None,
status: Optional[str] = None,
last_error: Optional[str] = None,
clear_error: bool = False,
) -> Optional[TriggerRecord]:
existing = self._store.fetch_one(trigger_id, agent_name)
if existing is None:
return None
tz = resolve_timezone(timezone_name or existing.timezone)
start_reference = (
parse_iso(existing.start_time)
if existing.start_time
else utc_now()
)
start_dt_local = coerce_start_datetime(start_time, tz, start_reference)
fields: Dict[str, Any] = {}
if payload is not None:
fields["payload"] = payload
normalized_status = None
status_changed_to_active = False
if status is not None:
normalized_status = normalize_status(status)
fields["status"] = normalized_status
status_changed_to_active = (
normalized_status == "active" and existing.status != "active"
)
else:
normalized_status = existing.status
if start_time is not None:
fields["start_time"] = to_storage_timestamp(start_dt_local.astimezone(tz))
if timezone_name is not None:
fields["timezone"] = getattr(tz, "key", "UTC")
schedule_inputs_changed = any(
value is not None for value in (recurrence_rule, start_time, timezone_name)
)
recurrence_source = (
recurrence_rule if recurrence_rule is not None else existing.recurrence_rule
)
if schedule_inputs_changed:
stored_recurrence = (
build_recurrence(recurrence_source, start_dt_local, tz)
if recurrence_source
else None
)
else:
stored_recurrence = recurrence_source
next_trigger_dt = (
parse_iso(existing.next_trigger)
if existing.next_trigger
else None
)
now = utc_now()
should_recompute_schedule = schedule_inputs_changed
if status_changed_to_active:
if next_trigger_dt is None:
should_recompute_schedule = True
else:
missed_duration = now - next_trigger_dt
if missed_duration > MISSED_TRIGGER_GRACE_PERIOD:
should_recompute_schedule = True
if should_recompute_schedule:
next_fire = self._compute_next_fire(
stored_recurrence=stored_recurrence,
start_dt_local=start_dt_local,
tz=tz,
now=now,
)
if (
stored_recurrence is None
and recurrence_rule is None
and start_time is None
and status_changed_to_active
and next_fire is not None
and next_fire <= now
):
next_fire = now
fields["next_trigger"] = (
to_storage_timestamp(next_fire) if next_fire else None
)
if schedule_inputs_changed:
fields["recurrence_rule"] = stored_recurrence
elif schedule_inputs_changed:
fields["recurrence_rule"] = stored_recurrence
if clear_error:
fields["last_error"] = None
elif last_error is not None:
fields["last_error"] = last_error
if not fields:
return existing
updated = self._store.update(trigger_id, agent_name, fields)
return self._store.fetch_one(trigger_id, agent_name) if updated else existing
def list_triggers(self, *, agent_name: str) -> List[TriggerRecord]:
return self._store.list_for_agent(agent_name)
def get_due_triggers(
self, *, before: datetime, agent_name: Optional[str] = None
) -> List[TriggerRecord]:
iso_cutoff = to_storage_timestamp(before)
return self._store.fetch_due(agent_name, iso_cutoff)
def mark_as_completed(self, trigger_id: int, *, agent_name: str) -> None:
self._store.update(
trigger_id,
agent_name,
{
"status": "completed",
"next_trigger": None,
"last_error": None,
},
)
def schedule_next_occurrence(
self,
trigger: TriggerRecord,
*,
fired_at: datetime,
) -> Optional[TriggerRecord]:
if not trigger.recurrence_rule:
self.mark_as_completed(trigger.id, agent_name=trigger.agent_name)
return self._store.fetch_one(trigger.id, trigger.agent_name)
tz = resolve_timezone(trigger.timezone)
next_fire = self._compute_next_after(trigger.recurrence_rule, fired_at, tz)
fields: Dict[str, Any] = {
"next_trigger": to_storage_timestamp(next_fire) if next_fire else None,
"last_error": None,
}
if next_fire is None:
fields["status"] = "completed"
self._store.update(trigger.id, trigger.agent_name, fields)
return self._store.fetch_one(trigger.id, trigger.agent_name)
def record_failure(self, trigger: TriggerRecord, error: str) -> None:
self._store.update(
trigger.id,
trigger.agent_name,
{
"last_error": error,
},
)
def clear_next_fire(self, trigger_id: int, *, agent_name: str) -> Optional[TriggerRecord]:
self._store.update(
trigger_id,
agent_name,
{
"next_trigger": None,
},
)
return self._store.fetch_one(trigger_id, agent_name)
def clear_all(self) -> None:
self._store.clear_all()
def _compute_next_fire(
self,
*,
stored_recurrence: Optional[str],
start_dt_local: datetime,
tz: ZoneInfo,
now: datetime,
) -> Optional[datetime]:
if stored_recurrence:
rule = load_rrule(stored_recurrence)
next_occurrence = rule.after(now.astimezone(tz), inc=True)
if next_occurrence is None:
return None
if next_occurrence.tzinfo is None:
next_occurrence = next_occurrence.replace(tzinfo=tz)
return next_occurrence.astimezone(tz)
if start_dt_local < now.astimezone(tz):
logger.warning(
"start_time in the past; trigger will fire immediately",
extra={"start_time": start_dt_local.isoformat()},
)
return start_dt_local
def _compute_next_after(
self,
stored_recurrence: str,
fired_at: datetime,
tz: ZoneInfo,
) -> Optional[datetime]:
rule = load_rrule(stored_recurrence)
next_occurrence = rule.after(fired_at.astimezone(tz), inc=False)
if next_occurrence is None:
return None
if next_occurrence.tzinfo is None:
next_occurrence = next_occurrence.replace(tzinfo=tz)
return next_occurrence.astimezone(tz)
__all__ = ["TriggerService", "MISSED_TRIGGER_GRACE_PERIOD"]
|