File size: 21,888 Bytes
8a682b5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
"""
Workflow Orchestration Engine
Provides workflow management using LangGraph for complex agent workflows
"""

import asyncio
import json
import logging
from typing import Dict, List, Any, Optional, Callable, Union
from dataclasses import dataclass, field
from enum import Enum
from datetime import datetime, timedelta
import uuid
from contextlib import asynccontextmanager

from langgraph.graph import StateGraph, END
from langgraph.prebuilt import ToolExecutor
from langchain_core.tools import BaseTool
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage

logger = logging.getLogger(__name__)

# =============================
# Workflow Types
# =============================

class WorkflowStatus(Enum):
    """Workflow execution status"""
    PENDING = "pending"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"
    CANCELLED = "cancelled"
    TIMEOUT = "timeout"

class WorkflowType(Enum):
    """Types of workflows"""
    SEQUENTIAL = "sequential"
    PARALLEL = "parallel"
    CONDITIONAL = "conditional"
    LOOP = "loop"
    FAN_OUT = "fan_out"
    FAN_IN = "fan_in"

@dataclass
class WorkflowStep:
    """Represents a step in a workflow"""
    step_id: str
    name: str
    description: str
    agent_id: Optional[str] = None
    tool_name: Optional[str] = None
    input_mapping: Dict[str, str] = field(default_factory=dict)
    output_mapping: Dict[str, str] = field(default_factory=dict)
    timeout: Optional[float] = None
    retry_count: int = 3
    retry_delay: float = 1.0
    dependencies: List[str] = field(default_factory=list)
    condition: Optional[str] = None  # Expression for conditional execution
    parallel: bool = False

@dataclass
class WorkflowDefinition:
    """Complete workflow definition"""
    workflow_id: str
    name: str
    description: str
    workflow_type: WorkflowType
    steps: List[WorkflowStep]
    input_schema: Dict[str, Any] = field(default_factory=dict)
    output_schema: Dict[str, Any] = field(default_factory=dict)
    timeout: Optional[float] = None
    max_retries: int = 3
    metadata: Dict[str, Any] = field(default_factory=dict)

@dataclass
class WorkflowExecution:
    """Workflow execution instance"""
    execution_id: str
    workflow_id: str
    status: WorkflowStatus
    input_data: Dict[str, Any]
    output_data: Optional[Dict[str, Any]] = None
    step_results: Dict[str, Any] = field(default_factory=dict)
    error_message: Optional[str] = None
    start_time: datetime = field(default_factory=datetime.now)
    end_time: Optional[datetime] = None
    metadata: Dict[str, Any] = field(default_factory=dict)

# =============================
# Workflow State
# =============================

@dataclass
class WorkflowState:
    """State passed between workflow steps"""
    execution_id: str
    workflow_id: str
    current_step: str
    step_results: Dict[str, Any] = field(default_factory=dict)
    input_data: Dict[str, Any] = field(default_factory=dict)
    output_data: Dict[str, Any] = field(default_factory=dict)
    error: Optional[str] = None
    metadata: Dict[str, Any] = field(default_factory=dict)
    step_history: List[str] = field(default_factory=list)
    retry_count: Dict[str, int] = field(default_factory=dict)

# =============================
# Workflow Engine
# =============================

