Spaces:
Paused
Paused
| """ | |
| Batch Processing Optimization for Felix Framework on ZeroGPU. | |
| This module provides intelligent batch processing capabilities specifically optimized | |
| for ZeroGPU deployment, maximizing GPU utilization while respecting memory constraints | |
| and maintaining the helix-based coordination architecture. | |
| Key Features: | |
| - Agent task batching with helix-aware grouping | |
| - Dynamic batch size adjustment based on GPU memory | |
| - Parallel processing with efficient GPU resource sharing | |
| - Priority-based scheduling for different agent types | |
| - Memory-aware batch composition and execution | |
| - Real-time performance optimization and monitoring | |
| """ | |
| import asyncio | |
| import logging | |
| import time | |
| import gc | |
| from typing import Dict, List, Optional, Any, Callable, Tuple, Union | |
| from dataclasses import dataclass, field | |
| from enum import Enum | |
| from collections import deque, defaultdict | |
| from contextlib import asynccontextmanager | |
| import heapq | |
| import threading | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| logger = logging.getLogger(__name__) | |
| class BatchStrategy(Enum): | |
| """Batch processing strategies for different scenarios.""" | |
| MEMORY_OPTIMIZED = "memory_optimized" # Prioritize memory efficiency | |
| THROUGHPUT_OPTIMIZED = "throughput_optimized" # Maximize processing speed | |
| LATENCY_OPTIMIZED = "latency_optimized" # Minimize response time | |
| ADAPTIVE = "adaptive" # Adapt based on current conditions | |
| class AgentPriority(Enum): | |
| """Priority levels for agent batch processing.""" | |
| LOW = 1 | |
| NORMAL = 2 | |
| HIGH = 3 | |
| CRITICAL = 4 | |
| class BatchTask: | |
| """Individual task within a batch.""" | |
| task_id: str | |
| agent_id: str | |
| agent_type: str | |
| prompt: str | |
| priority: AgentPriority | |
| estimated_tokens: int | |
| max_tokens: Optional[int] = None | |
| temperature: float = 0.7 | |
| model_preference: Optional[str] = None | |
| timeout: float = 30.0 | |
| callback: Optional[Callable] = None | |
| metadata: Dict[str, Any] = field(default_factory=dict) | |
| created_at: float = field(default_factory=time.time) | |
| class BatchRequest: | |
| """Batch of tasks for GPU processing.""" | |
| batch_id: str | |
| tasks: List[BatchTask] | |
| total_estimated_tokens: int | |
| max_memory_mb: float | |
| strategy: BatchStrategy | |
| priority_score: float | |
| created_at: float = field(default_factory=time.time) | |
| deadline: Optional[float] = None | |
| class BatchResult: | |
| """Result from batch processing.""" | |
| batch_id: str | |
| task_results: Dict[str, Any] # task_id -> result | |
| processing_time: float | |
| gpu_memory_used: float | |
| tokens_processed: int | |
| success_rate: float | |
| errors: List[Dict[str, Any]] = field(default_factory=list) | |
| performance_metrics: Dict[str, Any] = field(default_factory=dict) | |
| class GPUResourceState: | |
| """Current GPU resource utilization state.""" | |
| memory_used_mb: float | |
| memory_total_mb: float | |
| utilization_percent: float | |
| temperature: float | |
| active_models: List[str] | |
| last_update: float = field(default_factory=time.time) | |
| class ZeroGPUBatchOptimizer: | |
| """ | |
| Intelligent batch processor for Felix Framework on ZeroGPU. | |
| Optimizes agent task processing through strategic batching, memory management, | |
| and parallel execution while maintaining helix-based coordination principles. | |
| """ | |
| # Configuration constants | |
| DEFAULT_BATCH_SIZE = 4 # Conservative for ZeroGPU | |
| MAX_BATCH_SIZE = 8 # Maximum batch size for ZeroGPU | |
| MIN_BATCH_SIZE = 1 # Minimum viable batch | |
| MEMORY_SAFETY_MARGIN = 0.1 # 10% memory safety margin | |
| BATCH_TIMEOUT = 5.0 # Maximum wait time for batch assembly | |
| PRIORITY_BOOST_FACTOR = 1.5 # Priority multiplier for high-priority tasks | |
| def __init__(self, | |
| gpu_monitor=None, | |
| default_strategy: BatchStrategy = BatchStrategy.ADAPTIVE, | |
| max_batch_size: int = MAX_BATCH_SIZE, | |
| batch_timeout: float = BATCH_TIMEOUT, | |
| memory_threshold: float = 0.8, | |
| enable_dynamic_sizing: bool = True, | |
| max_concurrent_batches: int = 2): | |
| """ | |
| Initialize batch optimizer. | |
| Args: | |
| gpu_monitor: GPU monitoring instance for resource tracking | |
| default_strategy: Default batching strategy | |
| max_batch_size: Maximum tasks per batch | |
| batch_timeout: Maximum wait time for batch assembly | |
| memory_threshold: GPU memory threshold for batch sizing | |
| enable_dynamic_sizing: Enable dynamic batch size adjustment | |
| max_concurrent_batches: Maximum concurrent batch processing | |
| """ | |
| self.gpu_monitor = gpu_monitor | |
| self.default_strategy = default_strategy | |
| self.max_batch_size = max_batch_size | |
| self.batch_timeout = batch_timeout | |
| self.memory_threshold = memory_threshold | |
| self.enable_dynamic_sizing = enable_dynamic_sizing | |
| self.max_concurrent_batches = max_concurrent_batches | |
| # Task and batch management | |
| self.pending_tasks: deque[BatchTask] = deque() | |
| self.priority_queue: List[Tuple[float, BatchTask]] = [] # Priority heap | |
| self.active_batches: Dict[str, BatchRequest] = {} | |
| self.completed_batches: deque[BatchResult] = deque(maxlen=100) | |
| # Resource tracking | |
| self.gpu_state = GPUResourceState(0.0, 0.0, 0.0, 0.0, []) | |
| self.current_memory_usage = 0.0 | |
| self.model_memory_estimates = defaultdict(lambda: 1000.0) # MB per model | |
| # Performance metrics | |
| self.total_tasks_processed = 0 | |
| self.total_batches_processed = 0 | |
| self.average_batch_time = 0.0 | |
| self.memory_efficiency_history = deque(maxlen=50) | |
| self.throughput_history = deque(maxlen=50) | |
| # Processing control | |
| self.is_processing = False | |
| self.processor_task: Optional[asyncio.Task] = None | |
| self.batch_semaphore = asyncio.Semaphore(max_concurrent_batches) | |
| # Thread pool for CPU-intensive operations | |
| self.thread_pool = ThreadPoolExecutor(max_workers=4) | |
| logger.info(f"ZeroGPU Batch Optimizer initialized - Strategy: {default_strategy.value}, " | |
| f"Max Batch Size: {max_batch_size}") | |
| async def start_processing(self): | |
| """Start the batch processing engine.""" | |
| if self.is_processing: | |
| return | |
| self.is_processing = True | |
| self.processor_task = asyncio.create_task(self._batch_processing_loop()) | |
| logger.info("Batch processing started") | |
| async def stop_processing(self): | |
| """Stop the batch processing engine.""" | |
| self.is_processing = False | |
| if self.processor_task: | |
| self.processor_task.cancel() | |
| try: | |
| await self.processor_task | |
| except asyncio.CancelledError: | |
| pass | |
| self.thread_pool.shutdown(wait=True) | |
| logger.info("Batch processing stopped") | |
| async def submit_task(self, | |
| task_id: str, | |
| agent_id: str, | |
| agent_type: str, | |
| prompt: str, | |
| priority: AgentPriority = AgentPriority.NORMAL, | |
| estimated_tokens: int = 100, | |
| **kwargs) -> str: | |
| """ | |
| Submit a task for batch processing. | |
| Args: | |
| task_id: Unique task identifier | |
| agent_id: Agent submitting the task | |
| agent_type: Type of agent (research, analysis, synthesis, critic) | |
| prompt: Task prompt/input | |
| priority: Task priority level | |
| estimated_tokens: Estimated token count for resource planning | |
| **kwargs: Additional task parameters | |
| Returns: | |
| Task ID for tracking | |
| """ | |
| task = BatchTask( | |
| task_id=task_id, | |
| agent_id=agent_id, | |
| agent_type=agent_type, | |
| prompt=prompt, | |
| priority=priority, | |
| estimated_tokens=estimated_tokens, | |
| **kwargs | |
| ) | |
| # Add to appropriate queue based on priority | |
| if priority in [AgentPriority.HIGH, AgentPriority.CRITICAL]: | |
| priority_score = priority.value * self.PRIORITY_BOOST_FACTOR | |
| heapq.heappush(self.priority_queue, (-priority_score, task)) | |
| else: | |
| self.pending_tasks.append(task) | |
| logger.debug(f"Task {task_id} submitted for agent {agent_id} with priority {priority.value}") | |
| return task_id | |
| async def _batch_processing_loop(self): | |
| """Main batch processing loop.""" | |
| while self.is_processing: | |
| try: | |
| # Update GPU resource state | |
| await self._update_gpu_state() | |
| # Assemble batch if conditions are met | |
| batch = await self._assemble_batch() | |
| if batch and len(batch.tasks) > 0: | |
| # Process batch asynchronously | |
| asyncio.create_task(self._process_batch(batch)) | |
| # Small delay to prevent busy waiting | |
| await asyncio.sleep(0.1) | |
| except Exception as e: | |
| logger.error(f"Error in batch processing loop: {e}") | |
| await asyncio.sleep(1.0) | |
| async def _update_gpu_state(self): | |
| """Update current GPU resource state.""" | |
| if self.gpu_monitor: | |
| try: | |
| status = self.gpu_monitor.get_resource_status() | |
| gpu_info = status.get("gpu", {}) | |
| self.gpu_state = GPUResourceState( | |
| memory_used_mb=gpu_info.get("memory_mb", {}).get("reserved", 0.0), | |
| memory_total_mb=gpu_info.get("memory_mb", {}).get("total", 16000.0), # Default assumption | |
| utilization_percent=gpu_info.get("utilization_percent", 0.0), | |
| temperature=0.0, # Not typically available | |
| active_models=status.get("active", {}).get("model_list", []) | |
| ) | |
| except Exception as e: | |
| logger.warning(f"Failed to update GPU state: {e}") | |
| async def _assemble_batch(self) -> Optional[BatchRequest]: | |
| """Assemble tasks into an optimal batch.""" | |
| if not self.pending_tasks and not self.priority_queue: | |
| return None | |
| # Start with high-priority tasks | |
| selected_tasks = [] | |
| total_tokens = 0 | |
| # Add priority tasks first | |
| while self.priority_queue and len(selected_tasks) < self.max_batch_size: | |
| _, task = heapq.heappop(self.priority_queue) | |
| if self._can_add_to_batch(task, selected_tasks, total_tokens): | |
| selected_tasks.append(task) | |
| total_tokens += task.estimated_tokens | |
| # Fill remaining slots with normal priority tasks | |
| while self.pending_tasks and len(selected_tasks) < self.max_batch_size: | |
| task = self.pending_tasks.popleft() | |
| if self._can_add_to_batch(task, selected_tasks, total_tokens): | |
| selected_tasks.append(task) | |
| total_tokens += task.estimated_tokens | |
| else: | |
| # Return task to queue if it doesn't fit | |
| self.pending_tasks.appendleft(task) | |
| break | |
| # Create batch if we have tasks | |
| if selected_tasks: | |
| return await self._create_batch_request(selected_tasks, total_tokens) | |
| return None | |
| def _can_add_to_batch(self, task: BatchTask, current_tasks: List[BatchTask], current_tokens: int) -> bool: | |
| """Check if task can be added to current batch.""" | |
| # Check batch size limit | |
| if len(current_tasks) >= self.max_batch_size: | |
| return False | |
| # Check memory constraints | |
| estimated_memory = self._estimate_memory_usage(current_tasks + [task]) | |
| available_memory = self.gpu_state.memory_total_mb * self.memory_threshold | |
| if estimated_memory > available_memory: | |
| return False | |
| # Check token budget | |
| total_tokens = current_tokens + task.estimated_tokens | |
| if total_tokens > 8000: # Conservative token limit for batch | |
| return False | |
| # Check model compatibility (prefer same or compatible models) | |
| if current_tasks: | |
| current_types = set(t.agent_type for t in current_tasks) | |
| if len(current_types) > 2: # Limit model diversity in batch | |
| return False | |
| return True | |
| def _estimate_memory_usage(self, tasks: List[BatchTask]) -> float: | |
| """Estimate GPU memory usage for a batch of tasks.""" | |
| # Base memory overhead | |
| base_memory = 500.0 # MB | |
| # Model memory | |
| unique_models = set() | |
| for task in tasks: | |
| model_id = task.model_preference or f"default_{task.agent_type}" | |
| unique_models.add(model_id) | |
| model_memory = sum(self.model_memory_estimates[model] for model in unique_models) | |
| # Task processing memory (proportional to tokens) | |
| total_tokens = sum(task.estimated_tokens for task in tasks) | |
| task_memory = total_tokens * 0.1 # 0.1 MB per token estimate | |
| # Batch processing overhead | |
| batch_overhead = len(tasks) * 50.0 # 50 MB per task in batch | |
| return base_memory + model_memory + task_memory + batch_overhead | |
| async def _create_batch_request(self, tasks: List[BatchTask], total_tokens: int) -> BatchRequest: | |
| """Create a batch request from selected tasks.""" | |
| batch_id = f"batch_{int(time.time() * 1000)}" | |
| # Calculate priority score (average of task priorities) | |
| avg_priority = sum(task.priority.value for task in tasks) / len(tasks) | |
| # Estimate memory requirements | |
| estimated_memory = self._estimate_memory_usage(tasks) | |
| # Determine strategy | |
| strategy = await self._select_batch_strategy(tasks, estimated_memory) | |
| return BatchRequest( | |
| batch_id=batch_id, | |
| tasks=tasks, | |
| total_estimated_tokens=total_tokens, | |
| max_memory_mb=estimated_memory, | |
| strategy=strategy, | |
| priority_score=avg_priority | |
| ) | |
| async def _select_batch_strategy(self, tasks: List[BatchTask], estimated_memory: float) -> BatchStrategy: | |
| """Select optimal batch processing strategy.""" | |
| if self.default_strategy != BatchStrategy.ADAPTIVE: | |
| return self.default_strategy | |
| # Adaptive strategy selection | |
| gpu_memory_ratio = estimated_memory / self.gpu_state.memory_total_mb | |
| gpu_utilization = self.gpu_state.utilization_percent / 100.0 | |
| # High memory usage -> memory optimized | |
| if gpu_memory_ratio > 0.7: | |
| return BatchStrategy.MEMORY_OPTIMIZED | |
| # High priority tasks -> latency optimized | |
| if any(task.priority == AgentPriority.CRITICAL for task in tasks): | |
| return BatchStrategy.LATENCY_OPTIMIZED | |
| # High GPU utilization -> throughput optimized | |
| if gpu_utilization > 0.6: | |
| return BatchStrategy.THROUGHPUT_OPTIMIZED | |
| # Default to memory optimized for ZeroGPU | |
| return BatchStrategy.MEMORY_OPTIMIZED | |
| async def _process_batch(self, batch: BatchRequest): | |
| """Process a batch of tasks.""" | |
| async with self.batch_semaphore: | |
| start_time = time.time() | |
| batch_id = batch.batch_id | |
| logger.info(f"Processing batch {batch_id} with {len(batch.tasks)} tasks " | |
| f"(strategy: {batch.strategy.value})") | |
| try: | |
| # Add to active batches | |
| self.active_batches[batch_id] = batch | |
| # Apply pre-processing optimizations | |
| await self._optimize_batch_for_strategy(batch) | |
| # Process tasks based on strategy | |
| if batch.strategy == BatchStrategy.MEMORY_OPTIMIZED: | |
| result = await self._process_memory_optimized(batch) | |
| elif batch.strategy == BatchStrategy.THROUGHPUT_OPTIMIZED: | |
| result = await self._process_throughput_optimized(batch) | |
| elif batch.strategy == BatchStrategy.LATENCY_OPTIMIZED: | |
| result = await self._process_latency_optimized(batch) | |
| else: | |
| result = await self._process_default(batch) | |
| # Record performance metrics | |
| processing_time = time.time() - start_time | |
| await self._record_batch_performance(batch, result, processing_time) | |
| logger.info(f"Batch {batch_id} completed in {processing_time:.2f}s " | |
| f"(success rate: {result.success_rate:.1%})") | |
| except Exception as e: | |
| logger.error(f"Batch {batch_id} processing failed: {e}") | |
| # Create error result | |
| result = BatchResult( | |
| batch_id=batch_id, | |
| task_results={}, | |
| processing_time=time.time() - start_time, | |
| gpu_memory_used=0.0, | |
| tokens_processed=0, | |
| success_rate=0.0, | |
| errors=[{"error": str(e), "timestamp": time.time()}] | |
| ) | |
| finally: | |
| # Clean up | |
| self.active_batches.pop(batch_id, None) | |
| self.completed_batches.append(result) | |
| # Trigger cleanup if memory is high | |
| if self.gpu_state.memory_used_mb > self.gpu_state.memory_total_mb * 0.8: | |
| await self._cleanup_gpu_memory() | |
| async def _optimize_batch_for_strategy(self, batch: BatchRequest): | |
| """Apply strategy-specific optimizations.""" | |
| if batch.strategy == BatchStrategy.MEMORY_OPTIMIZED: | |
| # Sort tasks by estimated memory usage (smallest first) | |
| batch.tasks.sort(key=lambda t: t.estimated_tokens) | |
| elif batch.strategy == BatchStrategy.LATENCY_OPTIMIZED: | |
| # Sort by priority (highest first) | |
| batch.tasks.sort(key=lambda t: t.priority.value, reverse=True) | |
| elif batch.strategy == BatchStrategy.THROUGHPUT_OPTIMIZED: | |
| # Group by agent type for model efficiency | |
| batch.tasks.sort(key=lambda t: t.agent_type) | |
| async def _process_memory_optimized(self, batch: BatchRequest) -> BatchResult: | |
| """Process batch with memory optimization priority.""" | |
| results = {} | |
| total_tokens = 0 | |
| successful_tasks = 0 | |
| errors = [] | |
| # Process tasks sequentially to minimize memory usage | |
| for task in batch.tasks: | |
| try: | |
| # Check memory before processing | |
| if self.gpu_state.memory_used_mb > self.gpu_state.memory_total_mb * 0.9: | |
| await self._cleanup_gpu_memory() | |
| # Process single task | |
| result = await self._process_single_task(task) | |
| results[task.task_id] = result | |
| total_tokens += task.estimated_tokens | |
| successful_tasks += 1 | |
| # Clear intermediate results to save memory | |
| if hasattr(result, 'intermediate_data'): | |
| delattr(result, 'intermediate_data') | |
| except Exception as e: | |
| logger.error(f"Task {task.task_id} failed: {e}") | |
| errors.append({ | |
| "task_id": task.task_id, | |
| "error": str(e), | |
| "timestamp": time.time() | |
| }) | |
| return BatchResult( | |
| batch_id=batch.batch_id, | |
| task_results=results, | |
| processing_time=0.0, # Will be set by caller | |
| gpu_memory_used=self.gpu_state.memory_used_mb, | |
| tokens_processed=total_tokens, | |
| success_rate=successful_tasks / len(batch.tasks), | |
| errors=errors | |
| ) | |
| async def _process_throughput_optimized(self, batch: BatchRequest) -> BatchResult: | |
| """Process batch with throughput optimization priority.""" | |
| results = {} | |
| total_tokens = 0 | |
| successful_tasks = 0 | |
| errors = [] | |
| # Group tasks by agent type for parallel processing | |
| task_groups = defaultdict(list) | |
| for task in batch.tasks: | |
| task_groups[task.agent_type].append(task) | |
| # Process groups concurrently | |
| group_tasks = [] | |
| for agent_type, tasks in task_groups.items(): | |
| group_task = asyncio.create_task( | |
| self._process_task_group(tasks, f"group_{agent_type}") | |
| ) | |
| group_tasks.append(group_task) | |
| # Await all groups | |
| group_results = await asyncio.gather(*group_tasks, return_exceptions=True) | |
| # Aggregate results | |
| for group_result in group_results: | |
| if isinstance(group_result, Exception): | |
| errors.append({ | |
| "error": str(group_result), | |
| "timestamp": time.time() | |
| }) | |
| else: | |
| results.update(group_result.get("results", {})) | |
| total_tokens += group_result.get("tokens", 0) | |
| successful_tasks += group_result.get("successful", 0) | |
| return BatchResult( | |
| batch_id=batch.batch_id, | |
| task_results=results, | |
| processing_time=0.0, | |
| gpu_memory_used=self.gpu_state.memory_used_mb, | |
| tokens_processed=total_tokens, | |
| success_rate=successful_tasks / len(batch.tasks), | |
| errors=errors | |
| ) | |
| async def _process_latency_optimized(self, batch: BatchRequest) -> BatchResult: | |
| """Process batch with latency optimization priority.""" | |
| # Process highest priority tasks first, with immediate execution | |
| results = {} | |
| total_tokens = 0 | |
| successful_tasks = 0 | |
| errors = [] | |
| # Sort by priority | |
| priority_tasks = sorted(batch.tasks, key=lambda t: t.priority.value, reverse=True) | |
| # Process with adaptive concurrency based on priority | |
| for i, task in enumerate(priority_tasks): | |
| try: | |
| # Higher priority tasks get immediate processing | |
| if task.priority in [AgentPriority.CRITICAL, AgentPriority.HIGH]: | |
| result = await self._process_single_task(task) | |
| else: | |
| # Lower priority tasks can be batched | |
| remaining_tasks = priority_tasks[i:] | |
| if len(remaining_tasks) > 1: | |
| group_result = await self._process_task_group(remaining_tasks[:3], "low_priority_group") | |
| results.update(group_result.get("results", {})) | |
| total_tokens += group_result.get("tokens", 0) | |
| successful_tasks += group_result.get("successful", 0) | |
| break | |
| else: | |
| result = await self._process_single_task(task) | |
| results[task.task_id] = result | |
| total_tokens += task.estimated_tokens | |
| successful_tasks += 1 | |
| except Exception as e: | |
| logger.error(f"Task {task.task_id} failed: {e}") | |
| errors.append({ | |
| "task_id": task.task_id, | |
| "error": str(e), | |
| "timestamp": time.time() | |
| }) | |
| return BatchResult( | |
| batch_id=batch.batch_id, | |
| task_results=results, | |
| processing_time=0.0, | |
| gpu_memory_used=self.gpu_state.memory_used_mb, | |
| tokens_processed=total_tokens, | |
| success_rate=successful_tasks / len(batch.tasks), | |
| errors=errors | |
| ) | |
| async def _process_default(self, batch: BatchRequest) -> BatchResult: | |
| """Default batch processing strategy.""" | |
| return await self._process_memory_optimized(batch) | |
| async def _process_single_task(self, task: BatchTask) -> Dict[str, Any]: | |
| """Process a single task.""" | |
| # This would integrate with the actual LLM client | |
| # For now, return a mock result | |
| await asyncio.sleep(0.1) # Simulate processing time | |
| return { | |
| "task_id": task.task_id, | |
| "content": f"Processed result for {task.agent_type} agent", | |
| "tokens_used": task.estimated_tokens, | |
| "success": True, | |
| "timestamp": time.time() | |
| } | |
| async def _process_task_group(self, tasks: List[BatchTask], group_name: str) -> Dict[str, Any]: | |
| """Process a group of tasks concurrently.""" | |
| results = {} | |
| total_tokens = 0 | |
| successful_tasks = 0 | |
| # Process tasks concurrently within the group | |
| task_futures = [self._process_single_task(task) for task in tasks] | |
| task_results = await asyncio.gather(*task_futures, return_exceptions=True) | |
| for task, result in zip(tasks, task_results): | |
| if isinstance(result, Exception): | |
| logger.error(f"Task {task.task_id} in group {group_name} failed: {result}") | |
| else: | |
| results[task.task_id] = result | |
| total_tokens += task.estimated_tokens | |
| successful_tasks += 1 | |
| return { | |
| "results": results, | |
| "tokens": total_tokens, | |
| "successful": successful_tasks | |
| } | |
| async def _cleanup_gpu_memory(self): | |
| """Clean up GPU memory.""" | |
| if hasattr(self, 'gpu_monitor') and self.gpu_monitor: | |
| # Use monitor's cleanup if available | |
| logger.info("Triggering GPU memory cleanup") | |
| # Would call gpu_monitor._emergency_memory_cleanup() | |
| else: | |
| # Basic cleanup | |
| gc.collect() | |
| async def _record_batch_performance(self, batch: BatchRequest, result: BatchResult, processing_time: float): | |
| """Record batch performance metrics.""" | |
| result.processing_time = processing_time | |
| # Update global statistics | |
| self.total_tasks_processed += len(batch.tasks) | |
| self.total_batches_processed += 1 | |
| # Update average batch time | |
| self.average_batch_time = ( | |
| (self.average_batch_time * (self.total_batches_processed - 1) + processing_time) | |
| / self.total_batches_processed | |
| ) | |
| # Memory efficiency | |
| memory_efficiency = result.tokens_processed / max(1.0, result.gpu_memory_used) | |
| self.memory_efficiency_history.append(memory_efficiency) | |
| # Throughput | |
| throughput = len(batch.tasks) / processing_time | |
| self.throughput_history.append(throughput) | |
| # Log performance summary | |
| logger.info(f"Batch performance - Tasks: {len(batch.tasks)}, " | |
| f"Time: {processing_time:.2f}s, " | |
| f"Memory: {result.gpu_memory_used:.0f}MB, " | |
| f"Throughput: {throughput:.1f} tasks/s") | |
| def get_performance_statistics(self) -> Dict[str, Any]: | |
| """Get comprehensive performance statistics.""" | |
| return { | |
| "total_tasks_processed": self.total_tasks_processed, | |
| "total_batches_processed": self.total_batches_processed, | |
| "average_batch_time": self.average_batch_time, | |
| "current_queue_size": len(self.pending_tasks) + len(self.priority_queue), | |
| "active_batches": len(self.active_batches), | |
| "memory_efficiency": { | |
| "current": self.memory_efficiency_history[-1] if self.memory_efficiency_history else 0.0, | |
| "average": sum(self.memory_efficiency_history) / max(1, len(self.memory_efficiency_history)), | |
| "history_size": len(self.memory_efficiency_history) | |
| }, | |
| "throughput": { | |
| "current": self.throughput_history[-1] if self.throughput_history else 0.0, | |
| "average": sum(self.throughput_history) / max(1, len(self.throughput_history)), | |
| "peak": max(self.throughput_history) if self.throughput_history else 0.0 | |
| }, | |
| "gpu_state": { | |
| "memory_used_mb": self.gpu_state.memory_used_mb, | |
| "memory_total_mb": self.gpu_state.memory_total_mb, | |
| "memory_utilization": self.gpu_state.memory_used_mb / max(1.0, self.gpu_state.memory_total_mb), | |
| "gpu_utilization": self.gpu_state.utilization_percent | |
| } | |
| } | |
| async def get_queue_status(self) -> Dict[str, Any]: | |
| """Get current queue status.""" | |
| priority_tasks = len(self.priority_queue) | |
| normal_tasks = len(self.pending_tasks) | |
| return { | |
| "total_queued": priority_tasks + normal_tasks, | |
| "priority_queue": priority_tasks, | |
| "normal_queue": normal_tasks, | |
| "active_batches": len(self.active_batches), | |
| "processing": self.is_processing, | |
| "estimated_wait_time": self._estimate_wait_time() | |
| } | |
| def _estimate_wait_time(self) -> float: | |
| """Estimate wait time for new tasks.""" | |
| if not self.throughput_history: | |
| return 10.0 # Default estimate | |
| avg_throughput = sum(self.throughput_history) / len(self.throughput_history) | |
| total_queued = len(self.pending_tasks) + len(self.priority_queue) | |
| return total_queued / max(0.1, avg_throughput) | |
| # Utility functions | |
| def create_zerogpu_batch_optimizer(gpu_monitor=None, | |
| strategy: BatchStrategy = BatchStrategy.ADAPTIVE) -> ZeroGPUBatchOptimizer: | |
| """Create a ZeroGPU batch optimizer with optimal settings.""" | |
| return ZeroGPUBatchOptimizer( | |
| gpu_monitor=gpu_monitor, | |
| default_strategy=strategy, | |
| max_batch_size=6, # Conservative for ZeroGPU | |
| batch_timeout=3.0, # Quick batching for responsiveness | |
| memory_threshold=0.75, # Conservative memory usage | |
| enable_dynamic_sizing=True, | |
| max_concurrent_batches=2 # Limit concurrency for ZeroGPU | |
| ) | |
| # Export main classes and functions | |
| __all__ = [ | |
| 'ZeroGPUBatchOptimizer', | |
| 'BatchTask', | |
| 'BatchRequest', | |
| 'BatchResult', | |
| 'BatchStrategy', | |
| 'AgentPriority', | |
| 'GPUResourceState', | |
| 'create_zerogpu_batch_optimizer' | |
| ] |