|
|
import json |
|
|
from typing import Optional, Dict, Any, List |
|
|
from agentpress.tool import ToolResult, openapi_schema, xml_schema |
|
|
from agentpress.thread_manager import ThreadManager |
|
|
from .base_tool import AgentBuilderBaseTool |
|
|
from utils.logger import logger |
|
|
from datetime import datetime |
|
|
from services.supabase import DBConnection |
|
|
from triggers.core import TriggerManager |
|
|
|
|
|
|
|
|
class TriggerTool(AgentBuilderBaseTool): |
|
|
def __init__(self, thread_manager: ThreadManager, db_connection, agent_id: str): |
|
|
super().__init__(thread_manager, db_connection, agent_id) |
|
|
|
|
|
@openapi_schema({ |
|
|
"type": "function", |
|
|
"function": { |
|
|
"name": "create_scheduled_trigger", |
|
|
"description": "Create a scheduled trigger for the agent to execute workflows or direct agent runs using cron expressions. This allows the agent to run automatically at specified times.", |
|
|
"parameters": { |
|
|
"type": "object", |
|
|
"properties": { |
|
|
"name": { |
|
|
"type": "string", |
|
|
"description": "Name of the scheduled trigger. Should be descriptive of when/why it runs." |
|
|
}, |
|
|
"description": { |
|
|
"type": "string", |
|
|
"description": "Description of what this trigger does and when it runs." |
|
|
}, |
|
|
"cron_expression": { |
|
|
"type": "string", |
|
|
"description": "Cron expression defining when to run (e.g., '0 9 * * *' for daily at 9am, '*/30 * * * *' for every 30 minutes)" |
|
|
}, |
|
|
"execution_type": { |
|
|
"type": "string", |
|
|
"enum": ["workflow", "agent"], |
|
|
"description": "Whether to execute a workflow or run the agent directly", |
|
|
"default": "agent" |
|
|
}, |
|
|
"workflow_id": { |
|
|
"type": "string", |
|
|
"description": "ID of the workflow to execute (required if execution_type is 'workflow')" |
|
|
}, |
|
|
"workflow_input": { |
|
|
"type": "object", |
|
|
"description": "Input data to pass to the workflow (optional, only for workflow execution)", |
|
|
"additionalProperties": True |
|
|
}, |
|
|
"agent_prompt": { |
|
|
"type": "string", |
|
|
"description": "Prompt to send to the agent when triggered (required if execution_type is 'agent')" |
|
|
} |
|
|
}, |
|
|
"required": ["name", "cron_expression", "execution_type"] |
|
|
} |
|
|
} |
|
|
}) |
|
|
@xml_schema( |
|
|
tag_name="create-scheduled-trigger", |
|
|
mappings=[ |
|
|
{"param_name": "name", "node_type": "attribute", "path": ".", "required": True}, |
|
|
{"param_name": "description", "node_type": "element", "path": "description", "required": False}, |
|
|
{"param_name": "cron_expression", "node_type": "attribute", "path": ".", "required": True}, |
|
|
{"param_name": "execution_type", "node_type": "attribute", "path": ".", "required": True}, |
|
|
{"param_name": "workflow_id", "node_type": "element", "path": "workflow_id", "required": False}, |
|
|
{"param_name": "workflow_input", "node_type": "element", "path": "workflow_input", "required": False}, |
|
|
{"param_name": "agent_prompt", "node_type": "element", "path": "agent_prompt", "required": False} |
|
|
], |
|
|
example=''' |
|
|
<function_calls> |
|
|
<invoke name="create_scheduled_trigger"> |
|
|
<parameter name="name">Daily Report Generation</parameter> |
|
|
<parameter name="description">Generates daily reports every morning at 9 AM</parameter> |
|
|
<parameter name="cron_expression">0 9 * * *</parameter> |
|
|
<parameter name="execution_type">workflow</parameter> |
|
|
<parameter name="workflow_id">workflow-123</parameter> |
|
|
<parameter name="workflow_input">{"report_type": "daily", "include_charts": true}</parameter> |
|
|
</invoke> |
|
|
</function_calls> |
|
|
''' |
|
|
) |
|
|
async def create_scheduled_trigger( |
|
|
self, |
|
|
name: str, |
|
|
cron_expression: str, |
|
|
execution_type: str = "agent", |
|
|
description: Optional[str] = None, |
|
|
workflow_id: Optional[str] = None, |
|
|
workflow_input: Optional[Dict[str, Any]] = None, |
|
|
agent_prompt: Optional[str] = None |
|
|
) -> ToolResult: |
|
|
try: |
|
|
if execution_type not in ["workflow", "agent"]: |
|
|
return self.fail_response("execution_type must be either 'workflow' or 'agent'") |
|
|
|
|
|
if execution_type == "workflow" and not workflow_id: |
|
|
return self.fail_response("workflow_id is required when execution_type is 'workflow'") |
|
|
|
|
|
if execution_type == "agent" and not agent_prompt: |
|
|
return self.fail_response("agent_prompt is required when execution_type is 'agent'") |
|
|
|
|
|
if execution_type == "workflow": |
|
|
client = await self.db.client |
|
|
workflow_result = await client.table('agent_workflows').select('*').eq('id', workflow_id).eq('agent_id', self.agent_id).execute() |
|
|
if not workflow_result.data: |
|
|
return self.fail_response(f"Workflow {workflow_id} not found or doesn't belong to this agent") |
|
|
|
|
|
workflow = workflow_result.data[0] |
|
|
if workflow['status'] != 'active': |
|
|
return self.fail_response(f"Workflow '{workflow['name']}' is not active. Please activate it first.") |
|
|
|
|
|
trigger_config = { |
|
|
"cron_expression": cron_expression, |
|
|
"execution_type": execution_type, |
|
|
"provider_id": "schedule" |
|
|
} |
|
|
|
|
|
if execution_type == "workflow": |
|
|
trigger_config["workflow_id"] = workflow_id |
|
|
if workflow_input: |
|
|
trigger_config["workflow_input"] = workflow_input |
|
|
else: |
|
|
trigger_config["agent_prompt"] = agent_prompt |
|
|
|
|
|
trigger_db = DBConnection() |
|
|
trigger_manager = TriggerManager(trigger_db) |
|
|
await trigger_manager.load_provider_definitions() |
|
|
|
|
|
try: |
|
|
trigger_config_obj = await trigger_manager.create_trigger( |
|
|
agent_id=self.agent_id, |
|
|
provider_id="schedule", |
|
|
name=name, |
|
|
config=trigger_config, |
|
|
description=description |
|
|
) |
|
|
|
|
|
result_message = f"Scheduled trigger '{name}' created successfully!\n\n" |
|
|
result_message += f"**Schedule**: {cron_expression}\n" |
|
|
result_message += f"**Type**: {execution_type.capitalize()} execution\n" |
|
|
|
|
|
if execution_type == "workflow": |
|
|
result_message += f"**Workflow**: {workflow['name']}\n" |
|
|
if workflow_input: |
|
|
result_message += f"**Input Data**: {json.dumps(workflow_input, indent=2)}\n" |
|
|
else: |
|
|
result_message += f"**Prompt**: {agent_prompt}\n" |
|
|
|
|
|
result_message += f"\nThe trigger is now active and will run according to the schedule." |
|
|
|
|
|
return self.success_response({ |
|
|
"message": result_message, |
|
|
"trigger": { |
|
|
"id": trigger_config_obj.trigger_id, |
|
|
"name": trigger_config_obj.name, |
|
|
"description": trigger_config_obj.description, |
|
|
"cron_expression": cron_expression, |
|
|
"execution_type": execution_type, |
|
|
"is_active": trigger_config_obj.is_active, |
|
|
"created_at": trigger_config_obj.created_at.isoformat() |
|
|
} |
|
|
}) |
|
|
except ValueError as ve: |
|
|
return self.fail_response(f"Validation error: {str(ve)}") |
|
|
except Exception as e: |
|
|
logger.error(f"Error creating trigger through manager: {str(e)}") |
|
|
return self.fail_response(f"Failed to create trigger: {str(e)}") |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error creating scheduled trigger: {str(e)}") |
|
|
return self.fail_response(f"Error creating scheduled trigger: {str(e)}") |
|
|
|
|
|
@openapi_schema({ |
|
|
"type": "function", |
|
|
"function": { |
|
|
"name": "get_scheduled_triggers", |
|
|
"description": "Get all scheduled triggers for the current agent. Shows when the agent will run automatically.", |
|
|
"parameters": { |
|
|
"type": "object", |
|
|
"properties": {}, |
|
|
"required": [] |
|
|
} |
|
|
} |
|
|
}) |
|
|
@xml_schema( |
|
|
tag_name="get-scheduled-triggers", |
|
|
mappings=[], |
|
|
example=''' |
|
|
<function_calls> |
|
|
<invoke name="get_scheduled_triggers"> |
|
|
</invoke> |
|
|
</function_calls> |
|
|
''' |
|
|
) |
|
|
async def get_scheduled_triggers(self) -> ToolResult: |
|
|
try: |
|
|
from triggers.core import TriggerType |
|
|
|
|
|
trigger_db = DBConnection() |
|
|
trigger_manager = TriggerManager(trigger_db) |
|
|
|
|
|
triggers = await trigger_manager.get_agent_triggers(self.agent_id) |
|
|
|
|
|
schedule_triggers = [t for t in triggers if t.trigger_type == TriggerType.SCHEDULE] |
|
|
|
|
|
if not schedule_triggers: |
|
|
return self.success_response({ |
|
|
"message": "No scheduled triggers found for this agent.", |
|
|
"triggers": [] |
|
|
}) |
|
|
|
|
|
client = await self.db.client |
|
|
workflows = {} |
|
|
for trigger in schedule_triggers: |
|
|
if trigger.config.get("execution_type") == "workflow" and trigger.config.get("workflow_id"): |
|
|
workflow_id = trigger.config["workflow_id"] |
|
|
if workflow_id not in workflows: |
|
|
workflow_result = await client.table('agent_workflows').select('name').eq('id', workflow_id).execute() |
|
|
if workflow_result.data: |
|
|
workflows[workflow_id] = workflow_result.data[0]['name'] |
|
|
|
|
|
formatted_triggers = [] |
|
|
for trigger in schedule_triggers: |
|
|
formatted = { |
|
|
"id": trigger.trigger_id, |
|
|
"name": trigger.name, |
|
|
"description": trigger.description, |
|
|
"cron_expression": trigger.config.get("cron_expression"), |
|
|
"execution_type": trigger.config.get("execution_type", "agent"), |
|
|
"is_active": trigger.is_active, |
|
|
"created_at": trigger.created_at.isoformat() |
|
|
} |
|
|
|
|
|
if trigger.config.get("execution_type") == "workflow": |
|
|
workflow_id = trigger.config.get("workflow_id") |
|
|
formatted["workflow_name"] = workflows.get(workflow_id, "Unknown Workflow") |
|
|
formatted["workflow_input"] = trigger.config.get("workflow_input") |
|
|
else: |
|
|
formatted["agent_prompt"] = trigger.config.get("agent_prompt") |
|
|
|
|
|
formatted_triggers.append(formatted) |
|
|
|
|
|
return self.success_response({ |
|
|
"message": f"Found {len(formatted_triggers)} scheduled trigger(s)", |
|
|
"triggers": formatted_triggers |
|
|
}) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error getting scheduled triggers: {str(e)}") |
|
|
return self.fail_response(f"Error getting scheduled triggers: {str(e)}") |
|
|
|
|
|
@openapi_schema({ |
|
|
"type": "function", |
|
|
"function": { |
|
|
"name": "delete_scheduled_trigger", |
|
|
"description": "Delete a scheduled trigger. The agent will no longer run automatically at the scheduled time.", |
|
|
"parameters": { |
|
|
"type": "object", |
|
|
"properties": { |
|
|
"trigger_id": { |
|
|
"type": "string", |
|
|
"description": "ID of the trigger to delete" |
|
|
} |
|
|
}, |
|
|
"required": ["trigger_id"] |
|
|
} |
|
|
} |
|
|
}) |
|
|
@xml_schema( |
|
|
tag_name="delete-scheduled-trigger", |
|
|
mappings=[ |
|
|
{"param_name": "trigger_id", "node_type": "attribute", "path": ".", "required": True} |
|
|
], |
|
|
example=''' |
|
|
<function_calls> |
|
|
<invoke name="delete_scheduled_trigger"> |
|
|
<parameter name="trigger_id">trigger-123</parameter> |
|
|
</invoke> |
|
|
</function_calls> |
|
|
''' |
|
|
) |
|
|
async def delete_scheduled_trigger(self, trigger_id: str) -> ToolResult: |
|
|
try: |
|
|
trigger_db = DBConnection() |
|
|
trigger_manager = TriggerManager(trigger_db) |
|
|
|
|
|
trigger_config = await trigger_manager.get_trigger(trigger_id) |
|
|
|
|
|
if not trigger_config: |
|
|
return self.fail_response("Trigger not found") |
|
|
|
|
|
if trigger_config.agent_id != self.agent_id: |
|
|
return self.fail_response("This trigger doesn't belong to the current agent") |
|
|
|
|
|
success = await trigger_manager.delete_trigger(trigger_id) |
|
|
|
|
|
if success: |
|
|
return self.success_response({ |
|
|
"message": f"Scheduled trigger '{trigger_config.name}' deleted successfully", |
|
|
"trigger_id": trigger_id |
|
|
}) |
|
|
else: |
|
|
return self.fail_response("Failed to delete trigger") |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error deleting scheduled trigger: {str(e)}") |
|
|
return self.fail_response(f"Error deleting scheduled trigger: {str(e)}") |
|
|
|
|
|
@openapi_schema({ |
|
|
"type": "function", |
|
|
"function": { |
|
|
"name": "toggle_scheduled_trigger", |
|
|
"description": "Enable or disable a scheduled trigger. Disabled triggers won't run until re-enabled.", |
|
|
"parameters": { |
|
|
"type": "object", |
|
|
"properties": { |
|
|
"trigger_id": { |
|
|
"type": "string", |
|
|
"description": "ID of the trigger to toggle" |
|
|
}, |
|
|
"is_active": { |
|
|
"type": "boolean", |
|
|
"description": "Whether to enable (true) or disable (false) the trigger" |
|
|
} |
|
|
}, |
|
|
"required": ["trigger_id", "is_active"] |
|
|
} |
|
|
} |
|
|
}) |
|
|
@xml_schema( |
|
|
tag_name="toggle-scheduled-trigger", |
|
|
mappings=[ |
|
|
{"param_name": "trigger_id", "node_type": "attribute", "path": ".", "required": True}, |
|
|
{"param_name": "is_active", "node_type": "attribute", "path": ".", "required": True} |
|
|
], |
|
|
example=''' |
|
|
<function_calls> |
|
|
<invoke name="toggle_scheduled_trigger"> |
|
|
<parameter name="trigger_id">trigger-123</parameter> |
|
|
<parameter name="is_active">false</parameter> |
|
|
</invoke> |
|
|
</function_calls> |
|
|
''' |
|
|
) |
|
|
async def toggle_scheduled_trigger(self, trigger_id: str, is_active: bool) -> ToolResult: |
|
|
try: |
|
|
trigger_db = DBConnection() |
|
|
trigger_manager = TriggerManager(trigger_db) |
|
|
|
|
|
trigger_config = await trigger_manager.get_trigger(trigger_id) |
|
|
|
|
|
if not trigger_config: |
|
|
return self.fail_response("Trigger not found") |
|
|
|
|
|
if trigger_config.agent_id != self.agent_id: |
|
|
return self.fail_response("This trigger doesn't belong to the current agent") |
|
|
|
|
|
updated_config = await trigger_manager.update_trigger( |
|
|
trigger_id=trigger_id, |
|
|
is_active=is_active |
|
|
) |
|
|
|
|
|
if updated_config: |
|
|
status = "enabled" if is_active else "disabled" |
|
|
return self.success_response({ |
|
|
"message": f"Scheduled trigger '{updated_config.name}' has been {status}", |
|
|
"trigger": { |
|
|
"id": updated_config.trigger_id, |
|
|
"name": updated_config.name, |
|
|
"is_active": updated_config.is_active |
|
|
} |
|
|
}) |
|
|
else: |
|
|
return self.fail_response("Failed to update trigger") |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error toggling scheduled trigger: {str(e)}") |
|
|
return self.fail_response(f"Error toggling scheduled trigger: {str(e)}") |
|
|
|