File size: 17,139 Bytes
8a682b5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
"""
Parallel Executor for concurrent task execution

This module provides parallel execution capabilities for tools and agents,
enabling efficient concurrent processing of multiple tasks.
"""

import asyncio
import logging
from typing import Dict, Any, List, Optional, Tuple, Callable
from dataclasses import dataclass
from concurrent.futures import ThreadPoolExecutor, as_completed
import time

from src.unified_architecture.core import IUnifiedAgent, UnifiedTask, TaskResult


@dataclass
class ExecutionResult:
    """Result of parallel execution"""
    success: bool
    result: Any
    execution_time: float
    error: Optional[str] = None
    metadata: Dict[str, Any] = None


class ParallelExecutor:
    """
    Parallel execution engine for tools and agents.
    
    This class provides efficient concurrent execution of multiple tasks,
    with support for both async and sync operations, resource management,
    and error handling.
    """
    
    def __init__(self, max_workers: int = 4, max_concurrent: int = 10):
        self.max_workers = max_workers
        self.max_concurrent = max_concurrent
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.thread_pool = ThreadPoolExecutor(max_workers=max_workers)
        self.active_tasks: Dict[str, asyncio.Task] = {}
        self.logger = logging.getLogger(__name__)
        
    async def execute_tools_parallel(
        self, 
        tools: List[Callable], 
        inputs: List[Dict[str, Any]]
    ) -> List[Tuple[bool, Any]]:
        """
        Execute tools in parallel.
        
        Args:
            tools: List of tool functions to execute
            inputs: List of input dictionaries for each tool
            
        Returns:
            List of (success, result) tuples
        """
        if len(tools) != len(inputs):
            raise ValueError("Number of tools must match number of inputs")
        
        async def execute_single_tool(tool: Callable, input_data: Dict[str, Any]) -> Tuple[bool, Any]:
            async with self.semaphore:
                start_time = time.time()
                try:
                    if asyncio.iscoroutinefunction(tool):
                        result = await tool(**input_data)
                    else:
                        # Run sync function in thread pool
                        loop = asyncio.get_event_loop()
                        result = await loop.run_in_executor(self.thread_pool, tool, **input_data)
                    
                    execution_time = time.time() - start_time
                    self.logger.debug(f"Tool executed successfully in {execution_time:.3f}s")
                    return True, result
                    
                except Exception as e:
                    execution_time = time.time() - start_time
                    self.logger.error(f"Tool execution failed: {e}")
                    return False, str(e)
        
        # Create tasks for all tools
        tasks = [execute_single_tool(tool, input_data) for tool, input_data in zip(tools, inputs)]
        
        # Execute all tasks concurrently
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        # Process results
        processed_results = []
        for result in results:
            if isinstance(result, Exception):
                processed_results.append((False, str(result)))
            else:
                processed_results.append(result)
        
        return processed_results
    
    async def execute_agents_parallel(
        self, 
        agents: List[IUnifiedAgent], 
        tasks: List[UnifiedTask], 
        max_concurrent: Optional[int] = None
    ) -> List[Tuple[str, Dict[str, Any]]]:
        """
        Execute agents in parallel.
        
        Args:
            agents: List of agents to execute
            tasks: List of tasks to execute
            max_concurrent: Maximum concurrent executions (overrides default)
            
        Returns:
            List of (agent_id, result) tuples
        """
        if len(agents) != len(tasks):
            raise ValueError("Number of agents must match number of tasks")
        
        # Use provided max_concurrent or default
        semaphore = asyncio.Semaphore(max_concurrent or self.max_concurrent)
        
        async def execute_single_agent(agent: IUnifiedAgent, task: UnifiedTask) -> Tuple[str, Dict[str, Any]]:
            async with semaphore:
                start_time = time.time()
                try:
                    result = await agent.execute(task)
                    execution_time = time.time() - start_time
                    
                    # Convert result to dict if it's a TaskResult
                    if hasattr(result, '__dict__'):
                        result_dict = result.__dict__
                    else:
                        result_dict = {"result": result}
                    
                    result_dict["execution_time"] = execution_time
                    result_dict["agent_id"] = agent.agent_id
                    
                    self.logger.debug(f"Agent {agent.agent_id} executed task {task.task_id} in {execution_time:.3f}s")
                    return agent.agent_id, result_dict
                    
                except Exception as e:
                    execution_time = time.time() - start_time
                    self.logger.error(f"Agent {agent.agent_id} failed to execute task {task.task_id}: {e}")
                    return agent.agent_id, {
                        "error": str(e),
                        "execution_time": execution_time,
                        "agent_id": agent.agent_id
                    }
        
        # Create tasks for all agents
        tasks = [execute_single_agent(agent, task) for agent, task in zip(agents, tasks)]
        
        # Execute all tasks concurrently
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        # Process results
        processed_results = []
        for result in results:
            if isinstance(result, Exception):
                processed_results.append(("unknown", {"error": str(result)}))
            else:
                processed_results.append(result)
        
        return processed_results
    
    async def map_reduce(
        self, 
        map_func: Callable, 
        reduce_func: Callable, 
        items: List[Any]
    ) -> Any:
        """
        Execute map-reduce pattern.
        
        Args:
            map_func: Function to apply to each item
            reduce_func: Function to combine results
            items: List of items to process
            
        Returns:
            Reduced result
        """
        async def map_item(item: Any) -> Any:
            async with self.semaphore:
                try:
                    if asyncio.iscoroutinefunction(map_func):
                        return await map_func(item)
                    else:
                        # Run sync function in thread pool
                        loop = asyncio.get_event_loop()
                        return await loop.run_in_executor(self.thread_pool, map_func, item)
                except Exception as e:
                    self.logger.error(f"Map function failed for item {item}: {e}")
                    raise
        
        # Map phase - execute map function on all items
        map_tasks = [map_item(item) for item in items]
        map_results = await asyncio.gather(*map_tasks, return_exceptions=True)
        
        # Filter out exceptions
        valid_results = []
        for result in map_results:
            if isinstance(result, Exception):
                self.logger.warning(f"Map operation failed: {result}")
            else:
                valid_results.append(result)
        
        # Reduce phase - combine results
        if not valid_results:
            raise ValueError("No valid results from map phase")
        
        return reduce_func(valid_results)
    
    async def execute_with_timeout(
        self, 
        func: Callable, 
        timeout: float, 
        *args, 
        **kwargs
    ) -> ExecutionResult:
        """
        Execute a function with timeout.
        
        Args:
            func: Function to execute
            timeout: Timeout in seconds
            *args: Function arguments
            **kwargs: Function keyword arguments
            
        Returns:
            ExecutionResult with success status and result
        """
        start_time = time.time()
        
        try:
            if asyncio.iscoroutinefunction(func):
                result = await asyncio.wait_for(func(*args, **kwargs), timeout=timeout)
            else:
                # Run sync function in thread pool with timeout
                loop = asyncio.get_event_loop()
                result = await asyncio.wait_for(
                    loop.run_in_executor(self.thread_pool, func, *args, **kwargs),
                    timeout=timeout
                )
            
            execution_time = time.time() - start_time
            return ExecutionResult(
                success=True,
                result=result,
                execution_time=execution_time
            )
            
        except asyncio.TimeoutError:
            execution_time = time.time() - start_time
            return ExecutionResult(
                success=False,
                result=None,
                execution_time=execution_time,
                error=f"Execution timed out after {timeout}s"
            )
        except Exception as e:
            execution_time = time.time() - start_time
            return ExecutionResult(
                success=False,
                result=None,
                execution_time=execution_time,
                error=str(e)
            )
    
    async def batch_execute(
        self, 
        func: Callable, 
        items: List[Any], 
        batch_size: int = 10,
        timeout: Optional[float] = None
    ) -> List[ExecutionResult]:
        """
        Execute function on items in batches.
        
        Args:
            func: Function to execute
            items: List of items to process
            batch_size: Number of items to process concurrently
            timeout: Timeout per execution
            
        Returns:
            List of ExecutionResult objects
        """
        results = []
        
        # Process items in batches
        for i in range(0, len(items), batch_size):
            batch = items[i:i + batch_size]
            
            # Create tasks for batch
            tasks = []
            for item in batch:
                if timeout:
                    task = self.execute_with_timeout(func, timeout, item)
                else:
                    task = self.execute_single_item(func, item)
                tasks.append(task)
            
            # Execute batch
            batch_results = await asyncio.gather(*tasks, return_exceptions=True)
            
            # Process batch results
            for result in batch_results:
                if isinstance(result, Exception):
                    results.append(ExecutionResult(
                        success=False,
                        result=None,
                        execution_time=0.0,
                        error=str(result)
                    ))
                else:
                    results.append(result)
        
        return results
    
    async def execute_single_item(self, func: Callable, item: Any) -> ExecutionResult:
        """Execute function on a single item."""
        start_time = time.time()
        
        try:
            if asyncio.iscoroutinefunction(func):
                result = await func(item)
            else:
                loop = asyncio.get_event_loop()
                result = await loop.run_in_executor(self.thread_pool, func, item)
            
            execution_time = time.time() - start_time
            return ExecutionResult(
                success=True,
                result=result,
                execution_time=execution_time
            )
            
        except Exception as e:
            execution_time = time.time() - start_time
            return ExecutionResult(
                success=False,
                result=None,
                execution_time=execution_time,
                error=str(e)
            )
    
    def get_stats(self) -> Dict[str, Any]:
        """Get execution statistics."""
        return {
            "max_workers": self.max_workers,
            "max_concurrent": self.max_concurrent,
            "active_tasks": len(self.active_tasks),
            "semaphore_value": self.semaphore._value,
            "thread_pool_active": len(self.thread_pool._threads)
        }
    
    def shutdown(self, wait: bool = True):
        """Shutdown the executor."""
        # Cancel any remaining tasks
        for task in self.active_tasks.values():
            if not task.done():
                task.cancel()
        
        # Shutdown thread pool
        self.thread_pool.shutdown(wait=wait)
        
        self.logger.info("ParallelExecutor shutdown complete")
    
    async def __aenter__(self):
        """Async context manager entry."""
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """Async context manager exit."""
        self.shutdown()
    
    def __enter__(self):
        """Context manager entry."""
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        """Context manager exit."""
        self.shutdown()


