"""Background scheduler that watches trigger definitions and executes them.""" from __future__ import annotations import asyncio from datetime import datetime, timezone from typing import Optional, Set from ..agents.execution_agent.batch_manager import ExecutionBatchManager from ..agents.execution_agent.runtime import ExecutionResult from ..logging_config import logger from .triggers import TriggerRecord, get_trigger_service UTC = timezone.utc def _utc_now() -> datetime: return datetime.now(UTC) def _isoformat(dt: datetime) -> str: return dt.astimezone(UTC).isoformat(timespec="seconds").replace("+00:00", "Z") class TriggerScheduler: """Polls stored triggers and launches execution agents when due.""" def __init__(self, poll_interval_seconds: float = 10.0) -> None: self._poll_interval = poll_interval_seconds self._service = get_trigger_service() self._task: Optional[asyncio.Task[None]] = None self._running = False self._in_flight: Set[int] = set() self._lock = asyncio.Lock() async def start(self) -> None: async with self._lock: if self._task and not self._task.done(): return loop = asyncio.get_running_loop() self._running = True self._task = loop.create_task(self._run(), name="trigger-scheduler") logger.info("Trigger scheduler started", extra={"interval": self._poll_interval}) async def stop(self) -> None: async with self._lock: self._running = False if self._task: self._task.cancel() try: await self._task except asyncio.CancelledError: pass self._task = None logger.info("Trigger scheduler stopped") async def _run(self) -> None: try: while self._running: await self._poll_once() await asyncio.sleep(self._poll_interval) except asyncio.CancelledError: # pragma: no cover - shutdown path raise except Exception as exc: # pragma: no cover - defensive logger.exception("Trigger scheduler loop crashed", extra={"error": str(exc)}) async def _poll_once(self) -> None: now = _utc_now() due_triggers = self._service.get_due_triggers(before=now) if not due_triggers: return for trigger in due_triggers: if trigger.id in self._in_flight: continue self._in_flight.add(trigger.id) asyncio.create_task(self._execute_trigger(trigger), name=f"trigger-{trigger.id}") async def _execute_trigger(self, trigger: TriggerRecord) -> None: try: fired_at = _utc_now() instructions = self._format_instructions(trigger, fired_at) logger.info( "Dispatching trigger", extra={ "trigger_id": trigger.id, "agent": trigger.agent_name, "scheduled_for": trigger.next_trigger, }, ) execution_manager = ExecutionBatchManager() result = await execution_manager.execute_agent( trigger.agent_name, instructions, ) if result.success: self._handle_success(trigger, fired_at) else: error_text = result.error or result.response self._handle_failure(trigger, fired_at, error_text) except Exception as exc: # pragma: no cover - defensive self._handle_failure(trigger, _utc_now(), str(exc)) logger.exception( "Trigger execution failed unexpectedly", extra={"trigger_id": trigger.id, "agent": trigger.agent_name}, ) finally: self._in_flight.discard(trigger.id) def _handle_success(self, trigger: TriggerRecord, fired_at: datetime) -> None: logger.info( "Trigger completed", extra={"trigger_id": trigger.id, "agent": trigger.agent_name}, ) self._service.schedule_next_occurrence(trigger, fired_at=fired_at) def _handle_failure(self, trigger: TriggerRecord, fired_at: datetime, error: str) -> None: logger.warning( "Trigger execution failed", extra={ "trigger_id": trigger.id, "agent": trigger.agent_name, "error": error, }, ) self._service.record_failure(trigger, error) if trigger.recurrence_rule: self._service.schedule_next_occurrence(trigger, fired_at=fired_at) else: self._service.clear_next_fire(trigger.id, agent_name=trigger.agent_name) def _format_instructions(self, trigger: TriggerRecord, fired_at: datetime) -> str: scheduled_for = trigger.next_trigger or _isoformat(fired_at) metadata_lines = [f"Trigger ID: {trigger.id}"] if trigger.recurrence_rule: metadata_lines.append(f"Recurrence: {trigger.recurrence_rule}") if trigger.timezone: metadata_lines.append(f"Timezone: {trigger.timezone}") if trigger.start_time: metadata_lines.append(f"Start Time (UTC): {trigger.start_time}") metadata = "\n".join(f"- {line}" for line in metadata_lines) return ( f"Trigger fired at {_isoformat(fired_at)} (UTC).\n" f"Scheduled occurrence time: {scheduled_for}.\n\n" f"Metadata:\n{metadata}\n\n" f"Payload:\n{trigger.payload}" ) _scheduler_instance: Optional[TriggerScheduler] = None def get_trigger_scheduler() -> TriggerScheduler: global _scheduler_instance if _scheduler_instance is None: _scheduler_instance = TriggerScheduler() return _scheduler_instance __all__ = ["TriggerScheduler", "get_trigger_scheduler"]