File size: 7,297 Bytes
02117ee | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 | """
WorkflowAgent — n8n workflow generation, validation, deployment (Phase 7)
Workflow Factor OS merged into God Agent ecosystem
"""
import json
from typing import Dict, List
import structlog
from .base_agent import BaseAgent
log = structlog.get_logger()
WORKFLOW_SYSTEM = """You are an expert n8n workflow architect and automation engineer.
You design production-grade automation workflows for:
- Telegram/Discord bots with AI responses
- Data pipelines and ETL processes
- API integrations and webhooks
- Scheduled tasks and cron jobs
- Email/Slack/notification systems
- GitHub CI/CD triggers
- AI-powered automation chains
Generate valid n8n workflow JSON that can be directly imported.
Think about error handling, retry logic, and edge cases.
"""
class WorkflowAgent(BaseAgent):
def __init__(self, ws_manager=None, ai_router=None):
super().__init__("WorkflowAgent", ws_manager, ai_router)
async def run(self, task: str, context: Dict = {}, **kwargs) -> str:
session_id = kwargs.get("session_id", "")
task_id = kwargs.get("task_id", "")
await self.emit(task_id, "agent_start", {
"agent": "WorkflowAgent",
"task": task[:80],
}, session_id)
task_lower = task.lower()
if "telegram" in task_lower:
return await self._telegram_workflow(task, task_id, session_id)
elif "discord" in task_lower:
return await self._discord_workflow(task, task_id, session_id)
elif "schedule" in task_lower or "cron" in task_lower:
return await self._scheduled_workflow(task, task_id, session_id)
elif "github" in task_lower and ("webhook" in task_lower or "trigger" in task_lower):
return await self._github_workflow(task, task_id, session_id)
else:
return await self._generic_workflow(task, task_id, session_id)
async def _telegram_workflow(self, task: str, task_id: str, session_id: str) -> str:
messages = [
{"role": "system", "content": WORKFLOW_SYSTEM},
{"role": "user", "content": (
f"Create a complete n8n workflow for: {task}\n\n"
f"Include:\n"
f"1. Telegram Trigger node (webhook)\n"
f"2. AI processing node (HTTP Request to API)\n"
f"3. Telegram Send Message node\n"
f"4. Error handling branch\n\n"
f"Also provide:\n"
f"- Setup instructions\n"
f"- Required environment variables\n"
f"- Testing steps\n"
f"- The n8n workflow JSON"
)},
]
result = await self.llm(messages, task_id=task_id, session_id=session_id, temperature=0.2, max_tokens=6000)
await self.emit(task_id, "workflow_generated", {
"type": "telegram_bot",
"agent": "WorkflowAgent",
}, session_id)
return result
async def _discord_workflow(self, task: str, task_id: str, session_id: str) -> str:
messages = [
{"role": "system", "content": WORKFLOW_SYSTEM},
{"role": "user", "content": (
f"Create a complete n8n workflow for Discord automation: {task}\n\n"
f"Include Discord webhook integration, message processing, and response handling."
)},
]
result = await self.llm(messages, task_id=task_id, session_id=session_id, temperature=0.2, max_tokens=5000)
await self.emit(task_id, "workflow_generated", {"type": "discord", "agent": "WorkflowAgent"}, session_id)
return result
async def _scheduled_workflow(self, task: str, task_id: str, session_id: str) -> str:
messages = [
{"role": "system", "content": WORKFLOW_SYSTEM},
{"role": "user", "content": (
f"Create a scheduled n8n workflow for: {task}\n\n"
f"Include cron trigger configuration, processing steps, and notification on completion/failure."
)},
]
result = await self.llm(messages, task_id=task_id, session_id=session_id, temperature=0.2)
await self.emit(task_id, "workflow_generated", {"type": "scheduled", "agent": "WorkflowAgent"}, session_id)
return result
async def _github_workflow(self, task: str, task_id: str, session_id: str) -> str:
messages = [
{"role": "system", "content": WORKFLOW_SYSTEM},
{"role": "user", "content": (
f"Create a GitHub webhook n8n workflow for: {task}\n\n"
f"Include GitHub webhook trigger, event processing, and automated responses/actions."
)},
]
result = await self.llm(messages, task_id=task_id, session_id=session_id, temperature=0.2)
await self.emit(task_id, "workflow_generated", {"type": "github_webhook", "agent": "WorkflowAgent"}, session_id)
return result
async def _generic_workflow(self, task: str, task_id: str, session_id: str) -> str:
messages = [
{"role": "system", "content": WORKFLOW_SYSTEM},
{"role": "user", "content": (
f"Design a complete automation workflow for: {task}\n\n"
f"Provide:\n"
f"1. Workflow architecture diagram (text)\n"
f"2. n8n node configuration\n"
f"3. Step-by-step setup guide\n"
f"4. Import-ready n8n JSON\n"
f"5. Testing checklist"
)},
]
result = await self.llm(messages, task_id=task_id, session_id=session_id, temperature=0.3, max_tokens=6000)
await self.emit(task_id, "workflow_generated", {"type": "custom", "agent": "WorkflowAgent"}, session_id)
return result
async def validate_workflow(self, workflow_json: str) -> Dict:
"""Validate n8n workflow JSON structure."""
try:
workflow = json.loads(workflow_json)
nodes = workflow.get("nodes", [])
connections = workflow.get("connections", {})
has_trigger = any(
n.get("type", "").startswith("n8n-nodes-base.") and
("Trigger" in n.get("type", "") or n.get("type", "").endswith("trigger"))
for n in nodes
)
return {
"valid": True,
"node_count": len(nodes),
"has_trigger": has_trigger,
"connection_count": len(connections),
}
except json.JSONDecodeError as e:
return {"valid": False, "error": f"Invalid JSON: {e}"}
except Exception as e:
return {"valid": False, "error": str(e)}
async def simulate_workflow(self, workflow_json: str, test_data: Dict = {}) -> str:
"""Simulate workflow execution with test data."""
messages = [
{"role": "system", "content": WORKFLOW_SYSTEM},
{"role": "user", "content": (
f"Simulate this n8n workflow execution with test data:\n\n"
f"Workflow: {workflow_json[:1000]}\n\n"
f"Test data: {json.dumps(test_data)}\n\n"
f"Walk through each node's execution and expected output."
)},
]
return await self.llm(messages, temperature=0.3)
|