MedSearchPro / utils /parallel_executor.py
paulhemb's picture
Initial Backend Deployment
1367957
# utils/parallel_executor.py
"""
Parallel execution engine for agent components
Significantly speeds up multi-agent workflows
"""
import concurrent.futures
import asyncio
import time
from typing import List, Dict, Any, Callable, Optional
from dataclasses import dataclass
from enum import Enum
class ExecutionMode(Enum):
SEQUENTIAL = "sequential"
THREADED = "threaded"
PROCESS = "process"
ASYNC = "async"
@dataclass
class TaskResult:
"""Result container for parallel tasks"""
task_id: str
success: bool
result: Any
error: Optional[str]
execution_time: float
agent_name: str
class ParallelExecutor:
"""
Advanced parallel execution engine for agent workflows
Optimizes execution based on task characteristics
"""
def __init__(self, max_workers: int = 4, mode: ExecutionMode = ExecutionMode.THREADED):
self.max_workers = max_workers
self.mode = mode
self.execution_stats = {
'total_tasks': 0,
'successful_tasks': 0,
'failed_tasks': 0,
'total_execution_time': 0.0
}
def execute_parallel(self, tasks: List[Dict[str, Any]]) -> Dict[str, TaskResult]:
"""
Execute multiple tasks in parallel
tasks: List of dicts with 'id', 'function', 'args', 'kwargs', 'agent_name'
"""
if not tasks:
return {}
print(f"🚀 Executing {len(tasks)} tasks in {self.mode.value} mode")
start_time = time.time()
results = {}
if self.mode == ExecutionMode.SEQUENTIAL:
results = self._execute_sequential(tasks)
elif self.mode == ExecutionMode.THREADED:
results = self._execute_threaded(tasks)
elif self.mode == ExecutionMode.PROCESS:
results = self._execute_process(tasks)
elif self.mode == ExecutionMode.ASYNC:
results = asyncio.run(self._execute_async(tasks))
total_time = time.time() - start_time
self.execution_stats['total_tasks'] += len(tasks)
self.execution_stats['successful_tasks'] += sum(1 for r in results.values() if r.success)
self.execution_stats['failed_tasks'] += sum(1 for r in results.values() if not r.success)
self.execution_stats['total_execution_time'] += total_time
print(f"✅ Parallel execution completed in {total_time:.2f}s")
return results
def _execute_sequential(self, tasks: List[Dict]) -> Dict[str, TaskResult]:
"""Execute tasks sequentially (baseline)"""
results = {}
for task in tasks:
task_start = time.time()
try:
result = task['function'](*task.get('args', []), **task.get('kwargs', {}))
results[task['id']] = TaskResult(
task_id=task['id'],
success=True,
result=result,
error=None,
execution_time=time.time() - task_start,
agent_name=task['agent_name']
)
print(f" ✅ {task['agent_name']} completed")
except Exception as e:
results[task['id']] = TaskResult(
task_id=task['id'],
success=False,
result=None,
error=str(e),
execution_time=time.time() - task_start,
agent_name=task['agent_name']
)
print(f" ❌ {task['agent_name']} failed: {e}")
return results
def _execute_threaded(self, tasks: List[Dict]) -> Dict[str, TaskResult]:
"""Execute tasks using thread pool"""
results = {}
def execute_task(task):
task_start = time.time()
try:
result = task['function'](*task.get('args', []), **task.get('kwargs', {}))
return TaskResult(
task_id=task['id'],
success=True,
result=result,
error=None,
execution_time=time.time() - task_start,
agent_name=task['agent_name']
)
except Exception as e:
return TaskResult(
task_id=task['id'],
success=False,
result=None,
error=str(e),
execution_time=time.time() - task_start,
agent_name=task['agent_name']
)
with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
future_to_task = {
executor.submit(execute_task, task): task['id']
for task in tasks
}
for future in concurrent.futures.as_completed(future_to_task):
task_id = future_to_task[future]
try:
results[task_id] = future.result()
if results[task_id].success:
print(f" ✅ {results[task_id].agent_name} completed")
else:
print(f" ❌ {results[task_id].agent_name} failed: {results[task_id].error}")
except Exception as e:
print(f" 💥 Task {task_id} execution failed: {e}")
return results
def _execute_process(self, tasks: List[Dict]) -> Dict[str, TaskResult]:
"""Execute tasks using process pool (for CPU-bound tasks)"""
# Note: This requires tasks to be pickle-able
results = {}
def execute_task(task):
task_start = time.time()
try:
result = task['function'](*task.get('args', []), **task.get('kwargs', {}))
return (task['id'], TaskResult(
task_id=task['id'],
success=True,
result=result,
error=None,
execution_time=time.time() - task_start,
agent_name=task['agent_name']
))
except Exception as e:
return (task['id'], TaskResult(
task_id=task['id'],
success=False,
result=None,
error=str(e),
execution_time=time.time() - task_start,
agent_name=task['agent_name']
))
with concurrent.futures.ProcessPoolExecutor(max_workers=self.max_workers) as executor:
future_to_task = {
executor.submit(execute_task, task): task['id']
for task in tasks
}
for future in concurrent.futures.as_completed(future_to_task):
task_id = future_to_task[future]
try:
task_id, result = future.result()
results[task_id] = result
if result.success:
print(f" ✅ {result.agent_name} completed")
else:
print(f" ❌ {result.agent_name} failed: {result.error}")
except Exception as e:
print(f" 💥 Task {task_id} execution failed: {e}")
return results
async def _execute_async(self, tasks: List[Dict]) -> Dict[str, TaskResult]:
"""Execute tasks asynchronously"""
results = {}
async def execute_task(task):
task_start = time.time()
try:
# For async functions
if asyncio.iscoroutinefunction(task['function']):
result = await task['function'](*task.get('args', []), **task.get('kwargs', {}))
else:
# Run sync functions in thread pool
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(
None, task['function'], *task.get('args', []), **task.get('kwargs', {})
)
return TaskResult(
task_id=task['id'],
success=True,
result=result,
error=None,
execution_time=time.time() - task_start,
agent_name=task['agent_name']
)
except Exception as e:
return TaskResult(
task_id=task['id'],
success=False,
result=None,
error=str(e),
execution_time=time.time() - task_start,
agent_name=task['agent_name']
)
# Execute all tasks concurrently
task_coroutines = [execute_task(task) for task in tasks]
task_results = await asyncio.gather(*task_coroutines, return_exceptions=True)
for i, result in enumerate(task_results):
if isinstance(result, Exception):
print(f" 💥 Task {tasks[i]['id']} failed: {result}")
results[tasks[i]['id']] = TaskResult(
task_id=tasks[i]['id'],
success=False,
result=None,
error=str(result),
execution_time=0.0,
agent_name=tasks[i]['agent_name']
)
else:
results[result.task_id] = result
if result.success:
print(f" ✅ {result.agent_name} completed")
else:
print(f" ❌ {result.agent_name} failed: {result.error}")
return results
def get_execution_stats(self) -> Dict[str, Any]:
"""Get execution statistics"""
stats = self.execution_stats.copy()
if stats['total_tasks'] > 0:
stats['success_rate'] = (stats['successful_tasks'] / stats['total_tasks']) * 100
stats['average_time_per_task'] = stats['total_execution_time'] / stats['total_tasks']
return stats
def recommend_execution_mode(self, tasks: List[Dict]) -> ExecutionMode:
"""Recommend optimal execution mode based on task characteristics"""
if len(tasks) <= 1:
return ExecutionMode.SEQUENTIAL
# Analyze task characteristics
has_io_bound = any(task.get('io_bound', True) for task in tasks)
has_cpu_bound = any(task.get('cpu_bound', False) for task in tasks)
has_async_func = any(asyncio.iscoroutinefunction(task['function']) for task in tasks)
if has_async_func:
return ExecutionMode.ASYNC
elif has_cpu_bound and len(tasks) > 1:
return ExecutionMode.PROCESS
elif has_io_bound:
return ExecutionMode.THREADED
else:
return ExecutionMode.SEQUENTIAL
# Enhanced RAG Engine with Parallel Execution
class ParallelRAGEngine:
"""RAG Engine with parallel execution capabilities"""
def __init__(self, rag_engine, parallel_executor: ParallelExecutor):
self.rag_engine = rag_engine
self.parallel_executor = parallel_executor
def answer_research_question_parallel(self, query: str, domain: str, max_papers: int = 15) -> Dict[str, Any]:
"""Answer research question with parallel agent execution"""
print(f"🚀 Executing parallel RAG pipeline for: {query}")
# Step 1: Retrieve papers (sequential - dependency)
papers = self.rag_engine._retrieve_relevant_papers(query, domain, max_papers)
if not papers:
return self.rag_engine._create_no_results_response(query, domain)
# Step 2: Prepare parallel tasks
query_type = self.rag_engine._classify_query_type(query)
tasks = self._prepare_parallel_tasks(query, domain, query_type, papers)
# Step 3: Execute tasks in parallel
task_results = self.parallel_executor.execute_parallel(tasks)
# Step 4: Synthesize results
analysis_results = self._synthesize_parallel_results(task_results, query_type)
final_answer = self.rag_engine._synthesize_final_answer(
query, domain, query_type, analysis_results, papers
)
# Add parallel execution stats
final_answer['parallel_stats'] = self.parallel_executor.get_execution_stats()
return final_answer
def _prepare_parallel_tasks(self, query: str, domain: str, query_type: str, papers: List[Dict]) -> List[Dict]:
"""Prepare tasks for parallel execution"""
tasks = []
# Always include summarizer
tasks.append({
'id': 'summary',
'function': self.rag_engine.summarizer.summarize_research,
'args': [papers, query, domain],
'kwargs': {},
'agent_name': 'summarizer',
'io_bound': True # LLM calls are I/O bound
})
# Add tasks based on query type
if query_type == "comparison":
targets = self.rag_engine._extract_comparison_targets(query)
if targets and len(targets) >= 2:
tasks.append({
'id': 'comparison',
'function': self.rag_engine.comparator.compare_methods,
'args': [papers, targets[0], targets[1], domain],
'kwargs': {},
'agent_name': 'comparator',
'io_bound': True
})
elif query_type == "gaps":
tasks.append({
'id': 'gap_analysis',
'function': self.rag_engine.gap_analyzer.analyze_gaps,
'args': [papers, domain],
'kwargs': {},
'agent_name': 'gap_analyzer',
'io_bound': True
})
elif query_type == "methodology":
tasks.append({
'id': 'methodology',
'function': self.rag_engine.reasoning_engine.analyze_methodology,
'args': [papers, query, domain],
'kwargs': {},
'agent_name': 'reasoning_engine',
'io_bound': True
})
elif query_type == "clinical":
tasks.append({
'id': 'clinical',
'function': self.rag_engine.reasoning_engine.analyze_clinical_implications,
'args': [papers, domain],
'kwargs': {},
'agent_name': 'reasoning_engine',
'io_bound': True
})
return tasks
def _synthesize_parallel_results(self, task_results: Dict[str, TaskResult], query_type: str) -> Dict[str, Any]:
"""Synthesize results from parallel execution"""
analysis_results = {
"query_type": query_type,
"papers_analyzed": 0, # Will be filled from summary result
"domain": "" # Will be filled from summary result
}
for task_id, result in task_results.items():
if result.success:
if task_id == 'summary':
analysis_results["summary"] = result.result
analysis_results["papers_analyzed"] = result.result.get('papers_analyzed', 0)
analysis_results["domain"] = result.result.get('domain', '')
else:
analysis_results[task_id] = result.result
else:
analysis_results[f"{task_id}_error"] = result.error
return analysis_results
# Quick test
def test_parallel_executor():
"""Test parallel execution"""
print("🧪 Testing Parallel Executor")
print("=" * 50)
# Test functions
def mock_agent_1():
time.sleep(1)
return "Agent 1 result"
def mock_agent_2():
time.sleep(1)
return "Agent 2 result"
def mock_agent_3():
time.sleep(1)
return "Agent 3 result"
tasks = [
{'id': 'agent1', 'function': mock_agent_1, 'args': [], 'kwargs': {}, 'agent_name': 'Mock Agent 1'},
{'id': 'agent2', 'function': mock_agent_2, 'args': [], 'kwargs': {}, 'agent_name': 'Mock Agent 2'},
{'id': 'agent3', 'function': mock_agent_3, 'args': [], 'kwargs': {}, 'agent_name': 'Mock Agent 3'},
]
# Test sequential vs parallel
executor = ParallelExecutor(mode=ExecutionMode.SEQUENTIAL)
start_time = time.time()
sequential_results = executor.execute_parallel(tasks)
sequential_time = time.time() - start_time
executor = ParallelExecutor(mode=ExecutionMode.THREADED)
start_time = time.time()
parallel_results = executor.execute_parallel(tasks)
parallel_time = time.time() - start_time
print(f"⏱️ Sequential time: {sequential_time:.2f}s")
print(f"⏱️ Parallel time: {parallel_time:.2f}s")
print(f"🚀 Speedup: {sequential_time / parallel_time:.2f}x")
if __name__ == "__main__":
test_parallel_executor()