File size: 12,183 Bytes
a46811c 524b287 a46811c ffe4aa5 a46811c | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 | """
server/propagation.py β Queueing-theory cascade engine.
Computes how failures propagate through the service dependency graph using:
- Little's Law: L = Ξ» Γ S for thread pool saturation (Ο = L/T)
- Retry amplification: E[attempts] = (1 - p^(R+1)) / (1 - p)
- Per-hop dampening (~0.7 with circuit breakers) vs amplification (~1.2-1.8Γ)
- 1-2 tick propagation delay (not instant)
- Circuit breaker state machine: CLOSED β OPEN β HALF_OPEN β CLOSED
Sources: Google SRE Book, Netflix Hystrix, Docs/DataResearch.md Answer 3.
"""
from __future__ import annotations
import random
from dataclasses import dataclass, field
from enum import Enum
from typing import Dict, List, Optional, Tuple
# ---------------------------------------------------------------------------
# Circuit breaker state machine
# ---------------------------------------------------------------------------
class BreakerState(str, Enum):
CLOSED = "CLOSED"
OPEN = "OPEN"
HALF_OPEN = "HALF_OPEN"
@dataclass
class CircuitBreaker:
"""Per-edge circuit breaker with rolling error window."""
state: BreakerState = BreakerState.CLOSED
# Config (tunable by agent via tune_config)
error_threshold: float = 0.5 # Error rate to trip OPEN
cooldown_ticks: int = 3 # Ticks to stay OPEN before half-open
half_open_success_threshold: int = 2 # Successes needed to close
# Runtime state
ticks_in_current_state: int = 0
error_window: List[float] = field(default_factory=list)
window_size: int = 5
half_open_successes: int = 0
def record_error_rate(self, error_rate: float) -> None:
"""Record an error rate observation and potentially transition state."""
self.error_window.append(error_rate)
if len(self.error_window) > self.window_size:
self.error_window = self.error_window[-self.window_size:]
self.ticks_in_current_state += 1
def tick(self, current_error_rate: float, rng: random.Random) -> BreakerState:
"""Advance the circuit breaker state machine by one tick."""
self.record_error_rate(current_error_rate)
avg_error = sum(self.error_window) / len(self.error_window) if self.error_window else 0.0
if self.state == BreakerState.CLOSED:
if avg_error >= self.error_threshold:
self.state = BreakerState.OPEN
self.ticks_in_current_state = 0
self.half_open_successes = 0
elif self.state == BreakerState.OPEN:
if self.ticks_in_current_state >= self.cooldown_ticks:
self.state = BreakerState.HALF_OPEN
self.ticks_in_current_state = 0
self.half_open_successes = 0
elif self.state == BreakerState.HALF_OPEN:
if current_error_rate < self.error_threshold * 0.5:
self.half_open_successes += 1
if self.half_open_successes >= self.half_open_success_threshold:
self.state = BreakerState.CLOSED
self.ticks_in_current_state = 0
self.error_window.clear()
else:
# Probe failed β go back to OPEN
self.state = BreakerState.OPEN
self.ticks_in_current_state = 0
self.half_open_successes = 0
return self.state
@property
def dampening_factor(self) -> float:
"""How much this breaker dampens downstream error propagation."""
if self.state == BreakerState.OPEN:
return 0.05 # Nearly all errors blocked (fail-fast)
elif self.state == BreakerState.HALF_OPEN:
return 0.3 # Some probe traffic gets through
else:
return 1.0 # No dampening
# ---------------------------------------------------------------------------
# Queueing theory functions
# ---------------------------------------------------------------------------
def compute_utilisation(
arrival_rate: float,
service_time: float,
thread_pool_size: int,
) -> float:
"""
Little's Law: L = Ξ» Γ S (average items in system).
Utilisation Ο = L / T where T is thread pool size.
When Ο β 1.0, latency blows up nonlinearly (M/M/c queueing).
"""
L = arrival_rate * service_time
T = max(1, thread_pool_size)
rho = L / T
return min(rho, 1.0) # Cap at 1.0 (saturated)
def compute_queueing_latency_multiplier(rho: float) -> float:
"""
Approximate M/M/1 queueing delay multiplier.
As Ο β 1, response time β β.
Uses 1/(1-Ο) approximation with a cap to avoid infinity.
"""
if rho >= 0.99:
return 50.0 # ~50x baseline latency (effectively down)
if rho >= 0.95:
return 20.0 # ~20x
if rho >= 0.90:
return 10.0 # ~10x
if rho >= 0.80:
return 5.0 # ~5x
if rho < 0.01:
return 1.0 # No queueing
return 1.0 / (1.0 - rho)
def compute_retry_amplification(
failure_probability: float,
max_retries: int,
) -> float:
"""
Expected number of attempts with retries.
E[attempts] = (1 - p^(R+1)) / (1 - p)
where p = failure probability, R = max retries.
"""
p = max(0.0, min(1.0, failure_probability))
if p < 0.001:
return 1.0 # No failures, no retries
if p > 0.999:
return float(max_retries + 1) # Every attempt fails
R = max(0, max_retries)
return (1.0 - p ** (R + 1)) / (1.0 - p)
# ---------------------------------------------------------------------------
# Propagation engine
# ---------------------------------------------------------------------------
@dataclass
class ServiceRuntimeState:
"""Mutable runtime state for one service during simulation."""
service_id: str
# --- Current metrics (updated each tick) ---
error_rate: float = 0.0
latency_p50_ms: float = 20.0
latency_p95_ms: float = 50.0
latency_p99_ms: float = 100.0
throughput_rps: float = 100.0
cpu_pct: float = 15.0
memory_pct: float = 30.0
connection_pool_usage_pct: float = 10.0
# --- Queueing model state ---
arrival_rate: float = 100.0 # Ξ» β requests/tick
service_time_local: float = 0.05 # S_local β seconds per request
thread_pool_size: int = 50 # T β max concurrent
utilisation: float = 0.0 # Ο = L/T
# --- Deployment ---
replicas: int = 2
version: str = "v1.0.0"
previous_version: Optional[str] = None
status: str = "healthy" # healthy | degraded | critical | down
# --- Config (tunable by agent) ---
timeout_ms: int = 5000
retry_max: int = 3
retry_backoff: bool = False
pool_size: int = 20
# --- Circuit breakers (per-dependency) ---
circuit_breakers: Dict[str, CircuitBreaker] = field(default_factory=dict)
# --- Failure state ---
has_active_failure: bool = False
failure_ticks: int = 0
propagation_error_rate: float = 0.0 # Error rate from upstream propagation
def compute_status(self) -> str:
"""Derive health status from metrics."""
if self.error_rate >= 0.90:
return "down"
elif self.error_rate >= 0.30 or self.latency_p99_ms >= 5000:
return "critical"
elif self.error_rate >= 0.05 or self.latency_p99_ms >= 1000:
return "degraded"
else:
return "healthy"
def update_latency_percentiles(self, base_p99: float, multiplier: float, rng: random.Random) -> None:
"""Update p50/p95/p99 from a base p99 and multiplier, with natural noise."""
noise = rng.uniform(0.95, 1.05)
self.latency_p99_ms = max(1.0, base_p99 * multiplier * noise)
self.latency_p95_ms = self.latency_p99_ms * rng.uniform(0.60, 0.85)
self.latency_p50_ms = self.latency_p95_ms * rng.uniform(0.30, 0.50)
def propagate_failures(
services: Dict[str, ServiceRuntimeState],
adjacency: Dict[str, List[str]],
reverse_adjacency: Dict[str, List[str]],
edge_activation: Dict[Tuple[str, str], float],
rng: random.Random,
propagation_delay: int = 1,
current_tick: int = 0,
) -> None:
"""
Propagate failure effects through the dependency graph for one tick.
Each service that has errors causes downstream impact on its callers:
1. Caller's arrival rate may spike (retries, cache miss stampede)
2. Caller's service time increases (waiting on slow downstream)
3. Caller's thread pool fills up (blocked threads)
4. Circuit breakers may trip (dampening propagation)
This modifies ServiceRuntimeState in-place.
"""
# Process in reverse topological order: infra β business β edge
# So downstream failures propagate to upstream callers
for service_id, state in services.items():
if state.error_rate < 0.01:
continue # Healthy β no propagation from this service
# Who calls this service? (reverse edges = callers)
callers = reverse_adjacency.get(service_id, [])
for caller_id in callers:
caller = services.get(caller_id)
if caller is None:
continue
edge_key = (caller_id, service_id)
activation_prob = edge_activation.get(edge_key, 1.0)
# Is this edge active this tick?
if rng.random() > activation_prob:
continue # Edge not active β this dependency not called
# Get circuit breaker for this edge
if service_id not in caller.circuit_breakers:
caller.circuit_breakers[service_id] = CircuitBreaker()
breaker = caller.circuit_breakers[service_id]
# Update circuit breaker state
breaker.tick(state.error_rate, rng)
dampening = breaker.dampening_factor
# --- Compute propagated impact ---
# 1. Error propagation (dampened by circuit breaker)
propagated_error = state.error_rate * dampening * rng.uniform(0.5, 0.9)
caller.propagation_error_rate = max(
caller.propagation_error_rate,
propagated_error,
)
# 2. Retry amplification (increases arrival rate)
if dampening > 0.1: # Only retries if breaker isn't fully open
retry_mult = compute_retry_amplification(
state.error_rate * dampening,
caller.retry_max,
)
caller.arrival_rate *= min(retry_mult, 3.0) # Cap at 3x
# 3. Latency propagation (waiting on slow downstream)
if state.latency_p99_ms > 500 and dampening > 0.1:
downstream_wait = state.latency_p99_ms * dampening * 0.001 # ms β seconds
caller.service_time_local += downstream_wait * 0.5 # Partial impact
# --- After propagation: update utilisation and derived metrics ---
for service_id, state in services.items():
# Recompute utilisation
state.utilisation = compute_utilisation(
state.arrival_rate / max(1, state.replicas), # Per-replica arrival rate
state.service_time_local,
state.thread_pool_size,
)
# Apply queueing delay to latency
q_mult = compute_queueing_latency_multiplier(state.utilisation)
if q_mult > 1.1:
base_p99 = 100.0 # Baseline p99 in ms
state.update_latency_percentiles(base_p99, q_mult, rng)
# Combine direct failure error rate with propagation error rate.
# Services with no direct failure recover naturally when upstream heals.
if state.has_active_failure:
combined_error = max(state.error_rate, state.propagation_error_rate)
else:
combined_error = state.propagation_error_rate
state.error_rate = min(1.0, combined_error)
# Compute throughput (inverse of error rate, scaled by arrival)
state.throughput_rps = state.arrival_rate * (1.0 - state.error_rate) / max(1, state.replicas)
# Update status
state.status = state.compute_status()
# Reset per-tick propagation accumulator
state.propagation_error_rate = 0.0
|