File size: 2,339 Bytes
05a686e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
"""
NodeFailureCondition - Simulates node outages and scheduling disruption
"""

from typing import Dict, List, Any, Optional
from ..coenv_environment import World


class NodeFailureCondition:
    """Injects node failures into the cluster"""
    
    def __init__(self, world: World, config: Dict[str, Any]):
        self.world = world
        self.config = config
        
    def inject(self, target_node: Optional[str] = None, failure_rate: Optional[float] = None):
        """
        Inject node failures
        
        Args:
            target_node: Specific node to target (None for random)
            failure_rate: Probability of node failing (0.0-1.0)
        """
        if failure_rate is None:
            failure_rate = self.config.get("node_failure_rate", 0.3)
        else:
            failure_rate = float(failure_rate)
            
        nodes = self.world.get_nodes()
        
        if target_node:
            target_nodes = [n for n in nodes if n.name == target_node]
        else:
            target_nodes = [n for n in nodes if failure_rate is not None and float(self.world.rng.random()) < failure_rate]
            
        for node in target_nodes:
            patch = {
                "status": "NotReady",
                "cpu_usage": 0.0,
                "mem_usage": 0.0
            }
            self.world.apply_patch("node", node.name, patch)
            
            pods_on_node = [p for p in self.world.get_pods() if p.node == node.name]
            for pod in pods_on_node:
                patch = {
                    "node": None,
                    "status": "Pending"
                }
                self.world.apply_patch("pod", pod.name, patch)
                
            self._add_node_failure_event(node.name)
    
    def _add_node_failure_event(self, node_name: str):
        """Add a node failure event"""
        from ..models import ClusterEvent
        from datetime import datetime
        
        event = ClusterEvent(
            event_id=f"event-nodefail-{int(self.world.rng.integers(1000, 10000))}",
            timestamp=datetime.now().isoformat(),
            type="Warning",
            reason="NodeNotReady",
            message=f"Node {node_name} status is now: NodeNotReady",
            involved_object=node_name
        )
        self.world.events.append(event)