| """ |
| server/propagation.py — Queueing-theory cascade engine. |
| |
| Computes how failures propagate through the service dependency graph using: |
| - Little's Law: L = λ × S for thread pool saturation (ρ = L/T) |
| - Retry amplification: E[attempts] = (1 - p^(R+1)) / (1 - p) |
| - Per-hop dampening (~0.7 with circuit breakers) vs amplification (~1.2-1.8×) |
| - 1-2 tick propagation delay (not instant) |
| - Circuit breaker state machine: CLOSED → OPEN → HALF_OPEN → CLOSED |
| |
| Sources: Google SRE Book, Netflix Hystrix, Docs/DataResearch.md Answer 3. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import random |
| from dataclasses import dataclass, field |
| from enum import Enum |
| from typing import Dict, List, Optional, Tuple |
|
|
|
|
| |
| |
| |
|
|
|
|
| class BreakerState(str, Enum): |
| CLOSED = "CLOSED" |
| OPEN = "OPEN" |
| HALF_OPEN = "HALF_OPEN" |
|
|
|
|
| @dataclass |
| class CircuitBreaker: |
| """Per-edge circuit breaker with rolling error window.""" |
|
|
| state: BreakerState = BreakerState.CLOSED |
|
|
| |
| error_threshold: float = 0.5 |
| cooldown_ticks: int = 3 |
| half_open_success_threshold: int = 2 |
|
|
| |
| ticks_in_current_state: int = 0 |
| error_window: List[float] = field(default_factory=list) |
| window_size: int = 5 |
| half_open_successes: int = 0 |
|
|
| def record_error_rate(self, error_rate: float) -> None: |
| """Record an error rate observation and potentially transition state.""" |
| self.error_window.append(error_rate) |
| if len(self.error_window) > self.window_size: |
| self.error_window = self.error_window[-self.window_size:] |
| self.ticks_in_current_state += 1 |
|
|
| def tick(self, current_error_rate: float, rng: random.Random) -> BreakerState: |
| """Advance the circuit breaker state machine by one tick.""" |
| self.record_error_rate(current_error_rate) |
| avg_error = sum(self.error_window) / len(self.error_window) if self.error_window else 0.0 |
|
|
| if self.state == BreakerState.CLOSED: |
| if avg_error >= self.error_threshold: |
| self.state = BreakerState.OPEN |
| self.ticks_in_current_state = 0 |
| self.half_open_successes = 0 |
|
|
| elif self.state == BreakerState.OPEN: |
| if self.ticks_in_current_state >= self.cooldown_ticks: |
| self.state = BreakerState.HALF_OPEN |
| self.ticks_in_current_state = 0 |
| self.half_open_successes = 0 |
|
|
| elif self.state == BreakerState.HALF_OPEN: |
| if current_error_rate < self.error_threshold * 0.5: |
| self.half_open_successes += 1 |
| if self.half_open_successes >= self.half_open_success_threshold: |
| self.state = BreakerState.CLOSED |
| self.ticks_in_current_state = 0 |
| self.error_window.clear() |
| else: |
| |
| self.state = BreakerState.OPEN |
| self.ticks_in_current_state = 0 |
| self.half_open_successes = 0 |
|
|
| return self.state |
|
|
| @property |
| def dampening_factor(self) -> float: |
| """How much this breaker dampens downstream error propagation.""" |
| if self.state == BreakerState.OPEN: |
| return 0.05 |
| elif self.state == BreakerState.HALF_OPEN: |
| return 0.3 |
| else: |
| return 1.0 |
|
|
|
|
| |
| |
| |
|
|
|
|
| def compute_utilisation( |
| arrival_rate: float, |
| service_time: float, |
| thread_pool_size: int, |
| ) -> float: |
| """ |
| Little's Law: L = λ × S (average items in system). |
| Utilisation ρ = L / T where T is thread pool size. |
| When ρ → 1.0, latency blows up nonlinearly (M/M/c queueing). |
| """ |
| L = arrival_rate * service_time |
| T = max(1, thread_pool_size) |
| rho = L / T |
| return min(rho, 1.0) |
|
|
|
|
| def compute_queueing_latency_multiplier(rho: float) -> float: |
| """ |
| Approximate M/M/1 queueing delay multiplier. |
| As ρ → 1, response time → ∞. |
| Uses 1/(1-ρ) approximation with a cap to avoid infinity. |
| """ |
| if rho >= 0.99: |
| return 50.0 |
| if rho >= 0.95: |
| return 20.0 |
| if rho >= 0.90: |
| return 10.0 |
| if rho >= 0.80: |
| return 5.0 |
| if rho < 0.01: |
| return 1.0 |
| return 1.0 / (1.0 - rho) |
|
|
|
|
| def compute_retry_amplification( |
| failure_probability: float, |
| max_retries: int, |
| ) -> float: |
| """ |
| Expected number of attempts with retries. |
| E[attempts] = (1 - p^(R+1)) / (1 - p) |
| where p = failure probability, R = max retries. |
| """ |
| p = max(0.0, min(1.0, failure_probability)) |
| if p < 0.001: |
| return 1.0 |
| if p > 0.999: |
| return float(max_retries + 1) |
|
|
| R = max(0, max_retries) |
| return (1.0 - p ** (R + 1)) / (1.0 - p) |
|
|
|
|
| |
| |
| |
|
|
|
|
| @dataclass |
| class ServiceRuntimeState: |
| """Mutable runtime state for one service during simulation.""" |
|
|
| service_id: str |
|
|
| |
| error_rate: float = 0.0 |
| latency_p50_ms: float = 20.0 |
| latency_p95_ms: float = 50.0 |
| latency_p99_ms: float = 100.0 |
| throughput_rps: float = 100.0 |
| cpu_pct: float = 15.0 |
| memory_pct: float = 30.0 |
| connection_pool_usage_pct: float = 10.0 |
|
|
| |
| arrival_rate: float = 100.0 |
| service_time_local: float = 0.05 |
| thread_pool_size: int = 50 |
| utilisation: float = 0.0 |
|
|
| |
| replicas: int = 2 |
| version: str = "v1.0.0" |
| previous_version: Optional[str] = None |
| status: str = "healthy" |
|
|
| |
| timeout_ms: int = 5000 |
| retry_max: int = 3 |
| retry_backoff: bool = False |
| pool_size: int = 20 |
|
|
| |
| circuit_breakers: Dict[str, CircuitBreaker] = field(default_factory=dict) |
|
|
| |
| has_active_failure: bool = False |
| failure_ticks: int = 0 |
| propagation_error_rate: float = 0.0 |
|
|
| def compute_status(self) -> str: |
| """Derive health status from metrics.""" |
| if self.error_rate >= 0.90: |
| return "down" |
| elif self.error_rate >= 0.30 or self.latency_p99_ms >= 5000: |
| return "critical" |
| elif self.error_rate >= 0.05 or self.latency_p99_ms >= 1000: |
| return "degraded" |
| else: |
| return "healthy" |
|
|
| def update_latency_percentiles(self, base_p99: float, multiplier: float, rng: random.Random) -> None: |
| """Update p50/p95/p99 from a base p99 and multiplier, with natural noise.""" |
| noise = rng.uniform(0.95, 1.05) |
| self.latency_p99_ms = max(1.0, base_p99 * multiplier * noise) |
| self.latency_p95_ms = self.latency_p99_ms * rng.uniform(0.60, 0.85) |
| self.latency_p50_ms = self.latency_p95_ms * rng.uniform(0.30, 0.50) |
|
|
|
|
| def propagate_failures( |
| services: Dict[str, ServiceRuntimeState], |
| adjacency: Dict[str, List[str]], |
| reverse_adjacency: Dict[str, List[str]], |
| edge_activation: Dict[Tuple[str, str], float], |
| rng: random.Random, |
| propagation_delay: int = 1, |
| current_tick: int = 0, |
| ) -> None: |
| """ |
| Propagate failure effects through the dependency graph for one tick. |
| |
| Each service that has errors causes downstream impact on its callers: |
| 1. Caller's arrival rate may spike (retries, cache miss stampede) |
| 2. Caller's service time increases (waiting on slow downstream) |
| 3. Caller's thread pool fills up (blocked threads) |
| 4. Circuit breakers may trip (dampening propagation) |
| |
| This modifies ServiceRuntimeState in-place. |
| """ |
| |
| |
| for service_id, state in services.items(): |
| if state.error_rate < 0.01: |
| continue |
|
|
| |
| callers = reverse_adjacency.get(service_id, []) |
|
|
| for caller_id in callers: |
| caller = services.get(caller_id) |
| if caller is None: |
| continue |
|
|
| edge_key = (caller_id, service_id) |
| activation_prob = edge_activation.get(edge_key, 1.0) |
|
|
| |
| if rng.random() > activation_prob: |
| continue |
|
|
| |
| if service_id not in caller.circuit_breakers: |
| caller.circuit_breakers[service_id] = CircuitBreaker() |
| breaker = caller.circuit_breakers[service_id] |
|
|
| |
| breaker.tick(state.error_rate, rng) |
| dampening = breaker.dampening_factor |
|
|
| |
|
|
| |
| propagated_error = state.error_rate * dampening * rng.uniform(0.5, 0.9) |
| caller.propagation_error_rate = max( |
| caller.propagation_error_rate, |
| propagated_error, |
| ) |
|
|
| |
| if dampening > 0.1: |
| retry_mult = compute_retry_amplification( |
| state.error_rate * dampening, |
| caller.retry_max, |
| ) |
| caller.arrival_rate *= min(retry_mult, 3.0) |
|
|
| |
| if state.latency_p99_ms > 500 and dampening > 0.1: |
| downstream_wait = state.latency_p99_ms * dampening * 0.001 |
| caller.service_time_local += downstream_wait * 0.5 |
|
|
| |
| for service_id, state in services.items(): |
| |
| state.utilisation = compute_utilisation( |
| state.arrival_rate / max(1, state.replicas), |
| state.service_time_local, |
| state.thread_pool_size, |
| ) |
|
|
| |
| q_mult = compute_queueing_latency_multiplier(state.utilisation) |
| if q_mult > 1.1: |
| base_p99 = 100.0 |
| state.update_latency_percentiles(base_p99, q_mult, rng) |
|
|
| |
| |
| if state.has_active_failure: |
| combined_error = max(state.error_rate, state.propagation_error_rate) |
| else: |
| combined_error = state.propagation_error_rate |
| state.error_rate = min(1.0, combined_error) |
|
|
| |
| state.throughput_rps = state.arrival_rate * (1.0 - state.error_rate) / max(1, state.replicas) |
|
|
| |
| state.status = state.compute_status() |
|
|
| |
| state.propagation_error_rate = 0.0 |
|
|