class WorkflowEngine:
    """Main workflow orchestration engine"""
    
    def __init__(self):
        self.workflows: Dict[str, WorkflowDefinition] = {}
        self.executions: Dict[str, WorkflowExecution] = {}
        self.agents: Dict[str, Any] = {}  # Agent registry
        self.tools: Dict[str, BaseTool] = {}  # Tool registry
        self._lock = asyncio.Lock()
        
    async def register_workflow(self, workflow: WorkflowDefinition) -> bool:
        """Register a new workflow definition"""
        async with self._lock:
            self.workflows[workflow.workflow_id] = workflow
            logger.info(f"Registered workflow: {workflow.name} ({workflow.workflow_id})")
            return True
            
    async def unregister_workflow(self, workflow_id: str) -> bool:
        """Unregister a workflow definition"""
        async with self._lock:
            if workflow_id in self.workflows:
                del self.workflows[workflow_id]
                logger.info(f"Unregistered workflow: {workflow_id}")
                return True
            return False
            
    async def register_agent(self, agent_id: str, agent: Any) -> None:
        """Register an agent for workflow execution"""
        self.agents[agent_id] = agent
        
    async def register_tool(self, tool_name: str, tool: BaseTool) -> None:
        """Register a tool for workflow execution"""
        self.tools[tool_name] = tool
        
    async def execute_workflow(self, workflow_id: str, input_data: Dict[str, Any],
                             execution_id: Optional[str] = None) -> WorkflowExecution:
        """Execute a workflow"""
        if workflow_id not in self.workflows:
            raise ValueError(f"Workflow {workflow_id} not found")
            
        workflow = self.workflows[workflow_id]
        execution_id = execution_id or str(uuid.uuid4())
        
        # Create execution instance
        execution = WorkflowExecution(
            execution_id=execution_id,
            workflow_id=workflow_id,
            status=WorkflowStatus.PENDING,
            input_data=input_data
        )
        
        self.executions[execution_id] = execution
        
        try:
            # Update status to running
            execution.status = WorkflowStatus.RUNNING
            
            # Create workflow graph
            graph = await self._create_workflow_graph(workflow)
            
            # Execute workflow
            initial_state = WorkflowState(
                execution_id=execution_id,
                workflow_id=workflow_id,
                current_step="start",
                input_data=input_data
            )
            
            # Run the workflow
            final_state = await self._run_workflow(graph, initial_state, workflow)
            
            # Update execution with results
            execution.output_data = final_state.output_data
            execution.step_results = final_state.step_results
            execution.status = WorkflowStatus.COMPLETED
            execution.end_time = datetime.now()
            
            logger.info(f"Workflow execution completed: {execution_id}")
            
        except Exception as e:
            execution.status = WorkflowStatus.FAILED
            execution.error_message = str(e)
            execution.end_time = datetime.now()
            logger.error(f"Workflow execution failed: {execution_id}, error: {e}")
            
        return execution
        
    async def _create_workflow_graph(self, workflow: WorkflowDefinition) -> StateGraph:
        """Create LangGraph state graph from workflow definition"""
        workflow_graph = StateGraph(WorkflowState)
        
        # Add nodes for each step
        for step in workflow.steps:
            workflow_graph.add_node(step.step_id, self._create_step_node(step))
            
        # Add edges based on workflow type
        if workflow.workflow_type == WorkflowType.SEQUENTIAL:
            await self._add_sequential_edges(workflow_graph, workflow.steps)
        elif workflow.workflow_type == WorkflowType.PARALLEL:
            await self._add_parallel_edges(workflow_graph, workflow.steps)
        elif workflow.workflow_type == WorkflowType.CONDITIONAL:
            await self._add_conditional_edges(workflow_graph, workflow.steps)
        elif workflow.workflow_type == WorkflowType.LOOP:
            await self._add_loop_edges(workflow_graph, workflow.steps)
            
        # Set entry point
        if workflow.steps:
            workflow_graph.set_entry_point(workflow.steps[0].step_id)
            
        return workflow_graph.compile()
        
    def _create_step_node(self, step: WorkflowStep) -> Callable:
        """Create a node function for a workflow step"""
        async def step_node(state: WorkflowState) -> WorkflowState:
            try:
                logger.info(f"Executing step: {step.name} ({step.step_id})")
                
                # Update current step
                state.current_step = step.step_id
                state.step_history.append(step.step_id)
                
                # Prepare input for the step
                step_input = self._prepare_step_input(step, state)
                
                # Execute the step
                if step.agent_id:
                    result = await self._execute_agent_step(step, step_input)
                elif step.tool_name:
                    result = await self._execute_tool_step(step, step_input)
                else:
                    result = await self._execute_custom_step(step, step_input)
                    
                # Process output
                self._process_step_output(step, result, state)
                
                # Update step results
                state.step_results[step.step_id] = {
                    "status": "success",
                    "result": result,
                    "timestamp": datetime.now().isoformat()
                }
                
            except Exception as e:
                logger.error(f"Step execution failed: {step.step_id}, error: {e}")
                
                # Handle retries
                retry_count = state.retry_count.get(step.step_id, 0)
                if retry_count < step.retry_count:
                    state.retry_count[step.step_id] = retry_count + 1
                    await asyncio.sleep(step.retry_delay * (retry_count + 1))
                    # Retry the step
                    return await step_node(state)
                else:
                    # Max retries exceeded
                    state.error = str(e)
                    state.step_results[step.step_id] = {
                        "status": "failed",
                        "error": str(e),
                        "timestamp": datetime.now().isoformat()
                    }
                    
            return state
            
        return step_node
        
    def _prepare_step_input(self, step: WorkflowStep, state: WorkflowState) -> Dict[str, Any]:
        """Prepare input data for a workflow step"""
        step_input = {}
        
        for input_key, source_path in step.input_mapping.items():
            if source_path.startswith("input."):
                # Map from workflow input
                key = source_path.split(".", 1)[1]
                step_input[input_key] = state.input_data.get(key)
            elif source_path.startswith("output."):
                # Map from previous step output
                step_id, output_key = source_path.split(".", 2)[1:]
                if step_id in state.step_results:
                    step_input[input_key] = state.step_results[step_id].get("result", {}).get(output_key)
            else:
                # Direct value
                step_input[input_key] = source_path
                
        return step_input
        
    async def _execute_agent_step(self, step: WorkflowStep, step_input: Dict[str, Any]) -> Dict[str, Any]:
        """Execute a step using an agent"""
        if step.agent_id not in self.agents:
            raise ValueError(f"Agent {step.agent_id} not found")
            
        agent = self.agents[step.agent_id]
        
        # Create message for the agent
        message = HumanMessage(content=json.dumps(step_input))
        
        # Execute agent
        response = await agent.ainvoke([message])
        
        return {
            "agent_id": step.agent_id,
            "response": response.content,
            "metadata": response.additional_kwargs
        }
        
    async def _execute_tool_step(self, step: WorkflowStep, step_input: Dict[str, Any]) -> Dict[str, Any]:
        """Execute a step using a tool"""
        if step.tool_name not in self.tools:
            raise ValueError(f"Tool {step.tool_name} not found")
            
        tool = self.tools[step.tool_name]
        
        # Execute tool
        result = await tool.ainvoke(step_input)
        
        return {
            "tool_name": step.tool_name,
            "result": result,
            "metadata": {}
        }
        
    async def _execute_custom_step(self, step: WorkflowStep, step_input: Dict[str, Any]) -> Dict[str, Any]:
        """Execute a custom step (placeholder for custom logic)"""
        # This would be implemented based on custom step types
        return {
            "step_type": "custom",
            "input": step_input,
            "result": {"status": "completed"}
        }
        
    def _process_step_output(self, step: WorkflowStep, result: Dict[str, Any], state: WorkflowState):
        """Process output from a workflow step"""
        for output_key, target_path in step.output_mapping.items():
            if target_path.startswith("output."):
                # Map to workflow output
                key = target_path.split(".", 1)[1]
                state.output_data[key] = result.get(output_key)
            else:
                # Store in step results
                state.step_results[step.step_id][output_key] = result.get(output_key)
                
    async def _add_sequential_edges(self, graph: StateGraph, steps: List[WorkflowStep]):
        """Add edges for sequential workflow"""
        for i in range(len(steps) - 1):
            current_step = steps[i]
            next_step = steps[i + 1]
            graph.add_edge(current_step.step_id, next_step.step_id)
            
        # Add final edge to END
        if steps:
            graph.add_edge(steps[-1].step_id, END)
            
    async def _add_parallel_edges(self, graph: StateGraph, steps: List[WorkflowStep]):
        """Add edges for parallel workflow"""
        # All steps can run in parallel
        for step in steps:
            graph.add_edge(step.step_id, END)
            
    async def _add_conditional_edges(self, graph: StateGraph, steps: List[WorkflowStep]):
        """Add edges for conditional workflow"""
        # This would implement conditional routing based on step conditions
        for step in steps:
            if step.condition:
                # Add conditional edge
                graph.add_conditional_edges(
                    step.step_id,
                    self._create_condition_function(step.condition)
                )
            else:
                graph.add_edge(step.step_id, END)
                
    async def _add_loop_edges(self, graph: StateGraph, steps: List[WorkflowStep]):
        """Add edges for loop workflow"""
        # This would implement loop logic
        for step in steps:
            graph.add_edge(step.step_id, step.step_id)  # Loop back to same step
            
    def _create_condition_function(self, condition: str) -> Callable:
        """Create a condition function for conditional routing"""
        def condition_func(state: WorkflowState) -> str:
            # Simple condition evaluation (would be more sophisticated in practice)
            try:
                # Evaluate condition against state
                return "continue" if eval(condition, {"state": state}) else "end"
            except:
                return "end"
        return condition_func
        
    async def _run_workflow(self, graph: StateGraph, initial_state: WorkflowState,
                          workflow: WorkflowDefinition) -> WorkflowState:
        """Run the workflow graph"""
        # Execute the graph
        final_state = await graph.ainvoke(initial_state)
        
        # Check for timeout
        if workflow.timeout:
            execution_time = (datetime.now() - initial_state.start_time).total_seconds()
            if execution_time > workflow.timeout:
                raise TimeoutError(f"Workflow execution timed out after {workflow.timeout} seconds")
                
        return final_state
        
    async def get_execution_status(self, execution_id: str) -> Optional[WorkflowExecution]:
        """Get the status of a workflow execution"""
        return self.executions.get(execution_id)
        
    async def cancel_execution(self, execution_id: str) -> bool:
        """Cancel a workflow execution"""
        if execution_id in self.executions:
            execution = self.executions[execution_id]
            if execution.status == WorkflowStatus.RUNNING:
                execution.status = WorkflowStatus.CANCELLED
                execution.end_time = datetime.now()
                return True
        return False
        
    async def get_workflow_definitions(self) -> List[WorkflowDefinition]:
        """Get all workflow definitions"""
        return list(self.workflows.values())
        
    async def get_execution_history(self, workflow_id: Optional[str] = None) -> List[WorkflowExecution]:
        """Get execution history"""
        executions = list(self.executions.values())
        if workflow_id:
            executions = [e for e in executions if e.workflow_id == workflow_id]
        return executions

