File size: 7,297 Bytes
02117ee
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
"""
WorkflowAgent — n8n workflow generation, validation, deployment (Phase 7)
Workflow Factor OS merged into God Agent ecosystem
"""
import json
from typing import Dict, List
import structlog
from .base_agent import BaseAgent

log = structlog.get_logger()

WORKFLOW_SYSTEM = """You are an expert n8n workflow architect and automation engineer.
You design production-grade automation workflows for:
- Telegram/Discord bots with AI responses
- Data pipelines and ETL processes
- API integrations and webhooks
- Scheduled tasks and cron jobs
- Email/Slack/notification systems
- GitHub CI/CD triggers
- AI-powered automation chains

Generate valid n8n workflow JSON that can be directly imported.
Think about error handling, retry logic, and edge cases.
"""


class WorkflowAgent(BaseAgent):
    def __init__(self, ws_manager=None, ai_router=None):
        super().__init__("WorkflowAgent", ws_manager, ai_router)

    async def run(self, task: str, context: Dict = {}, **kwargs) -> str:
        session_id = kwargs.get("session_id", "")
        task_id = kwargs.get("task_id", "")

        await self.emit(task_id, "agent_start", {
            "agent": "WorkflowAgent",
            "task": task[:80],
        }, session_id)

        task_lower = task.lower()

        if "telegram" in task_lower:
            return await self._telegram_workflow(task, task_id, session_id)
        elif "discord" in task_lower:
            return await self._discord_workflow(task, task_id, session_id)
        elif "schedule" in task_lower or "cron" in task_lower:
            return await self._scheduled_workflow(task, task_id, session_id)
        elif "github" in task_lower and ("webhook" in task_lower or "trigger" in task_lower):
            return await self._github_workflow(task, task_id, session_id)
        else:
            return await self._generic_workflow(task, task_id, session_id)

    async def _telegram_workflow(self, task: str, task_id: str, session_id: str) -> str:
        messages = [
            {"role": "system", "content": WORKFLOW_SYSTEM},
            {"role": "user", "content": (
                f"Create a complete n8n workflow for: {task}\n\n"
                f"Include:\n"
                f"1. Telegram Trigger node (webhook)\n"
                f"2. AI processing node (HTTP Request to API)\n"
                f"3. Telegram Send Message node\n"
                f"4. Error handling branch\n\n"
                f"Also provide:\n"
                f"- Setup instructions\n"
                f"- Required environment variables\n"
                f"- Testing steps\n"
                f"- The n8n workflow JSON"
            )},
        ]
        result = await self.llm(messages, task_id=task_id, session_id=session_id, temperature=0.2, max_tokens=6000)
        await self.emit(task_id, "workflow_generated", {
            "type": "telegram_bot",
            "agent": "WorkflowAgent",
        }, session_id)
        return result

    async def _discord_workflow(self, task: str, task_id: str, session_id: str) -> str:
        messages = [
            {"role": "system", "content": WORKFLOW_SYSTEM},
            {"role": "user", "content": (
                f"Create a complete n8n workflow for Discord automation: {task}\n\n"
                f"Include Discord webhook integration, message processing, and response handling."
            )},
        ]
        result = await self.llm(messages, task_id=task_id, session_id=session_id, temperature=0.2, max_tokens=5000)
        await self.emit(task_id, "workflow_generated", {"type": "discord", "agent": "WorkflowAgent"}, session_id)
        return result

    async def _scheduled_workflow(self, task: str, task_id: str, session_id: str) -> str:
        messages = [
            {"role": "system", "content": WORKFLOW_SYSTEM},
            {"role": "user", "content": (
                f"Create a scheduled n8n workflow for: {task}\n\n"
                f"Include cron trigger configuration, processing steps, and notification on completion/failure."
            )},
        ]
        result = await self.llm(messages, task_id=task_id, session_id=session_id, temperature=0.2)
        await self.emit(task_id, "workflow_generated", {"type": "scheduled", "agent": "WorkflowAgent"}, session_id)
        return result

    async def _github_workflow(self, task: str, task_id: str, session_id: str) -> str:
        messages = [
            {"role": "system", "content": WORKFLOW_SYSTEM},
            {"role": "user", "content": (
                f"Create a GitHub webhook n8n workflow for: {task}\n\n"
                f"Include GitHub webhook trigger, event processing, and automated responses/actions."
            )},
        ]
        result = await self.llm(messages, task_id=task_id, session_id=session_id, temperature=0.2)
        await self.emit(task_id, "workflow_generated", {"type": "github_webhook", "agent": "WorkflowAgent"}, session_id)
        return result

    async def _generic_workflow(self, task: str, task_id: str, session_id: str) -> str:
        messages = [
            {"role": "system", "content": WORKFLOW_SYSTEM},
            {"role": "user", "content": (
                f"Design a complete automation workflow for: {task}\n\n"
                f"Provide:\n"
                f"1. Workflow architecture diagram (text)\n"
                f"2. n8n node configuration\n"
                f"3. Step-by-step setup guide\n"
                f"4. Import-ready n8n JSON\n"
                f"5. Testing checklist"
            )},
        ]
        result = await self.llm(messages, task_id=task_id, session_id=session_id, temperature=0.3, max_tokens=6000)
        await self.emit(task_id, "workflow_generated", {"type": "custom", "agent": "WorkflowAgent"}, session_id)
        return result

    async def validate_workflow(self, workflow_json: str) -> Dict:
        """Validate n8n workflow JSON structure."""
        try:
            workflow = json.loads(workflow_json)
            nodes = workflow.get("nodes", [])
            connections = workflow.get("connections", {})
            has_trigger = any(
                n.get("type", "").startswith("n8n-nodes-base.") and
                ("Trigger" in n.get("type", "") or n.get("type", "").endswith("trigger"))
                for n in nodes
            )
            return {
                "valid": True,
                "node_count": len(nodes),
                "has_trigger": has_trigger,
                "connection_count": len(connections),
            }
        except json.JSONDecodeError as e:
            return {"valid": False, "error": f"Invalid JSON: {e}"}
        except Exception as e:
            return {"valid": False, "error": str(e)}

    async def simulate_workflow(self, workflow_json: str, test_data: Dict = {}) -> str:
        """Simulate workflow execution with test data."""
        messages = [
            {"role": "system", "content": WORKFLOW_SYSTEM},
            {"role": "user", "content": (
                f"Simulate this n8n workflow execution with test data:\n\n"
                f"Workflow: {workflow_json[:1000]}\n\n"
                f"Test data: {json.dumps(test_data)}\n\n"
                f"Walk through each node's execution and expected output."
            )},
        ]
        return await self.llm(messages, temperature=0.3)