File size: 2,550 Bytes
c2524ca 9de5703 c2524ca 9de5703 c2524ca 9de5703 c2524ca 9de5703 c2524ca 9de5703 c2524ca 9de5703 c2524ca 9de5703 c2524ca 9de5703 c2524ca 9de5703 c2524ca 9de5703 c2524ca 9de5703 c2524ca 9de5703 c2524ca 9de5703 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 | """
Infrastructure simulator for data center topology with fault injection.
Maintains per‑session state (to be stored in gr.State).
"""
import random
from typing import Dict, List, Optional
class InfraSimulator:
"""Simulates a dynamic infrastructure with servers, switches, and services."""
def __init__(self):
self.reset()
def reset(self):
"""Reset to healthy state."""
self.components = {
"switch-1": {
"type": "switch",
"status": "up",
"connections": ["server-1", "server-2"]
},
"server-1": {
"type": "server",
"status": "up",
"connections": ["switch-1"],
"services": ["db"]
},
"server-2": {
"type": "server",
"status": "up",
"connections": ["switch-1"],
"services": ["web"]
},
"service-db": {
"type": "service",
"status": "up",
"runs_on": "server-1"
},
"service-web": {
"type": "service",
"status": "up",
"runs_on": "server-2"
}
}
self.fault_mode: Optional[str] = None
def set_fault(self, fault: str):
"""
Inject a fault.
Options: 'none', 'switch_down', 'server_overload', 'cascade'
"""
self.reset() # start from healthy
self.fault_mode = fault
if fault == "switch_down":
self.components["switch-1"]["status"] = "down"
# Propagate to connected servers (optional)
for conn in self.components["switch-1"]["connections"]:
if conn in self.components:
self.components[conn]["status"] = "degraded"
elif fault == "server_overload":
self.components["server-1"]["status"] = "overloaded"
self.components["service-db"]["status"] = "degraded"
elif fault == "cascade":
self.components["switch-1"]["status"] = "down"
self.components["server-1"]["status"] = "down"
self.components["server-2"]["status"] = "down"
self.components["service-db"]["status"] = "down"
self.components["service-web"]["status"] = "down"
# 'none' does nothing
def read_state(self) -> Dict:
"""Return current component states."""
return self.components |