# utils/parallel_executor.py """ Parallel execution engine for agent components Significantly speeds up multi-agent workflows """ import concurrent.futures import asyncio import time from typing import List, Dict, Any, Callable, Optional from dataclasses import dataclass from enum import Enum class ExecutionMode(Enum): SEQUENTIAL = "sequential" THREADED = "threaded" PROCESS = "process" ASYNC = "async" @dataclass class TaskResult: """Result container for parallel tasks""" task_id: str success: bool result: Any error: Optional[str] execution_time: float agent_name: str class ParallelExecutor: """ Advanced parallel execution engine for agent workflows Optimizes execution based on task characteristics """ def __init__(self, max_workers: int = 4, mode: ExecutionMode = ExecutionMode.THREADED): self.max_workers = max_workers self.mode = mode self.execution_stats = { 'total_tasks': 0, 'successful_tasks': 0, 'failed_tasks': 0, 'total_execution_time': 0.0 } def execute_parallel(self, tasks: List[Dict[str, Any]]) -> Dict[str, TaskResult]: """ Execute multiple tasks in parallel tasks: List of dicts with 'id', 'function', 'args', 'kwargs', 'agent_name' """ if not tasks: return {} print(f"๐Ÿš€ Executing {len(tasks)} tasks in {self.mode.value} mode") start_time = time.time() results = {} if self.mode == ExecutionMode.SEQUENTIAL: results = self._execute_sequential(tasks) elif self.mode == ExecutionMode.THREADED: results = self._execute_threaded(tasks) elif self.mode == ExecutionMode.PROCESS: results = self._execute_process(tasks) elif self.mode == ExecutionMode.ASYNC: results = asyncio.run(self._execute_async(tasks)) total_time = time.time() - start_time self.execution_stats['total_tasks'] += len(tasks) self.execution_stats['successful_tasks'] += sum(1 for r in results.values() if r.success) self.execution_stats['failed_tasks'] += sum(1 for r in results.values() if not r.success) self.execution_stats['total_execution_time'] += total_time print(f"โœ… Parallel execution completed in {total_time:.2f}s") return results def _execute_sequential(self, tasks: List[Dict]) -> Dict[str, TaskResult]: """Execute tasks sequentially (baseline)""" results = {} for task in tasks: task_start = time.time() try: result = task['function'](*task.get('args', []), **task.get('kwargs', {})) results[task['id']] = TaskResult( task_id=task['id'], success=True, result=result, error=None, execution_time=time.time() - task_start, agent_name=task['agent_name'] ) print(f" โœ… {task['agent_name']} completed") except Exception as e: results[task['id']] = TaskResult( task_id=task['id'], success=False, result=None, error=str(e), execution_time=time.time() - task_start, agent_name=task['agent_name'] ) print(f" โŒ {task['agent_name']} failed: {e}") return results def _execute_threaded(self, tasks: List[Dict]) -> Dict[str, TaskResult]: """Execute tasks using thread pool""" results = {} def execute_task(task): task_start = time.time() try: result = task['function'](*task.get('args', []), **task.get('kwargs', {})) return TaskResult( task_id=task['id'], success=True, result=result, error=None, execution_time=time.time() - task_start, agent_name=task['agent_name'] ) except Exception as e: return TaskResult( task_id=task['id'], success=False, result=None, error=str(e), execution_time=time.time() - task_start, agent_name=task['agent_name'] ) with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor: future_to_task = { executor.submit(execute_task, task): task['id'] for task in tasks } for future in concurrent.futures.as_completed(future_to_task): task_id = future_to_task[future] try: results[task_id] = future.result() if results[task_id].success: print(f" โœ… {results[task_id].agent_name} completed") else: print(f" โŒ {results[task_id].agent_name} failed: {results[task_id].error}") except Exception as e: print(f" ๐Ÿ’ฅ Task {task_id} execution failed: {e}") return results def _execute_process(self, tasks: List[Dict]) -> Dict[str, TaskResult]: """Execute tasks using process pool (for CPU-bound tasks)""" # Note: This requires tasks to be pickle-able results = {} def execute_task(task): task_start = time.time() try: result = task['function'](*task.get('args', []), **task.get('kwargs', {})) return (task['id'], TaskResult( task_id=task['id'], success=True, result=result, error=None, execution_time=time.time() - task_start, agent_name=task['agent_name'] )) except Exception as e: return (task['id'], TaskResult( task_id=task['id'], success=False, result=None, error=str(e), execution_time=time.time() - task_start, agent_name=task['agent_name'] )) with concurrent.futures.ProcessPoolExecutor(max_workers=self.max_workers) as executor: future_to_task = { executor.submit(execute_task, task): task['id'] for task in tasks } for future in concurrent.futures.as_completed(future_to_task): task_id = future_to_task[future] try: task_id, result = future.result() results[task_id] = result if result.success: print(f" โœ… {result.agent_name} completed") else: print(f" โŒ {result.agent_name} failed: {result.error}") except Exception as e: print(f" ๐Ÿ’ฅ Task {task_id} execution failed: {e}") return results async def _execute_async(self, tasks: List[Dict]) -> Dict[str, TaskResult]: """Execute tasks asynchronously""" results = {} async def execute_task(task): task_start = time.time() try: # For async functions if asyncio.iscoroutinefunction(task['function']): result = await task['function'](*task.get('args', []), **task.get('kwargs', {})) else: # Run sync functions in thread pool loop = asyncio.get_event_loop() result = await loop.run_in_executor( None, task['function'], *task.get('args', []), **task.get('kwargs', {}) ) return TaskResult( task_id=task['id'], success=True, result=result, error=None, execution_time=time.time() - task_start, agent_name=task['agent_name'] ) except Exception as e: return TaskResult( task_id=task['id'], success=False, result=None, error=str(e), execution_time=time.time() - task_start, agent_name=task['agent_name'] ) # Execute all tasks concurrently task_coroutines = [execute_task(task) for task in tasks] task_results = await asyncio.gather(*task_coroutines, return_exceptions=True) for i, result in enumerate(task_results): if isinstance(result, Exception): print(f" ๐Ÿ’ฅ Task {tasks[i]['id']} failed: {result}") results[tasks[i]['id']] = TaskResult( task_id=tasks[i]['id'], success=False, result=None, error=str(result), execution_time=0.0, agent_name=tasks[i]['agent_name'] ) else: results[result.task_id] = result if result.success: print(f" โœ… {result.agent_name} completed") else: print(f" โŒ {result.agent_name} failed: {result.error}") return results def get_execution_stats(self) -> Dict[str, Any]: """Get execution statistics""" stats = self.execution_stats.copy() if stats['total_tasks'] > 0: stats['success_rate'] = (stats['successful_tasks'] / stats['total_tasks']) * 100 stats['average_time_per_task'] = stats['total_execution_time'] / stats['total_tasks'] return stats def recommend_execution_mode(self, tasks: List[Dict]) -> ExecutionMode: """Recommend optimal execution mode based on task characteristics""" if len(tasks) <= 1: return ExecutionMode.SEQUENTIAL # Analyze task characteristics has_io_bound = any(task.get('io_bound', True) for task in tasks) has_cpu_bound = any(task.get('cpu_bound', False) for task in tasks) has_async_func = any(asyncio.iscoroutinefunction(task['function']) for task in tasks) if has_async_func: return ExecutionMode.ASYNC elif has_cpu_bound and len(tasks) > 1: return ExecutionMode.PROCESS elif has_io_bound: return ExecutionMode.THREADED else: return ExecutionMode.SEQUENTIAL # Enhanced RAG Engine with Parallel Execution class ParallelRAGEngine: """RAG Engine with parallel execution capabilities""" def __init__(self, rag_engine, parallel_executor: ParallelExecutor): self.rag_engine = rag_engine self.parallel_executor = parallel_executor def answer_research_question_parallel(self, query: str, domain: str, max_papers: int = 15) -> Dict[str, Any]: """Answer research question with parallel agent execution""" print(f"๐Ÿš€ Executing parallel RAG pipeline for: {query}") # Step 1: Retrieve papers (sequential - dependency) papers = self.rag_engine._retrieve_relevant_papers(query, domain, max_papers) if not papers: return self.rag_engine._create_no_results_response(query, domain) # Step 2: Prepare parallel tasks query_type = self.rag_engine._classify_query_type(query) tasks = self._prepare_parallel_tasks(query, domain, query_type, papers) # Step 3: Execute tasks in parallel task_results = self.parallel_executor.execute_parallel(tasks) # Step 4: Synthesize results analysis_results = self._synthesize_parallel_results(task_results, query_type) final_answer = self.rag_engine._synthesize_final_answer( query, domain, query_type, analysis_results, papers ) # Add parallel execution stats final_answer['parallel_stats'] = self.parallel_executor.get_execution_stats() return final_answer def _prepare_parallel_tasks(self, query: str, domain: str, query_type: str, papers: List[Dict]) -> List[Dict]: """Prepare tasks for parallel execution""" tasks = [] # Always include summarizer tasks.append({ 'id': 'summary', 'function': self.rag_engine.summarizer.summarize_research, 'args': [papers, query, domain], 'kwargs': {}, 'agent_name': 'summarizer', 'io_bound': True # LLM calls are I/O bound }) # Add tasks based on query type if query_type == "comparison": targets = self.rag_engine._extract_comparison_targets(query) if targets and len(targets) >= 2: tasks.append({ 'id': 'comparison', 'function': self.rag_engine.comparator.compare_methods, 'args': [papers, targets[0], targets[1], domain], 'kwargs': {}, 'agent_name': 'comparator', 'io_bound': True }) elif query_type == "gaps": tasks.append({ 'id': 'gap_analysis', 'function': self.rag_engine.gap_analyzer.analyze_gaps, 'args': [papers, domain], 'kwargs': {}, 'agent_name': 'gap_analyzer', 'io_bound': True }) elif query_type == "methodology": tasks.append({ 'id': 'methodology', 'function': self.rag_engine.reasoning_engine.analyze_methodology, 'args': [papers, query, domain], 'kwargs': {}, 'agent_name': 'reasoning_engine', 'io_bound': True }) elif query_type == "clinical": tasks.append({ 'id': 'clinical', 'function': self.rag_engine.reasoning_engine.analyze_clinical_implications, 'args': [papers, domain], 'kwargs': {}, 'agent_name': 'reasoning_engine', 'io_bound': True }) return tasks def _synthesize_parallel_results(self, task_results: Dict[str, TaskResult], query_type: str) -> Dict[str, Any]: """Synthesize results from parallel execution""" analysis_results = { "query_type": query_type, "papers_analyzed": 0, # Will be filled from summary result "domain": "" # Will be filled from summary result } for task_id, result in task_results.items(): if result.success: if task_id == 'summary': analysis_results["summary"] = result.result analysis_results["papers_analyzed"] = result.result.get('papers_analyzed', 0) analysis_results["domain"] = result.result.get('domain', '') else: analysis_results[task_id] = result.result else: analysis_results[f"{task_id}_error"] = result.error return analysis_results # Quick test def test_parallel_executor(): """Test parallel execution""" print("๐Ÿงช Testing Parallel Executor") print("=" * 50) # Test functions def mock_agent_1(): time.sleep(1) return "Agent 1 result" def mock_agent_2(): time.sleep(1) return "Agent 2 result" def mock_agent_3(): time.sleep(1) return "Agent 3 result" tasks = [ {'id': 'agent1', 'function': mock_agent_1, 'args': [], 'kwargs': {}, 'agent_name': 'Mock Agent 1'}, {'id': 'agent2', 'function': mock_agent_2, 'args': [], 'kwargs': {}, 'agent_name': 'Mock Agent 2'}, {'id': 'agent3', 'function': mock_agent_3, 'args': [], 'kwargs': {}, 'agent_name': 'Mock Agent 3'}, ] # Test sequential vs parallel executor = ParallelExecutor(mode=ExecutionMode.SEQUENTIAL) start_time = time.time() sequential_results = executor.execute_parallel(tasks) sequential_time = time.time() - start_time executor = ParallelExecutor(mode=ExecutionMode.THREADED) start_time = time.time() parallel_results = executor.execute_parallel(tasks) parallel_time = time.time() - start_time print(f"โฑ๏ธ Sequential time: {sequential_time:.2f}s") print(f"โฑ๏ธ Parallel time: {parallel_time:.2f}s") print(f"๐Ÿš€ Speedup: {sequential_time / parallel_time:.2f}x") if __name__ == "__main__": test_parallel_executor()