Guilherme34's picture
Upload folder using huggingface_hub
aa15bce verified
"""Trigger tool schemas and actions for the execution agent."""
from __future__ import annotations
import json
from functools import partial
from typing import Any, Callable, Dict, List, Optional
from server.services.execution import get_execution_agent_logs
from server.services.timezone_store import get_timezone_store
from server.services.triggers import TriggerRecord, get_trigger_service
_SCHEMAS: List[Dict[str, Any]] = [
{
"type": "function",
"function": {
"name": "createTrigger",
"description": "Create a reminder trigger for the current execution agent.",
"parameters": {
"type": "object",
"properties": {
"payload": {
"type": "string",
"description": "Raw instruction text that should run when the trigger fires.",
},
"recurrence_rule": {
"type": "string",
"description": "iCalendar RRULE string describing how often to fire (optional).",
},
"start_time": {
"type": "string",
"description": "ISO 8601 start time for the first firing. Defaults to now if omitted.",
},
"status": {
"type": "string",
"description": "Initial status; usually 'active' or 'paused'.",
},
},
"required": ["payload"],
"additionalProperties": False,
},
},
},
{
"type": "function",
"function": {
"name": "updateTrigger",
"description": "Update or pause an existing trigger owned by this execution agent.",
"parameters": {
"type": "object",
"properties": {
"trigger_id": {
"type": "integer",
"description": "Identifier returned when the trigger was created.",
},
"payload": {
"type": "string",
"description": "Replace the instruction payload (optional).",
},
"recurrence_rule": {
"type": "string",
"description": "New RRULE definition (optional).",
},
"start_time": {
"type": "string",
"description": "New ISO 8601 start time for the schedule (optional).",
},
"status": {
"type": "string",
"description": "Set trigger status to 'active', 'paused', or 'completed'.",
},
},
"required": ["trigger_id"],
"additionalProperties": False,
},
},
},
{
"type": "function",
"function": {
"name": "listTriggers",
"description": "List all triggers belonging to this execution agent.",
"parameters": {
"type": "object",
"properties": {},
"required": [],
"additionalProperties": False,
},
},
},
]
_LOG_STORE = get_execution_agent_logs()
_TRIGGER_SERVICE = get_trigger_service()
# Return trigger tool schemas
def get_schemas() -> List[Dict[str, Any]]:
"""Return trigger tool schemas."""
return _SCHEMAS
# Convert TriggerRecord to dictionary payload for API responses
def _trigger_record_to_payload(record: TriggerRecord) -> Dict[str, Any]:
return {
"id": record.id,
"payload": record.payload,
"start_time": record.start_time,
"next_trigger": record.next_trigger,
"recurrence_rule": record.recurrence_rule,
"timezone": record.timezone,
"status": record.status,
"last_error": record.last_error,
"created_at": record.created_at,
"updated_at": record.updated_at,
}
# Create a new trigger for the specified execution agent
def _create_trigger_tool(
*,
agent_name: str,
payload: str,
recurrence_rule: Optional[str] = None,
start_time: Optional[str] = None,
status: Optional[str] = None,
) -> Dict[str, Any]:
timezone_value = get_timezone_store().get_timezone()
summary_args = {
"recurrence_rule": recurrence_rule,
"start_time": start_time,
"timezone": timezone_value,
"status": status,
}
try:
record = _TRIGGER_SERVICE.create_trigger(
agent_name=agent_name,
payload=payload,
recurrence_rule=recurrence_rule,
start_time=start_time,
timezone_name=timezone_value,
status=status,
)
except Exception as exc: # pragma: no cover - defensive
_LOG_STORE.record_action(
agent_name,
description=f"createTrigger failed | details={json.dumps(summary_args, ensure_ascii=False)} | error={exc}",
)
return {"error": str(exc)}
_LOG_STORE.record_action(
agent_name,
description=f"createTrigger succeeded | trigger_id={record.id}",
)
return {
"trigger_id": record.id,
"status": record.status,
"next_trigger": record.next_trigger,
"start_time": record.start_time,
"timezone": record.timezone,
"recurrence_rule": record.recurrence_rule,
}
# Update or pause an existing trigger owned by this execution agent
def _update_trigger_tool(
*,
agent_name: str,
trigger_id: Any,
payload: Optional[str] = None,
recurrence_rule: Optional[str] = None,
start_time: Optional[str] = None,
status: Optional[str] = None,
) -> Dict[str, Any]:
try:
trigger_id_int = int(trigger_id)
except (TypeError, ValueError):
return {"error": "trigger_id must be an integer"}
try:
timezone_value = get_timezone_store().get_timezone()
record = _TRIGGER_SERVICE.update_trigger(
trigger_id_int,
agent_name=agent_name,
payload=payload,
recurrence_rule=recurrence_rule,
start_time=start_time,
timezone_name=timezone_value,
status=status,
)
except Exception as exc: # pragma: no cover - defensive
_LOG_STORE.record_action(
agent_name,
description=f"updateTrigger failed | id={trigger_id_int} | error={exc}",
)
return {"error": str(exc)}
if record is None:
return {"error": f"Trigger {trigger_id_int} not found"}
_LOG_STORE.record_action(
agent_name,
description=f"updateTrigger succeeded | trigger_id={trigger_id_int}",
)
return {
"trigger_id": record.id,
"status": record.status,
"next_trigger": record.next_trigger,
"start_time": record.start_time,
"timezone": record.timezone,
"recurrence_rule": record.recurrence_rule,
"last_error": record.last_error,
}
# List all triggers belonging to this execution agent
def _list_triggers_tool(*, agent_name: str) -> Dict[str, Any]:
try:
records = _TRIGGER_SERVICE.list_triggers(agent_name=agent_name)
except Exception as exc: # pragma: no cover - defensive
_LOG_STORE.record_action(
agent_name,
description=f"listTriggers failed | error={exc}",
)
return {"error": str(exc)}
_LOG_STORE.record_action(
agent_name,
description=f"listTriggers succeeded | count={len(records)}",
)
return {"triggers": [_trigger_record_to_payload(record) for record in records]}
# Return trigger tool callables bound to a specific agent
def build_registry(agent_name: str) -> Dict[str, Callable[..., Any]]:
"""Return trigger tool callables bound to a specific agent."""
return {
"createTrigger": partial(_create_trigger_tool, agent_name=agent_name),
"updateTrigger": partial(_update_trigger_tool, agent_name=agent_name),
"listTriggers": partial(_list_triggers_tool, agent_name=agent_name),
}
__all__ = [
"build_registry",
"get_schemas",
]