File size: 10,396 Bytes
8a682b5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
"""
Agent executor implementation for executing AI agents.
"""

import asyncio
import logging
from typing import Dict, Any, Optional
from uuid import UUID, uuid4
from datetime import datetime

from src.core.interfaces.agent_executor import AgentExecutor
from src.core.entities.agent import Agent, AgentType
from src.core.entities.message import Message
from src.shared.exceptions import DomainException


class AgentExecutorImpl(AgentExecutor):
    """
    Implementation of the agent executor interface.
    
    This class handles the execution of different types of agents
    and manages their lifecycle during processing.
    """
    
    def __init__(self):
        self.logger = logging.getLogger(__name__)
        self._active_executions: Dict[UUID, Dict[str, Any]] = {}
    
    async def execute(self, agent: Agent, message: Message) -> Dict[str, Any]:
        """
        Execute an agent with a given message.
        
        Args:
            agent: The agent to execute
            message: The message to process
            
        Returns:
            Dictionary containing the execution result
        """
        execution_id = uuid4()
        start_time = datetime.now()
        
        try:
            # Register execution
            self._active_executions[execution_id] = {
                "agent_id": agent.id,
                "message_id": message.id,
                "start_time": start_time,
                "status": "running"
            }
            
            self.logger.info(f"Starting execution {execution_id} for agent {agent.id}")
            
            # Validate agent
            validation_result = await self.validate_agent(agent)
            if not validation_result.get("valid", False):
                raise DomainException(f"Agent validation failed: {validation_result.get('errors', [])}")
            
            # Execute based on agent type
            if agent.agent_type == AgentType.FSM_REACT:
                result = await self._execute_fsm_react_agent(agent, message)
            elif agent.agent_type == AgentType.NEXT_GEN:
                result = await self._execute_next_gen_agent(agent, message)
            elif agent.agent_type == AgentType.CREW:
                result = await self._execute_crew_agent(agent, message)
            elif agent.agent_type == AgentType.SPECIALIZED:
                result = await self._execute_specialized_agent(agent, message)
            else:
                raise DomainException(f"Unsupported agent type: {agent.agent_type}")
            
            # Update execution status
            execution_time = (datetime.now() - start_time).total_seconds()
            self._active_executions[execution_id]["status"] = "completed"
            self._active_executions[execution_id]["execution_time"] = execution_time
            
            # Add execution metadata
            result["execution_id"] = str(execution_id)
            result["execution_time"] = execution_time
            
            self.logger.info(f"Execution {execution_id} completed successfully in {execution_time:.2f}s")
            
            return result
            
        except Exception as e:
            execution_time = (datetime.now() - start_time).total_seconds()
            self._active_executions[execution_id]["status"] = "failed"
            self._active_executions[execution_id]["error"] = str(e)
            self._active_executions[execution_id]["execution_time"] = execution_time
            
            self.logger.error(f"Execution {execution_id} failed: {str(e)}")
            raise DomainException(f"Agent execution failed: {str(e)}")
    
    async def validate_agent(self, agent: Agent) -> Dict[str, Any]:
        """
        Validate an agent before execution.
        
        Args:
            agent: The agent to validate
            
        Returns:
            Dictionary containing validation result
        """
        errors = []
        warnings = []
        
        # Check if agent is available
        if not agent.is_available:
            errors.append("Agent is not available for execution")
        
        # Check agent configuration
        if not agent.config:
            warnings.append("Agent has no configuration")
        
        # Check model configuration
        if not agent.model_config:
            warnings.append("Agent has no model configuration")
        
        # Validate agent type
        if agent.agent_type not in AgentType:
            errors.append(f"Invalid agent type: {agent.agent_type}")
        
        return {
            "valid": len(errors) == 0,
            "errors": errors,
            "warnings": warnings
        }
    
    async def get_agent_capabilities(self, agent: Agent) -> Dict[str, Any]:
        """
        Get agent capabilities and supported operations.
        
        Args:
            agent: The agent to query
            
        Returns:
            Dictionary containing agent capabilities
        """
        capabilities = {
            "agent_type": agent.agent_type.value,
            "supported_operations": [],
            "tools_available": [],
            "model_info": {}
        }
        
        # Add capabilities based on agent type
        if agent.agent_type == AgentType.FSM_REACT:
            capabilities["supported_operations"] = [
                "text_processing",
                "tool_execution",
                "state_management",
                "reasoning"
            ]
        elif agent.agent_type == AgentType.NEXT_GEN:
            capabilities["supported_operations"] = [
                "advanced_reasoning",
                "parallel_processing",
                "multi_modal_processing",
                "learning"
            ]
        elif agent.agent_type == AgentType.CREW:
            capabilities["supported_operations"] = [
                "multi_agent_coordination",
                "task_delegation",
                "collaborative_reasoning",
                "workflow_management"
            ]
        elif agent.agent_type == AgentType.SPECIALIZED:
            capabilities["supported_operations"] = [
                "domain_specific_processing",
                "expert_knowledge",
                "specialized_tools"
            ]
        
        return capabilities
    
    async def cancel_execution(self, execution_id: UUID) -> bool:
        """
        Cancel a running execution.
        
        Args:
            execution_id: The execution to cancel
            
        Returns:
            True if cancellation was successful, False otherwise
        """
        if execution_id not in self._active_executions:
            return False
        
        execution = self._active_executions[execution_id]
        if execution["status"] == "running":
            execution["status"] = "cancelled"
            execution["end_time"] = datetime.now()
            self.logger.info(f"Execution {execution_id} cancelled")
            return True
        
        return False
    
    async def get_execution_status(self, execution_id: UUID) -> Dict[str, Any]:
        """
        Get the status of an execution.
        
        Args:
            execution_id: The execution to query
            
        Returns:
            Dictionary containing execution status
        """
        if execution_id not in self._active_executions:
            return {"error": "Execution not found"}
        
        execution = self._active_executions[execution_id]
        status = {
            "execution_id": str(execution_id),
            "status": execution["status"],
            "agent_id": str(execution["agent_id"]),
            "message_id": str(execution["message_id"]),
            "start_time": execution["start_time"].isoformat()
        }
        
        if "execution_time" in execution:
            status["execution_time"] = execution["execution_time"]
        
        if "error" in execution:
            status["error"] = execution["error"]
        
        return status
    
    async def _execute_fsm_react_agent(self, agent: Agent, message: Message) -> Dict[str, Any]:
        """Execute an FSM React agent."""
        # This would integrate with the existing FSM agent implementation
        # For now, return a mock response
        await asyncio.sleep(0.1)  # Simulate processing time
        
        return {
            "response": f"FSM React agent processed: {message.content}",
            "confidence": 0.85,
            "tools_used": ["text_processor", "reasoning_engine"],
            "metadata": {
                "agent_type": "fsm_react",
                "processing_steps": 3
            }
        }
    
    async def _execute_next_gen_agent(self, agent: Agent, message: Message) -> Dict[str, Any]:
        """Execute a Next Gen agent."""
        await asyncio.sleep(0.2)  # Simulate processing time
        
        return {
            "response": f"Next Gen agent processed: {message.content}",
            "confidence": 0.92,
            "tools_used": ["advanced_reasoning", "parallel_processor"],
            "metadata": {
                "agent_type": "next_gen",
                "processing_steps": 5
            }
        }
    
    async def _execute_crew_agent(self, agent: Agent, message: Message) -> Dict[str, Any]:
        """Execute a Crew agent."""
        await asyncio.sleep(0.3)  # Simulate processing time
        
        return {
            "response": f"Crew agent processed: {message.content}",
            "confidence": 0.88,
            "tools_used": ["coordinator", "researcher", "executor"],
            "metadata": {
                "agent_type": "crew",
                "crew_size": 3,
                "processing_steps": 7
            }
        }
    
    async def _execute_specialized_agent(self, agent: Agent, message: Message) -> Dict[str, Any]:
        """Execute a Specialized agent."""
        await asyncio.sleep(0.15)  # Simulate processing time
        
        return {
            "response": f"Specialized agent processed: {message.content}",
            "confidence": 0.95,
            "tools_used": ["domain_expert", "specialized_tool"],
            "metadata": {
                "agent_type": "specialized",
                "domain": "expert",
                "processing_steps": 2
            }
        }


# Alias for backward compatibility
AgentExecutor = AgentExecutorImpl