Spaces:
Sleeping
Sleeping
| """ | |
| Parallel Agent Executor | |
| ======================== | |
| Provides parallel execution of multiple agents using asyncio. | |
| """ | |
| import asyncio | |
| from typing import Any, Callable, Awaitable | |
| from dataclasses import dataclass, field | |
| from datetime import datetime, timezone | |
| import logging | |
| from agents.base import BaseAgent | |
| from ledger.merkle import hash_leaf | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger("ParallelExecutor") | |
| class AgentTask: | |
| """Represents a task to be executed by an agent.""" | |
| agent: BaseAgent | |
| input_data: dict[str, Any] | |
| task_id: str = "" | |
| def __post_init__(self): | |
| if not self.task_id: | |
| self.task_id = f"task_{hash_leaf(str(self.input_data))[:8]}" | |
| class AgentResult: | |
| """Represents the result of an agent execution.""" | |
| task_id: str | |
| agent_role: str | |
| input_data: dict[str, Any] | |
| output_data: dict[str, Any] | |
| success: bool | |
| error: str = "" | |
| execution_time_ms: float = 0 | |
| timestamp: str = "" | |
| def __post_init__(self): | |
| if not self.timestamp: | |
| self.timestamp = datetime.now(timezone.utc).isoformat() | |
| class ParallelExecutor: | |
| """ | |
| Executes multiple agents in parallel using asyncio.gather. | |
| Features: | |
| - Concurrent agent execution | |
| - Error handling per agent | |
| - Result aggregation | |
| - Execution timing | |
| """ | |
| def __init__(self, max_concurrency: int = 10): | |
| """ | |
| Initialize the parallel executor. | |
| Args: | |
| max_concurrency: Maximum number of concurrent agent executions | |
| """ | |
| self.max_concurrency = max_concurrency | |
| self.semaphore = asyncio.Semaphore(max_concurrency) | |
| async def execute_single(self, task: AgentTask) -> AgentResult: | |
| """ | |
| Execute a single agent task with semaphore control. | |
| Args: | |
| task: The agent task to execute | |
| Returns: | |
| AgentResult with execution details | |
| """ | |
| async with self.semaphore: | |
| start_time = asyncio.get_event_loop().time() | |
| try: | |
| output = await task.agent.run(task.input_data) | |
| execution_time = (asyncio.get_event_loop().time() - start_time) * 1000 | |
| return AgentResult( | |
| task_id=task.task_id, | |
| agent_role=task.agent.role, | |
| input_data=task.input_data, | |
| output_data=output, | |
| success=True, | |
| execution_time_ms=execution_time | |
| ) | |
| except Exception as e: | |
| execution_time = (asyncio.get_event_loop().time() - start_time) * 1000 | |
| logger.error(f"Agent {task.agent.role} failed: {e}") | |
| return AgentResult( | |
| task_id=task.task_id, | |
| agent_role=task.agent.role, | |
| input_data=task.input_data, | |
| output_data={}, | |
| success=False, | |
| error=str(e), | |
| execution_time_ms=execution_time | |
| ) | |
| async def execute_parallel( | |
| self, | |
| tasks: list[AgentTask], | |
| return_exceptions: bool = False | |
| ) -> list[AgentResult]: | |
| """ | |
| Execute multiple agent tasks in parallel. | |
| Args: | |
| tasks: List of AgentTask objects to execute | |
| return_exceptions: If True, exceptions are returned as results | |
| Returns: | |
| List of AgentResult objects | |
| """ | |
| logger.info(f"Executing {len(tasks)} tasks in parallel (max concurrency: {self.max_concurrency})") | |
| # Create coroutines for all tasks | |
| coroutines = [self.execute_single(task) for task in tasks] | |
| # Execute all in parallel | |
| results = await asyncio.gather(*coroutines, return_exceptions=return_exceptions) | |
| # Handle any unexpected exceptions | |
| processed_results = [] | |
| for i, result in enumerate(results): | |
| if isinstance(result, Exception): | |
| processed_results.append(AgentResult( | |
| task_id=tasks[i].task_id, | |
| agent_role=tasks[i].agent.role, | |
| input_data=tasks[i].input_data, | |
| output_data={}, | |
| success=False, | |
| error=str(result) | |
| )) | |
| else: | |
| processed_results.append(result) | |
| successful = sum(1 for r in processed_results if r.success) | |
| logger.info(f"Parallel execution complete: {successful}/{len(tasks)} successful") | |
| return processed_results | |
| async def execute_with_dependencies( | |
| self, | |
| task_groups: list[list[AgentTask]] | |
| ) -> list[list[AgentResult]]: | |
| """ | |
| Execute task groups sequentially, with tasks within each group running in parallel. | |
| Args: | |
| task_groups: List of task groups. Each group runs after the previous completes. | |
| Returns: | |
| List of result groups corresponding to task groups | |
| """ | |
| all_results = [] | |
| for i, group in enumerate(task_groups): | |
| logger.info(f"Executing task group {i + 1}/{len(task_groups)}") | |
| group_results = await self.execute_parallel(group) | |
| all_results.append(group_results) | |
| return all_results | |
| def merge_results(results: list[AgentResult]) -> dict[str, Any]: | |
| """ | |
| Merge multiple agent results into a combined output. | |
| Args: | |
| results: List of AgentResult objects | |
| Returns: | |
| Merged result dictionary | |
| """ | |
| merged = { | |
| "total_agents": len(results), | |
| "successful": sum(1 for r in results if r.success), | |
| "failed": sum(1 for r in results if not r.success), | |
| "total_execution_time_ms": sum(r.execution_time_ms for r in results), | |
| "results": [], | |
| "errors": [] | |
| } | |
| for result in results: | |
| if result.success: | |
| merged["results"].append({ | |
| "agent": result.agent_role, | |
| "task_id": result.task_id, | |
| "output": result.output_data, | |
| "execution_time_ms": result.execution_time_ms | |
| }) | |
| else: | |
| merged["errors"].append({ | |
| "agent": result.agent_role, | |
| "task_id": result.task_id, | |
| "error": result.error | |
| }) | |
| return merged | |