File size: 12,183 Bytes
a46811c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
524b287
 
a46811c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ffe4aa5
 
 
 
 
 
a46811c
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
"""
server/propagation.py β€” Queueing-theory cascade engine.

Computes how failures propagate through the service dependency graph using:
- Little's Law: L = Ξ» Γ— S for thread pool saturation (ρ = L/T)
- Retry amplification: E[attempts] = (1 - p^(R+1)) / (1 - p)
- Per-hop dampening (~0.7 with circuit breakers) vs amplification (~1.2-1.8Γ—)
- 1-2 tick propagation delay (not instant)
- Circuit breaker state machine: CLOSED β†’ OPEN β†’ HALF_OPEN β†’ CLOSED

Sources: Google SRE Book, Netflix Hystrix, Docs/DataResearch.md Answer 3.
"""

from __future__ import annotations

import random
from dataclasses import dataclass, field
from enum import Enum
from typing import Dict, List, Optional, Tuple


# ---------------------------------------------------------------------------
# Circuit breaker state machine
# ---------------------------------------------------------------------------


class BreakerState(str, Enum):
    CLOSED = "CLOSED"
    OPEN = "OPEN"
    HALF_OPEN = "HALF_OPEN"


@dataclass
class CircuitBreaker:
    """Per-edge circuit breaker with rolling error window."""

    state: BreakerState = BreakerState.CLOSED

    # Config (tunable by agent via tune_config)
    error_threshold: float = 0.5      # Error rate to trip OPEN
    cooldown_ticks: int = 3           # Ticks to stay OPEN before half-open
    half_open_success_threshold: int = 2  # Successes needed to close

    # Runtime state
    ticks_in_current_state: int = 0
    error_window: List[float] = field(default_factory=list)
    window_size: int = 5
    half_open_successes: int = 0

    def record_error_rate(self, error_rate: float) -> None:
        """Record an error rate observation and potentially transition state."""
        self.error_window.append(error_rate)
        if len(self.error_window) > self.window_size:
            self.error_window = self.error_window[-self.window_size:]
        self.ticks_in_current_state += 1

    def tick(self, current_error_rate: float, rng: random.Random) -> BreakerState:
        """Advance the circuit breaker state machine by one tick."""
        self.record_error_rate(current_error_rate)
        avg_error = sum(self.error_window) / len(self.error_window) if self.error_window else 0.0

        if self.state == BreakerState.CLOSED:
            if avg_error >= self.error_threshold:
                self.state = BreakerState.OPEN
                self.ticks_in_current_state = 0
                self.half_open_successes = 0

        elif self.state == BreakerState.OPEN:
            if self.ticks_in_current_state >= self.cooldown_ticks:
                self.state = BreakerState.HALF_OPEN
                self.ticks_in_current_state = 0
                self.half_open_successes = 0

        elif self.state == BreakerState.HALF_OPEN:
            if current_error_rate < self.error_threshold * 0.5:
                self.half_open_successes += 1
                if self.half_open_successes >= self.half_open_success_threshold:
                    self.state = BreakerState.CLOSED
                    self.ticks_in_current_state = 0
                    self.error_window.clear()
            else:
                # Probe failed β€” go back to OPEN
                self.state = BreakerState.OPEN
                self.ticks_in_current_state = 0
                self.half_open_successes = 0

        return self.state

    @property
    def dampening_factor(self) -> float:
        """How much this breaker dampens downstream error propagation."""
        if self.state == BreakerState.OPEN:
            return 0.05   # Nearly all errors blocked (fail-fast)
        elif self.state == BreakerState.HALF_OPEN:
            return 0.3    # Some probe traffic gets through
        else:
            return 1.0    # No dampening


# ---------------------------------------------------------------------------
# Queueing theory functions
# ---------------------------------------------------------------------------


