""" Fault Tolerance System for SACCP Network Handles node failures, retries, task redistribution, and network resilience """ import time import threading from typing import Dict, List, Optional, Any from datetime import datetime, timedelta from enum import Enum import random import asyncio class FailureType(Enum): NODE_DISCONNECTED = "node_disconnected" TASK_TIMEOUT = "task_timeout" HEARTBEAT_FAILED = "heartbeat_failed" NETWORK_ERROR = "network_error" RESOURCE_EXHAUSTED = "resource_exhausted" class RecoveryStrategy(Enum): RETRY = "retry" REDISTRIBUTE = "redistribute" FAIL_OVER = "fail_over" DROP_TASK = "drop_task" class NodeStatus(Enum): HEALTHY = "healthy" UNRESPONSIVE = "unresponsive" FAILED = "failed" RECOVERING = "recovering" class FaultToleranceManager: """ Manages fault tolerance across the SACCP network """ def __init__(self): self.nodes: Dict[str, Dict[str, Any]] = {} self.active_tasks: Dict[str, Dict[str, Any]] = {} self.failed_tasks: List[Dict[str, Any]] = [] self.failure_history: List[Dict[str, Any]] = [] self.recovery_queue: List[Dict[str, Any]] = [] self.lock = threading.Lock() # Configuration self.heartbeat_interval = 30 # seconds self.heartbeat_timeout = 60 # seconds self.max_retries = 3 self.retry_delay = 5 # seconds self.network_monitoring_enabled = True # Start monitoring thread self.monitoring_thread = threading.Thread(target=self._network_monitoring_loop, daemon=True) self.monitoring_thread.start() def register_node(self, node_id: str, node_type: str, capabilities: Dict[str, Any]) -> bool: """Register a node with the fault tolerance system""" with self.lock: self.nodes[node_id] = { "node_id": node_id, "node_type": node_type, "capabilities": capabilities, "status": NodeStatus.HEALTHY, "last_heartbeat": time.time(), "failure_count": 0, "consecutive_failures": 0, "tasks_processed": 0, "tasks_failed": 0 } return True def remove_node(self, node_id: str) -> bool: """Remove a node from the system (when permanently offline)""" with self.lock: if node_id in self.nodes: del self.nodes[node_id] # Reassign tasks assigned to this node self._reassign_node_tasks(node_id) return True return False def heartbeat(self, node_id: str) -> bool: """Process heartbeat from a node""" with self.lock: if node_id not in self.nodes: return False node = self.nodes[node_id] node["last_heartbeat"] = time.time() node["status"] = NodeStatus.HEALTHY node["consecutive_failures"] = 0 # Reset on successful heartbeat return True def record_task_assignment(self, task_id: str, node_id: str, task_details: Dict[str, Any]) -> bool: """Record that a task was assigned to a node""" with self.lock: self.active_tasks[task_id] = { "task_id": task_id, "node_id": node_id, "assignment_time": time.time(), "task_details": task_details, "retry_count": 0, "status": "assigned" } return True def record_task_completion(self, task_id: str, node_id: str) -> bool: """Record successful task completion""" with self.lock: if task_id in self.active_tasks: del self.active_tasks[task_id] # Update node statistics if node_id in self.nodes: self.nodes[node_id]["tasks_processed"] += 1 return True return False def record_task_failure(self, task_id: str, node_id: str, failure_type: FailureType, error_details: Optional[str] = None) -> RecoveryStrategy: """Record task failure and determine recovery strategy""" with self.lock: # Record the failure failure_record = { "task_id": task_id, "node_id": node_id, "failure_type": failure_type.value, "error_details": error_details, "timestamp": time.time() } self.failure_history.append(failure_record) # Update node failure statistics if node_id in self.nodes: node = self.nodes[node_id] node["tasks_failed"] += 1 node["failure_count"] += 1 node["consecutive_failures"] += 1 # Check if node should be marked as failed if node["consecutive_failures"] >= 3: # 3 consecutive failures node["status"] = NodeStatus.FAILED # Get the task record task_record = self.active_tasks.get(task_id) if not task_record: return RecoveryStrategy.DROP_TASK # Determine recovery strategy based on failure type and retry count if task_record["retry_count"] < self.max_retries: # For timeout failures, try redistributing to a different node if failure_type == FailureType.TASK_TIMEOUT: return RecoveryStrategy.REDISTRIBUTE # For node disconnections, try fail-over to another node elif failure_type == FailureType.NODE_DISCONNECTED: return RecoveryStrategy.FAIL_OVER # For other failures, try retrying on the same node else: return RecoveryStrategy.RETRY else: # Max retries reached, drop the task if task_id in self.active_tasks: del self.active_tasks[task_id] self.failed_tasks.append(task_record) return RecoveryStrategy.DROP_TASK def _reassign_node_tasks(self, failed_node_id: str): """Reassign tasks from a failed node to healthy nodes""" tasks_to_reassign = [] with self.lock: # Find tasks assigned to the failed node for task_id, task_record in self.active_tasks.items(): if task_record["node_id"] == failed_node_id: tasks_to_reassign.append(task_id) # Reassign each task for task_id in tasks_to_reassign: self._attempt_task_redistribution(task_id) def _attempt_task_redistribution(self, task_id: str) -> bool: """Attempt to redistribute a task to a different node""" with self.lock: if task_id not in self.active_tasks: return False task_record = self.active_tasks[task_id] # Find a healthy alternative node new_node = self._find_alternative_node(task_record["task_details"]) if not new_node: # No alternative node available, retry later return False # Update task assignment old_node_id = task_record["node_id"] task_record["node_id"] = new_node["node_id"] task_record["retry_count"] += 1 task_record["assignment_time"] = time.time() # Update node stats if old_node_id in self.nodes: self.nodes[old_node_id]["tasks_failed"] += 1 if new_node["node_id"] in self.nodes: self.nodes[new_node["node_id"]]["tasks_processed"] += 1 return True def _find_alternative_node(self, task_requirements: Dict[str, Any]) -> Optional[Dict[str, Any]]: """Find an alternative healthy node that can handle the task""" with self.lock: for node_id, node in self.nodes.items(): if node["status"] == NodeStatus.HEALTHY: # Check if node meets task requirements if self._node_meets_requirements(node, task_requirements): return node return None def _node_meets_requirements(self, node: Dict[str, Any], requirements: Dict[str, Any]) -> bool: """Check if a node meets specific requirements for a task""" # Check if node has required resources capabilities = node["capabilities"] # Example: Check if the node has enough memory for the task required_memory = requirements.get("memory_required", 0) available_memory = capabilities.get("memory_gb", 0) if required_memory > available_memory: return False # Check if node type is compatible with task type required_node_types = requirements.get("compatible_node_types", []) if required_node_types and node["node_type"] not in required_node_types: return False return True def _network_monitoring_loop(self): """Background thread to monitor network health and handle failures""" while self.network_monitoring_enabled: time.sleep(1) # Check every second # Check for node timeouts if int(time.time()) % 10 == 0: # Every 10 seconds self._check_node_health() # Process recovery queue self._process_recovery_queue() def _check_node_health(self): """Check for nodes that have missed heartbeats""" current_time = time.time() with self.lock: for node_id, node in self.nodes.items(): time_since_heartbeat = current_time - node["last_heartbeat"] if time_since_heartbeat > self.heartbeat_timeout: # Node is unresponsive if node["status"] != NodeStatus.FAILED: node["status"] = NodeStatus.UNRESPONSIVE # Record the failure failure_record = { "node_id": node_id, "failure_type": FailureType.HEARTBEAT_FAILED.value, "timestamp": current_time, "details": f"Node {node_id} missed heartbeat for {time_since_heartbeat}s" } self.failure_history.append(failure_record) # Add to recovery queue self.recovery_queue.append({ "type": "node_recovery", "node_id": node_id, "action": "reconnect", "timestamp": current_time + self.retry_delay }) def _process_recovery_queue(self): """Process items in the recovery queue""" current_time = time.time() items_to_process = [] with self.lock: for item in self.recovery_queue[:]: # Copy list to avoid modification during iteration if current_time >= item["timestamp"]: items_to_process.append(item) # Process each item outside the lock to avoid blocking for item in items_to_process: self._execute_recovery_action(item) # Remove processed item from queue with self.lock: if item in self.recovery_queue: self.recovery_queue.remove(item) def _execute_recovery_action(self, recovery_item: Dict[str, Any]): """Execute a specific recovery action""" action_type = recovery_item["type"] if action_type == "node_recovery": node_id = recovery_item["node_id"] if recovery_item["action"] == "reconnect": # Try to reconnect by marking node as healthy # In a real implementation, this would try to reestablish connection with self.lock: if node_id in self.nodes: node = self.nodes[node_id] if node["status"] in [NodeStatus.UNRESPONSIVE, NodeStatus.FAILED]: # In a real system, we would attempt reconnection # For simulation, we'll just reset to healthy node["status"] = NodeStatus.HEALTHY node["consecutive_failures"] = 0 elif action_type == "task_redistribution": task_id = recovery_item["task_id"] # Attempt to redistribute the task self._attempt_task_redistribution(task_id) def get_network_health(self) -> Dict[str, Any]: """Get overall network health statistics""" with self.lock: healthy_nodes = 0 unresponsive_nodes = 0 failed_nodes = 0 for node in self.nodes.values(): if node["status"] == NodeStatus.HEALTHY: healthy_nodes += 1 elif node["status"] == NodeStatus.UNRESPONSIVE: unresponsive_nodes += 1 elif node["status"] == NodeStatus.FAILED: failed_nodes += 1 total_tasks = len(self.active_tasks) + len(self.failed_tasks) return { "total_nodes": len(self.nodes), "healthy_nodes": healthy_nodes, "unresponsive_nodes": unresponsive_nodes, "failed_nodes": failed_nodes, "active_tasks": len(self.active_tasks), "failed_tasks": len(self.failed_tasks), "total_tasks_processed": sum(node["tasks_processed"] for node in self.nodes.values()), "total_tasks_failed": sum(node["tasks_failed"] for node in self.nodes.values()), "recovery_attempts": len(self.recovery_queue) } def get_failed_nodes(self) -> List[Dict[str, Any]]: """Get list of currently failed nodes""" with self.lock: failed = [] for node in self.nodes.values(): if node["status"] == NodeStatus.FAILED: failed.append(node) return failed # Global instance fault_tolerance_manager = FaultToleranceManager()