Spaces:
Running
Running
| # utils/parallel_executor.py | |
| """ | |
| Parallel execution engine for agent components | |
| Significantly speeds up multi-agent workflows | |
| """ | |
| import concurrent.futures | |
| import asyncio | |
| import time | |
| from typing import List, Dict, Any, Callable, Optional | |
| from dataclasses import dataclass | |
| from enum import Enum | |
| class ExecutionMode(Enum): | |
| SEQUENTIAL = "sequential" | |
| THREADED = "threaded" | |
| PROCESS = "process" | |
| ASYNC = "async" | |
| class TaskResult: | |
| """Result container for parallel tasks""" | |
| task_id: str | |
| success: bool | |
| result: Any | |
| error: Optional[str] | |
| execution_time: float | |
| agent_name: str | |
| class ParallelExecutor: | |
| """ | |
| Advanced parallel execution engine for agent workflows | |
| Optimizes execution based on task characteristics | |
| """ | |
| def __init__(self, max_workers: int = 4, mode: ExecutionMode = ExecutionMode.THREADED): | |
| self.max_workers = max_workers | |
| self.mode = mode | |
| self.execution_stats = { | |
| 'total_tasks': 0, | |
| 'successful_tasks': 0, | |
| 'failed_tasks': 0, | |
| 'total_execution_time': 0.0 | |
| } | |
| def execute_parallel(self, tasks: List[Dict[str, Any]]) -> Dict[str, TaskResult]: | |
| """ | |
| Execute multiple tasks in parallel | |
| tasks: List of dicts with 'id', 'function', 'args', 'kwargs', 'agent_name' | |
| """ | |
| if not tasks: | |
| return {} | |
| print(f"🚀 Executing {len(tasks)} tasks in {self.mode.value} mode") | |
| start_time = time.time() | |
| results = {} | |
| if self.mode == ExecutionMode.SEQUENTIAL: | |
| results = self._execute_sequential(tasks) | |
| elif self.mode == ExecutionMode.THREADED: | |
| results = self._execute_threaded(tasks) | |
| elif self.mode == ExecutionMode.PROCESS: | |
| results = self._execute_process(tasks) | |
| elif self.mode == ExecutionMode.ASYNC: | |
| results = asyncio.run(self._execute_async(tasks)) | |
| total_time = time.time() - start_time | |
| self.execution_stats['total_tasks'] += len(tasks) | |
| self.execution_stats['successful_tasks'] += sum(1 for r in results.values() if r.success) | |
| self.execution_stats['failed_tasks'] += sum(1 for r in results.values() if not r.success) | |
| self.execution_stats['total_execution_time'] += total_time | |
| print(f"✅ Parallel execution completed in {total_time:.2f}s") | |
| return results | |
| def _execute_sequential(self, tasks: List[Dict]) -> Dict[str, TaskResult]: | |
| """Execute tasks sequentially (baseline)""" | |
| results = {} | |
| for task in tasks: | |
| task_start = time.time() | |
| try: | |
| result = task['function'](*task.get('args', []), **task.get('kwargs', {})) | |
| results[task['id']] = TaskResult( | |
| task_id=task['id'], | |
| success=True, | |
| result=result, | |
| error=None, | |
| execution_time=time.time() - task_start, | |
| agent_name=task['agent_name'] | |
| ) | |
| print(f" ✅ {task['agent_name']} completed") | |
| except Exception as e: | |
| results[task['id']] = TaskResult( | |
| task_id=task['id'], | |
| success=False, | |
| result=None, | |
| error=str(e), | |
| execution_time=time.time() - task_start, | |
| agent_name=task['agent_name'] | |
| ) | |
| print(f" ❌ {task['agent_name']} failed: {e}") | |
| return results | |
| def _execute_threaded(self, tasks: List[Dict]) -> Dict[str, TaskResult]: | |
| """Execute tasks using thread pool""" | |
| results = {} | |
| def execute_task(task): | |
| task_start = time.time() | |
| try: | |
| result = task['function'](*task.get('args', []), **task.get('kwargs', {})) | |
| return TaskResult( | |
| task_id=task['id'], | |
| success=True, | |
| result=result, | |
| error=None, | |
| execution_time=time.time() - task_start, | |
| agent_name=task['agent_name'] | |
| ) | |
| except Exception as e: | |
| return TaskResult( | |
| task_id=task['id'], | |
| success=False, | |
| result=None, | |
| error=str(e), | |
| execution_time=time.time() - task_start, | |
| agent_name=task['agent_name'] | |
| ) | |
| with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor: | |
| future_to_task = { | |
| executor.submit(execute_task, task): task['id'] | |
| for task in tasks | |
| } | |
| for future in concurrent.futures.as_completed(future_to_task): | |
| task_id = future_to_task[future] | |
| try: | |
| results[task_id] = future.result() | |
| if results[task_id].success: | |
| print(f" ✅ {results[task_id].agent_name} completed") | |
| else: | |
| print(f" ❌ {results[task_id].agent_name} failed: {results[task_id].error}") | |
| except Exception as e: | |
| print(f" 💥 Task {task_id} execution failed: {e}") | |
| return results | |
| def _execute_process(self, tasks: List[Dict]) -> Dict[str, TaskResult]: | |
| """Execute tasks using process pool (for CPU-bound tasks)""" | |
| # Note: This requires tasks to be pickle-able | |
| results = {} | |
| def execute_task(task): | |
| task_start = time.time() | |
| try: | |
| result = task['function'](*task.get('args', []), **task.get('kwargs', {})) | |
| return (task['id'], TaskResult( | |
| task_id=task['id'], | |
| success=True, | |
| result=result, | |
| error=None, | |
| execution_time=time.time() - task_start, | |
| agent_name=task['agent_name'] | |
| )) | |
| except Exception as e: | |
| return (task['id'], TaskResult( | |
| task_id=task['id'], | |
| success=False, | |
| result=None, | |
| error=str(e), | |
| execution_time=time.time() - task_start, | |
| agent_name=task['agent_name'] | |
| )) | |
| with concurrent.futures.ProcessPoolExecutor(max_workers=self.max_workers) as executor: | |
| future_to_task = { | |
| executor.submit(execute_task, task): task['id'] | |
| for task in tasks | |
| } | |
| for future in concurrent.futures.as_completed(future_to_task): | |
| task_id = future_to_task[future] | |
| try: | |
| task_id, result = future.result() | |
| results[task_id] = result | |
| if result.success: | |
| print(f" ✅ {result.agent_name} completed") | |
| else: | |
| print(f" ❌ {result.agent_name} failed: {result.error}") | |
| except Exception as e: | |
| print(f" 💥 Task {task_id} execution failed: {e}") | |
| return results | |
| async def _execute_async(self, tasks: List[Dict]) -> Dict[str, TaskResult]: | |
| """Execute tasks asynchronously""" | |
| results = {} | |
| async def execute_task(task): | |
| task_start = time.time() | |
| try: | |
| # For async functions | |
| if asyncio.iscoroutinefunction(task['function']): | |
| result = await task['function'](*task.get('args', []), **task.get('kwargs', {})) | |
| else: | |
| # Run sync functions in thread pool | |
| loop = asyncio.get_event_loop() | |
| result = await loop.run_in_executor( | |
| None, task['function'], *task.get('args', []), **task.get('kwargs', {}) | |
| ) | |
| return TaskResult( | |
| task_id=task['id'], | |
| success=True, | |
| result=result, | |
| error=None, | |
| execution_time=time.time() - task_start, | |
| agent_name=task['agent_name'] | |
| ) | |
| except Exception as e: | |
| return TaskResult( | |
| task_id=task['id'], | |
| success=False, | |
| result=None, | |
| error=str(e), | |
| execution_time=time.time() - task_start, | |
| agent_name=task['agent_name'] | |
| ) | |
| # Execute all tasks concurrently | |
| task_coroutines = [execute_task(task) for task in tasks] | |
| task_results = await asyncio.gather(*task_coroutines, return_exceptions=True) | |
| for i, result in enumerate(task_results): | |
| if isinstance(result, Exception): | |
| print(f" 💥 Task {tasks[i]['id']} failed: {result}") | |
| results[tasks[i]['id']] = TaskResult( | |
| task_id=tasks[i]['id'], | |
| success=False, | |
| result=None, | |
| error=str(result), | |
| execution_time=0.0, | |
| agent_name=tasks[i]['agent_name'] | |
| ) | |
| else: | |
| results[result.task_id] = result | |
| if result.success: | |
| print(f" ✅ {result.agent_name} completed") | |
| else: | |
| print(f" ❌ {result.agent_name} failed: {result.error}") | |
| return results | |
| def get_execution_stats(self) -> Dict[str, Any]: | |
| """Get execution statistics""" | |
| stats = self.execution_stats.copy() | |
| if stats['total_tasks'] > 0: | |
| stats['success_rate'] = (stats['successful_tasks'] / stats['total_tasks']) * 100 | |
| stats['average_time_per_task'] = stats['total_execution_time'] / stats['total_tasks'] | |
| return stats | |
| def recommend_execution_mode(self, tasks: List[Dict]) -> ExecutionMode: | |
| """Recommend optimal execution mode based on task characteristics""" | |
| if len(tasks) <= 1: | |
| return ExecutionMode.SEQUENTIAL | |
| # Analyze task characteristics | |
| has_io_bound = any(task.get('io_bound', True) for task in tasks) | |
| has_cpu_bound = any(task.get('cpu_bound', False) for task in tasks) | |
| has_async_func = any(asyncio.iscoroutinefunction(task['function']) for task in tasks) | |
| if has_async_func: | |
| return ExecutionMode.ASYNC | |
| elif has_cpu_bound and len(tasks) > 1: | |
| return ExecutionMode.PROCESS | |
| elif has_io_bound: | |
| return ExecutionMode.THREADED | |
| else: | |
| return ExecutionMode.SEQUENTIAL | |
| # Enhanced RAG Engine with Parallel Execution | |
| class ParallelRAGEngine: | |
| """RAG Engine with parallel execution capabilities""" | |
| def __init__(self, rag_engine, parallel_executor: ParallelExecutor): | |
| self.rag_engine = rag_engine | |
| self.parallel_executor = parallel_executor | |
| def answer_research_question_parallel(self, query: str, domain: str, max_papers: int = 15) -> Dict[str, Any]: | |
| """Answer research question with parallel agent execution""" | |
| print(f"🚀 Executing parallel RAG pipeline for: {query}") | |
| # Step 1: Retrieve papers (sequential - dependency) | |
| papers = self.rag_engine._retrieve_relevant_papers(query, domain, max_papers) | |
| if not papers: | |
| return self.rag_engine._create_no_results_response(query, domain) | |
| # Step 2: Prepare parallel tasks | |
| query_type = self.rag_engine._classify_query_type(query) | |
| tasks = self._prepare_parallel_tasks(query, domain, query_type, papers) | |
| # Step 3: Execute tasks in parallel | |
| task_results = self.parallel_executor.execute_parallel(tasks) | |
| # Step 4: Synthesize results | |
| analysis_results = self._synthesize_parallel_results(task_results, query_type) | |
| final_answer = self.rag_engine._synthesize_final_answer( | |
| query, domain, query_type, analysis_results, papers | |
| ) | |
| # Add parallel execution stats | |
| final_answer['parallel_stats'] = self.parallel_executor.get_execution_stats() | |
| return final_answer | |
| def _prepare_parallel_tasks(self, query: str, domain: str, query_type: str, papers: List[Dict]) -> List[Dict]: | |
| """Prepare tasks for parallel execution""" | |
| tasks = [] | |
| # Always include summarizer | |
| tasks.append({ | |
| 'id': 'summary', | |
| 'function': self.rag_engine.summarizer.summarize_research, | |
| 'args': [papers, query, domain], | |
| 'kwargs': {}, | |
| 'agent_name': 'summarizer', | |
| 'io_bound': True # LLM calls are I/O bound | |
| }) | |
| # Add tasks based on query type | |
| if query_type == "comparison": | |
| targets = self.rag_engine._extract_comparison_targets(query) | |
| if targets and len(targets) >= 2: | |
| tasks.append({ | |
| 'id': 'comparison', | |
| 'function': self.rag_engine.comparator.compare_methods, | |
| 'args': [papers, targets[0], targets[1], domain], | |
| 'kwargs': {}, | |
| 'agent_name': 'comparator', | |
| 'io_bound': True | |
| }) | |
| elif query_type == "gaps": | |
| tasks.append({ | |
| 'id': 'gap_analysis', | |
| 'function': self.rag_engine.gap_analyzer.analyze_gaps, | |
| 'args': [papers, domain], | |
| 'kwargs': {}, | |
| 'agent_name': 'gap_analyzer', | |
| 'io_bound': True | |
| }) | |
| elif query_type == "methodology": | |
| tasks.append({ | |
| 'id': 'methodology', | |
| 'function': self.rag_engine.reasoning_engine.analyze_methodology, | |
| 'args': [papers, query, domain], | |
| 'kwargs': {}, | |
| 'agent_name': 'reasoning_engine', | |
| 'io_bound': True | |
| }) | |
| elif query_type == "clinical": | |
| tasks.append({ | |
| 'id': 'clinical', | |
| 'function': self.rag_engine.reasoning_engine.analyze_clinical_implications, | |
| 'args': [papers, domain], | |
| 'kwargs': {}, | |
| 'agent_name': 'reasoning_engine', | |
| 'io_bound': True | |
| }) | |
| return tasks | |
| def _synthesize_parallel_results(self, task_results: Dict[str, TaskResult], query_type: str) -> Dict[str, Any]: | |
| """Synthesize results from parallel execution""" | |
| analysis_results = { | |
| "query_type": query_type, | |
| "papers_analyzed": 0, # Will be filled from summary result | |
| "domain": "" # Will be filled from summary result | |
| } | |
| for task_id, result in task_results.items(): | |
| if result.success: | |
| if task_id == 'summary': | |
| analysis_results["summary"] = result.result | |
| analysis_results["papers_analyzed"] = result.result.get('papers_analyzed', 0) | |
| analysis_results["domain"] = result.result.get('domain', '') | |
| else: | |
| analysis_results[task_id] = result.result | |
| else: | |
| analysis_results[f"{task_id}_error"] = result.error | |
| return analysis_results | |
| # Quick test | |
| def test_parallel_executor(): | |
| """Test parallel execution""" | |
| print("🧪 Testing Parallel Executor") | |
| print("=" * 50) | |
| # Test functions | |
| def mock_agent_1(): | |
| time.sleep(1) | |
| return "Agent 1 result" | |
| def mock_agent_2(): | |
| time.sleep(1) | |
| return "Agent 2 result" | |
| def mock_agent_3(): | |
| time.sleep(1) | |
| return "Agent 3 result" | |
| tasks = [ | |
| {'id': 'agent1', 'function': mock_agent_1, 'args': [], 'kwargs': {}, 'agent_name': 'Mock Agent 1'}, | |
| {'id': 'agent2', 'function': mock_agent_2, 'args': [], 'kwargs': {}, 'agent_name': 'Mock Agent 2'}, | |
| {'id': 'agent3', 'function': mock_agent_3, 'args': [], 'kwargs': {}, 'agent_name': 'Mock Agent 3'}, | |
| ] | |
| # Test sequential vs parallel | |
| executor = ParallelExecutor(mode=ExecutionMode.SEQUENTIAL) | |
| start_time = time.time() | |
| sequential_results = executor.execute_parallel(tasks) | |
| sequential_time = time.time() - start_time | |
| executor = ParallelExecutor(mode=ExecutionMode.THREADED) | |
| start_time = time.time() | |
| parallel_results = executor.execute_parallel(tasks) | |
| parallel_time = time.time() - start_time | |
| print(f"⏱️ Sequential time: {sequential_time:.2f}s") | |
| print(f"⏱️ Parallel time: {parallel_time:.2f}s") | |
| print(f"🚀 Speedup: {sequential_time / parallel_time:.2f}x") | |
| if __name__ == "__main__": | |
| test_parallel_executor() |