sevzero / server /propagation.py
Mist-ic's picture
Fix Phase 2: add [START]/[STEP]/[END] structured output + simulation fixes
524b287
"""
server/propagation.py — Queueing-theory cascade engine.
Computes how failures propagate through the service dependency graph using:
- Little's Law: L = λ × S for thread pool saturation (ρ = L/T)
- Retry amplification: E[attempts] = (1 - p^(R+1)) / (1 - p)
- Per-hop dampening (~0.7 with circuit breakers) vs amplification (~1.2-1.8×)
- 1-2 tick propagation delay (not instant)
- Circuit breaker state machine: CLOSED → OPEN → HALF_OPEN → CLOSED
Sources: Google SRE Book, Netflix Hystrix, Docs/DataResearch.md Answer 3.
"""
from __future__ import annotations
import random
from dataclasses import dataclass, field
from enum import Enum
from typing import Dict, List, Optional, Tuple
# ---------------------------------------------------------------------------
# Circuit breaker state machine
# ---------------------------------------------------------------------------
class BreakerState(str, Enum):
CLOSED = "CLOSED"
OPEN = "OPEN"
HALF_OPEN = "HALF_OPEN"
@dataclass
class CircuitBreaker:
"""Per-edge circuit breaker with rolling error window."""
state: BreakerState = BreakerState.CLOSED
# Config (tunable by agent via tune_config)
error_threshold: float = 0.5 # Error rate to trip OPEN
cooldown_ticks: int = 3 # Ticks to stay OPEN before half-open
half_open_success_threshold: int = 2 # Successes needed to close
# Runtime state
ticks_in_current_state: int = 0
error_window: List[float] = field(default_factory=list)
window_size: int = 5
half_open_successes: int = 0
def record_error_rate(self, error_rate: float) -> None:
"""Record an error rate observation and potentially transition state."""
self.error_window.append(error_rate)
if len(self.error_window) > self.window_size:
self.error_window = self.error_window[-self.window_size:]
self.ticks_in_current_state += 1
def tick(self, current_error_rate: float, rng: random.Random) -> BreakerState:
"""Advance the circuit breaker state machine by one tick."""
self.record_error_rate(current_error_rate)
avg_error = sum(self.error_window) / len(self.error_window) if self.error_window else 0.0
if self.state == BreakerState.CLOSED:
if avg_error >= self.error_threshold:
self.state = BreakerState.OPEN
self.ticks_in_current_state = 0
self.half_open_successes = 0
elif self.state == BreakerState.OPEN:
if self.ticks_in_current_state >= self.cooldown_ticks:
self.state = BreakerState.HALF_OPEN
self.ticks_in_current_state = 0
self.half_open_successes = 0
elif self.state == BreakerState.HALF_OPEN:
if current_error_rate < self.error_threshold * 0.5:
self.half_open_successes += 1
if self.half_open_successes >= self.half_open_success_threshold:
self.state = BreakerState.CLOSED
self.ticks_in_current_state = 0
self.error_window.clear()
else:
# Probe failed — go back to OPEN
self.state = BreakerState.OPEN
self.ticks_in_current_state = 0
self.half_open_successes = 0
return self.state
@property
def dampening_factor(self) -> float:
"""How much this breaker dampens downstream error propagation."""
if self.state == BreakerState.OPEN:
return 0.05 # Nearly all errors blocked (fail-fast)
elif self.state == BreakerState.HALF_OPEN:
return 0.3 # Some probe traffic gets through
else:
return 1.0 # No dampening
# ---------------------------------------------------------------------------
# Queueing theory functions
# ---------------------------------------------------------------------------
def compute_utilisation(
arrival_rate: float,
service_time: float,
thread_pool_size: int,
) -> float:
"""
Little's Law: L = λ × S (average items in system).
Utilisation ρ = L / T where T is thread pool size.
When ρ → 1.0, latency blows up nonlinearly (M/M/c queueing).
"""
L = arrival_rate * service_time
T = max(1, thread_pool_size)
rho = L / T
return min(rho, 1.0) # Cap at 1.0 (saturated)
def compute_queueing_latency_multiplier(rho: float) -> float:
"""
Approximate M/M/1 queueing delay multiplier.
As ρ → 1, response time → ∞.
Uses 1/(1-ρ) approximation with a cap to avoid infinity.
"""
if rho >= 0.99:
return 50.0 # ~50x baseline latency (effectively down)
if rho >= 0.95:
return 20.0 # ~20x
if rho >= 0.90:
return 10.0 # ~10x
if rho >= 0.80:
return 5.0 # ~5x
if rho < 0.01:
return 1.0 # No queueing
return 1.0 / (1.0 - rho)
def compute_retry_amplification(
failure_probability: float,
max_retries: int,
) -> float:
"""
Expected number of attempts with retries.
E[attempts] = (1 - p^(R+1)) / (1 - p)
where p = failure probability, R = max retries.
"""
p = max(0.0, min(1.0, failure_probability))
if p < 0.001:
return 1.0 # No failures, no retries
if p > 0.999:
return float(max_retries + 1) # Every attempt fails
R = max(0, max_retries)
return (1.0 - p ** (R + 1)) / (1.0 - p)
# ---------------------------------------------------------------------------
# Propagation engine
# ---------------------------------------------------------------------------
@dataclass
class ServiceRuntimeState:
"""Mutable runtime state for one service during simulation."""
service_id: str
# --- Current metrics (updated each tick) ---
error_rate: float = 0.0
latency_p50_ms: float = 20.0
latency_p95_ms: float = 50.0
latency_p99_ms: float = 100.0
throughput_rps: float = 100.0
cpu_pct: float = 15.0
memory_pct: float = 30.0
connection_pool_usage_pct: float = 10.0
# --- Queueing model state ---
arrival_rate: float = 100.0 # λ — requests/tick
service_time_local: float = 0.05 # S_local — seconds per request
thread_pool_size: int = 50 # T — max concurrent
utilisation: float = 0.0 # ρ = L/T
# --- Deployment ---
replicas: int = 2
version: str = "v1.0.0"
previous_version: Optional[str] = None
status: str = "healthy" # healthy | degraded | critical | down
# --- Config (tunable by agent) ---
timeout_ms: int = 5000
retry_max: int = 3
retry_backoff: bool = False
pool_size: int = 20
# --- Circuit breakers (per-dependency) ---
circuit_breakers: Dict[str, CircuitBreaker] = field(default_factory=dict)
# --- Failure state ---
has_active_failure: bool = False
failure_ticks: int = 0
propagation_error_rate: float = 0.0 # Error rate from upstream propagation
def compute_status(self) -> str:
"""Derive health status from metrics."""
if self.error_rate >= 0.90:
return "down"
elif self.error_rate >= 0.30 or self.latency_p99_ms >= 5000:
return "critical"
elif self.error_rate >= 0.05 or self.latency_p99_ms >= 1000:
return "degraded"
else:
return "healthy"
def update_latency_percentiles(self, base_p99: float, multiplier: float, rng: random.Random) -> None:
"""Update p50/p95/p99 from a base p99 and multiplier, with natural noise."""
noise = rng.uniform(0.95, 1.05)
self.latency_p99_ms = max(1.0, base_p99 * multiplier * noise)
self.latency_p95_ms = self.latency_p99_ms * rng.uniform(0.60, 0.85)
self.latency_p50_ms = self.latency_p95_ms * rng.uniform(0.30, 0.50)
def propagate_failures(
services: Dict[str, ServiceRuntimeState],
adjacency: Dict[str, List[str]],
reverse_adjacency: Dict[str, List[str]],
edge_activation: Dict[Tuple[str, str], float],
rng: random.Random,
propagation_delay: int = 1,
current_tick: int = 0,
) -> None:
"""
Propagate failure effects through the dependency graph for one tick.
Each service that has errors causes downstream impact on its callers:
1. Caller's arrival rate may spike (retries, cache miss stampede)
2. Caller's service time increases (waiting on slow downstream)
3. Caller's thread pool fills up (blocked threads)
4. Circuit breakers may trip (dampening propagation)
This modifies ServiceRuntimeState in-place.
"""
# Process in reverse topological order: infra → business → edge
# So downstream failures propagate to upstream callers
for service_id, state in services.items():
if state.error_rate < 0.01:
continue # Healthy — no propagation from this service
# Who calls this service? (reverse edges = callers)
callers = reverse_adjacency.get(service_id, [])
for caller_id in callers:
caller = services.get(caller_id)
if caller is None:
continue
edge_key = (caller_id, service_id)
activation_prob = edge_activation.get(edge_key, 1.0)
# Is this edge active this tick?
if rng.random() > activation_prob:
continue # Edge not active — this dependency not called
# Get circuit breaker for this edge
if service_id not in caller.circuit_breakers:
caller.circuit_breakers[service_id] = CircuitBreaker()
breaker = caller.circuit_breakers[service_id]
# Update circuit breaker state
breaker.tick(state.error_rate, rng)
dampening = breaker.dampening_factor
# --- Compute propagated impact ---
# 1. Error propagation (dampened by circuit breaker)
propagated_error = state.error_rate * dampening * rng.uniform(0.5, 0.9)
caller.propagation_error_rate = max(
caller.propagation_error_rate,
propagated_error,
)
# 2. Retry amplification (increases arrival rate)
if dampening > 0.1: # Only retries if breaker isn't fully open
retry_mult = compute_retry_amplification(
state.error_rate * dampening,
caller.retry_max,
)
caller.arrival_rate *= min(retry_mult, 3.0) # Cap at 3x
# 3. Latency propagation (waiting on slow downstream)
if state.latency_p99_ms > 500 and dampening > 0.1:
downstream_wait = state.latency_p99_ms * dampening * 0.001 # ms → seconds
caller.service_time_local += downstream_wait * 0.5 # Partial impact
# --- After propagation: update utilisation and derived metrics ---
for service_id, state in services.items():
# Recompute utilisation
state.utilisation = compute_utilisation(
state.arrival_rate / max(1, state.replicas), # Per-replica arrival rate
state.service_time_local,
state.thread_pool_size,
)
# Apply queueing delay to latency
q_mult = compute_queueing_latency_multiplier(state.utilisation)
if q_mult > 1.1:
base_p99 = 100.0 # Baseline p99 in ms
state.update_latency_percentiles(base_p99, q_mult, rng)
# Combine direct failure error rate with propagation error rate.
# Services with no direct failure recover naturally when upstream heals.
if state.has_active_failure:
combined_error = max(state.error_rate, state.propagation_error_rate)
else:
combined_error = state.propagation_error_rate
state.error_rate = min(1.0, combined_error)
# Compute throughput (inverse of error rate, scaled by arrival)
state.throughput_rps = state.arrival_rate * (1.0 - state.error_rate) / max(1, state.replicas)
# Update status
state.status = state.compute_status()
# Reset per-tick propagation accumulator
state.propagation_error_rate = 0.0