| |
| |
| |
| |
| |
|
|
| """ |
| AntiAtropos Core Simulation Physics. |
| |
| A discrete-time fluid-queue model simulating a 5-node microservice cluster. |
| Each node has stateful queues, capacities, and failure probabilities. |
| Dynamic traffic is injected per tick, and management actions shift capacity |
| and routing parameters. |
| """ |
|
|
| import random |
| from dataclasses import dataclass, field |
| from enum import Enum |
| from typing import Optional |
|
|
| |
| |
| |
|
|
| DEFAULT_CAPACITY: float = 3.0 |
| MAX_CAPACITY: float = 999.0 |
| MAX_SCALING_STEP: int = 3 |
| BOOT_DELAY_TICKS: int = 5 |
| BASE_LATENCY_MS: float = 20.0 |
| OVERLOAD_THRESHOLD: int = 80 |
| LATENCY_STEEPNESS: float = 2.0 |
| FATAL_FAIL_THRESHOLD: int = 200 |
| CASCADE_WINDOW_TICKS: int = 3 |
| CASCADE_QUEUE_MULTIPLIER: float = 1.2 |
| NODE_RECOVERY_TICKS: int = 20 |
| BACKPRESSURE_THRESHOLD: float = 60.0 |
| BACKPRESSURE_MAX_FACTOR: float = 0.4 |
|
|
| SENSOR_DROPOUT_PROB: float = 0.05 |
| NODE_FAILURE_PROB: float = 0.00 |
|
|
| |
| |
| |
| COST_PER_CAPACITY_UNIT_PER_HOUR: float = 0.05 |
| OVERPROVISION_COST_PER_UNIT: float = 1.0 |
|
|
| |
| |
| |
| |
| T1_INITIAL_LAMBDA: float = 86.0 |
| T1_RAMP_SLOPE: float = 0.5 |
| |
| T2_INITIAL_LAMBDA: float = 95.0 |
| T2_FAIL_TICK: int = 20 |
| T3_INITIAL_LAMBDA: float = 60.0 |
|
|
| |
| T3_SURGE_CYCLE: int = 60 |
| T3_SURGE_BASE_START: int = 30 |
| T3_SURGE_BASE_END: int = 40 |
| T3_SURGE_JITTER: int = 10 |
| T3_SURGE_MAGNITUDE: float = 60.0 |
|
|
| |
| |
| |
| |
| CRITICAL_NODES: list[str] = ["node-0"] |
|
|
| |
| |
| |
| |
| |
| VIP_NODE_WEIGHTS: dict[str, float] = { |
| "node-0": 2.0, |
| } |
|
|
| |
| |
| |
|
|
| |
| |
| CLUSTER_TOPOLOGY: dict[str, list[str]] = { |
| "node-0": ["node-1", "node-2"], |
| "node-1": [], |
| "node-2": ["node-3"], |
| "node-3": [], |
| "node-4": [], |
| } |
|
|
| |
| EXTERNAL_TRAFFIC_NODES: set[str] = {"node-0", "node-4"} |
|
|
| |
| |
| |
| |
| EXTERNAL_LAMBDA_FRACTION: float = 0.5 |
|
|
| |
| |
| DEFAULT_ROUTING_SPLIT: dict[str, dict[str, float]] = { |
| "node-0": {"node-1": 0.5, "node-2": 0.5}, |
| "node-2": {"node-3": 1.0}, |
| } |
|
|
| |
| |
| |
| _TOPOLOGICAL_ORDER: tuple[str, ...] = ("node-0", "node-4", "node-1", "node-2", "node-3") |
|
|
|
|
| class NodeStatus(str, Enum): |
| HEALTHY = "HEALTHY" |
| DEGRADED = "DEGRADED" |
| FAILED = "FAILED" |
|
|
|
|
| @dataclass |
| class NodeState: |
| node_id: str |
| status: NodeStatus = NodeStatus.HEALTHY |
| is_vip: bool = False |
| |
| |
| capacity: float = DEFAULT_CAPACITY |
| importance_weight: float = 1.0 |
| queue_depth: float = 0.0 |
| latency_ms: float = BASE_LATENCY_MS |
| incoming_request_rate: float = 0.0 |
| cpu_utilization: float = 0.0 |
| |
| |
| dropped_requests: float = 0.0 |
| shed_fraction: float = 0.0 |
| pending_capacity_queue: list[int] = field(default_factory=list) |
| recovery_timer: int = 0 |
| is_scripted_failure: bool = False |
| outflow_rate: float = 0.0 |
|
|
| |
| @property |
| def service_rate(self) -> float: |
| """The total service capacity (μ) in requests per tick.""" |
| if self.status == NodeStatus.FAILED: |
| return 0.0 |
| return self.capacity * 15.0 |
|
|
| def to_dict(self) -> dict: |
| return { |
| "node_id": self.node_id, |
| "status": self.status, |
| "is_vip": self.is_vip, |
| "capacity": self.capacity, |
| "importance_weight": self.importance_weight, |
| "queue_depth": int(self.queue_depth), |
| "latency_ms": round(self.latency_ms, 2), |
| "incoming_request_rate": round(self.incoming_request_rate, 2), |
| "cpu_utilization": round(min(1.0, self.cpu_utilization), 4), |
| "dropped_requests": int(self.dropped_requests), |
| "shed_fraction": round(self.shed_fraction, 4), |
| "capacity_units": int(self.capacity), |
| "pending_capacity_units": int(len(self.pending_capacity_queue)), |
| "recovery_timer": self.recovery_timer, |
| "is_scripted_failure": self.is_scripted_failure, |
| "outflow_rate": round(self.outflow_rate, 2), |
| } |
|
|
|
|
| class ClusterSimulator: |
| """ |
| Multi-node fluid queue simulator. |
| |
| Operates in discrete ticks. |
| 1. Action: Control plane actions (Scaling/Routing/Shedding) are applied. |
| 2. Tick: Physics engine updates queues based on λ (incoming) and μ (service rate). |
| 3. Failure Logic: Queue overflows trigger status degradation/node death. |
| """ |
|
|
| def __init__(self, n_nodes: int = 5, task_id: str = "task-1", seed: Optional[int] = None): |
| self._n_nodes = n_nodes |
| self._task_id = task_id |
| |
| |
| |
| self._seed: Optional[int] = seed |
| self._rng = random.Random(seed) |
| self._tick_count: int = 0 |
| self._failed_node_id: Optional[str] = None |
| self._t1_ramp_slope: float = T1_RAMP_SLOPE |
| self._t1_init_lambda: float = T1_INITIAL_LAMBDA |
| self._t2_fail_tick: int = T2_FAIL_TICK |
| self._t2_init_lambda: float = T2_INITIAL_LAMBDA |
| |
| self._t3_surge_start: int = T3_SURGE_BASE_START |
| self._t3_surge_end: int = T3_SURGE_BASE_END |
| |
| self._reroute_weights: dict[str, float] = {} |
| self._cascade_tick: int = 0 |
| self._cascade_triggered: bool = False |
| self._nodes: list[NodeState] = [] |
| self.invalid_action_count: int = 0 |
| self._randomize_domain() |
| self._reset_nodes() |
|
|
| def _randomize_domain(self) -> None: |
| """Apply domain randomization for RL robustness across tasks.""" |
| self._t1_ramp_slope = self._rng.uniform(0.8, 2.0) |
| |
| |
| |
| |
| |
| n_ingress = len(EXTERNAL_TRAFFIC_NODES) |
| ingress_mu_total = n_ingress * DEFAULT_CAPACITY * 15.0 |
| self._t1_init_lambda = self._rng.uniform( |
| ingress_mu_total * 0.92, ingress_mu_total * 0.99 |
| ) |
| self._t2_fail_tick = self._rng.randint(10, 40) |
| |
| |
| self._t2_init_lambda = self._rng.uniform( |
| ingress_mu_total * 1.00, ingress_mu_total * 1.10 |
| ) |
| |
| jitter = self._rng.randint(-T3_SURGE_JITTER, T3_SURGE_JITTER) |
| self._t3_surge_start = T3_SURGE_BASE_START + jitter |
| self._t3_surge_end = T3_SURGE_BASE_END + jitter |
| |
| self._reroute_weights = {} |
|
|
| def _reset_nodes(self) -> None: |
| self._nodes = [ |
| NodeState( |
| node_id=f"node-{i}", |
| is_vip=f"node-{i}" in VIP_NODE_WEIGHTS, |
| importance_weight=VIP_NODE_WEIGHTS.get(f"node-{i}", 1.0), |
| is_scripted_failure=False, |
| ) |
| for i in range(self._n_nodes) |
| ] |
|
|
| def reset(self, task_id: str = "task-1", seed: Optional[int] = None) -> None: |
| """Restart the simulator for a fresh episode.""" |
| if seed is not None: |
| self._seed = seed |
| |
| self._rng = random.Random(seed) |
| self._task_id = task_id |
| self._tick_count = 0 |
| self._failed_node_id = None |
| self._reroute_weights = {} |
| self._cascade_tick = 0 |
| self._cascade_triggered = False |
| self.invalid_action_count = 0 |
| self._randomize_domain() |
| self._reset_nodes() |
|
|
| def state(self, for_agent: bool = True) -> list[dict]: |
| """Return current per-node state as a list of plain dicts.""" |
| state_list = [] |
| for n in self._nodes: |
| d = n.to_dict() |
| if for_agent and self._rng.random() < SENSOR_DROPOUT_PROB: |
| d["queue_depth"] = 0 |
| d["latency_ms"] = -1.0 |
| state_list.append(d) |
| return state_list |
|
|
| def apply_action(self, action_model) -> bool: |
| """Update simulator world-state parameters based on agent command. |
| |
| Returns: |
| True if the action had a measurable effect on state, False if it |
| was silently discarded (e.g. SCALE_UP at capacity cap). |
| """ |
| at = action_model.action_type if hasattr(action_model, "action_type") else action_model["action_type"] |
| node_id = action_model.target_node_id if hasattr(action_model, "target_node_id") else action_model["target_node_id"] |
| param = action_model.parameter if hasattr(action_model, "parameter") else action_model["parameter"] |
|
|
| |
| if at == "NO_OP": |
| return True |
|
|
| |
| target = next((n for n in self._nodes if n.node_id == node_id), None) |
| if not target: |
| return False |
|
|
| |
| if at == "SCALE_UP": |
| delta = max(1, int(param * MAX_SCALING_STEP)) |
| for _ in range(delta): |
| target.pending_capacity_queue.append(BOOT_DELAY_TICKS) |
| |
| |
| was_degraded = target.status == NodeStatus.DEGRADED |
| if was_degraded: |
| target.status = NodeStatus.HEALTHY |
|
|
| return True |
|
|
| elif at == "SCALE_DOWN": |
| delta = max(1, int(param * MAX_SCALING_STEP)) |
| |
| |
| cancelled = 0 |
| while cancelled < delta and target.pending_capacity_queue: |
| target.pending_capacity_queue.pop() |
| cancelled += 1 |
| |
| remaining = delta - cancelled |
| old_capacity = target.capacity |
| if remaining > 0: |
| target.capacity = max(1, target.capacity - remaining) |
| |
| return cancelled > 0 or target.capacity != old_capacity |
|
|
| elif at == "REROUTE_TRAFFIC": |
| |
| |
| |
| frac = min(1.0, max(0.0, float(param))) |
| |
| |
| self._reroute_weights[node_id] = frac |
| return True |
|
|
| elif at == "SHED_LOAD": |
| |
| |
| if node_id in CRITICAL_NODES: |
| self.invalid_action_count += 1 |
| return False |
|
|
| frac = min(1.0, param) |
| target.shed_fraction = frac |
| return True |
| |
|
|
| |
| return True |
|
|
| def tick(self) -> None: |
| """ |
| Advance simulated time by one step. |
| Evaluates physics: Capacity growth, Traffic injection, Queue processing. |
| """ |
| self._tick_count += 1 |
| self._update_capacity() |
| self._inject_traffic() |
| |
| |
| |
| saved_capacities = {n.node_id: n.capacity for n in self._nodes} |
| self._apply_backpressure() |
| |
| for node in self._nodes: |
| node.dropped_requests = 0.0 |
| self._update_queues() |
| |
| for n in self._nodes: |
| n.capacity = saved_capacities.get(n.node_id, n.capacity) |
| self._update_derived_metrics() |
| self._update_statuses() |
| self._cascade_failures() |
| self._process_recovery() |
| |
| |
| for node in self._nodes: |
| node.shed_fraction *= 0.5 |
| if node.shed_fraction < 0.05: |
| node.shed_fraction = 0.0 |
|
|
| def _update_capacity(self) -> None: |
| """Process pending capacity from SCALE_UP actions""" |
| for node in self._nodes: |
| for i in range(len(node.pending_capacity_queue)): |
| node.pending_capacity_queue[i] -= 1 |
| |
| |
| ready = sum(1 for delay in node.pending_capacity_queue if delay <= 0) |
| node.capacity += ready |
| node.pending_capacity_queue = [delay for delay in node.pending_capacity_queue if delay > 0] |
|
|
| def _inject_traffic(self) -> None: |
| """ |
| Distribute traffic through the cluster DAG in three phases. |
| |
| Phase 1 — Task lambda + scripted events: |
| Compute total external λ for this tick and apply any task-specific |
| mutations (node failure scripting, surge flags). No early returns; |
| all branches fall through to the shared DAG in Phase 2. |
| |
| Phase 2 — Topological DAG distribution: |
| Traverse _TOPOLOGICAL_ORDER (roots first). Each parent's |
| processed outflow (min(incoming, service_rate)) is split across |
| its children via DEFAULT_ROUTING_SPLIT. A FAILED node has |
| service_rate=0, so outflow=0 and its children are naturally |
| starved — this is the causal failure chain the RL agent must |
| learn to route around. |
| |
| Phase 3 — Reroute weight correction: |
| Apply REROUTE_TRAFFIC weight adjustments post-DAG, then decay |
| weights. Keeps reroute semantics identical to pre-DAG behaviour. |
| """ |
| |
| |
| |
| total_lambda: float = 0.0 |
| |
| |
| |
| |
| direct_injections: dict[str, float] = {} |
|
|
| if self._task_id == "task-1": |
| |
| total_lambda = self._t1_init_lambda + (self._t1_ramp_slope * self._tick_count) |
|
|
| elif self._task_id == "task-2": |
| total_lambda = self._t2_init_lambda |
| |
| if self._tick_count >= self._t2_fail_tick and not self._failed_node_id: |
| self._failed_node_id = self._rng.choice( |
| [n.node_id for n in self._nodes if n.node_id != "node-0"] |
| ) |
| target = next((n for n in self._nodes if n.node_id == self._failed_node_id), None) |
| if target: |
| target.is_scripted_failure = True |
| |
| |
| |
| |
| |
|
|
| elif self._task_id == "task-3": |
| total_lambda = T3_INITIAL_LAMBDA |
| phase = self._tick_count % T3_SURGE_CYCLE |
| if self._t3_surge_start <= phase <= self._t3_surge_end: |
| |
| |
| |
| |
| |
| for nid in ["node-1", "node-2"]: |
| direct_injections[nid] = T3_SURGE_MAGNITUDE |
|
|
| |
| |
| |
| node_incoming: dict[str, float] = {n.node_id: 0.0 for n in self._nodes} |
| node_map: dict[str, "NodeState"] = {n.node_id: n for n in self._nodes} |
|
|
| |
| node_incoming["node-0"] = total_lambda * EXTERNAL_LAMBDA_FRACTION |
| node_incoming["node-4"] = total_lambda * (1.0 - EXTERNAL_LAMBDA_FRACTION) |
|
|
| |
| for nid, extra in direct_injections.items(): |
| node_incoming[nid] = node_incoming.get(nid, 0.0) + extra |
|
|
| |
| for parent_id in _TOPOLOGICAL_ORDER: |
| parent = node_map.get(parent_id) |
| if parent is None: |
| continue |
| parent.incoming_request_rate = node_incoming[parent_id] |
| |
| |
| outflow = min(parent.incoming_request_rate, parent.service_rate) |
| parent.outflow_rate = outflow |
| for child_id, split in DEFAULT_ROUTING_SPLIT.get(parent_id, {}).items(): |
| node_incoming[child_id] = node_incoming.get(child_id, 0.0) + outflow * split |
|
|
| |
| |
| |
| self._apply_reroute_weights() |
|
|
| |
| |
| |
| for n in self._nodes: |
| n.outflow_rate = min(n.incoming_request_rate, n.service_rate) |
|
|
| def _apply_reroute_weights(self) -> None: |
| """ |
| Apply REROUTE_TRAFFIC adjustments. |
| |
| For each node with a reroute weight w, reduce its incoming_request_rate |
| by (w × its current rate) and distribute that offloaded load equally |
| across healthy peer nodes. |
| |
| Important: rerouting must still work when the target is FAILED (Task 2). |
| We therefore allow offload FROM failed targets. Only absorber nodes are |
| constrained to non-failed nodes. |
| |
| Decays each reroute weight by 50% per tick so the effect must be |
| re-issued to be maintained (forces the agent to act). |
| """ |
| if not self._reroute_weights: |
| return |
|
|
| total_overflow: float = 0.0 |
| rerouted_ids = set(self._reroute_weights.keys()) |
|
|
| for n in self._nodes: |
| w = self._reroute_weights.get(n.node_id, 0.0) |
| if w <= 0.0: |
| continue |
|
|
| |
| offload = n.incoming_request_rate * w |
| if offload <= 0.0: |
| continue |
| n.incoming_request_rate -= offload |
| total_overflow += offload |
|
|
| |
| if total_overflow > 0: |
| absorbers = [ |
| n for n in self._nodes |
| if n.node_id not in rerouted_ids |
| and n.status != NodeStatus.FAILED |
| ] |
| |
| |
| |
| if not absorbers: |
| absorbers = [n for n in self._nodes if n.status != NodeStatus.FAILED and n.node_id not in rerouted_ids] |
| if not absorbers: |
| absorbers = [n for n in self._nodes if n.status != NodeStatus.FAILED] |
| |
| if absorbers: |
| share = total_overflow / len(absorbers) |
| for n in absorbers: |
| n.incoming_request_rate += share |
|
|
| |
| |
| for nid in list(self._reroute_weights.keys()): |
| self._reroute_weights[nid] *= 0.5 |
| if self._reroute_weights[nid] < 0.05: |
| del self._reroute_weights[nid] |
|
|
| def _apply_backpressure(self) -> None: |
| """Reduce parent service rate when children are overloaded.""" |
| for parent_id, children in CLUSTER_TOPOLOGY.items(): |
| parent = next((n for n in self._nodes if n.node_id == parent_id), None) |
| if not children or not parent or parent.status == NodeStatus.FAILED: |
| continue |
| |
| |
| total_pressure = 0.0 |
| for child_id in children: |
| child = next((n for n in self._nodes if n.node_id == child_id), None) |
| if child: |
| excess = max(0.0, child.queue_depth - BACKPRESSURE_THRESHOLD) |
| total_pressure += excess / FATAL_FAIL_THRESHOLD |
| |
| |
| pressure_factor = min(BACKPRESSURE_MAX_FACTOR, total_pressure * 0.6) |
| parent.capacity = max(1.0, parent.capacity * (1.0 - pressure_factor)) |
|
|
| def _update_queues(self) -> None: |
| """ |
| Fluid-queue update for all nodes. |
| |
| Q_i(t+1) = max(Q_i(t) + λ_eff_i(t) − μ_i(t), 0) |
| |
| where λ_eff = λ_incoming * (1 - shed_fraction). |
| """ |
| for node in self._nodes: |
| if node.status == NodeStatus.FAILED: |
| node.queue_depth = 0.0 |
| node.dropped_requests = node.incoming_request_rate |
| continue |
|
|
| |
| lambda_eff = node.incoming_request_rate * (1.0 - node.shed_fraction) |
| node.dropped_requests = node.incoming_request_rate - lambda_eff |
|
|
| excess = lambda_eff - node.service_rate |
| node.queue_depth = max(0.0, node.queue_depth + excess) |
|
|
| def _update_derived_metrics(self) -> None: |
| """Compute CPU and Latency based on current queue and status.""" |
| for n in self._nodes: |
| if n.status == NodeStatus.FAILED: |
| n.cpu_utilization = 0.0 |
| n.latency_ms = float("inf") |
| continue |
|
|
| |
| service_rate = n.service_rate |
| n.cpu_utilization = n.incoming_request_rate / service_rate if service_rate > 0 else 1.0 |
|
|
| |
| |
| |
| |
| utilization = min(0.99, n.cpu_utilization) |
| mm1_latency = BASE_LATENCY_MS / (1.0 - utilization) |
| backlog_latency = n.queue_depth * LATENCY_STEEPNESS |
| n.latency_ms = mm1_latency + backlog_latency |
|
|
| def _update_statuses(self) -> None: |
| """Transition node health based on queue boundaries. |
| |
| Recovery rules: |
| - Scripted failures (Task 2 forced node kill): permanent, never auto-recover. |
| Marked by is_scripted_failure=True, recovery_timer=0. |
| - Overload failures (queue > FATAL_FAIL_THRESHOLD): auto-recover after |
| NODE_RECOVERY_TICKS. The agent can learn to reroute away and let the |
| node heal. |
| """ |
| for n in self._nodes: |
| |
| if n.is_scripted_failure: |
| n.status = NodeStatus.FAILED |
| n.recovery_timer = 0 |
| continue |
|
|
| if n.queue_depth > FATAL_FAIL_THRESHOLD: |
| if n.status != NodeStatus.FAILED: |
| n.status = NodeStatus.FAILED |
| n.recovery_timer = NODE_RECOVERY_TICKS |
| n.capacity = 0.5 |
| self._cascade_triggered = True |
| elif n.queue_depth > OVERLOAD_THRESHOLD: |
| n.status = NodeStatus.DEGRADED |
| elif n.status == NodeStatus.DEGRADED and n.queue_depth < (OVERLOAD_THRESHOLD / 2): |
| n.status = NodeStatus.HEALTHY |
|
|
| def _cascade_failures(self) -> None: |
| """Detect cascading failure: if a peer node's queue exceeds a heightened |
| threshold within CASCADE_WINDOW_TICKS of a *new* failure, degrade it. |
| |
| Guardrails: |
| - Only triggers when a NEW failure occurred this tick (not any failed node). |
| - Graph-bounded: cascade only propagates along edges (parents or children). |
| - Scripted failures (Task 2) do not trigger cascades. |
| """ |
| if not self._cascade_triggered: |
| self._cascade_tick = 0 |
| return |
|
|
| self._cascade_tick += 1 |
| if self._cascade_tick > CASCADE_WINDOW_TICKS: |
| self._cascade_triggered = False |
| self._cascade_tick = 0 |
| return |
|
|
| |
| failed_ids = {n.node_id for n in self._nodes if n.status == NodeStatus.FAILED} |
| |
| |
| at_risk = set() |
| for failed_id in failed_ids: |
| |
| at_risk.update(CLUSTER_TOPOLOGY.get(failed_id, [])) |
| |
| for parent_id, children in CLUSTER_TOPOLOGY.items(): |
| if failed_id in children: |
| at_risk.add(parent_id) |
|
|
| cascade_threshold = FATAL_FAIL_THRESHOLD * CASCADE_QUEUE_MULTIPLIER |
| for n in self._nodes: |
| if n.node_id not in at_risk: |
| continue |
| if n.status == NodeStatus.FAILED or n.is_scripted_failure: |
| continue |
| if n.queue_depth > cascade_threshold: |
| n.status = NodeStatus.DEGRADED |
|
|
| def _process_recovery(self) -> None: |
| """Count down recovery timers and bring FAILED nodes back online. |
| |
| Only overload-failed nodes (recovery_timer > 0) can recover. |
| Scripted failures (is_scripted_failure=True) are excluded. |
| """ |
| RECOVERY_RAMP_PER_TICK: float = 0.5 |
| |
| for n in self._nodes: |
| if n.is_scripted_failure: |
| continue |
| |
| |
| |
| if n.recovery_timer > 0: |
| n.recovery_timer -= 1 |
| if n.recovery_timer <= 0: |
| n.status = NodeStatus.HEALTHY |
| n.queue_depth = 0.0 |
| n.latency_ms = BASE_LATENCY_MS |
| n.cpu_utilization = 0.0 |
| |
| else: |
| |
| n.capacity = min(DEFAULT_CAPACITY, n.capacity + RECOVERY_RAMP_PER_TICK) |
| n.status = NodeStatus.DEGRADED |
|
|
| def reconcile_state(self, telemetry_map: dict) -> None: |
| """ |
| Reconcile internal simulator state with external telemetry signals. |
| Used in 'hybrid' or 'live' modes to align the physics engine with reality. |
| |
| telemetry_map: node_id -> TelemetryRecord (or dict) |
| """ |
| for node in self._nodes: |
| if node.node_id in telemetry_map: |
| record = telemetry_map[node.node_id] |
| |
| if hasattr(record, "queue_depth"): |
| q_ext = float(record.queue_depth) |
| r_ext = float(record.request_rate) |
| c_ext = float(record.cpu_utilization) |
| e_ext = float(record.error_rate) |
| l_ext = float(record.latency_ms) |
| else: |
| q_ext = float(record.get("queue_depth", node.queue_depth)) |
| r_ext = float(record.get("request_rate", node.incoming_request_rate)) |
| c_ext = float(record.get("cpu_utilization", node.cpu_utilization)) |
| e_ext = float(record.get("error_rate", 0.0)) |
| l_ext = float(record.get("latency_ms", node.latency_ms)) |
|
|
| |
| |
| node.queue_depth = (node.queue_depth * 0.3) + (q_ext * 0.7) |
| node.incoming_request_rate = (node.incoming_request_rate * 0.3) + (r_ext * 0.7) |
| node.cpu_utilization = (node.cpu_utilization * 0.3) + (c_ext * 0.7) |
| node.latency_ms = (node.latency_ms * 0.3) + (l_ext * 0.7) |
|
|
| |
| if e_ext > 0.5: |
| node.status = NodeStatus.FAILED |
| elif e_ext > 0.1 and node.status == NodeStatus.HEALTHY: |
| node.status = NodeStatus.DEGRADED |
| elif e_ext <= 0.05 and node.status == NodeStatus.DEGRADED: |
| |
| node.status = NodeStatus.HEALTHY |
|
|
| |
| |
| |
| |
| |
| |
| |
| self._update_statuses() |
|
|