Spaces:
Runtime error
Runtime error
| """ | |
| Dynamic Load Balancer for SACCP Network | |
| Distributes tasks across different node types based on availability, capacity, and performance | |
| """ | |
| import time | |
| import heapq | |
| from typing import Dict, List, Optional, Any, Tuple | |
| from enum import Enum | |
| from dataclasses import dataclass | |
| from datetime import datetime, timedelta | |
| import threading | |
| import random | |
| class TaskPriority(Enum): | |
| LOW = 1 | |
| NORMAL = 2 | |
| HIGH = 3 | |
| CRITICAL = 4 | |
| class NodeType(Enum): | |
| HEAD = "head" | |
| RAM = "ram" | |
| DISK = "disk" | |
| COMPUTE = "compute" | |
| GPU = "gpu" | |
| TPU = "tpu" | |
| NPU = "npu" | |
| class Task: | |
| """Represents a task to be distributed""" | |
| task_id: str | |
| task_type: str | |
| priority: TaskPriority | |
| resource_requirements: Dict[str, Any] # CPU, memory, etc. | |
| estimated_duration: float # in seconds | |
| created_at: float | |
| assigned_node: Optional[str] = None | |
| assigned_at: Optional[float] = None | |
| class Node: | |
| """Represents a node in the network""" | |
| node_id: str | |
| node_type: NodeType | |
| capabilities: Dict[str, Any] # CPU, memory, etc. | |
| current_load: float | |
| tasks_queued: int | |
| tasks_completed: int | |
| tasks_failed: int | |
| last_heartbeat: float | |
| performance_score: float # 0.0-1.0 based on historical performance | |
| is_available: bool = True | |
| max_concurrent_tasks: int = 10 | |
| current_tasks: int = 0 | |
| class LoadBalancer: | |
| """ | |
| Dynamic load balancer that distributes tasks across node types | |
| """ | |
| def __init__(self): | |
| self.nodes: Dict[str, Node] = {} | |
| self.task_queue: List[Tuple[int, float, Task]] = [] # Priority queue: (-priority, creation_time, task) | |
| self.assigned_tasks: Dict[str, str] = {} # task_id -> node_id | |
| self.node_stats: Dict[str, Dict[str, Any]] = {} | |
| self.lock = threading.Lock() | |
| # Configuration | |
| self.heartbeat_timeout = 90 # seconds | |
| self.task_timeout = 300 # seconds | |
| self.load_balancing_algorithm = "weighted_least_connections" | |
| def register_node(self, node_id: str, node_type: NodeType, capabilities: Dict[str, Any]) -> bool: | |
| """Register a node with the load balancer""" | |
| with self.lock: | |
| self.nodes[node_id] = Node( | |
| node_id=node_id, | |
| node_type=node_type, | |
| capabilities=capabilities, | |
| current_load=0.0, | |
| tasks_queued=0, | |
| tasks_completed=0, | |
| tasks_failed=0, | |
| last_heartbeat=time.time(), | |
| performance_score=0.8, # Default performance score | |
| max_concurrent_tasks=capabilities.get("max_concurrent_tasks", 10) | |
| ) | |
| # Initialize node stats | |
| self.node_stats[node_id] = { | |
| "avg_task_duration": 0, | |
| "success_rate": 1.0, | |
| "response_time_avg": 0.1 | |
| } | |
| return True | |
| def heartbeat_node(self, node_id: str) -> bool: | |
| """Update node heartbeat""" | |
| with self.lock: | |
| if node_id in self.nodes: | |
| self.nodes[node_id].last_heartbeat = time.time() | |
| self.nodes[node_id].is_available = True | |
| return True | |
| return False | |
| def heartbeat_batch_nodes(self, node_ids: List[str]) -> int: | |
| """Update heartbeats for multiple nodes""" | |
| count = 0 | |
| for node_id in node_ids: | |
| if self.heartbeat_node(node_id): | |
| count += 1 | |
| return count | |
| def deregister_node(self, node_id: str) -> bool: | |
| """Remove a node from the load balancer""" | |
| with self.lock: | |
| if node_id in self.nodes: | |
| # Move assigned tasks to queue for reassignment | |
| self._reassign_node_tasks(node_id) | |
| del self.nodes[node_id] | |
| if node_id in self.node_stats: | |
| del self.node_stats[node_id] | |
| return True | |
| return False | |
| def submit_task(self, task: Task) -> Optional[str]: | |
| """Submit a task for distribution""" | |
| with self.lock: | |
| # Add task to priority queue | |
| # Priority: Higher priority first, then oldest first | |
| priority_key = (-task.priority.value, task.created_at) | |
| heapq.heappush(self.task_queue, priority_key + (task,)) | |
| # Try to assign the task immediately | |
| node_id = self._find_suitable_node(task) | |
| if node_id: | |
| assigned = self._assign_task_to_node(task.task_id, node_id) | |
| if assigned: | |
| return node_id | |
| return None # Task queued but not yet assigned | |
| def get_task_assignment(self, task_id: str) -> Optional[str]: | |
| """Get the node assigned to a task""" | |
| with self.lock: | |
| return self.assigned_tasks.get(task_id) | |
| def complete_task(self, task_id: str, node_id: str, success: bool = True, duration: float = 0) -> bool: | |
| """Mark a task as completed""" | |
| with self.lock: | |
| # Update node stats | |
| if node_id in self.nodes: | |
| node = self.nodes[node_id] | |
| if success: | |
| node.tasks_completed += 1 | |
| node.current_tasks -= 1 | |
| else: | |
| node.tasks_failed += 1 | |
| node.current_tasks -= 1 | |
| # Update task queue count | |
| node.tasks_queued = max(0, node.tasks_queued - 1) | |
| # Update node stats for performance calculation | |
| if node_id in self.node_stats: | |
| stats = self.node_stats[node_id] | |
| if success and duration > 0: | |
| # Update average task duration | |
| if stats["avg_task_duration"] == 0: | |
| stats["avg_task_duration"] = duration | |
| else: | |
| stats["avg_task_duration"] = ( | |
| stats["avg_task_duration"] * 0.7 + duration * 0.3 | |
| ) | |
| # Update success rate | |
| total_tasks = node.tasks_completed + node.tasks_failed | |
| if total_tasks > 0: | |
| stats["success_rate"] = node.tasks_completed / total_tasks | |
| # Update node performance score | |
| self._update_node_performance_score(node_id) | |
| # Remove from assigned tasks | |
| if task_id in self.assigned_tasks: | |
| del self.assigned_tasks[task_id] | |
| # Try to assign new tasks to available nodes | |
| self._attempt_task_assignments() | |
| return True | |
| def _find_suitable_node(self, task: Task) -> Optional[str]: | |
| """Find the most suitable node for a task""" | |
| with self.lock: | |
| # Get all available nodes | |
| available_nodes = [ | |
| node for node in self.nodes.values() | |
| if self._is_node_suitable(node, task) | |
| ] | |
| if not available_nodes: | |
| return None | |
| # Sort nodes by the selected algorithm | |
| if self.load_balancing_algorithm == "weighted_least_connections": | |
| # Prioritize nodes with fewer connections and higher performance | |
| available_nodes.sort(key=lambda n: ( | |
| n.current_tasks / n.max_concurrent_tasks, # Load factor | |
| -n.performance_score # Higher performance first | |
| )) | |
| elif self.load_balancing_algorithm == "weighted_response_time": | |
| # Prioritize nodes with better historical response time | |
| available_nodes.sort(key=lambda n: ( | |
| -n.performance_score, # Higher performance first | |
| n.current_tasks / n.max_concurrent_tasks # Lower load first | |
| )) | |
| elif self.load_balancing_algorithm == "node_type_priority": | |
| # Prioritize specific node type for the task | |
| preferred_type = task.resource_requirements.get("preferred_node_type") | |
| available_nodes.sort(key=lambda n: ( | |
| 0 if n.node_type.value == preferred_type else 1, # Preferred type first | |
| n.current_tasks / n.max_concurrent_tasks, # Then lower load | |
| -n.performance_score # Then higher performance | |
| )) | |
| else: | |
| # Default: least connections with performance consideration | |
| available_nodes.sort(key=lambda n: ( | |
| n.current_tasks / n.max_concurrent_tasks, | |
| -n.performance_score | |
| )) | |
| # Return the best node (first in sorted list) | |
| if available_nodes: | |
| return available_nodes[0].node_id | |
| return None | |
| def _is_node_suitable(self, node: Node, task: Task) -> bool: | |
| """Check if a node is suitable for a task""" | |
| if not node.is_available: | |
| return False | |
| # Check if node has timed out | |
| if time.time() - node.last_heartbeat > self.heartbeat_timeout: | |
| node.is_available = False | |
| return False | |
| # Check node type compatibility | |
| required_types = task.resource_requirements.get("compatible_node_types", []) | |
| if required_types and node.node_type.value not in required_types: | |
| return False | |
| # Check resource requirements | |
| reqs = task.resource_requirements | |
| caps = node.capabilities | |
| # Check memory requirement | |
| if reqs.get("memory_required", 0) > caps.get("memory_gb", 0): | |
| return False | |
| # Check GPU requirement | |
| if reqs.get("needs_gpu", False) and not caps.get("gpu_available", False): | |
| return False | |
| # Check if node has reached max concurrent tasks | |
| if node.current_tasks >= node.max_concurrent_tasks: | |
| return False | |
| # Check if node has capacity based on current load | |
| if node.current_load > 0.9: # Node is over 90% loaded | |
| return False | |
| return True | |
| def _assign_task_to_node(self, task_id: str, node_id: str) -> bool: | |
| """Assign a task to a specific node""" | |
| with self.lock: | |
| if node_id not in self.nodes: | |
| return False | |
| node = self.nodes[node_id] | |
| task = self._get_task_by_id(task_id) | |
| if not task: | |
| return False | |
| # Update node statistics | |
| node.current_tasks += 1 | |
| node.tasks_queued += 1 | |
| # Update assigned tasks | |
| self.assigned_tasks[task_id] = node_id | |
| task.assigned_node = node_id | |
| task.assigned_at = time.time() | |
| # Update node load (estimated based on task duration) | |
| estimated_load = min(0.2, task.estimated_duration / 3600.0) # Cap at 20% for long tasks | |
| node.current_load = min(1.0, node.current_load + estimated_load) | |
| return True | |
| def _get_task_by_id(self, task_id: str) -> Optional[Task]: | |
| """Get a task by ID from the queue""" | |
| # Find in priority queue | |
| for _, _, task in self.task_queue: | |
| if task.task_id == task_id: | |
| return task | |
| return None | |
| def _reassign_node_tasks(self, node_id: str): | |
| """Reassign tasks from a failed node""" | |
| tasks_to_reassign = [] | |
| # Find tasks assigned to this node | |
| for task_id, assigned_node_id in self.assigned_tasks.items(): | |
| if assigned_node_id == node_id: | |
| tasks_to_reassign.append(task_id) | |
| # Try to reassign each task | |
| for task_id in tasks_to_reassign: | |
| task = self._get_task_by_id(task_id) | |
| if task: | |
| # Put task back in queue for reassignment | |
| self.submit_task(task) | |
| if task_id in self.assigned_tasks: | |
| del self.assigned_tasks[task_id] | |
| def _attempt_task_assignments(self): | |
| """Try to assign queued tasks to available nodes""" | |
| with self.lock: | |
| # Make a copy of the queue to iterate without modification issues | |
| tasks_to_retry = [] | |
| while self.task_queue: | |
| priority, creation_time, task = heapq.heappop(self.task_queue) | |
| # Check if task is expired | |
| if time.time() - task.created_at > self.task_timeout: | |
| continue # Skip expired tasks | |
| # Try to assign the task | |
| node_id = self._find_suitable_node(task) | |
| if node_id: | |
| if self._assign_task_to_node(task.task_id, node_id): | |
| # Successfully assigned, don't add back to queue | |
| continue | |
| else: | |
| # Assignment failed, add back to retry list | |
| tasks_to_retry.append((priority, creation_time, task)) | |
| else: | |
| # No suitable node found, add back to retry list | |
| tasks_to_retry.append((priority, creation_time, task)) | |
| # Put unassigned tasks back in the queue | |
| for item in tasks_to_retry: | |
| heapq.heappush(self.task_queue, item) | |
| def _update_node_performance_score(self, node_id: str): | |
| """Update the performance score for a node based on its stats""" | |
| if node_id not in self.nodes or node_id not in self.node_stats: | |
| return | |
| node = self.nodes[node_id] | |
| stats = self.node_stats[node_id] | |
| # Calculate performance score based on multiple factors | |
| total_tasks = node.tasks_completed + node.tasks_failed | |
| success_rate = stats["success_rate"] | |
| # Base score on success rate (60%), response time (25%), and load (15%) | |
| success_weight = 0.6 | |
| response_weight = 0.25 | |
| load_weight = 0.15 | |
| # Success rate contribution (0.0 to 1.0) | |
| success_score = success_rate | |
| # Response time contribution (better response = higher score) | |
| avg_duration = stats["avg_task_duration"] | |
| response_score = 1.0 / (1.0 + avg_duration / 100.0) # Normalize | |
| # Load contribution (avoid overloading high-performing nodes) | |
| load_score = 1.0 - min(1.0, node.current_load) | |
| # Calculate final score | |
| performance_score = ( | |
| success_score * success_weight + | |
| response_score * response_weight + | |
| load_score * load_weight | |
| ) | |
| node.performance_score = min(1.0, max(0.0, performance_score)) | |
| def get_node_loads(self) -> Dict[str, float]: | |
| """Get current load for each node""" | |
| with self.lock: | |
| return {node_id: node.current_load for node_id, node in self.nodes.items()} | |
| def get_node_status(self) -> List[Dict[str, Any]]: | |
| """Get comprehensive status of all nodes""" | |
| with self.lock: | |
| status_list = [] | |
| for node_id, node in self.nodes.items(): | |
| # Check if node is still active | |
| is_active = time.time() - node.last_heartbeat < self.heartbeat_timeout | |
| node.is_available = is_active | |
| status_list.append({ | |
| "node_id": node.node_id, | |
| "node_type": node.node_type.value, | |
| "is_available": is_active, | |
| "current_load": node.current_load, | |
| "current_tasks": node.current_tasks, | |
| "tasks_queued": node.tasks_queued, | |
| "tasks_completed": node.tasks_completed, | |
| "tasks_failed": node.tasks_failed, | |
| "performance_score": node.performance_score, | |
| "max_concurrent_tasks": node.max_concurrent_tasks, | |
| "capabilities": node.capabilities, | |
| "last_heartbeat": node.last_heartbeat | |
| }) | |
| return status_list | |
| def get_task_queue_status(self) -> Dict[str, Any]: | |
| """Get status of the task queue""" | |
| with self.lock: | |
| return { | |
| "total_queued_tasks": len(self.task_queue), | |
| "priority_distribution": { | |
| "critical": len([t for _, _, t in self.task_queue if t.priority == TaskPriority.CRITICAL]), | |
| "high": len([t for _, _, t in self.task_queue if t.priority == TaskPriority.HIGH]), | |
| "normal": len([t for _, _, t in self.task_queue if t.priority == TaskPriority.NORMAL]), | |
| "low": len([t for _, _, t in self.task_queue if t.priority == TaskPriority.LOW]) | |
| }, | |
| "average_wait_time": self._calculate_avg_wait_time() | |
| } | |
| def _calculate_avg_wait_time(self) -> float: | |
| """Calculate average wait time for tasks in queue""" | |
| if not self.task_queue: | |
| return 0 | |
| current_time = time.time() | |
| total_wait = sum(current_time - task.created_at for _, _, task in self.task_queue) | |
| return total_wait / len(self.task_queue) if self.task_queue else 0 | |
| # Global instance | |
| load_balancer = LoadBalancer() |