Spaces:
Build error
Build error
| """ | |
| Parallel Executor for concurrent task execution | |
| This module provides parallel execution capabilities for tools and agents, | |
| enabling efficient concurrent processing of multiple tasks. | |
| """ | |
| import asyncio | |
| import logging | |
| from typing import Dict, Any, List, Optional, Tuple, Callable | |
| from dataclasses import dataclass | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| import time | |
| from src.unified_architecture.core import IUnifiedAgent, UnifiedTask, TaskResult | |
| class ExecutionResult: | |
| """Result of parallel execution""" | |
| success: bool | |
| result: Any | |
| execution_time: float | |
| error: Optional[str] = None | |
| metadata: Dict[str, Any] = None | |
| class ParallelExecutor: | |
| """ | |
| Parallel execution engine for tools and agents. | |
| This class provides efficient concurrent execution of multiple tasks, | |
| with support for both async and sync operations, resource management, | |
| and error handling. | |
| """ | |
| def __init__(self, max_workers: int = 4, max_concurrent: int = 10): | |
| self.max_workers = max_workers | |
| self.max_concurrent = max_concurrent | |
| self.semaphore = asyncio.Semaphore(max_concurrent) | |
| self.thread_pool = ThreadPoolExecutor(max_workers=max_workers) | |
| self.active_tasks: Dict[str, asyncio.Task] = {} | |
| self.logger = logging.getLogger(__name__) | |
| async def execute_tools_parallel( | |
| self, | |
| tools: List[Callable], | |
| inputs: List[Dict[str, Any]] | |
| ) -> List[Tuple[bool, Any]]: | |
| """ | |
| Execute tools in parallel. | |
| Args: | |
| tools: List of tool functions to execute | |
| inputs: List of input dictionaries for each tool | |
| Returns: | |
| List of (success, result) tuples | |
| """ | |
| if len(tools) != len(inputs): | |
| raise ValueError("Number of tools must match number of inputs") | |
| async def execute_single_tool(tool: Callable, input_data: Dict[str, Any]) -> Tuple[bool, Any]: | |
| async with self.semaphore: | |
| start_time = time.time() | |
| try: | |
| if asyncio.iscoroutinefunction(tool): | |
| result = await tool(**input_data) | |
| else: | |
| # Run sync function in thread pool | |
| loop = asyncio.get_event_loop() | |
| result = await loop.run_in_executor(self.thread_pool, tool, **input_data) | |
| execution_time = time.time() - start_time | |
| self.logger.debug(f"Tool executed successfully in {execution_time:.3f}s") | |
| return True, result | |
| except Exception as e: | |
| execution_time = time.time() - start_time | |
| self.logger.error(f"Tool execution failed: {e}") | |
| return False, str(e) | |
| # Create tasks for all tools | |
| tasks = [execute_single_tool(tool, input_data) for tool, input_data in zip(tools, inputs)] | |
| # Execute all tasks concurrently | |
| results = await asyncio.gather(*tasks, return_exceptions=True) | |
| # Process results | |
| processed_results = [] | |
| for result in results: | |
| if isinstance(result, Exception): | |
| processed_results.append((False, str(result))) | |
| else: | |
| processed_results.append(result) | |
| return processed_results | |
| async def execute_agents_parallel( | |
| self, | |
| agents: List[IUnifiedAgent], | |
| tasks: List[UnifiedTask], | |
| max_concurrent: Optional[int] = None | |
| ) -> List[Tuple[str, Dict[str, Any]]]: | |
| """ | |
| Execute agents in parallel. | |
| Args: | |
| agents: List of agents to execute | |
| tasks: List of tasks to execute | |
| max_concurrent: Maximum concurrent executions (overrides default) | |
| Returns: | |
| List of (agent_id, result) tuples | |
| """ | |
| if len(agents) != len(tasks): | |
| raise ValueError("Number of agents must match number of tasks") | |
| # Use provided max_concurrent or default | |
| semaphore = asyncio.Semaphore(max_concurrent or self.max_concurrent) | |
| async def execute_single_agent(agent: IUnifiedAgent, task: UnifiedTask) -> Tuple[str, Dict[str, Any]]: | |
| async with semaphore: | |
| start_time = time.time() | |
| try: | |
| result = await agent.execute(task) | |
| execution_time = time.time() - start_time | |
| # Convert result to dict if it's a TaskResult | |
| if hasattr(result, '__dict__'): | |
| result_dict = result.__dict__ | |
| else: | |
| result_dict = {"result": result} | |
| result_dict["execution_time"] = execution_time | |
| result_dict["agent_id"] = agent.agent_id | |
| self.logger.debug(f"Agent {agent.agent_id} executed task {task.task_id} in {execution_time:.3f}s") | |
| return agent.agent_id, result_dict | |
| except Exception as e: | |
| execution_time = time.time() - start_time | |
| self.logger.error(f"Agent {agent.agent_id} failed to execute task {task.task_id}: {e}") | |
| return agent.agent_id, { | |
| "error": str(e), | |
| "execution_time": execution_time, | |
| "agent_id": agent.agent_id | |
| } | |
| # Create tasks for all agents | |
| tasks = [execute_single_agent(agent, task) for agent, task in zip(agents, tasks)] | |
| # Execute all tasks concurrently | |
| results = await asyncio.gather(*tasks, return_exceptions=True) | |
| # Process results | |
| processed_results = [] | |
| for result in results: | |
| if isinstance(result, Exception): | |
| processed_results.append(("unknown", {"error": str(result)})) | |
| else: | |
| processed_results.append(result) | |
| return processed_results | |
| async def map_reduce( | |
| self, | |
| map_func: Callable, | |
| reduce_func: Callable, | |
| items: List[Any] | |
| ) -> Any: | |
| """ | |
| Execute map-reduce pattern. | |
| Args: | |
| map_func: Function to apply to each item | |
| reduce_func: Function to combine results | |
| items: List of items to process | |
| Returns: | |
| Reduced result | |
| """ | |
| async def map_item(item: Any) -> Any: | |
| async with self.semaphore: | |
| try: | |
| if asyncio.iscoroutinefunction(map_func): | |
| return await map_func(item) | |
| else: | |
| # Run sync function in thread pool | |
| loop = asyncio.get_event_loop() | |
| return await loop.run_in_executor(self.thread_pool, map_func, item) | |
| except Exception as e: | |
| self.logger.error(f"Map function failed for item {item}: {e}") | |
| raise | |
| # Map phase - execute map function on all items | |
| map_tasks = [map_item(item) for item in items] | |
| map_results = await asyncio.gather(*map_tasks, return_exceptions=True) | |
| # Filter out exceptions | |
| valid_results = [] | |
| for result in map_results: | |
| if isinstance(result, Exception): | |
| self.logger.warning(f"Map operation failed: {result}") | |
| else: | |
| valid_results.append(result) | |
| # Reduce phase - combine results | |
| if not valid_results: | |
| raise ValueError("No valid results from map phase") | |
| return reduce_func(valid_results) | |
| async def execute_with_timeout( | |
| self, | |
| func: Callable, | |
| timeout: float, | |
| *args, | |
| **kwargs | |
| ) -> ExecutionResult: | |
| """ | |
| Execute a function with timeout. | |
| Args: | |
| func: Function to execute | |
| timeout: Timeout in seconds | |
| *args: Function arguments | |
| **kwargs: Function keyword arguments | |
| Returns: | |
| ExecutionResult with success status and result | |
| """ | |
| start_time = time.time() | |
| try: | |
| if asyncio.iscoroutinefunction(func): | |
| result = await asyncio.wait_for(func(*args, **kwargs), timeout=timeout) | |
| else: | |
| # Run sync function in thread pool with timeout | |
| loop = asyncio.get_event_loop() | |
| result = await asyncio.wait_for( | |
| loop.run_in_executor(self.thread_pool, func, *args, **kwargs), | |
| timeout=timeout | |
| ) | |
| execution_time = time.time() - start_time | |
| return ExecutionResult( | |
| success=True, | |
| result=result, | |
| execution_time=execution_time | |
| ) | |
| except asyncio.TimeoutError: | |
| execution_time = time.time() - start_time | |
| return ExecutionResult( | |
| success=False, | |
| result=None, | |
| execution_time=execution_time, | |
| error=f"Execution timed out after {timeout}s" | |
| ) | |
| except Exception as e: | |
| execution_time = time.time() - start_time | |
| return ExecutionResult( | |
| success=False, | |
| result=None, | |
| execution_time=execution_time, | |
| error=str(e) | |
| ) | |
| async def batch_execute( | |
| self, | |
| func: Callable, | |
| items: List[Any], | |
| batch_size: int = 10, | |
| timeout: Optional[float] = None | |
| ) -> List[ExecutionResult]: | |
| """ | |
| Execute function on items in batches. | |
| Args: | |
| func: Function to execute | |
| items: List of items to process | |
| batch_size: Number of items to process concurrently | |
| timeout: Timeout per execution | |
| Returns: | |
| List of ExecutionResult objects | |
| """ | |
| results = [] | |
| # Process items in batches | |
| for i in range(0, len(items), batch_size): | |
| batch = items[i:i + batch_size] | |
| # Create tasks for batch | |
| tasks = [] | |
| for item in batch: | |
| if timeout: | |
| task = self.execute_with_timeout(func, timeout, item) | |
| else: | |
| task = self.execute_single_item(func, item) | |
| tasks.append(task) | |
| # Execute batch | |
| batch_results = await asyncio.gather(*tasks, return_exceptions=True) | |
| # Process batch results | |
| for result in batch_results: | |
| if isinstance(result, Exception): | |
| results.append(ExecutionResult( | |
| success=False, | |
| result=None, | |
| execution_time=0.0, | |
| error=str(result) | |
| )) | |
| else: | |
| results.append(result) | |
| return results | |
| async def execute_single_item(self, func: Callable, item: Any) -> ExecutionResult: | |
| """Execute function on a single item.""" | |
| start_time = time.time() | |
| try: | |
| if asyncio.iscoroutinefunction(func): | |
| result = await func(item) | |
| else: | |
| loop = asyncio.get_event_loop() | |
| result = await loop.run_in_executor(self.thread_pool, func, item) | |
| execution_time = time.time() - start_time | |
| return ExecutionResult( | |
| success=True, | |
| result=result, | |
| execution_time=execution_time | |
| ) | |
| except Exception as e: | |
| execution_time = time.time() - start_time | |
| return ExecutionResult( | |
| success=False, | |
| result=None, | |
| execution_time=execution_time, | |
| error=str(e) | |
| ) | |
| def get_stats(self) -> Dict[str, Any]: | |
| """Get execution statistics.""" | |
| return { | |
| "max_workers": self.max_workers, | |
| "max_concurrent": self.max_concurrent, | |
| "active_tasks": len(self.active_tasks), | |
| "semaphore_value": self.semaphore._value, | |
| "thread_pool_active": len(self.thread_pool._threads) | |
| } | |
| def shutdown(self, wait: bool = True): | |
| """Shutdown the executor.""" | |
| # Cancel any remaining tasks | |
| for task in self.active_tasks.values(): | |
| if not task.done(): | |
| task.cancel() | |
| # Shutdown thread pool | |
| self.thread_pool.shutdown(wait=wait) | |
| self.logger.info("ParallelExecutor shutdown complete") | |
| async def __aenter__(self): | |
| """Async context manager entry.""" | |
| return self | |
| async def __aexit__(self, exc_type, exc_val, exc_tb): | |
| """Async context manager exit.""" | |
| self.shutdown() | |
| def __enter__(self): | |
| """Context manager entry.""" | |
| return self | |
| def __exit__(self, exc_type, exc_val, exc_tb): | |
| """Context manager exit.""" | |
| self.shutdown() | |
| # Enhanced FSM Agent with parallel tool execution | |
| class ParallelFSMReactAgent: | |
| """FSM React Agent with parallel tool execution capabilities""" | |
| def __init__(self, tools: List[Any], max_parallel_tools: int = 5): | |
| self.tools = tools | |
| self.parallel_executor = ParallelExecutor(max_workers=max_parallel_tools) | |
| self.logger = logging.getLogger(__name__) | |
| async def execute_tools_parallel( | |
| self, | |
| tool_calls: List[Dict[str, Any]] | |
| ) -> List[Dict[str, Any]]: | |
| """Execute multiple tool calls in parallel | |
| Args: | |
| tool_calls: List of dicts with 'tool_name' and 'arguments' | |
| Returns: | |
| List of results | |
| """ | |
| # Group tools and inputs | |
| tools = [] | |
| inputs = [] | |
| for call in tool_calls: | |
| tool_name = call['tool_name'] | |
| arguments = call.get('arguments', {}) | |
| # Find tool by name | |
| tool = next((t for t in self.tools if t.name == tool_name), None) | |
| if not tool: | |
| self.logger.warning(f"Tool {tool_name} not found") | |
| continue | |
| tools.append(tool.func) | |
| inputs.append(arguments) | |
| if not tools: | |
| return [] | |
| # Execute in parallel | |
| results = await self.parallel_executor.execute_tools_parallel( | |
| tools, inputs, timeout=30.0 | |
| ) | |
| # Format results | |
| formatted_results = [] | |
| for i, (success, result) in enumerate(results): | |
| formatted_results.append({ | |
| "tool_name": tool_calls[i]['tool_name'], | |
| "success": success, | |
| "result": result if success else None, | |
| "error": result if not success else None | |
| }) | |
| return formatted_results | |
| # Example usage | |
| async def example_parallel_execution(): | |
| """Example of parallel tool execution""" | |
| # Create parallel executor | |
| executor = ParallelExecutor(max_workers=10) | |
| # Define some mock tools | |
| async def web_search(query: str) -> str: | |
| await asyncio.sleep(1) # Simulate API call | |
| return f"Search results for: {query}" | |
| async def calculate(expression: str) -> float: | |
| await asyncio.sleep(0.5) # Simulate calculation | |
| return eval(expression) # Note: unsafe in production | |
| async def analyze_text(text: str) -> Dict[str, Any]: | |
| await asyncio.sleep(2) # Simulate analysis | |
| return {"length": len(text), "words": len(text.split())} | |
| # Execute tools in parallel | |
| tools = [web_search, calculate, analyze_text] | |
| inputs = [ | |
| {"query": "parallel execution python"}, | |
| {"expression": "2 + 2 * 3"}, | |
| {"text": "This is a sample text for analysis"} | |
| ] | |
| results = await executor.execute_tools_parallel(tools, inputs) | |
| for (success, result) in results: | |
| if success: | |
| print(f"Result: {result}") | |
| else: | |
| print(f"Error: {result}") | |
| # Map-reduce example | |
| async def process_item(item: int) -> int: | |
| await asyncio.sleep(0.1) | |
| return item * item | |
| def sum_results(results: List[int]) -> int: | |
| return sum(results) | |
| items = list(range(100)) | |
| final_result = await executor.map_reduce( | |
| process_item, sum_results, items | |
| ) | |
| print(f"Sum of squares: {final_result}") | |
| # Cleanup | |
| executor.shutdown() |