File size: 6,607 Bytes
631d977
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
"""
Parallel Agent Executor
========================
Provides parallel execution of multiple agents using asyncio.
"""

import asyncio
from typing import Any, Callable, Awaitable
from dataclasses import dataclass, field
from datetime import datetime, timezone
import logging

from agents.base import BaseAgent
from ledger.merkle import hash_leaf

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("ParallelExecutor")


@dataclass
class AgentTask:
    """Represents a task to be executed by an agent."""
    agent: BaseAgent
    input_data: dict[str, Any]
    task_id: str = ""
    
    def __post_init__(self):
        if not self.task_id:
            self.task_id = f"task_{hash_leaf(str(self.input_data))[:8]}"


@dataclass
class AgentResult:
    """Represents the result of an agent execution."""
    task_id: str
    agent_role: str
    input_data: dict[str, Any]
    output_data: dict[str, Any]
    success: bool
    error: str = ""
    execution_time_ms: float = 0
    timestamp: str = ""
    
    def __post_init__(self):
        if not self.timestamp:
            self.timestamp = datetime.now(timezone.utc).isoformat()


class ParallelExecutor:
    """
    Executes multiple agents in parallel using asyncio.gather.
    
    Features:
    - Concurrent agent execution
    - Error handling per agent
    - Result aggregation
    - Execution timing
    """
    
    def __init__(self, max_concurrency: int = 10):
        """
        Initialize the parallel executor.
        
        Args:
            max_concurrency: Maximum number of concurrent agent executions
        """
        self.max_concurrency = max_concurrency
        self.semaphore = asyncio.Semaphore(max_concurrency)
    
    async def execute_single(self, task: AgentTask) -> AgentResult:
        """
        Execute a single agent task with semaphore control.
        
        Args:
            task: The agent task to execute
            
        Returns:
            AgentResult with execution details
        """
        async with self.semaphore:
            start_time = asyncio.get_event_loop().time()
            
            try:
                output = await task.agent.run(task.input_data)
                execution_time = (asyncio.get_event_loop().time() - start_time) * 1000
                
                return AgentResult(
                    task_id=task.task_id,
                    agent_role=task.agent.role,
                    input_data=task.input_data,
                    output_data=output,
                    success=True,
                    execution_time_ms=execution_time
                )
            except Exception as e:
                execution_time = (asyncio.get_event_loop().time() - start_time) * 1000
                logger.error(f"Agent {task.agent.role} failed: {e}")
                
                return AgentResult(
                    task_id=task.task_id,
                    agent_role=task.agent.role,
                    input_data=task.input_data,
                    output_data={},
                    success=False,
                    error=str(e),
                    execution_time_ms=execution_time
                )
    
    async def execute_parallel(
        self, 
        tasks: list[AgentTask],
        return_exceptions: bool = False
    ) -> list[AgentResult]:
        """
        Execute multiple agent tasks in parallel.
        
        Args:
            tasks: List of AgentTask objects to execute
            return_exceptions: If True, exceptions are returned as results
            
        Returns:
            List of AgentResult objects
        """
        logger.info(f"Executing {len(tasks)} tasks in parallel (max concurrency: {self.max_concurrency})")
        
        # Create coroutines for all tasks
        coroutines = [self.execute_single(task) for task in tasks]
        
        # Execute all in parallel
        results = await asyncio.gather(*coroutines, return_exceptions=return_exceptions)
        
        # Handle any unexpected exceptions
        processed_results = []
        for i, result in enumerate(results):
            if isinstance(result, Exception):
                processed_results.append(AgentResult(
                    task_id=tasks[i].task_id,
                    agent_role=tasks[i].agent.role,
                    input_data=tasks[i].input_data,
                    output_data={},
                    success=False,
                    error=str(result)
                ))
            else:
                processed_results.append(result)
        
        successful = sum(1 for r in processed_results if r.success)
        logger.info(f"Parallel execution complete: {successful}/{len(tasks)} successful")
        
        return processed_results
    
    async def execute_with_dependencies(
        self,
        task_groups: list[list[AgentTask]]
    ) -> list[list[AgentResult]]:
        """
        Execute task groups sequentially, with tasks within each group running in parallel.
        
        Args:
            task_groups: List of task groups. Each group runs after the previous completes.
            
        Returns:
            List of result groups corresponding to task groups
        """
        all_results = []
        
        for i, group in enumerate(task_groups):
            logger.info(f"Executing task group {i + 1}/{len(task_groups)}")
            group_results = await self.execute_parallel(group)
            all_results.append(group_results)
        
        return all_results


def merge_results(results: list[AgentResult]) -> dict[str, Any]:
    """
    Merge multiple agent results into a combined output.
    
    Args:
        results: List of AgentResult objects
        
    Returns:
        Merged result dictionary
    """
    merged = {
        "total_agents": len(results),
        "successful": sum(1 for r in results if r.success),
        "failed": sum(1 for r in results if not r.success),
        "total_execution_time_ms": sum(r.execution_time_ms for r in results),
        "results": [],
        "errors": []
    }
    
    for result in results:
        if result.success:
            merged["results"].append({
                "agent": result.agent_role,
                "task_id": result.task_id,
                "output": result.output_data,
                "execution_time_ms": result.execution_time_ms
            })
        else:
            merged["errors"].append({
                "agent": result.agent_role,
                "task_id": result.task_id,
                "error": result.error
            })
    
    return merged