petter2025 commited on
Commit
c2524ca
·
verified ·
1 Parent(s): 79db0ae

Create infra_simulator.py

Browse files
Files changed (1) hide show
  1. infra_simulator.py +52 -12
infra_simulator.py CHANGED
@@ -1,34 +1,74 @@
1
- # infra_simulator.py
 
 
 
2
  import random
3
  from typing import Dict, List, Optional
4
 
5
  class InfraSimulator:
6
- """Simulates a dynamic infrastructure. Maintains per‑session state."""
 
7
  def __init__(self):
8
  self.reset()
9
-
10
  def reset(self):
 
11
  self.components = {
12
- "switch-1": {"type": "switch", "status": "up", "connections": ["server-1", "server-2"]},
13
- "server-1": {"type": "server", "status": "up", "connections": ["switch-1"], "services": ["db"]},
14
- "server-2": {"type": "server", "status": "up", "connections": ["switch-1"], "services": ["web"]},
15
- "service-db": {"type": "service", "status": "up", "runs_on": "server-1"},
16
- "service-web": {"type": "service", "status": "up", "runs_on": "server-2"},
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
17
  }
18
  self.fault_mode: Optional[str] = None
19
-
20
  def set_fault(self, fault: str):
21
- self.reset() # start fresh
 
 
 
 
22
  self.fault_mode = fault
 
23
  if fault == "switch_down":
24
  self.components["switch-1"]["status"] = "down"
 
 
 
 
25
  elif fault == "server_overload":
26
  self.components["server-1"]["status"] = "overloaded"
 
27
  elif fault == "cascade":
28
  self.components["switch-1"]["status"] = "down"
29
  self.components["server-1"]["status"] = "down"
 
30
  self.components["service-db"]["status"] = "down"
31
- # etc.
32
-
 
33
  def read_state(self) -> Dict:
 
34
  return self.components
 
1
+ """
2
+ Infrastructure simulator for data center topology with fault injection.
3
+ Maintains per‑session state (to be stored in gr.State).
4
+ """
5
  import random
6
  from typing import Dict, List, Optional
7
 
8
  class InfraSimulator:
9
+ """Simulates a dynamic infrastructure with servers, switches, and services."""
10
+
11
  def __init__(self):
12
  self.reset()
13
+
14
  def reset(self):
15
+ """Reset to healthy state."""
16
  self.components = {
17
+ "switch-1": {
18
+ "type": "switch",
19
+ "status": "up",
20
+ "connections": ["server-1", "server-2"]
21
+ },
22
+ "server-1": {
23
+ "type": "server",
24
+ "status": "up",
25
+ "connections": ["switch-1"],
26
+ "services": ["db"]
27
+ },
28
+ "server-2": {
29
+ "type": "server",
30
+ "status": "up",
31
+ "connections": ["switch-1"],
32
+ "services": ["web"]
33
+ },
34
+ "service-db": {
35
+ "type": "service",
36
+ "status": "up",
37
+ "runs_on": "server-1"
38
+ },
39
+ "service-web": {
40
+ "type": "service",
41
+ "status": "up",
42
+ "runs_on": "server-2"
43
+ }
44
  }
45
  self.fault_mode: Optional[str] = None
46
+
47
  def set_fault(self, fault: str):
48
+ """
49
+ Inject a fault.
50
+ Options: 'none', 'switch_down', 'server_overload', 'cascade'
51
+ """
52
+ self.reset() # start from healthy
53
  self.fault_mode = fault
54
+
55
  if fault == "switch_down":
56
  self.components["switch-1"]["status"] = "down"
57
+ # Propagate to connected servers (optional)
58
+ for conn in self.components["switch-1"]["connections"]:
59
+ if conn in self.components:
60
+ self.components[conn]["status"] = "degraded"
61
  elif fault == "server_overload":
62
  self.components["server-1"]["status"] = "overloaded"
63
+ self.components["service-db"]["status"] = "degraded"
64
  elif fault == "cascade":
65
  self.components["switch-1"]["status"] = "down"
66
  self.components["server-1"]["status"] = "down"
67
+ self.components["server-2"]["status"] = "down"
68
  self.components["service-db"]["status"] = "down"
69
+ self.components["service-web"]["status"] = "down"
70
+ # 'none' does nothing
71
+
72
  def read_state(self) -> Dict:
73
+ """Return current component states."""
74
  return self.components