# Enhanced FSM Agent with parallel tool execution
class ParallelFSMReactAgent:
    """FSM React Agent with parallel tool execution capabilities"""
    
    def __init__(self, tools: List[Any], max_parallel_tools: int = 5):
        self.tools = tools
        self.parallel_executor = ParallelExecutor(max_workers=max_parallel_tools)
        self.logger = logging.getLogger(__name__)
    
    async def execute_tools_parallel(
        self,
        tool_calls: List[Dict[str, Any]]
    ) -> List[Dict[str, Any]]:
        """Execute multiple tool calls in parallel
        
        Args:
            tool_calls: List of dicts with 'tool_name' and 'arguments'
        
        Returns:
            List of results
        """
        
        # Group tools and inputs
        tools = []
        inputs = []
        
        for call in tool_calls:
            tool_name = call['tool_name']
            arguments = call.get('arguments', {})
            
            # Find tool by name
            tool = next((t for t in self.tools if t.name == tool_name), None)
            if not tool:
                self.logger.warning(f"Tool {tool_name} not found")
                continue
            
            tools.append(tool.func)
            inputs.append(arguments)
        
        if not tools:
            return []
        
        # Execute in parallel
        results = await self.parallel_executor.execute_tools_parallel(
            tools, inputs, timeout=30.0
        )
        
        # Format results
        formatted_results = []
        for i, (success, result) in enumerate(results):
            formatted_results.append({
                "tool_name": tool_calls[i]['tool_name'],
                "success": success,
                "result": result if success else None,
                "error": result if not success else None
            })
        
        return formatted_results


