Spaces:
Configuration error
Configuration error
| """Trigger tool schemas and actions for the execution agent.""" | |
| from __future__ import annotations | |
| import json | |
| from functools import partial | |
| from typing import Any, Callable, Dict, List, Optional | |
| from server.services.execution import get_execution_agent_logs | |
| from server.services.timezone_store import get_timezone_store | |
| from server.services.triggers import TriggerRecord, get_trigger_service | |
| _SCHEMAS: List[Dict[str, Any]] = [ | |
| { | |
| "type": "function", | |
| "function": { | |
| "name": "createTrigger", | |
| "description": "Create a reminder trigger for the current execution agent.", | |
| "parameters": { | |
| "type": "object", | |
| "properties": { | |
| "payload": { | |
| "type": "string", | |
| "description": "Raw instruction text that should run when the trigger fires.", | |
| }, | |
| "recurrence_rule": { | |
| "type": "string", | |
| "description": "iCalendar RRULE string describing how often to fire (optional).", | |
| }, | |
| "start_time": { | |
| "type": "string", | |
| "description": "ISO 8601 start time for the first firing. Defaults to now if omitted.", | |
| }, | |
| "status": { | |
| "type": "string", | |
| "description": "Initial status; usually 'active' or 'paused'.", | |
| }, | |
| }, | |
| "required": ["payload"], | |
| "additionalProperties": False, | |
| }, | |
| }, | |
| }, | |
| { | |
| "type": "function", | |
| "function": { | |
| "name": "updateTrigger", | |
| "description": "Update or pause an existing trigger owned by this execution agent.", | |
| "parameters": { | |
| "type": "object", | |
| "properties": { | |
| "trigger_id": { | |
| "type": "integer", | |
| "description": "Identifier returned when the trigger was created.", | |
| }, | |
| "payload": { | |
| "type": "string", | |
| "description": "Replace the instruction payload (optional).", | |
| }, | |
| "recurrence_rule": { | |
| "type": "string", | |
| "description": "New RRULE definition (optional).", | |
| }, | |
| "start_time": { | |
| "type": "string", | |
| "description": "New ISO 8601 start time for the schedule (optional).", | |
| }, | |
| "status": { | |
| "type": "string", | |
| "description": "Set trigger status to 'active', 'paused', or 'completed'.", | |
| }, | |
| }, | |
| "required": ["trigger_id"], | |
| "additionalProperties": False, | |
| }, | |
| }, | |
| }, | |
| { | |
| "type": "function", | |
| "function": { | |
| "name": "listTriggers", | |
| "description": "List all triggers belonging to this execution agent.", | |
| "parameters": { | |
| "type": "object", | |
| "properties": {}, | |
| "required": [], | |
| "additionalProperties": False, | |
| }, | |
| }, | |
| }, | |
| ] | |
| _LOG_STORE = get_execution_agent_logs() | |
| _TRIGGER_SERVICE = get_trigger_service() | |
| # Return trigger tool schemas | |
| def get_schemas() -> List[Dict[str, Any]]: | |
| """Return trigger tool schemas.""" | |
| return _SCHEMAS | |
| # Convert TriggerRecord to dictionary payload for API responses | |
| def _trigger_record_to_payload(record: TriggerRecord) -> Dict[str, Any]: | |
| return { | |
| "id": record.id, | |
| "payload": record.payload, | |
| "start_time": record.start_time, | |
| "next_trigger": record.next_trigger, | |
| "recurrence_rule": record.recurrence_rule, | |
| "timezone": record.timezone, | |
| "status": record.status, | |
| "last_error": record.last_error, | |
| "created_at": record.created_at, | |
| "updated_at": record.updated_at, | |
| } | |
| # Create a new trigger for the specified execution agent | |
| def _create_trigger_tool( | |
| *, | |
| agent_name: str, | |
| payload: str, | |
| recurrence_rule: Optional[str] = None, | |
| start_time: Optional[str] = None, | |
| status: Optional[str] = None, | |
| ) -> Dict[str, Any]: | |
| timezone_value = get_timezone_store().get_timezone() | |
| summary_args = { | |
| "recurrence_rule": recurrence_rule, | |
| "start_time": start_time, | |
| "timezone": timezone_value, | |
| "status": status, | |
| } | |
| try: | |
| record = _TRIGGER_SERVICE.create_trigger( | |
| agent_name=agent_name, | |
| payload=payload, | |
| recurrence_rule=recurrence_rule, | |
| start_time=start_time, | |
| timezone_name=timezone_value, | |
| status=status, | |
| ) | |
| except Exception as exc: # pragma: no cover - defensive | |
| _LOG_STORE.record_action( | |
| agent_name, | |
| description=f"createTrigger failed | details={json.dumps(summary_args, ensure_ascii=False)} | error={exc}", | |
| ) | |
| return {"error": str(exc)} | |
| _LOG_STORE.record_action( | |
| agent_name, | |
| description=f"createTrigger succeeded | trigger_id={record.id}", | |
| ) | |
| return { | |
| "trigger_id": record.id, | |
| "status": record.status, | |
| "next_trigger": record.next_trigger, | |
| "start_time": record.start_time, | |
| "timezone": record.timezone, | |
| "recurrence_rule": record.recurrence_rule, | |
| } | |
| # Update or pause an existing trigger owned by this execution agent | |
| def _update_trigger_tool( | |
| *, | |
| agent_name: str, | |
| trigger_id: Any, | |
| payload: Optional[str] = None, | |
| recurrence_rule: Optional[str] = None, | |
| start_time: Optional[str] = None, | |
| status: Optional[str] = None, | |
| ) -> Dict[str, Any]: | |
| try: | |
| trigger_id_int = int(trigger_id) | |
| except (TypeError, ValueError): | |
| return {"error": "trigger_id must be an integer"} | |
| try: | |
| timezone_value = get_timezone_store().get_timezone() | |
| record = _TRIGGER_SERVICE.update_trigger( | |
| trigger_id_int, | |
| agent_name=agent_name, | |
| payload=payload, | |
| recurrence_rule=recurrence_rule, | |
| start_time=start_time, | |
| timezone_name=timezone_value, | |
| status=status, | |
| ) | |
| except Exception as exc: # pragma: no cover - defensive | |
| _LOG_STORE.record_action( | |
| agent_name, | |
| description=f"updateTrigger failed | id={trigger_id_int} | error={exc}", | |
| ) | |
| return {"error": str(exc)} | |
| if record is None: | |
| return {"error": f"Trigger {trigger_id_int} not found"} | |
| _LOG_STORE.record_action( | |
| agent_name, | |
| description=f"updateTrigger succeeded | trigger_id={trigger_id_int}", | |
| ) | |
| return { | |
| "trigger_id": record.id, | |
| "status": record.status, | |
| "next_trigger": record.next_trigger, | |
| "start_time": record.start_time, | |
| "timezone": record.timezone, | |
| "recurrence_rule": record.recurrence_rule, | |
| "last_error": record.last_error, | |
| } | |
| # List all triggers belonging to this execution agent | |
| def _list_triggers_tool(*, agent_name: str) -> Dict[str, Any]: | |
| try: | |
| records = _TRIGGER_SERVICE.list_triggers(agent_name=agent_name) | |
| except Exception as exc: # pragma: no cover - defensive | |
| _LOG_STORE.record_action( | |
| agent_name, | |
| description=f"listTriggers failed | error={exc}", | |
| ) | |
| return {"error": str(exc)} | |
| _LOG_STORE.record_action( | |
| agent_name, | |
| description=f"listTriggers succeeded | count={len(records)}", | |
| ) | |
| return {"triggers": [_trigger_record_to_payload(record) for record in records]} | |
| # Return trigger tool callables bound to a specific agent | |
| def build_registry(agent_name: str) -> Dict[str, Callable[..., Any]]: | |
| """Return trigger tool callables bound to a specific agent.""" | |
| return { | |
| "createTrigger": partial(_create_trigger_tool, agent_name=agent_name), | |
| "updateTrigger": partial(_update_trigger_tool, agent_name=agent_name), | |
| "listTriggers": partial(_list_triggers_tool, agent_name=agent_name), | |
| } | |
| __all__ = [ | |
| "build_registry", | |
| "get_schemas", | |
| ] | |