Spaces:
Configuration error
Configuration error
File size: 6,005 Bytes
aa15bce |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 |
"""Background scheduler that watches trigger definitions and executes them."""
from __future__ import annotations
import asyncio
from datetime import datetime, timezone
from typing import Optional, Set
from ..agents.execution_agent.batch_manager import ExecutionBatchManager
from ..agents.execution_agent.runtime import ExecutionResult
from ..logging_config import logger
from .triggers import TriggerRecord, get_trigger_service
UTC = timezone.utc
def _utc_now() -> datetime:
return datetime.now(UTC)
def _isoformat(dt: datetime) -> str:
return dt.astimezone(UTC).isoformat(timespec="seconds").replace("+00:00", "Z")
class TriggerScheduler:
"""Polls stored triggers and launches execution agents when due."""
def __init__(self, poll_interval_seconds: float = 10.0) -> None:
self._poll_interval = poll_interval_seconds
self._service = get_trigger_service()
self._task: Optional[asyncio.Task[None]] = None
self._running = False
self._in_flight: Set[int] = set()
self._lock = asyncio.Lock()
async def start(self) -> None:
async with self._lock:
if self._task and not self._task.done():
return
loop = asyncio.get_running_loop()
self._running = True
self._task = loop.create_task(self._run(), name="trigger-scheduler")
logger.info("Trigger scheduler started", extra={"interval": self._poll_interval})
async def stop(self) -> None:
async with self._lock:
self._running = False
if self._task:
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
self._task = None
logger.info("Trigger scheduler stopped")
async def _run(self) -> None:
try:
while self._running:
await self._poll_once()
await asyncio.sleep(self._poll_interval)
except asyncio.CancelledError: # pragma: no cover - shutdown path
raise
except Exception as exc: # pragma: no cover - defensive
logger.exception("Trigger scheduler loop crashed", extra={"error": str(exc)})
async def _poll_once(self) -> None:
now = _utc_now()
due_triggers = self._service.get_due_triggers(before=now)
if not due_triggers:
return
for trigger in due_triggers:
if trigger.id in self._in_flight:
continue
self._in_flight.add(trigger.id)
asyncio.create_task(self._execute_trigger(trigger), name=f"trigger-{trigger.id}")
async def _execute_trigger(self, trigger: TriggerRecord) -> None:
try:
fired_at = _utc_now()
instructions = self._format_instructions(trigger, fired_at)
logger.info(
"Dispatching trigger",
extra={
"trigger_id": trigger.id,
"agent": trigger.agent_name,
"scheduled_for": trigger.next_trigger,
},
)
execution_manager = ExecutionBatchManager()
result = await execution_manager.execute_agent(
trigger.agent_name,
instructions,
)
if result.success:
self._handle_success(trigger, fired_at)
else:
error_text = result.error or result.response
self._handle_failure(trigger, fired_at, error_text)
except Exception as exc: # pragma: no cover - defensive
self._handle_failure(trigger, _utc_now(), str(exc))
logger.exception(
"Trigger execution failed unexpectedly",
extra={"trigger_id": trigger.id, "agent": trigger.agent_name},
)
finally:
self._in_flight.discard(trigger.id)
def _handle_success(self, trigger: TriggerRecord, fired_at: datetime) -> None:
logger.info(
"Trigger completed",
extra={"trigger_id": trigger.id, "agent": trigger.agent_name},
)
self._service.schedule_next_occurrence(trigger, fired_at=fired_at)
def _handle_failure(self, trigger: TriggerRecord, fired_at: datetime, error: str) -> None:
logger.warning(
"Trigger execution failed",
extra={
"trigger_id": trigger.id,
"agent": trigger.agent_name,
"error": error,
},
)
self._service.record_failure(trigger, error)
if trigger.recurrence_rule:
self._service.schedule_next_occurrence(trigger, fired_at=fired_at)
else:
self._service.clear_next_fire(trigger.id, agent_name=trigger.agent_name)
def _format_instructions(self, trigger: TriggerRecord, fired_at: datetime) -> str:
scheduled_for = trigger.next_trigger or _isoformat(fired_at)
metadata_lines = [f"Trigger ID: {trigger.id}"]
if trigger.recurrence_rule:
metadata_lines.append(f"Recurrence: {trigger.recurrence_rule}")
if trigger.timezone:
metadata_lines.append(f"Timezone: {trigger.timezone}")
if trigger.start_time:
metadata_lines.append(f"Start Time (UTC): {trigger.start_time}")
metadata = "\n".join(f"- {line}" for line in metadata_lines)
return (
f"Trigger fired at {_isoformat(fired_at)} (UTC).\n"
f"Scheduled occurrence time: {scheduled_for}.\n\n"
f"Metadata:\n{metadata}\n\n"
f"Payload:\n{trigger.payload}"
)
_scheduler_instance: Optional[TriggerScheduler] = None
def get_trigger_scheduler() -> TriggerScheduler:
global _scheduler_instance
if _scheduler_instance is None:
_scheduler_instance = TriggerScheduler()
return _scheduler_instance
__all__ = ["TriggerScheduler", "get_trigger_scheduler"]
|