""" Parallel Agent Executor ======================== Provides parallel execution of multiple agents using asyncio. """ import asyncio from typing import Any, Callable, Awaitable from dataclasses import dataclass, field from datetime import datetime, timezone import logging from agents.base import BaseAgent from ledger.merkle import hash_leaf logging.basicConfig(level=logging.INFO) logger = logging.getLogger("ParallelExecutor") @dataclass class AgentTask: """Represents a task to be executed by an agent.""" agent: BaseAgent input_data: dict[str, Any] task_id: str = "" def __post_init__(self): if not self.task_id: self.task_id = f"task_{hash_leaf(str(self.input_data))[:8]}" @dataclass class AgentResult: """Represents the result of an agent execution.""" task_id: str agent_role: str input_data: dict[str, Any] output_data: dict[str, Any] success: bool error: str = "" execution_time_ms: float = 0 timestamp: str = "" def __post_init__(self): if not self.timestamp: self.timestamp = datetime.now(timezone.utc).isoformat() class ParallelExecutor: """ Executes multiple agents in parallel using asyncio.gather. Features: - Concurrent agent execution - Error handling per agent - Result aggregation - Execution timing """ def __init__(self, max_concurrency: int = 10): """ Initialize the parallel executor. Args: max_concurrency: Maximum number of concurrent agent executions """ self.max_concurrency = max_concurrency self.semaphore = asyncio.Semaphore(max_concurrency) async def execute_single(self, task: AgentTask) -> AgentResult: """ Execute a single agent task with semaphore control. Args: task: The agent task to execute Returns: AgentResult with execution details """ async with self.semaphore: start_time = asyncio.get_event_loop().time() try: output = await task.agent.run(task.input_data) execution_time = (asyncio.get_event_loop().time() - start_time) * 1000 return AgentResult( task_id=task.task_id, agent_role=task.agent.role, input_data=task.input_data, output_data=output, success=True, execution_time_ms=execution_time ) except Exception as e: execution_time = (asyncio.get_event_loop().time() - start_time) * 1000 logger.error(f"Agent {task.agent.role} failed: {e}") return AgentResult( task_id=task.task_id, agent_role=task.agent.role, input_data=task.input_data, output_data={}, success=False, error=str(e), execution_time_ms=execution_time ) async def execute_parallel( self, tasks: list[AgentTask], return_exceptions: bool = False ) -> list[AgentResult]: """ Execute multiple agent tasks in parallel. Args: tasks: List of AgentTask objects to execute return_exceptions: If True, exceptions are returned as results Returns: List of AgentResult objects """ logger.info(f"Executing {len(tasks)} tasks in parallel (max concurrency: {self.max_concurrency})") # Create coroutines for all tasks coroutines = [self.execute_single(task) for task in tasks] # Execute all in parallel results = await asyncio.gather(*coroutines, return_exceptions=return_exceptions) # Handle any unexpected exceptions processed_results = [] for i, result in enumerate(results): if isinstance(result, Exception): processed_results.append(AgentResult( task_id=tasks[i].task_id, agent_role=tasks[i].agent.role, input_data=tasks[i].input_data, output_data={}, success=False, error=str(result) )) else: processed_results.append(result) successful = sum(1 for r in processed_results if r.success) logger.info(f"Parallel execution complete: {successful}/{len(tasks)} successful") return processed_results async def execute_with_dependencies( self, task_groups: list[list[AgentTask]] ) -> list[list[AgentResult]]: """ Execute task groups sequentially, with tasks within each group running in parallel. Args: task_groups: List of task groups. Each group runs after the previous completes. Returns: List of result groups corresponding to task groups """ all_results = [] for i, group in enumerate(task_groups): logger.info(f"Executing task group {i + 1}/{len(task_groups)}") group_results = await self.execute_parallel(group) all_results.append(group_results) return all_results def merge_results(results: list[AgentResult]) -> dict[str, Any]: """ Merge multiple agent results into a combined output. Args: results: List of AgentResult objects Returns: Merged result dictionary """ merged = { "total_agents": len(results), "successful": sum(1 for r in results if r.success), "failed": sum(1 for r in results if not r.success), "total_execution_time_ms": sum(r.execution_time_ms for r in results), "results": [], "errors": [] } for result in results: if result.success: merged["results"].append({ "agent": result.agent_role, "task_id": result.task_id, "output": result.output_data, "execution_time_ms": result.execution_time_ms }) else: merged["errors"].append({ "agent": result.agent_role, "task_id": result.task_id, "error": result.error }) return merged