worker-universal / shared /fault_tolerance.py
Bc-AI's picture
Upload folder using huggingface_hub
af68acb verified
"""
Fault Tolerance System for SACCP Network
Handles node failures, retries, task redistribution, and network resilience
"""
import time
import threading
from typing import Dict, List, Optional, Any
from datetime import datetime, timedelta
from enum import Enum
import random
import asyncio
class FailureType(Enum):
NODE_DISCONNECTED = "node_disconnected"
TASK_TIMEOUT = "task_timeout"
HEARTBEAT_FAILED = "heartbeat_failed"
NETWORK_ERROR = "network_error"
RESOURCE_EXHAUSTED = "resource_exhausted"
class RecoveryStrategy(Enum):
RETRY = "retry"
REDISTRIBUTE = "redistribute"
FAIL_OVER = "fail_over"
DROP_TASK = "drop_task"
class NodeStatus(Enum):
HEALTHY = "healthy"
UNRESPONSIVE = "unresponsive"
FAILED = "failed"
RECOVERING = "recovering"
class FaultToleranceManager:
"""
Manages fault tolerance across the SACCP network
"""
def __init__(self):
self.nodes: Dict[str, Dict[str, Any]] = {}
self.active_tasks: Dict[str, Dict[str, Any]] = {}
self.failed_tasks: List[Dict[str, Any]] = []
self.failure_history: List[Dict[str, Any]] = []
self.recovery_queue: List[Dict[str, Any]] = []
self.lock = threading.Lock()
# Configuration
self.heartbeat_interval = 30 # seconds
self.heartbeat_timeout = 60 # seconds
self.max_retries = 3
self.retry_delay = 5 # seconds
self.network_monitoring_enabled = True
# Start monitoring thread
self.monitoring_thread = threading.Thread(target=self._network_monitoring_loop, daemon=True)
self.monitoring_thread.start()
def register_node(self, node_id: str, node_type: str, capabilities: Dict[str, Any]) -> bool:
"""Register a node with the fault tolerance system"""
with self.lock:
self.nodes[node_id] = {
"node_id": node_id,
"node_type": node_type,
"capabilities": capabilities,
"status": NodeStatus.HEALTHY,
"last_heartbeat": time.time(),
"failure_count": 0,
"consecutive_failures": 0,
"tasks_processed": 0,
"tasks_failed": 0
}
return True
def remove_node(self, node_id: str) -> bool:
"""Remove a node from the system (when permanently offline)"""
with self.lock:
if node_id in self.nodes:
del self.nodes[node_id]
# Reassign tasks assigned to this node
self._reassign_node_tasks(node_id)
return True
return False
def heartbeat(self, node_id: str) -> bool:
"""Process heartbeat from a node"""
with self.lock:
if node_id not in self.nodes:
return False
node = self.nodes[node_id]
node["last_heartbeat"] = time.time()
node["status"] = NodeStatus.HEALTHY
node["consecutive_failures"] = 0 # Reset on successful heartbeat
return True
def record_task_assignment(self, task_id: str, node_id: str, task_details: Dict[str, Any]) -> bool:
"""Record that a task was assigned to a node"""
with self.lock:
self.active_tasks[task_id] = {
"task_id": task_id,
"node_id": node_id,
"assignment_time": time.time(),
"task_details": task_details,
"retry_count": 0,
"status": "assigned"
}
return True
def record_task_completion(self, task_id: str, node_id: str) -> bool:
"""Record successful task completion"""
with self.lock:
if task_id in self.active_tasks:
del self.active_tasks[task_id]
# Update node statistics
if node_id in self.nodes:
self.nodes[node_id]["tasks_processed"] += 1
return True
return False
def record_task_failure(self, task_id: str, node_id: str, failure_type: FailureType,
error_details: Optional[str] = None) -> RecoveryStrategy:
"""Record task failure and determine recovery strategy"""
with self.lock:
# Record the failure
failure_record = {
"task_id": task_id,
"node_id": node_id,
"failure_type": failure_type.value,
"error_details": error_details,
"timestamp": time.time()
}
self.failure_history.append(failure_record)
# Update node failure statistics
if node_id in self.nodes:
node = self.nodes[node_id]
node["tasks_failed"] += 1
node["failure_count"] += 1
node["consecutive_failures"] += 1
# Check if node should be marked as failed
if node["consecutive_failures"] >= 3: # 3 consecutive failures
node["status"] = NodeStatus.FAILED
# Get the task record
task_record = self.active_tasks.get(task_id)
if not task_record:
return RecoveryStrategy.DROP_TASK
# Determine recovery strategy based on failure type and retry count
if task_record["retry_count"] < self.max_retries:
# For timeout failures, try redistributing to a different node
if failure_type == FailureType.TASK_TIMEOUT:
return RecoveryStrategy.REDISTRIBUTE
# For node disconnections, try fail-over to another node
elif failure_type == FailureType.NODE_DISCONNECTED:
return RecoveryStrategy.FAIL_OVER
# For other failures, try retrying on the same node
else:
return RecoveryStrategy.RETRY
else:
# Max retries reached, drop the task
if task_id in self.active_tasks:
del self.active_tasks[task_id]
self.failed_tasks.append(task_record)
return RecoveryStrategy.DROP_TASK
def _reassign_node_tasks(self, failed_node_id: str):
"""Reassign tasks from a failed node to healthy nodes"""
tasks_to_reassign = []
with self.lock:
# Find tasks assigned to the failed node
for task_id, task_record in self.active_tasks.items():
if task_record["node_id"] == failed_node_id:
tasks_to_reassign.append(task_id)
# Reassign each task
for task_id in tasks_to_reassign:
self._attempt_task_redistribution(task_id)
def _attempt_task_redistribution(self, task_id: str) -> bool:
"""Attempt to redistribute a task to a different node"""
with self.lock:
if task_id not in self.active_tasks:
return False
task_record = self.active_tasks[task_id]
# Find a healthy alternative node
new_node = self._find_alternative_node(task_record["task_details"])
if not new_node:
# No alternative node available, retry later
return False
# Update task assignment
old_node_id = task_record["node_id"]
task_record["node_id"] = new_node["node_id"]
task_record["retry_count"] += 1
task_record["assignment_time"] = time.time()
# Update node stats
if old_node_id in self.nodes:
self.nodes[old_node_id]["tasks_failed"] += 1
if new_node["node_id"] in self.nodes:
self.nodes[new_node["node_id"]]["tasks_processed"] += 1
return True
def _find_alternative_node(self, task_requirements: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""Find an alternative healthy node that can handle the task"""
with self.lock:
for node_id, node in self.nodes.items():
if node["status"] == NodeStatus.HEALTHY:
# Check if node meets task requirements
if self._node_meets_requirements(node, task_requirements):
return node
return None
def _node_meets_requirements(self, node: Dict[str, Any], requirements: Dict[str, Any]) -> bool:
"""Check if a node meets specific requirements for a task"""
# Check if node has required resources
capabilities = node["capabilities"]
# Example: Check if the node has enough memory for the task
required_memory = requirements.get("memory_required", 0)
available_memory = capabilities.get("memory_gb", 0)
if required_memory > available_memory:
return False
# Check if node type is compatible with task type
required_node_types = requirements.get("compatible_node_types", [])
if required_node_types and node["node_type"] not in required_node_types:
return False
return True
def _network_monitoring_loop(self):
"""Background thread to monitor network health and handle failures"""
while self.network_monitoring_enabled:
time.sleep(1) # Check every second
# Check for node timeouts
if int(time.time()) % 10 == 0: # Every 10 seconds
self._check_node_health()
# Process recovery queue
self._process_recovery_queue()
def _check_node_health(self):
"""Check for nodes that have missed heartbeats"""
current_time = time.time()
with self.lock:
for node_id, node in self.nodes.items():
time_since_heartbeat = current_time - node["last_heartbeat"]
if time_since_heartbeat > self.heartbeat_timeout:
# Node is unresponsive
if node["status"] != NodeStatus.FAILED:
node["status"] = NodeStatus.UNRESPONSIVE
# Record the failure
failure_record = {
"node_id": node_id,
"failure_type": FailureType.HEARTBEAT_FAILED.value,
"timestamp": current_time,
"details": f"Node {node_id} missed heartbeat for {time_since_heartbeat}s"
}
self.failure_history.append(failure_record)
# Add to recovery queue
self.recovery_queue.append({
"type": "node_recovery",
"node_id": node_id,
"action": "reconnect",
"timestamp": current_time + self.retry_delay
})
def _process_recovery_queue(self):
"""Process items in the recovery queue"""
current_time = time.time()
items_to_process = []
with self.lock:
for item in self.recovery_queue[:]: # Copy list to avoid modification during iteration
if current_time >= item["timestamp"]:
items_to_process.append(item)
# Process each item outside the lock to avoid blocking
for item in items_to_process:
self._execute_recovery_action(item)
# Remove processed item from queue
with self.lock:
if item in self.recovery_queue:
self.recovery_queue.remove(item)
def _execute_recovery_action(self, recovery_item: Dict[str, Any]):
"""Execute a specific recovery action"""
action_type = recovery_item["type"]
if action_type == "node_recovery":
node_id = recovery_item["node_id"]
if recovery_item["action"] == "reconnect":
# Try to reconnect by marking node as healthy
# In a real implementation, this would try to reestablish connection
with self.lock:
if node_id in self.nodes:
node = self.nodes[node_id]
if node["status"] in [NodeStatus.UNRESPONSIVE, NodeStatus.FAILED]:
# In a real system, we would attempt reconnection
# For simulation, we'll just reset to healthy
node["status"] = NodeStatus.HEALTHY
node["consecutive_failures"] = 0
elif action_type == "task_redistribution":
task_id = recovery_item["task_id"]
# Attempt to redistribute the task
self._attempt_task_redistribution(task_id)
def get_network_health(self) -> Dict[str, Any]:
"""Get overall network health statistics"""
with self.lock:
healthy_nodes = 0
unresponsive_nodes = 0
failed_nodes = 0
for node in self.nodes.values():
if node["status"] == NodeStatus.HEALTHY:
healthy_nodes += 1
elif node["status"] == NodeStatus.UNRESPONSIVE:
unresponsive_nodes += 1
elif node["status"] == NodeStatus.FAILED:
failed_nodes += 1
total_tasks = len(self.active_tasks) + len(self.failed_tasks)
return {
"total_nodes": len(self.nodes),
"healthy_nodes": healthy_nodes,
"unresponsive_nodes": unresponsive_nodes,
"failed_nodes": failed_nodes,
"active_tasks": len(self.active_tasks),
"failed_tasks": len(self.failed_tasks),
"total_tasks_processed": sum(node["tasks_processed"] for node in self.nodes.values()),
"total_tasks_failed": sum(node["tasks_failed"] for node in self.nodes.values()),
"recovery_attempts": len(self.recovery_queue)
}
def get_failed_nodes(self) -> List[Dict[str, Any]]:
"""Get list of currently failed nodes"""
with self.lock:
failed = []
for node in self.nodes.values():
if node["status"] == NodeStatus.FAILED:
failed.append(node)
return failed
# Global instance
fault_tolerance_manager = FaultToleranceManager()