Spaces:
Paused
Paused
| import json | |
| from typing import Optional, Dict, Any, List | |
| from agentpress.tool import ToolResult, openapi_schema, usage_example | |
| from agentpress.thread_manager import ThreadManager | |
| from .base_tool import AgentBuilderBaseTool | |
| from utils.logger import logger | |
| from utils.config import config, EnvMode | |
| from datetime import datetime | |
| from services.supabase import DBConnection | |
| from triggers import get_trigger_service | |
| import os | |
| import httpx | |
| from composio_integration.composio_profile_service import ComposioProfileService | |
| from composio_integration.composio_trigger_service import ComposioTriggerService | |
| class TriggerTool(AgentBuilderBaseTool): | |
| def __init__(self, thread_manager: ThreadManager, db_connection, agent_id: str): | |
| super().__init__(thread_manager, db_connection, agent_id) | |
| # ===== SCHEDULED TRIGGERS ===== | |
| async def create_scheduled_trigger( | |
| self, | |
| name: str, | |
| cron_expression: str, | |
| execution_type: str = "agent", | |
| description: Optional[str] = None, | |
| workflow_id: Optional[str] = None, | |
| workflow_input: Optional[Dict[str, Any]] = None, | |
| agent_prompt: Optional[str] = None | |
| ) -> ToolResult: | |
| try: | |
| if execution_type not in ["workflow", "agent"]: | |
| return self.fail_response("execution_type must be either 'workflow' or 'agent'") | |
| if execution_type == "workflow" and not workflow_id: | |
| return self.fail_response("workflow_id is required when execution_type is 'workflow'") | |
| if execution_type == "agent" and not agent_prompt: | |
| return self.fail_response("agent_prompt is required when execution_type is 'agent'") | |
| if execution_type == "workflow": | |
| client = await self.db.client | |
| workflow_result = await client.table('agent_workflows').select('*').eq('id', workflow_id).eq('agent_id', self.agent_id).execute() | |
| if not workflow_result.data: | |
| return self.fail_response(f"Workflow {workflow_id} not found or doesn't belong to this agent") | |
| workflow = workflow_result.data[0] | |
| if workflow['status'] != 'active': | |
| return self.fail_response(f"Workflow '{workflow['name']}' is not active. Please activate it first.") | |
| trigger_config = { | |
| "cron_expression": cron_expression, | |
| "execution_type": execution_type, | |
| "provider_id": "schedule" | |
| } | |
| if execution_type == "workflow": | |
| trigger_config["workflow_id"] = workflow_id | |
| if workflow_input: | |
| trigger_config["workflow_input"] = workflow_input | |
| else: | |
| trigger_config["agent_prompt"] = agent_prompt | |
| trigger_svc = get_trigger_service(self.db) | |
| try: | |
| trigger = await trigger_svc.create_trigger( | |
| agent_id=self.agent_id, | |
| provider_id="schedule", | |
| name=name, | |
| config=trigger_config, | |
| description=description | |
| ) | |
| result_message = f"Scheduled trigger '{name}' created successfully!\n\n" | |
| result_message += f"**Schedule**: {cron_expression}\n" | |
| result_message += f"**Type**: {execution_type.capitalize()} execution\n" | |
| if execution_type == "workflow": | |
| result_message += f"**Workflow**: {workflow['name']}\n" | |
| if workflow_input: | |
| result_message += f"**Input Data**: {json.dumps(workflow_input, indent=2)}\n" | |
| else: | |
| result_message += f"**Prompt**: {agent_prompt}\n" | |
| result_message += f"\nThe trigger is now active and will run according to the schedule." | |
| return self.success_response({ | |
| "message": result_message, | |
| "trigger": { | |
| "id": trigger.trigger_id, | |
| "name": trigger.name, | |
| "description": trigger.description, | |
| "cron_expression": cron_expression, | |
| "execution_type": execution_type, | |
| "is_active": trigger.is_active, | |
| "created_at": trigger.created_at.isoformat() | |
| } | |
| }) | |
| except ValueError as ve: | |
| return self.fail_response(f"Validation error: {str(ve)}") | |
| except Exception as e: | |
| logger.error(f"Error creating trigger through manager: {str(e)}") | |
| return self.fail_response(f"Failed to create trigger: {str(e)}") | |
| except Exception as e: | |
| logger.error(f"Error creating scheduled trigger: {str(e)}") | |
| return self.fail_response(f"Error creating scheduled trigger: {str(e)}") | |
| async def get_scheduled_triggers(self) -> ToolResult: | |
| try: | |
| from triggers import TriggerType | |
| trigger_svc = get_trigger_service(self.db) | |
| triggers = await trigger_svc.get_agent_triggers(self.agent_id) | |
| schedule_triggers = [t for t in triggers if t.trigger_type == TriggerType.SCHEDULE] | |
| if not schedule_triggers: | |
| return self.success_response({ | |
| "message": "No scheduled triggers found for this agent.", | |
| "triggers": [] | |
| }) | |
| client = await self.db.client | |
| workflows = {} | |
| for trigger in schedule_triggers: | |
| if trigger.config.get("execution_type") == "workflow" and trigger.config.get("workflow_id"): | |
| workflow_id = trigger.config["workflow_id"] | |
| if workflow_id not in workflows: | |
| workflow_result = await client.table('agent_workflows').select('name').eq('id', workflow_id).execute() | |
| if workflow_result.data: | |
| workflows[workflow_id] = workflow_result.data[0]['name'] | |
| formatted_triggers = [] | |
| for trigger in schedule_triggers: | |
| formatted = { | |
| "id": trigger.trigger_id, | |
| "name": trigger.name, | |
| "description": trigger.description, | |
| "cron_expression": trigger.config.get("cron_expression"), | |
| "execution_type": trigger.config.get("execution_type", "agent"), | |
| "is_active": trigger.is_active, | |
| "created_at": trigger.created_at.isoformat() | |
| } | |
| if trigger.config.get("execution_type") == "workflow": | |
| workflow_id = trigger.config.get("workflow_id") | |
| formatted["workflow_name"] = workflows.get(workflow_id, "Unknown Workflow") | |
| formatted["workflow_input"] = trigger.config.get("workflow_input") | |
| else: | |
| formatted["agent_prompt"] = trigger.config.get("agent_prompt") | |
| formatted_triggers.append(formatted) | |
| return self.success_response({ | |
| "message": f"Found {len(formatted_triggers)} scheduled trigger(s)", | |
| "triggers": formatted_triggers | |
| }) | |
| except Exception as e: | |
| logger.error(f"Error getting scheduled triggers: {str(e)}") | |
| return self.fail_response(f"Error getting scheduled triggers: {str(e)}") | |
| async def delete_scheduled_trigger(self, trigger_id: str) -> ToolResult: | |
| try: | |
| trigger_svc = get_trigger_service(self.db) | |
| trigger_config = await trigger_svc.get_trigger(trigger_id) | |
| if not trigger_config: | |
| return self.fail_response("Trigger not found") | |
| if trigger_config.agent_id != self.agent_id: | |
| return self.fail_response("This trigger doesn't belong to the current agent") | |
| success = await trigger_svc.delete_trigger(trigger_id) | |
| if success: | |
| return self.success_response({ | |
| "message": f"Scheduled trigger '{trigger_config.name}' deleted successfully", | |
| "trigger_id": trigger_id | |
| }) | |
| else: | |
| return self.fail_response("Failed to delete trigger") | |
| except Exception as e: | |
| logger.error(f"Error deleting scheduled trigger: {str(e)}") | |
| return self.fail_response(f"Error deleting scheduled trigger: {str(e)}") | |
| async def toggle_scheduled_trigger(self, trigger_id: str, is_active: bool) -> ToolResult: | |
| try: | |
| trigger_svc = get_trigger_service(self.db) | |
| trigger_config = await trigger_svc.get_trigger(trigger_id) | |
| if not trigger_config: | |
| return self.fail_response("Trigger not found") | |
| if trigger_config.agent_id != self.agent_id: | |
| return self.fail_response("This trigger doesn't belong to the current agent") | |
| updated_config = await trigger_svc.update_trigger( | |
| trigger_id=trigger_id, | |
| is_active=is_active | |
| ) | |
| if updated_config: | |
| status = "enabled" if is_active else "disabled" | |
| return self.success_response({ | |
| "message": f"Scheduled trigger '{updated_config.name}' has been {status}", | |
| "trigger": { | |
| "id": updated_config.trigger_id, | |
| "name": updated_config.name, | |
| "is_active": updated_config.is_active | |
| } | |
| }) | |
| else: | |
| return self.fail_response("Failed to update trigger") | |
| except Exception as e: | |
| logger.error(f"Error toggling scheduled trigger: {str(e)}") | |
| return self.fail_response(f"Error toggling scheduled trigger: {str(e)}") | |
| # ===== EVENT-BASED TRIGGERS (Non-Production Only) ===== | |
| # Event trigger methods - only available in non-production environments | |
| if config.ENV_MODE != EnvMode.PRODUCTION: | |
| async def list_event_trigger_apps(self) -> ToolResult: | |
| try: | |
| trigger_service = ComposioTriggerService() | |
| response = await trigger_service.list_apps_with_triggers() | |
| # Return exact same format as API | |
| return self.success_response({ | |
| "message": f"Found {response['total']} apps with triggers", | |
| "items": response["items"], | |
| "total": response["total"] | |
| }) | |
| except Exception as e: | |
| logger.error(f"Error listing event trigger apps: {e}") | |
| return self.fail_response(f"Error listing apps: {str(e)}") | |
| TriggerTool.list_event_trigger_apps = list_event_trigger_apps | |
| async def list_app_event_triggers(self, toolkit_slug: str) -> ToolResult: | |
| try: | |
| trigger_service = ComposioTriggerService() | |
| response = await trigger_service.list_triggers_for_app(toolkit_slug) | |
| # Return exact same format as API | |
| return self.success_response({ | |
| "message": f"Found {response['total']} triggers for {toolkit_slug}", | |
| "items": response["items"], | |
| "toolkit": response["toolkit"], | |
| "total": response["total"] | |
| }) | |
| except Exception as e: | |
| logger.error(f"Error listing triggers for app {toolkit_slug}: {e}") | |
| return self.fail_response(f"Error listing triggers: {str(e)}") | |
| TriggerTool.list_app_event_triggers = list_app_event_triggers | |
| async def list_event_profiles(self, toolkit_slug: str) -> ToolResult: | |
| try: | |
| client = await self.db.client | |
| agent_rows = await client.table('agents').select('account_id').eq('agent_id', self.agent_id).execute() | |
| if not agent_rows.data: | |
| return self.fail_response("Agent not found") | |
| account_id = agent_rows.data[0]['account_id'] | |
| profile_service = ComposioProfileService(self.db) | |
| profiles = await profile_service.get_profiles(account_id, toolkit_slug) | |
| items = [] | |
| for p in profiles: | |
| items.append({ | |
| "profile_id": p.profile_id, | |
| "display_name": p.display_name, | |
| "is_connected": p.is_connected, | |
| "connected_account_id": getattr(p, 'connected_account_id', None) | |
| }) | |
| return self.success_response({ | |
| "message": f"Found {len(items)} profile(s) for {toolkit_slug}", | |
| "items": items, | |
| "total": len(items) | |
| }) | |
| except Exception as e: | |
| logger.error(f"Error listing event profiles: {e}") | |
| return self.fail_response(f"Error listing profiles: {str(e)}") | |
| TriggerTool.list_event_profiles = list_event_profiles | |
| async def create_event_trigger( | |
| self, | |
| slug: str, | |
| profile_id: str, | |
| trigger_config: Optional[Dict[str, Any]] = None, | |
| route: str = "agent", | |
| name: Optional[str] = None, | |
| agent_prompt: Optional[str] = None, | |
| workflow_id: Optional[str] = None, | |
| workflow_input: Optional[Dict[str, Any]] = None, | |
| connected_account_id: Optional[str] = None | |
| ) -> ToolResult: | |
| try: | |
| if route not in ("agent", "workflow"): | |
| return self.fail_response("route must be either 'agent' or 'workflow'") | |
| if route == "workflow" and not workflow_id: | |
| return self.fail_response("workflow_id is required when route is 'workflow'") | |
| if route == "agent" and not agent_prompt: | |
| return self.fail_response("agent_prompt is required when route is 'agent'") | |
| # Resolve composio user id and connected account id from profile | |
| profile_service = ComposioProfileService(self.db) | |
| profile_cfg = await profile_service.get_profile_config(profile_id) | |
| composio_user_id = profile_cfg.get("user_id") | |
| if not composio_user_id: | |
| return self.fail_response("Composio profile is missing user_id") | |
| if not connected_account_id: | |
| connected_account_id = profile_cfg.get("connected_account_id") | |
| api_base = os.getenv("COMPOSIO_API_BASE", "https://backend.composio.dev").rstrip("/") | |
| api_key = os.getenv("COMPOSIO_API_KEY") | |
| if not api_key: | |
| return self.fail_response("COMPOSIO_API_KEY not configured") | |
| headers = {"x-api-key": api_key, "Content-Type": "application/json"} | |
| # Coerce config types per trigger schema | |
| coerced_config = dict(trigger_config or {}) | |
| try: | |
| type_url = f"{api_base}/api/v3/triggers_types/{slug}" | |
| async with httpx.AsyncClient(timeout=10) as http_client: | |
| tr = await http_client.get(type_url, headers=headers) | |
| if tr.status_code == 200: | |
| tdata = tr.json() | |
| schema = tdata.get("config") or {} | |
| props = schema.get("properties") or {} | |
| for key, prop in props.items(): | |
| if key not in coerced_config: | |
| continue | |
| val = coerced_config[key] | |
| ptype = prop.get("type") if isinstance(prop, dict) else None | |
| try: | |
| if ptype == "array": | |
| if isinstance(val, str): | |
| coerced_config[key] = [val] | |
| elif ptype == "integer": | |
| if isinstance(val, str) and val.isdigit(): | |
| coerced_config[key] = int(val) | |
| elif ptype == "number": | |
| if isinstance(val, str): | |
| coerced_config[key] = float(val) | |
| elif ptype == "boolean": | |
| if isinstance(val, str): | |
| coerced_config[key] = val.lower() in ("true", "1", "yes") | |
| elif ptype == "string": | |
| if isinstance(val, (list, tuple)): | |
| coerced_config[key] = ",".join(str(x) for x in val) | |
| elif not isinstance(val, str): | |
| coerced_config[key] = str(val) | |
| except Exception: | |
| pass | |
| except Exception: | |
| pass | |
| # Upsert trigger instance with webhook | |
| base_url = os.getenv("WEBHOOK_BASE_URL", "http://localhost:8000").rstrip("/") | |
| secret = os.getenv("COMPOSIO_WEBHOOK_SECRET", "") | |
| webhook_headers: Dict[str, str] = {"X-Composio-Secret": secret} if secret else {} | |
| vercel_bypass = os.getenv("VERCEL_PROTECTION_BYPASS_KEY", "") | |
| if vercel_bypass: | |
| webhook_headers["X-Vercel-Protection-Bypass"] = vercel_bypass | |
| body: Dict[str, Any] = { | |
| "user_id": composio_user_id, | |
| "userId": composio_user_id, | |
| "trigger_config": coerced_config, | |
| "triggerConfig": coerced_config, | |
| "webhook": { | |
| "url": f"{base_url}/api/composio/webhook", | |
| "headers": webhook_headers, | |
| "method": "POST", | |
| }, | |
| } | |
| if connected_account_id: | |
| body["connectedAccountId"] = connected_account_id | |
| body["connected_account_id"] = connected_account_id | |
| body["connectedAccountIds"] = [connected_account_id] | |
| body["connected_account_ids"] = [connected_account_id] | |
| upsert_url = f"{api_base}/api/v3/trigger_instances/{slug}/upsert" | |
| async with httpx.AsyncClient(timeout=20) as http_client: | |
| resp = await http_client.post(upsert_url, headers=headers, json=body) | |
| try: | |
| resp.raise_for_status() | |
| except httpx.HTTPStatusError: | |
| ct = resp.headers.get("content-type", "") | |
| detail = resp.json() if "application/json" in ct else resp.text | |
| return self.fail_response(f"Composio upsert error: {detail}") | |
| created = resp.json() | |
| def _extract_id(obj: Dict[str, Any]) -> Optional[str]: | |
| if not isinstance(obj, dict): | |
| return None | |
| cand = ( | |
| obj.get("id") | |
| or obj.get("trigger_id") | |
| or obj.get("triggerId") | |
| or obj.get("nano_id") | |
| or obj.get("nanoId") | |
| or obj.get("triggerNanoId") | |
| ) | |
| if cand: | |
| return cand | |
| for k in ("trigger", "trigger_instance", "triggerInstance", "data", "result"): | |
| nested = obj.get(k) | |
| if isinstance(nested, dict): | |
| nid = _extract_id(nested) | |
| if nid: | |
| return nid | |
| if isinstance(nested, list) and nested: | |
| nid = _extract_id(nested[0] if isinstance(nested[0], dict) else {}) | |
| if nid: | |
| return nid | |
| return None | |
| composio_trigger_id = _extract_id(created) if isinstance(created, dict) else None | |
| if not composio_trigger_id: | |
| # fallback to list active | |
| try: | |
| params_lookup: Dict[str, Any] = {"limit": 50, "slug": slug, "userId": composio_user_id} | |
| if connected_account_id: | |
| params_lookup["connectedAccountId"] = connected_account_id | |
| list_url = f"{api_base}/api/v3/trigger_instances/active" | |
| async with httpx.AsyncClient(timeout=15) as http_client: | |
| lr = await http_client.get(list_url, headers=headers, params=params_lookup) | |
| if lr.status_code == 200: | |
| ldata = lr.json() | |
| items = ldata.get("items") if isinstance(ldata, dict) else (ldata if isinstance(ldata, list) else []) | |
| if items: | |
| composio_trigger_id = _extract_id(items[0] if isinstance(items[0], dict) else {}) | |
| except Exception: | |
| pass | |
| if not composio_trigger_id: | |
| return self.fail_response("Failed to get Composio trigger id from response") | |
| # Build Suna trigger and save | |
| suna_config: Dict[str, Any] = { | |
| "composio_trigger_id": composio_trigger_id, | |
| "trigger_slug": slug, | |
| "execution_type": route, | |
| "profile_id": profile_id, | |
| } | |
| if route == "agent": | |
| if agent_prompt: | |
| suna_config["agent_prompt"] = agent_prompt | |
| else: | |
| suna_config["workflow_id"] = workflow_id | |
| if workflow_input: | |
| suna_config["workflow_input"] = workflow_input | |
| trigger_svc = get_trigger_service(self.db) | |
| trigger = await trigger_svc.create_trigger( | |
| agent_id=self.agent_id, | |
| provider_id="composio", | |
| name=name or slug, | |
| config=suna_config, | |
| description=f"Composio event: {slug}" | |
| ) | |
| message = f"Event trigger '{trigger.name}' created successfully.\n" | |
| message += f"Route: {route}. " | |
| if route == "workflow": | |
| message += f"Workflow: {workflow_id}." | |
| else: | |
| message += "Agent execution configured." | |
| return self.success_response({ | |
| "message": message, | |
| "trigger": { | |
| "id": trigger.trigger_id, | |
| "agent_id": trigger.agent_id, | |
| "provider": "composio", | |
| "slug": slug, | |
| "config": trigger.config, | |
| "is_active": trigger.is_active, | |
| "created_at": trigger.created_at.isoformat() | |
| } | |
| }) | |
| except Exception as e: | |
| logger.error(f"Error creating event trigger: {e}", exc_info=True) | |
| return self.fail_response(f"Error creating event trigger: {str(e)}") | |
| TriggerTool.create_event_trigger = create_event_trigger | |