def compute_utilisation(
    arrival_rate: float,
    service_time: float,
    thread_pool_size: int,
) -> float:
    """
    Little's Law: L = Ξ» Γ— S (average items in system).
    Utilisation ρ = L / T where T is thread pool size.
    When ρ β†’ 1.0, latency blows up nonlinearly (M/M/c queueing).
    """
    L = arrival_rate * service_time
    T = max(1, thread_pool_size)
    rho = L / T
    return min(rho, 1.0)  # Cap at 1.0 (saturated)


def compute_queueing_latency_multiplier(rho: float) -> float:
    """
    Approximate M/M/1 queueing delay multiplier.
    As ρ β†’ 1, response time β†’ ∞.
    Uses 1/(1-ρ) approximation with a cap to avoid infinity.
    """
    if rho >= 0.99:
        return 50.0   # ~50x baseline latency (effectively down)
    if rho >= 0.95:
        return 20.0   # ~20x
    if rho >= 0.90:
        return 10.0   # ~10x
    if rho >= 0.80:
        return 5.0    # ~5x
    if rho < 0.01:
        return 1.0    # No queueing
    return 1.0 / (1.0 - rho)


def compute_retry_amplification(
    failure_probability: float,
    max_retries: int,
) -> float:
    """
    Expected number of attempts with retries.
    E[attempts] = (1 - p^(R+1)) / (1 - p)
    where p = failure probability, R = max retries.
    """
    p = max(0.0, min(1.0, failure_probability))
    if p < 0.001:
        return 1.0  # No failures, no retries
    if p > 0.999:
        return float(max_retries + 1)  # Every attempt fails

    R = max(0, max_retries)
    return (1.0 - p ** (R + 1)) / (1.0 - p)


# ---------------------------------------------------------------------------
# Propagation engine
# ---------------------------------------------------------------------------


@dataclass
class ServiceRuntimeState:
    """Mutable runtime state for one service during simulation."""

    service_id: str

    # --- Current metrics (updated each tick) ---
    error_rate: float = 0.0
    latency_p50_ms: float = 20.0
    latency_p95_ms: float = 50.0
    latency_p99_ms: float = 100.0
    throughput_rps: float = 100.0
    cpu_pct: float = 15.0
    memory_pct: float = 30.0
    connection_pool_usage_pct: float = 10.0

    # --- Queueing model state ---
    arrival_rate: float = 100.0       # Ξ» β€” requests/tick
    service_time_local: float = 0.05  # S_local β€” seconds per request
    thread_pool_size: int = 50        # T β€” max concurrent
    utilisation: float = 0.0          # ρ = L/T

    # --- Deployment ---
    replicas: int = 2
    version: str = "v1.0.0"
    previous_version: Optional[str] = None
    status: str = "healthy"  # healthy | degraded | critical | down

    # --- Config (tunable by agent) ---
    timeout_ms: int = 5000
    retry_max: int = 3
    retry_backoff: bool = False
    pool_size: int = 20

    # --- Circuit breakers (per-dependency) ---
    circuit_breakers: Dict[str, CircuitBreaker] = field(default_factory=dict)

    # --- Failure state ---
    has_active_failure: bool = False
    failure_ticks: int = 0
    propagation_error_rate: float = 0.0  # Error rate from upstream propagation

    def compute_status(self) -> str:
        """Derive health status from metrics."""
        if self.error_rate >= 0.90:
            return "down"
        elif self.error_rate >= 0.30 or self.latency_p99_ms >= 5000:
            return "critical"
        elif self.error_rate >= 0.05 or self.latency_p99_ms >= 1000:
            return "degraded"
        else:
            return "healthy"

    def update_latency_percentiles(self, base_p99: float, multiplier: float, rng: random.Random) -> None:
        """Update p50/p95/p99 from a base p99 and multiplier, with natural noise."""
        noise = rng.uniform(0.95, 1.05)
        self.latency_p99_ms = max(1.0, base_p99 * multiplier * noise)
        self.latency_p95_ms = self.latency_p99_ms * rng.uniform(0.60, 0.85)
        self.latency_p50_ms = self.latency_p95_ms * rng.uniform(0.30, 0.50)


