Spaces:
Runtime error
Runtime error
| """ | |
| Fault Tolerance System for SACCP Network | |
| Handles node failures, retries, task redistribution, and network resilience | |
| """ | |
| import time | |
| import threading | |
| from typing import Dict, List, Optional, Any | |
| from datetime import datetime, timedelta | |
| from enum import Enum | |
| import random | |
| import asyncio | |
| class FailureType(Enum): | |
| NODE_DISCONNECTED = "node_disconnected" | |
| TASK_TIMEOUT = "task_timeout" | |
| HEARTBEAT_FAILED = "heartbeat_failed" | |
| NETWORK_ERROR = "network_error" | |
| RESOURCE_EXHAUSTED = "resource_exhausted" | |
| class RecoveryStrategy(Enum): | |
| RETRY = "retry" | |
| REDISTRIBUTE = "redistribute" | |
| FAIL_OVER = "fail_over" | |
| DROP_TASK = "drop_task" | |
| class NodeStatus(Enum): | |
| HEALTHY = "healthy" | |
| UNRESPONSIVE = "unresponsive" | |
| FAILED = "failed" | |
| RECOVERING = "recovering" | |
| class FaultToleranceManager: | |
| """ | |
| Manages fault tolerance across the SACCP network | |
| """ | |
| def __init__(self): | |
| self.nodes: Dict[str, Dict[str, Any]] = {} | |
| self.active_tasks: Dict[str, Dict[str, Any]] = {} | |
| self.failed_tasks: List[Dict[str, Any]] = [] | |
| self.failure_history: List[Dict[str, Any]] = [] | |
| self.recovery_queue: List[Dict[str, Any]] = [] | |
| self.lock = threading.Lock() | |
| # Configuration | |
| self.heartbeat_interval = 30 # seconds | |
| self.heartbeat_timeout = 60 # seconds | |
| self.max_retries = 3 | |
| self.retry_delay = 5 # seconds | |
| self.network_monitoring_enabled = True | |
| # Start monitoring thread | |
| self.monitoring_thread = threading.Thread(target=self._network_monitoring_loop, daemon=True) | |
| self.monitoring_thread.start() | |
| def register_node(self, node_id: str, node_type: str, capabilities: Dict[str, Any]) -> bool: | |
| """Register a node with the fault tolerance system""" | |
| with self.lock: | |
| self.nodes[node_id] = { | |
| "node_id": node_id, | |
| "node_type": node_type, | |
| "capabilities": capabilities, | |
| "status": NodeStatus.HEALTHY, | |
| "last_heartbeat": time.time(), | |
| "failure_count": 0, | |
| "consecutive_failures": 0, | |
| "tasks_processed": 0, | |
| "tasks_failed": 0 | |
| } | |
| return True | |
| def remove_node(self, node_id: str) -> bool: | |
| """Remove a node from the system (when permanently offline)""" | |
| with self.lock: | |
| if node_id in self.nodes: | |
| del self.nodes[node_id] | |
| # Reassign tasks assigned to this node | |
| self._reassign_node_tasks(node_id) | |
| return True | |
| return False | |
| def heartbeat(self, node_id: str) -> bool: | |
| """Process heartbeat from a node""" | |
| with self.lock: | |
| if node_id not in self.nodes: | |
| return False | |
| node = self.nodes[node_id] | |
| node["last_heartbeat"] = time.time() | |
| node["status"] = NodeStatus.HEALTHY | |
| node["consecutive_failures"] = 0 # Reset on successful heartbeat | |
| return True | |
| def record_task_assignment(self, task_id: str, node_id: str, task_details: Dict[str, Any]) -> bool: | |
| """Record that a task was assigned to a node""" | |
| with self.lock: | |
| self.active_tasks[task_id] = { | |
| "task_id": task_id, | |
| "node_id": node_id, | |
| "assignment_time": time.time(), | |
| "task_details": task_details, | |
| "retry_count": 0, | |
| "status": "assigned" | |
| } | |
| return True | |
| def record_task_completion(self, task_id: str, node_id: str) -> bool: | |
| """Record successful task completion""" | |
| with self.lock: | |
| if task_id in self.active_tasks: | |
| del self.active_tasks[task_id] | |
| # Update node statistics | |
| if node_id in self.nodes: | |
| self.nodes[node_id]["tasks_processed"] += 1 | |
| return True | |
| return False | |
| def record_task_failure(self, task_id: str, node_id: str, failure_type: FailureType, | |
| error_details: Optional[str] = None) -> RecoveryStrategy: | |
| """Record task failure and determine recovery strategy""" | |
| with self.lock: | |
| # Record the failure | |
| failure_record = { | |
| "task_id": task_id, | |
| "node_id": node_id, | |
| "failure_type": failure_type.value, | |
| "error_details": error_details, | |
| "timestamp": time.time() | |
| } | |
| self.failure_history.append(failure_record) | |
| # Update node failure statistics | |
| if node_id in self.nodes: | |
| node = self.nodes[node_id] | |
| node["tasks_failed"] += 1 | |
| node["failure_count"] += 1 | |
| node["consecutive_failures"] += 1 | |
| # Check if node should be marked as failed | |
| if node["consecutive_failures"] >= 3: # 3 consecutive failures | |
| node["status"] = NodeStatus.FAILED | |
| # Get the task record | |
| task_record = self.active_tasks.get(task_id) | |
| if not task_record: | |
| return RecoveryStrategy.DROP_TASK | |
| # Determine recovery strategy based on failure type and retry count | |
| if task_record["retry_count"] < self.max_retries: | |
| # For timeout failures, try redistributing to a different node | |
| if failure_type == FailureType.TASK_TIMEOUT: | |
| return RecoveryStrategy.REDISTRIBUTE | |
| # For node disconnections, try fail-over to another node | |
| elif failure_type == FailureType.NODE_DISCONNECTED: | |
| return RecoveryStrategy.FAIL_OVER | |
| # For other failures, try retrying on the same node | |
| else: | |
| return RecoveryStrategy.RETRY | |
| else: | |
| # Max retries reached, drop the task | |
| if task_id in self.active_tasks: | |
| del self.active_tasks[task_id] | |
| self.failed_tasks.append(task_record) | |
| return RecoveryStrategy.DROP_TASK | |
| def _reassign_node_tasks(self, failed_node_id: str): | |
| """Reassign tasks from a failed node to healthy nodes""" | |
| tasks_to_reassign = [] | |
| with self.lock: | |
| # Find tasks assigned to the failed node | |
| for task_id, task_record in self.active_tasks.items(): | |
| if task_record["node_id"] == failed_node_id: | |
| tasks_to_reassign.append(task_id) | |
| # Reassign each task | |
| for task_id in tasks_to_reassign: | |
| self._attempt_task_redistribution(task_id) | |
| def _attempt_task_redistribution(self, task_id: str) -> bool: | |
| """Attempt to redistribute a task to a different node""" | |
| with self.lock: | |
| if task_id not in self.active_tasks: | |
| return False | |
| task_record = self.active_tasks[task_id] | |
| # Find a healthy alternative node | |
| new_node = self._find_alternative_node(task_record["task_details"]) | |
| if not new_node: | |
| # No alternative node available, retry later | |
| return False | |
| # Update task assignment | |
| old_node_id = task_record["node_id"] | |
| task_record["node_id"] = new_node["node_id"] | |
| task_record["retry_count"] += 1 | |
| task_record["assignment_time"] = time.time() | |
| # Update node stats | |
| if old_node_id in self.nodes: | |
| self.nodes[old_node_id]["tasks_failed"] += 1 | |
| if new_node["node_id"] in self.nodes: | |
| self.nodes[new_node["node_id"]]["tasks_processed"] += 1 | |
| return True | |
| def _find_alternative_node(self, task_requirements: Dict[str, Any]) -> Optional[Dict[str, Any]]: | |
| """Find an alternative healthy node that can handle the task""" | |
| with self.lock: | |
| for node_id, node in self.nodes.items(): | |
| if node["status"] == NodeStatus.HEALTHY: | |
| # Check if node meets task requirements | |
| if self._node_meets_requirements(node, task_requirements): | |
| return node | |
| return None | |
| def _node_meets_requirements(self, node: Dict[str, Any], requirements: Dict[str, Any]) -> bool: | |
| """Check if a node meets specific requirements for a task""" | |
| # Check if node has required resources | |
| capabilities = node["capabilities"] | |
| # Example: Check if the node has enough memory for the task | |
| required_memory = requirements.get("memory_required", 0) | |
| available_memory = capabilities.get("memory_gb", 0) | |
| if required_memory > available_memory: | |
| return False | |
| # Check if node type is compatible with task type | |
| required_node_types = requirements.get("compatible_node_types", []) | |
| if required_node_types and node["node_type"] not in required_node_types: | |
| return False | |
| return True | |
| def _network_monitoring_loop(self): | |
| """Background thread to monitor network health and handle failures""" | |
| while self.network_monitoring_enabled: | |
| time.sleep(1) # Check every second | |
| # Check for node timeouts | |
| if int(time.time()) % 10 == 0: # Every 10 seconds | |
| self._check_node_health() | |
| # Process recovery queue | |
| self._process_recovery_queue() | |
| def _check_node_health(self): | |
| """Check for nodes that have missed heartbeats""" | |
| current_time = time.time() | |
| with self.lock: | |
| for node_id, node in self.nodes.items(): | |
| time_since_heartbeat = current_time - node["last_heartbeat"] | |
| if time_since_heartbeat > self.heartbeat_timeout: | |
| # Node is unresponsive | |
| if node["status"] != NodeStatus.FAILED: | |
| node["status"] = NodeStatus.UNRESPONSIVE | |
| # Record the failure | |
| failure_record = { | |
| "node_id": node_id, | |
| "failure_type": FailureType.HEARTBEAT_FAILED.value, | |
| "timestamp": current_time, | |
| "details": f"Node {node_id} missed heartbeat for {time_since_heartbeat}s" | |
| } | |
| self.failure_history.append(failure_record) | |
| # Add to recovery queue | |
| self.recovery_queue.append({ | |
| "type": "node_recovery", | |
| "node_id": node_id, | |
| "action": "reconnect", | |
| "timestamp": current_time + self.retry_delay | |
| }) | |
| def _process_recovery_queue(self): | |
| """Process items in the recovery queue""" | |
| current_time = time.time() | |
| items_to_process = [] | |
| with self.lock: | |
| for item in self.recovery_queue[:]: # Copy list to avoid modification during iteration | |
| if current_time >= item["timestamp"]: | |
| items_to_process.append(item) | |
| # Process each item outside the lock to avoid blocking | |
| for item in items_to_process: | |
| self._execute_recovery_action(item) | |
| # Remove processed item from queue | |
| with self.lock: | |
| if item in self.recovery_queue: | |
| self.recovery_queue.remove(item) | |
| def _execute_recovery_action(self, recovery_item: Dict[str, Any]): | |
| """Execute a specific recovery action""" | |
| action_type = recovery_item["type"] | |
| if action_type == "node_recovery": | |
| node_id = recovery_item["node_id"] | |
| if recovery_item["action"] == "reconnect": | |
| # Try to reconnect by marking node as healthy | |
| # In a real implementation, this would try to reestablish connection | |
| with self.lock: | |
| if node_id in self.nodes: | |
| node = self.nodes[node_id] | |
| if node["status"] in [NodeStatus.UNRESPONSIVE, NodeStatus.FAILED]: | |
| # In a real system, we would attempt reconnection | |
| # For simulation, we'll just reset to healthy | |
| node["status"] = NodeStatus.HEALTHY | |
| node["consecutive_failures"] = 0 | |
| elif action_type == "task_redistribution": | |
| task_id = recovery_item["task_id"] | |
| # Attempt to redistribute the task | |
| self._attempt_task_redistribution(task_id) | |
| def get_network_health(self) -> Dict[str, Any]: | |
| """Get overall network health statistics""" | |
| with self.lock: | |
| healthy_nodes = 0 | |
| unresponsive_nodes = 0 | |
| failed_nodes = 0 | |
| for node in self.nodes.values(): | |
| if node["status"] == NodeStatus.HEALTHY: | |
| healthy_nodes += 1 | |
| elif node["status"] == NodeStatus.UNRESPONSIVE: | |
| unresponsive_nodes += 1 | |
| elif node["status"] == NodeStatus.FAILED: | |
| failed_nodes += 1 | |
| total_tasks = len(self.active_tasks) + len(self.failed_tasks) | |
| return { | |
| "total_nodes": len(self.nodes), | |
| "healthy_nodes": healthy_nodes, | |
| "unresponsive_nodes": unresponsive_nodes, | |
| "failed_nodes": failed_nodes, | |
| "active_tasks": len(self.active_tasks), | |
| "failed_tasks": len(self.failed_tasks), | |
| "total_tasks_processed": sum(node["tasks_processed"] for node in self.nodes.values()), | |
| "total_tasks_failed": sum(node["tasks_failed"] for node in self.nodes.values()), | |
| "recovery_attempts": len(self.recovery_queue) | |
| } | |
| def get_failed_nodes(self) -> List[Dict[str, Any]]: | |
| """Get list of currently failed nodes""" | |
| with self.lock: | |
| failed = [] | |
| for node in self.nodes.values(): | |
| if node["status"] == NodeStatus.FAILED: | |
| failed.append(node) | |
| return failed | |
| # Global instance | |
| fault_tolerance_manager = FaultToleranceManager() |