# Example usage
async def example_parallel_execution():
    """Example of parallel tool execution"""
    
    # Create parallel executor
    executor = ParallelExecutor(max_workers=10)
    
    # Define some mock tools
    async def web_search(query: str) -> str:
        await asyncio.sleep(1)  # Simulate API call
        return f"Search results for: {query}"
    
    async def calculate(expression: str) -> float:
        await asyncio.sleep(0.5)  # Simulate calculation
        return eval(expression)  # Note: unsafe in production
    
    async def analyze_text(text: str) -> Dict[str, Any]:
        await asyncio.sleep(2)  # Simulate analysis
        return {"length": len(text), "words": len(text.split())}
    
    # Execute tools in parallel
    tools = [web_search, calculate, analyze_text]
    inputs = [
        {"query": "parallel execution python"},
        {"expression": "2 + 2 * 3"},
        {"text": "This is a sample text for analysis"}
    ]
    
    results = await executor.execute_tools_parallel(tools, inputs)
    
    for (success, result) in results:
        if success:
            print(f"Result: {result}")
        else:
            print(f"Error: {result}")
    
    # Map-reduce example
    async def process_item(item: int) -> int:
        await asyncio.sleep(0.1)
        return item * item
    
    def sum_results(results: List[int]) -> int:
        return sum(results)
    
    items = list(range(100))
    final_result = await executor.map_reduce(
        process_item, sum_results, items
    )
    print(f"Sum of squares: {final_result}")
    
    # Cleanup
    executor.shutdown()