def propagate_failures(
    services: Dict[str, ServiceRuntimeState],
    adjacency: Dict[str, List[str]],
    reverse_adjacency: Dict[str, List[str]],
    edge_activation: Dict[Tuple[str, str], float],
    rng: random.Random,
    propagation_delay: int = 1,
    current_tick: int = 0,
) -> None:
    """
    Propagate failure effects through the dependency graph for one tick.

    Each service that has errors causes downstream impact on its callers:
    1. Caller's arrival rate may spike (retries, cache miss stampede)
    2. Caller's service time increases (waiting on slow downstream)
    3. Caller's thread pool fills up (blocked threads)
    4. Circuit breakers may trip (dampening propagation)

    This modifies ServiceRuntimeState in-place.
    """
    # Process in reverse topological order: infra β†’ business β†’ edge
    # So downstream failures propagate to upstream callers
    for service_id, state in services.items():
        if state.error_rate < 0.01:
            continue  # Healthy β€” no propagation from this service

        # Who calls this service? (reverse edges = callers)
        callers = reverse_adjacency.get(service_id, [])

        for caller_id in callers:
            caller = services.get(caller_id)
            if caller is None:
                continue

            edge_key = (caller_id, service_id)
            activation_prob = edge_activation.get(edge_key, 1.0)

            # Is this edge active this tick?
            if rng.random() > activation_prob:
                continue  # Edge not active β€” this dependency not called

            # Get circuit breaker for this edge
            if service_id not in caller.circuit_breakers:
                caller.circuit_breakers[service_id] = CircuitBreaker()
            breaker = caller.circuit_breakers[service_id]

            # Update circuit breaker state
            breaker.tick(state.error_rate, rng)
            dampening = breaker.dampening_factor

            # --- Compute propagated impact ---

            # 1. Error propagation (dampened by circuit breaker)
            propagated_error = state.error_rate * dampening * rng.uniform(0.5, 0.9)
            caller.propagation_error_rate = max(
                caller.propagation_error_rate,
                propagated_error,
            )

            # 2. Retry amplification (increases arrival rate)
            if dampening > 0.1:  # Only retries if breaker isn't fully open
                retry_mult = compute_retry_amplification(
                    state.error_rate * dampening,
                    caller.retry_max,
                )
                caller.arrival_rate *= min(retry_mult, 3.0)  # Cap at 3x

            # 3. Latency propagation (waiting on slow downstream)
            if state.latency_p99_ms > 500 and dampening > 0.1:
                downstream_wait = state.latency_p99_ms * dampening * 0.001  # ms β†’ seconds
                caller.service_time_local += downstream_wait * 0.5  # Partial impact

    # --- After propagation: update utilisation and derived metrics ---
    for service_id, state in services.items():
        # Recompute utilisation
        state.utilisation = compute_utilisation(
            state.arrival_rate / max(1, state.replicas),  # Per-replica arrival rate
            state.service_time_local,
            state.thread_pool_size,
        )

        # Apply queueing delay to latency
        q_mult = compute_queueing_latency_multiplier(state.utilisation)
        if q_mult > 1.1:
            base_p99 = 100.0  # Baseline p99 in ms
            state.update_latency_percentiles(base_p99, q_mult, rng)

        # Combine direct failure error rate with propagation error rate.
        # Services with no direct failure recover naturally when upstream heals.
        if state.has_active_failure:
            combined_error = max(state.error_rate, state.propagation_error_rate)
        else:
            combined_error = state.propagation_error_rate
        state.error_rate = min(1.0, combined_error)

        # Compute throughput (inverse of error rate, scaled by arrival)
        state.throughput_rps = state.arrival_rate * (1.0 - state.error_rate) / max(1, state.replicas)

        # Update status
        state.status = state.compute_status()

        # Reset per-tick propagation accumulator
        state.propagation_error_rate = 0.0