# =============================
# Workflow Builder
# =============================

class WorkflowBuilder:
    """Builder for creating workflow definitions"""
    
    def __init__(self, name: str, description: str = ""):
        self.workflow_id = str(uuid.uuid4())
        self.name = name
        self.description = description
        self.workflow_type = WorkflowType.SEQUENTIAL
        self.steps: List[WorkflowStep] = []
        self.input_schema: Dict[str, Any] = {}
        self.output_schema: Dict[str, Any] = {}
        self.timeout: Optional[float] = None
        self.max_retries: int = 3
        self.metadata: Dict[str, Any] = {}
        
    def set_type(self, workflow_type: WorkflowType) -> 'WorkflowBuilder':
        """Set workflow type"""
        self.workflow_type = workflow_type
        return self
        
    def add_step(self, step: WorkflowStep) -> 'WorkflowBuilder':
        """Add a step to the workflow"""
        self.steps.append(step)
        return self
        
    def add_agent_step(self, name: str, agent_id: str, description: str = "",
                      input_mapping: Optional[Dict[str, str]] = None,
                      output_mapping: Optional[Dict[str, str]] = None) -> 'WorkflowBuilder':
        """Add an agent step"""
        step = WorkflowStep(
            step_id=str(uuid.uuid4()),
            name=name,
            description=description,
            agent_id=agent_id,
            input_mapping=input_mapping or {},
            output_mapping=output_mapping or {}
        )
        return self.add_step(step)
        
    def add_tool_step(self, name: str, tool_name: str, description: str = "",
                     input_mapping: Optional[Dict[str, str]] = None,
                     output_mapping: Optional[Dict[str, str]] = None) -> 'WorkflowBuilder':
        """Add a tool step"""
        step = WorkflowStep(
            step_id=str(uuid.uuid4()),
            name=name,
            description=description,
            tool_name=tool_name,
            input_mapping=input_mapping or {},
            output_mapping=output_mapping or {}
        )
        return self.add_step(step)
        
    def set_input_schema(self, schema: Dict[str, Any]) -> 'WorkflowBuilder':
        """Set input schema"""
        self.input_schema = schema
        return self
        
    def set_output_schema(self, schema: Dict[str, Any]) -> 'WorkflowBuilder':
        """Set output schema"""
        self.output_schema = schema
        return self
        
    def set_timeout(self, timeout: float) -> 'WorkflowBuilder':
        """Set workflow timeout"""
        self.timeout = timeout
        return self
        
    def set_max_retries(self, max_retries: int) -> 'WorkflowBuilder':
        """Set maximum retries"""
        self.max_retries = max_retries
        return self
        
    def add_metadata(self, key: str, value: Any) -> 'WorkflowBuilder':
        """Add metadata"""
        self.metadata[key] = value
        return self
        
    def build(self) -> WorkflowDefinition:
        """Build the workflow definition"""
        return WorkflowDefinition(
            workflow_id=self.workflow_id,
            name=self.name,
            description=self.description,
            workflow_type=self.workflow_type,
            steps=self.steps,
            input_schema=self.input_schema,
            output_schema=self.output_schema,
            timeout=self.timeout,
            max_retries=self.max_retries,
            metadata=self.metadata
        )

# =============================
# Global Workflow Engine
# =============================

# Global workflow engine instance
workflow_engine = WorkflowEngine()

# =============================
# Utility Functions
# =============================

async def register_workflow(workflow: WorkflowDefinition) -> bool:
    """Register a workflow with the global engine"""
    return await workflow_engine.register_workflow(workflow)

async def execute_workflow(workflow_id: str, input_data: Dict[str, Any],
                          execution_id: Optional[str] = None) -> WorkflowExecution:
    """Execute a workflow using the global engine"""
    return await workflow_engine.execute_workflow(workflow_id, input_data, execution_id)

async def get_execution_status(execution_id: str) -> Optional[WorkflowExecution]:
    """Get execution status from the global engine"""
    return await workflow_engine.get_execution_status(execution_id)

def create_workflow_builder(name: str, description: str = "") -> WorkflowBuilder:
    """Create a new workflow builder"""
    return WorkflowBuilder(name, description)