| """ |
| WorkflowAgent — n8n workflow generation, validation, deployment (Phase 7) |
| Workflow Factor OS merged into God Agent ecosystem |
| """ |
| import json |
| from typing import Dict, List |
| import structlog |
| from .base_agent import BaseAgent |
|
|
| log = structlog.get_logger() |
|
|
| WORKFLOW_SYSTEM = """You are an expert n8n workflow architect and automation engineer. |
| You design production-grade automation workflows for: |
| - Telegram/Discord bots with AI responses |
| - Data pipelines and ETL processes |
| - API integrations and webhooks |
| - Scheduled tasks and cron jobs |
| - Email/Slack/notification systems |
| - GitHub CI/CD triggers |
| - AI-powered automation chains |
| |
| Generate valid n8n workflow JSON that can be directly imported. |
| Think about error handling, retry logic, and edge cases. |
| """ |
|
|
|
|
| class WorkflowAgent(BaseAgent): |
| def __init__(self, ws_manager=None, ai_router=None): |
| super().__init__("WorkflowAgent", ws_manager, ai_router) |
|
|
| async def run(self, task: str, context: Dict = {}, **kwargs) -> str: |
| session_id = kwargs.get("session_id", "") |
| task_id = kwargs.get("task_id", "") |
|
|
| await self.emit(task_id, "agent_start", { |
| "agent": "WorkflowAgent", |
| "task": task[:80], |
| }, session_id) |
|
|
| task_lower = task.lower() |
|
|
| if "telegram" in task_lower: |
| return await self._telegram_workflow(task, task_id, session_id) |
| elif "discord" in task_lower: |
| return await self._discord_workflow(task, task_id, session_id) |
| elif "schedule" in task_lower or "cron" in task_lower: |
| return await self._scheduled_workflow(task, task_id, session_id) |
| elif "github" in task_lower and ("webhook" in task_lower or "trigger" in task_lower): |
| return await self._github_workflow(task, task_id, session_id) |
| else: |
| return await self._generic_workflow(task, task_id, session_id) |
|
|
| async def _telegram_workflow(self, task: str, task_id: str, session_id: str) -> str: |
| messages = [ |
| {"role": "system", "content": WORKFLOW_SYSTEM}, |
| {"role": "user", "content": ( |
| f"Create a complete n8n workflow for: {task}\n\n" |
| f"Include:\n" |
| f"1. Telegram Trigger node (webhook)\n" |
| f"2. AI processing node (HTTP Request to API)\n" |
| f"3. Telegram Send Message node\n" |
| f"4. Error handling branch\n\n" |
| f"Also provide:\n" |
| f"- Setup instructions\n" |
| f"- Required environment variables\n" |
| f"- Testing steps\n" |
| f"- The n8n workflow JSON" |
| )}, |
| ] |
| result = await self.llm(messages, task_id=task_id, session_id=session_id, temperature=0.2, max_tokens=6000) |
| await self.emit(task_id, "workflow_generated", { |
| "type": "telegram_bot", |
| "agent": "WorkflowAgent", |
| }, session_id) |
| return result |
|
|
| async def _discord_workflow(self, task: str, task_id: str, session_id: str) -> str: |
| messages = [ |
| {"role": "system", "content": WORKFLOW_SYSTEM}, |
| {"role": "user", "content": ( |
| f"Create a complete n8n workflow for Discord automation: {task}\n\n" |
| f"Include Discord webhook integration, message processing, and response handling." |
| )}, |
| ] |
| result = await self.llm(messages, task_id=task_id, session_id=session_id, temperature=0.2, max_tokens=5000) |
| await self.emit(task_id, "workflow_generated", {"type": "discord", "agent": "WorkflowAgent"}, session_id) |
| return result |
|
|
| async def _scheduled_workflow(self, task: str, task_id: str, session_id: str) -> str: |
| messages = [ |
| {"role": "system", "content": WORKFLOW_SYSTEM}, |
| {"role": "user", "content": ( |
| f"Create a scheduled n8n workflow for: {task}\n\n" |
| f"Include cron trigger configuration, processing steps, and notification on completion/failure." |
| )}, |
| ] |
| result = await self.llm(messages, task_id=task_id, session_id=session_id, temperature=0.2) |
| await self.emit(task_id, "workflow_generated", {"type": "scheduled", "agent": "WorkflowAgent"}, session_id) |
| return result |
|
|
| async def _github_workflow(self, task: str, task_id: str, session_id: str) -> str: |
| messages = [ |
| {"role": "system", "content": WORKFLOW_SYSTEM}, |
| {"role": "user", "content": ( |
| f"Create a GitHub webhook n8n workflow for: {task}\n\n" |
| f"Include GitHub webhook trigger, event processing, and automated responses/actions." |
| )}, |
| ] |
| result = await self.llm(messages, task_id=task_id, session_id=session_id, temperature=0.2) |
| await self.emit(task_id, "workflow_generated", {"type": "github_webhook", "agent": "WorkflowAgent"}, session_id) |
| return result |
|
|
| async def _generic_workflow(self, task: str, task_id: str, session_id: str) -> str: |
| messages = [ |
| {"role": "system", "content": WORKFLOW_SYSTEM}, |
| {"role": "user", "content": ( |
| f"Design a complete automation workflow for: {task}\n\n" |
| f"Provide:\n" |
| f"1. Workflow architecture diagram (text)\n" |
| f"2. n8n node configuration\n" |
| f"3. Step-by-step setup guide\n" |
| f"4. Import-ready n8n JSON\n" |
| f"5. Testing checklist" |
| )}, |
| ] |
| result = await self.llm(messages, task_id=task_id, session_id=session_id, temperature=0.3, max_tokens=6000) |
| await self.emit(task_id, "workflow_generated", {"type": "custom", "agent": "WorkflowAgent"}, session_id) |
| return result |
|
|
| async def validate_workflow(self, workflow_json: str) -> Dict: |
| """Validate n8n workflow JSON structure.""" |
| try: |
| workflow = json.loads(workflow_json) |
| nodes = workflow.get("nodes", []) |
| connections = workflow.get("connections", {}) |
| has_trigger = any( |
| n.get("type", "").startswith("n8n-nodes-base.") and |
| ("Trigger" in n.get("type", "") or n.get("type", "").endswith("trigger")) |
| for n in nodes |
| ) |
| return { |
| "valid": True, |
| "node_count": len(nodes), |
| "has_trigger": has_trigger, |
| "connection_count": len(connections), |
| } |
| except json.JSONDecodeError as e: |
| return {"valid": False, "error": f"Invalid JSON: {e}"} |
| except Exception as e: |
| return {"valid": False, "error": str(e)} |
|
|
| async def simulate_workflow(self, workflow_json: str, test_data: Dict = {}) -> str: |
| """Simulate workflow execution with test data.""" |
| messages = [ |
| {"role": "system", "content": WORKFLOW_SYSTEM}, |
| {"role": "user", "content": ( |
| f"Simulate this n8n workflow execution with test data:\n\n" |
| f"Workflow: {workflow_json[:1000]}\n\n" |
| f"Test data: {json.dumps(test_data)}\n\n" |
| f"Walk through each node's execution and expected output." |
| )}, |
| ] |
| return await self.llm(messages, temperature=0.3) |
|
|