Spaces:
Sleeping
Sleeping
File size: 6,607 Bytes
631d977 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 |
"""
Parallel Agent Executor
========================
Provides parallel execution of multiple agents using asyncio.
"""
import asyncio
from typing import Any, Callable, Awaitable
from dataclasses import dataclass, field
from datetime import datetime, timezone
import logging
from agents.base import BaseAgent
from ledger.merkle import hash_leaf
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("ParallelExecutor")
@dataclass
class AgentTask:
"""Represents a task to be executed by an agent."""
agent: BaseAgent
input_data: dict[str, Any]
task_id: str = ""
def __post_init__(self):
if not self.task_id:
self.task_id = f"task_{hash_leaf(str(self.input_data))[:8]}"
@dataclass
class AgentResult:
"""Represents the result of an agent execution."""
task_id: str
agent_role: str
input_data: dict[str, Any]
output_data: dict[str, Any]
success: bool
error: str = ""
execution_time_ms: float = 0
timestamp: str = ""
def __post_init__(self):
if not self.timestamp:
self.timestamp = datetime.now(timezone.utc).isoformat()
class ParallelExecutor:
"""
Executes multiple agents in parallel using asyncio.gather.
Features:
- Concurrent agent execution
- Error handling per agent
- Result aggregation
- Execution timing
"""
def __init__(self, max_concurrency: int = 10):
"""
Initialize the parallel executor.
Args:
max_concurrency: Maximum number of concurrent agent executions
"""
self.max_concurrency = max_concurrency
self.semaphore = asyncio.Semaphore(max_concurrency)
async def execute_single(self, task: AgentTask) -> AgentResult:
"""
Execute a single agent task with semaphore control.
Args:
task: The agent task to execute
Returns:
AgentResult with execution details
"""
async with self.semaphore:
start_time = asyncio.get_event_loop().time()
try:
output = await task.agent.run(task.input_data)
execution_time = (asyncio.get_event_loop().time() - start_time) * 1000
return AgentResult(
task_id=task.task_id,
agent_role=task.agent.role,
input_data=task.input_data,
output_data=output,
success=True,
execution_time_ms=execution_time
)
except Exception as e:
execution_time = (asyncio.get_event_loop().time() - start_time) * 1000
logger.error(f"Agent {task.agent.role} failed: {e}")
return AgentResult(
task_id=task.task_id,
agent_role=task.agent.role,
input_data=task.input_data,
output_data={},
success=False,
error=str(e),
execution_time_ms=execution_time
)
async def execute_parallel(
self,
tasks: list[AgentTask],
return_exceptions: bool = False
) -> list[AgentResult]:
"""
Execute multiple agent tasks in parallel.
Args:
tasks: List of AgentTask objects to execute
return_exceptions: If True, exceptions are returned as results
Returns:
List of AgentResult objects
"""
logger.info(f"Executing {len(tasks)} tasks in parallel (max concurrency: {self.max_concurrency})")
# Create coroutines for all tasks
coroutines = [self.execute_single(task) for task in tasks]
# Execute all in parallel
results = await asyncio.gather(*coroutines, return_exceptions=return_exceptions)
# Handle any unexpected exceptions
processed_results = []
for i, result in enumerate(results):
if isinstance(result, Exception):
processed_results.append(AgentResult(
task_id=tasks[i].task_id,
agent_role=tasks[i].agent.role,
input_data=tasks[i].input_data,
output_data={},
success=False,
error=str(result)
))
else:
processed_results.append(result)
successful = sum(1 for r in processed_results if r.success)
logger.info(f"Parallel execution complete: {successful}/{len(tasks)} successful")
return processed_results
async def execute_with_dependencies(
self,
task_groups: list[list[AgentTask]]
) -> list[list[AgentResult]]:
"""
Execute task groups sequentially, with tasks within each group running in parallel.
Args:
task_groups: List of task groups. Each group runs after the previous completes.
Returns:
List of result groups corresponding to task groups
"""
all_results = []
for i, group in enumerate(task_groups):
logger.info(f"Executing task group {i + 1}/{len(task_groups)}")
group_results = await self.execute_parallel(group)
all_results.append(group_results)
return all_results
def merge_results(results: list[AgentResult]) -> dict[str, Any]:
"""
Merge multiple agent results into a combined output.
Args:
results: List of AgentResult objects
Returns:
Merged result dictionary
"""
merged = {
"total_agents": len(results),
"successful": sum(1 for r in results if r.success),
"failed": sum(1 for r in results if not r.success),
"total_execution_time_ms": sum(r.execution_time_ms for r in results),
"results": [],
"errors": []
}
for result in results:
if result.success:
merged["results"].append({
"agent": result.agent_role,
"task_id": result.task_id,
"output": result.output_data,
"execution_time_ms": result.execution_time_ms
})
else:
merged["errors"].append({
"agent": result.agent_role,
"task_id": result.task_id,
"error": result.error
})
return merged
|