AgentMask / src /agents /parallel.py
b2230765034
stage4: ledger persistence, signing, parallel agents, UI improvements
631d977
"""
Parallel Agent Executor
========================
Provides parallel execution of multiple agents using asyncio.
"""
import asyncio
from typing import Any, Callable, Awaitable
from dataclasses import dataclass, field
from datetime import datetime, timezone
import logging
from agents.base import BaseAgent
from ledger.merkle import hash_leaf
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("ParallelExecutor")
@dataclass
class AgentTask:
"""Represents a task to be executed by an agent."""
agent: BaseAgent
input_data: dict[str, Any]
task_id: str = ""
def __post_init__(self):
if not self.task_id:
self.task_id = f"task_{hash_leaf(str(self.input_data))[:8]}"
@dataclass
class AgentResult:
"""Represents the result of an agent execution."""
task_id: str
agent_role: str
input_data: dict[str, Any]
output_data: dict[str, Any]
success: bool
error: str = ""
execution_time_ms: float = 0
timestamp: str = ""
def __post_init__(self):
if not self.timestamp:
self.timestamp = datetime.now(timezone.utc).isoformat()
class ParallelExecutor:
"""
Executes multiple agents in parallel using asyncio.gather.
Features:
- Concurrent agent execution
- Error handling per agent
- Result aggregation
- Execution timing
"""
def __init__(self, max_concurrency: int = 10):
"""
Initialize the parallel executor.
Args:
max_concurrency: Maximum number of concurrent agent executions
"""
self.max_concurrency = max_concurrency
self.semaphore = asyncio.Semaphore(max_concurrency)
async def execute_single(self, task: AgentTask) -> AgentResult:
"""
Execute a single agent task with semaphore control.
Args:
task: The agent task to execute
Returns:
AgentResult with execution details
"""
async with self.semaphore:
start_time = asyncio.get_event_loop().time()
try:
output = await task.agent.run(task.input_data)
execution_time = (asyncio.get_event_loop().time() - start_time) * 1000
return AgentResult(
task_id=task.task_id,
agent_role=task.agent.role,
input_data=task.input_data,
output_data=output,
success=True,
execution_time_ms=execution_time
)
except Exception as e:
execution_time = (asyncio.get_event_loop().time() - start_time) * 1000
logger.error(f"Agent {task.agent.role} failed: {e}")
return AgentResult(
task_id=task.task_id,
agent_role=task.agent.role,
input_data=task.input_data,
output_data={},
success=False,
error=str(e),
execution_time_ms=execution_time
)
async def execute_parallel(
self,
tasks: list[AgentTask],
return_exceptions: bool = False
) -> list[AgentResult]:
"""
Execute multiple agent tasks in parallel.
Args:
tasks: List of AgentTask objects to execute
return_exceptions: If True, exceptions are returned as results
Returns:
List of AgentResult objects
"""
logger.info(f"Executing {len(tasks)} tasks in parallel (max concurrency: {self.max_concurrency})")
# Create coroutines for all tasks
coroutines = [self.execute_single(task) for task in tasks]
# Execute all in parallel
results = await asyncio.gather(*coroutines, return_exceptions=return_exceptions)
# Handle any unexpected exceptions
processed_results = []
for i, result in enumerate(results):
if isinstance(result, Exception):
processed_results.append(AgentResult(
task_id=tasks[i].task_id,
agent_role=tasks[i].agent.role,
input_data=tasks[i].input_data,
output_data={},
success=False,
error=str(result)
))
else:
processed_results.append(result)
successful = sum(1 for r in processed_results if r.success)
logger.info(f"Parallel execution complete: {successful}/{len(tasks)} successful")
return processed_results
async def execute_with_dependencies(
self,
task_groups: list[list[AgentTask]]
) -> list[list[AgentResult]]:
"""
Execute task groups sequentially, with tasks within each group running in parallel.
Args:
task_groups: List of task groups. Each group runs after the previous completes.
Returns:
List of result groups corresponding to task groups
"""
all_results = []
for i, group in enumerate(task_groups):
logger.info(f"Executing task group {i + 1}/{len(task_groups)}")
group_results = await self.execute_parallel(group)
all_results.append(group_results)
return all_results
def merge_results(results: list[AgentResult]) -> dict[str, Any]:
"""
Merge multiple agent results into a combined output.
Args:
results: List of AgentResult objects
Returns:
Merged result dictionary
"""
merged = {
"total_agents": len(results),
"successful": sum(1 for r in results if r.success),
"failed": sum(1 for r in results if not r.success),
"total_execution_time_ms": sum(r.execution_time_ms for r in results),
"results": [],
"errors": []
}
for result in results:
if result.success:
merged["results"].append({
"agent": result.agent_role,
"task_id": result.task_id,
"output": result.output_data,
"execution_time_ms": result.execution_time_ms
})
else:
merged["errors"].append({
"agent": result.agent_role,
"task_id": result.task_id,
"error": result.error
})
return merged