File size: 34,529 Bytes
83ecd75
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.

"""Simulation engine for the DevOps Pipeline Environment."""

import random

from devops_pipeline_env.models import (
    ActionType,
    AlertInfo,
    ConfigEdit,
    MigrationStatus,
    PipelineAction,
    PipelineStage,
    PipelineStatus,
    ServiceHealth,
    ServiceStatus,
)


class ServiceState:
    """State machine for a single microservice."""

    def __init__(self, name, version, health, config, dependencies,

                 latency_ms, error_rate, cpu, memory, rng=None):
        self.name = name
        self.current_version = version
        self.target_version = None
        self.health = health
        self.config = dict(config)
        self.dependencies = list(dependencies) if dependencies else []
        self.latency_ms = latency_ms
        self.error_rate = error_rate
        self.cpu_percent = cpu
        self.memory_percent = memory
        self.active_connections = 100
        self.staging_deployed = False
        self.staging_verified = False
        self.prod_deployed = False
        self.last_deploy_timestamp = "2026-04-01T00:00:00Z"
        self.logs = []
        self._rng = rng or random.Random(0)
        # Staged health recovery: 0 = fully recovered, >0 = still recovering
        self._recovery_steps_remaining = 0
        self._recovery_target_latency = 0.0
        self._recovery_target_error_rate = 0.0

    def deploy_to_staging(self, version, scenario):
        """Deploy version to staging. Returns result text."""
        self.staging_deployed = True
        self.target_version = version

        # 8% chance of transient staging failure on first attempt
        # Skip for clean_deploy (easy task) and during incidents (health already degraded/down)
        transient_roll = self._rng.random()  # always consume RNG for determinism
        is_clean_deploy = hasattr(self, '_task_name') and self._task_name == "clean_deploy"
        if not is_clean_deploy and not self.staging_verified and self.health == ServiceHealth.HEALTHY and transient_roll < 0.08:
            self.staging_deployed = True  # deployed but not verified
            self.logs.append(
                f"[DEPLOY] Deployed {self.name} {version} to staging. "
                f"Transient failure: health check timed out. Retry should succeed."
            )
            return (
                f"Deployed {self.name} {version} to staging. "
                f"WARNING: Transient health check timeout. Try deploying again."
            )

        if scenario.check_config_error(self.name, self.config):
            self.health = ServiceHealth.DEGRADED
            lat_mult = self._rng.uniform(0.8, 1.2)
            err_mult = self._rng.uniform(0.9, 1.1)
            self.error_rate = round(12.0 * err_mult, 2)
            self.latency_ms = round(300.0 * lat_mult, 1)
            self.logs.append(
                f"[DEPLOY] Deployed {self.name} {version} to staging. "
                f"WARNING: Health check DEGRADED. Error rate elevated "
                f"({self.error_rate:.1f}/s, latency {self.latency_ms:.0f}ms)."
            )
            return (
                f"Deployed {self.name} {version} to staging. "
                f"WARNING: Health check degraded. Error rate elevated."
            )
        self.health = ServiceHealth.HEALTHY
        self.staging_verified = True
        lat_mult = self._rng.uniform(0.8, 1.2)
        self.error_rate = round(0.1 * self._rng.uniform(0.9, 1.1), 3)
        self.latency_ms = round(45.0 * lat_mult, 1)
        self.logs.append(
            f"[DEPLOY] Deployed {self.name} {version} to staging. Health check: PASSED."
        )
        return f"Deployed {self.name} {version} to STAGING. Staging verified. Deploy same service+version again to PROMOTE TO PRODUCTION."

    def deploy_to_production(self, version):
        """Promote to production."""
        if not self.staging_verified:
            self.health = ServiceHealth.DEGRADED
            lat_mult = self._rng.uniform(0.8, 1.2)
            err_mult = self._rng.uniform(0.9, 1.1)
            self.error_rate = round(25.0 * err_mult, 2)
            self.latency_ms = round(500.0 * lat_mult, 1)
            self.logs.append(
                f"[DEPLOY] Deployed {self.name} {version} to production "
                f"WITHOUT staging verification. High risk."
            )
            return (
                f"Deployed {self.name} {version} to production "
                f"WITHOUT staging verification. High risk."
            )
        self.prod_deployed = True
        self.current_version = version
        # Staged recovery: takes 1-3 steps to fully stabilize
        recovery_steps = self._rng.randint(1, 3)
        self._recovery_steps_remaining = recovery_steps
        base_latency = 45.0 * self._rng.uniform(0.8, 1.2)
        base_error_rate = 0.1 * self._rng.uniform(0.9, 1.1)

        # Non-linear deploy quality: same seed = same outcome
        quality_roll = self._rng.random()
        deploy_note = ""
        if quality_roll < 0.7:
            # Clean deploy β€” recovers to near-perfect
            pass  # base values are already good
        elif quality_roll < 0.9:
            # Minor issues β€” recovers to good but not perfect
            base_latency *= 1.5
            base_error_rate *= 3.0
            deploy_note = " Minor post-deploy issues detected."
            self.logs.append(
                f"[DEPLOY] {self.name}: Minor post-deploy issues detected. "
                f"Performance slightly below optimal."
            )
        else:
            # Unstable deploy β€” recovers poorly
            base_latency *= 2.5
            base_error_rate *= 8.0
            self.error_rate += 1.5
            deploy_note = " Post-deploy instability detected."
            self.logs.append(
                f"[DEPLOY] {self.name}: Post-deploy instability detected. "
                f"Elevated error rate."
            )

        self._recovery_target_latency = round(base_latency, 1)
        self._recovery_target_error_rate = round(base_error_rate, 3)
        # Start at slightly elevated values during recovery
        self.health = ServiceHealth.HEALTHY
        self.latency_ms = round(base_latency * (1.0 + 0.3 * recovery_steps), 1)
        self.error_rate = round(base_error_rate * (1.0 + 0.5 * recovery_steps), 3)
        # Trade-off: deploy causes temporary CPU/latency spike (warmup load)
        # Clean deploy tasks get reduced spikes β€” they should be clean
        if hasattr(self, '_task_name') and self._task_name == "clean_deploy":
            self.cpu_percent = min(self.cpu_percent + 3, 99)
            self.latency_ms += round(30 * self._rng.uniform(0.8, 1.2), 1)
        else:
            self.cpu_percent = min(self.cpu_percent + 15, 99)
            self.latency_ms += round(200 * self._rng.uniform(0.8, 1.2), 1)
        self.last_deploy_timestamp = "2026-04-01T12:00:00Z"
        self.logs.append(
            f"[DEPLOY] Promoted {self.name} {version} to production. Health: HEALTHY. "
            f"Stabilizing over ~{recovery_steps} step(s). CPU/latency spike from warmup."
        )
        return (
            f"Promoted {self.name} {version} to production. Health: HEALTHY. "
            f"Deployed successfully. Service under warmup load β€” temporary CPU/latency spike expected."
            f"{deploy_note}"
        )

    def tick_recovery(self):
        """Called each step to progress staged health recovery."""
        if self._recovery_steps_remaining > 0:
            self._recovery_steps_remaining -= 1
            if self._recovery_steps_remaining == 0:
                # Fully recovered
                self.latency_ms = self._recovery_target_latency
                self.error_rate = self._recovery_target_error_rate
                if self.health == ServiceHealth.DEGRADED and self.error_rate < 5.0:
                    self.health = ServiceHealth.HEALTHY
            else:
                # Interpolate toward target
                progress = 1.0 - (self._recovery_steps_remaining / (self._recovery_steps_remaining + 1))
                self.latency_ms = round(
                    self.latency_ms + (self._recovery_target_latency - self.latency_ms) * progress, 1
                )
                self.error_rate = round(
                    self.error_rate + (self._recovery_target_error_rate - self.error_rate) * progress, 3
                )

    def rollback(self):
        """Rollback to previous version."""
        self.health = ServiceHealth.HEALTHY
        lat_mult = self._rng.uniform(0.8, 1.2)
        err_mult = self._rng.uniform(0.9, 1.1)
        self.error_rate = round(0.5 * err_mult, 3)
        self.latency_ms = round(50.0 * lat_mult * 0.7, 1)
        self.staging_deployed = False
        self.staging_verified = False
        self.prod_deployed = True  # still in prod, just rolled back
        self._recovery_steps_remaining = 0
        # Trade-off: 25% chance rollback reintroduces a known bug
        regression = False
        if self._rng.random() < 0.25:
            self.error_rate = round(self.error_rate + 3.0, 2)
            regression = True
            self.logs.append(
                f"[ROLLBACK] Rolled back {self.name} to {self.current_version}. "
                f"Warning: rollback may have reintroduced known issue from previous version"
            )
        else:
            self.logs.append(
                f"[ROLLBACK] Rolled back {self.name} to {self.current_version}. Service healthy."
            )
        result = f"Rolled back {self.name} to {self.current_version}. Rolled back. Monitoring for regression..."
        if regression:
            result += f" WARNING: Error rate elevated ({self.error_rate:.1f}/s) β€” possible regression."
        return result

    def set_config(self, key, value):
        """Edit a config value."""
        old = self.config.get(key, "<not set>")
        self.config[key] = value
        # Trade-off: config change causes brief restart spike
        self.latency_ms += round(100 * self._rng.uniform(0.8, 1.2), 1)
        self.cpu_percent = min(self.cpu_percent + 5, 99)
        self.logs.append(f"[CONFIG] {self.name}: {key} changed from '{old}' to '{value}'. Service restarting.")
        return f"Config {self.name}: {key} changed from '{old}' to '{value}'. Config updated. Service restarting β€” brief latency spike."

    def get_config_snapshot(self):
        return dict(self.config)

    def get_logs(self):
        return list(self.logs)

    def _get_health_pct(self):
        """Get numeric health percentage for this service."""
        h = 100.0
        if self.health == ServiceHealth.DOWN:
            h = 0.0
        elif self.health == ServiceHealth.DEGRADED:
            h = 50.0
        h -= min(self.error_rate * 2, 30)
        if self.latency_ms > 200:
            h -= min((self.latency_ms - 200) / 10, 30)
        return max(h, 0.0)

    def to_status(self):
        return ServiceStatus(
            name=self.name,
            health=self.health,
            current_version=self.current_version,
            cpu_percent=self.cpu_percent,
            memory_percent=self.memory_percent,
            error_rate=self.error_rate,
            request_latency_ms=self.latency_ms,
            active_connections=self.active_connections,
            last_deploy_timestamp=self.last_deploy_timestamp,
        )


class PipelineEngine:
    """Manages all services, pipeline state, migrations, alerts."""

    def __init__(self, scenario, seed):
        self.scenario = scenario
        self._rng = random.Random(seed)
        self.services = {}
        self.pipeline_stage = PipelineStage.IDLE
        self.migrations_pending = []
        self.migrations_applied = []
        self.migration_errors = []
        self.alerts = []
        self.commit_sha = "abc123"
        self.triggered_by = "deploy-bot"
        self.started_at = "2026-04-01T10:00:00Z"
        self.test_pass = 0
        self.test_fail = 0
        self.build_logs = ""
        self._time_pressure = False  # Set by scenario if needed

        # Initialize from scenario
        scenario.setup(self)

        # Inject the shared RNG and task name into all services created by the scenario
        for svc in self.services.values():
            svc._rng = self._rng
            svc._task_name = scenario.task_name

    def execute(self, action):
        """Execute an action. Returns human-readable result string."""
        # 1. Tick health recovery for all services (heal from previous deploys)
        for svc in self.services.values():
            svc.tick_recovery()

        # 2. Execute the agent's action FIRST
        if action.action_type == ActionType.VIEW_PIPELINE:
            result = self._view_pipeline()
        elif action.action_type == ActionType.VIEW_LOGS:
            result = self._view_logs(action.service_name)
        elif action.action_type == ActionType.VIEW_CONFIG:
            result = self._view_config(action.service_name)
        elif action.action_type == ActionType.EDIT_CONFIG:
            result = self._edit_config(action.service_name, action.config_edits)
        elif action.action_type == ActionType.RUN_MIGRATION:
            result = self._run_migration(action.migration_name, action.migration_type)
        elif action.action_type == ActionType.DEPLOY:
            result = self._deploy(action.service_name, action.target_version)
        elif action.action_type == ActionType.ROLLBACK:
            result = self._rollback(action.service_name)
        elif action.action_type == ActionType.APPROVE:
            result = self._approve(action.reason)
        elif action.action_type == ActionType.ABORT:
            result = self._abort(action.reason)
        else:
            result = "Unknown action."

        # 3. Environmental effects AFTER action (agent sees consequences)
        if self._time_pressure:
            self._apply_time_pressure()
        self._tick_cascading_effects()
        self._tick_metric_compounding()
        self._tick_tipping_points()

        return result

    # --- Cross-metric compounding ---------------------------------------------

    def _tick_metric_compounding(self):
        """Metrics compound on each other β€” creates realistic spirals and recovery."""
        if self.scenario.task_name == "clean_deploy":
            return
        for name, svc in self.services.items():
            # Degradation spirals (moderate β€” should not kill episodes in <5 steps)
            if svc.error_rate > 15.0:
                svc.cpu_percent = min(svc.cpu_percent + 3, 99)
            if svc.cpu_percent > 90:
                svc.latency_ms = round(min(svc.latency_ms + 100, 5000), 1)
            if svc.latency_ms > 3000:
                svc.error_rate = round(min(svc.error_rate + 1.0, 50.0), 2)

            # Natural recovery (when metrics are good, they help each other)
            if svc.error_rate < 2.0:
                svc.cpu_percent = max(svc.cpu_percent - 3, 10)
            if svc.cpu_percent < 50:
                svc.latency_ms = round(max(svc.latency_ms - 50, 20), 1)
            if svc.latency_ms < 200 and svc.error_rate < 1.0:
                svc.error_rate = round(max(svc.error_rate - 0.5, 0.0), 2)

    # --- Non-linear tipping points -------------------------------------------

    def _tick_tipping_points(self):
        """Non-linear tipping points β€” systems cliff instead of degrading linearly."""
        if self.scenario.task_name == "clean_deploy":
            return
        for name, svc in self.services.items():
            # CPU cliff: above 85% = exponential error growth
            if svc.cpu_percent > 85:
                overflow = svc.cpu_percent - 85
                svc.error_rate = round(min(svc.error_rate + overflow * 0.2, 50.0), 2)

            # Latency cliff: above 2000ms = rapid collapse
            if svc.latency_ms > 2000:
                svc.error_rate = round(min(svc.error_rate + 3.0, 50.0), 2)

            # Health cliff: below 30% health = accelerating death spiral
            base = 50.0 if svc.health == ServiceHealth.DEGRADED else (
                100.0 if svc.health == ServiceHealth.HEALTHY else 0.0
            )
            err_penalty = min(svc.error_rate * 2, 30)
            lat_penalty = min(max(0, svc.latency_ms - 200) / 10, 30)
            health_pct = max(0, base - err_penalty - lat_penalty)
            if health_pct < 30:
                svc.error_rate = round(min(svc.error_rate * 1.3, 50.0), 2)

            # Latency β†’ CPU feedback (high latency = retries = more CPU)
            if svc.latency_ms > 1500:
                svc.cpu_percent = min(svc.cpu_percent + 3, 99)

    # --- Cascading failures ---------------------------------------------------

    def _get_dependents(self, service_name):
        """Find all services that list service_name in their dependencies."""
        return [
            svc for svc in self.services.values()
            if service_name in svc.dependencies
        ]

    def _tick_cascading_effects(self):
        """Unhealthy services degrade their dependents each step."""
        for svc in self.services.values():
            health_pct = svc._get_health_pct()
            if health_pct >= 50.0:
                continue  # healthy enough, no cascade

            dependents = self._get_dependents(svc.name)
            for dep in dependents:
                if dep.health == ServiceHealth.DOWN:
                    continue  # already down, can't get worse from cascade

                # Determine cascade severity
                if health_pct < 20.0:
                    # Source is effectively down β€” moderate cascade
                    err_increase = 1.5
                    lat_increase = 30.0
                else:
                    # Source is degraded β€” lighter cascade
                    err_increase = 0.5
                    lat_increase = 10.0

                old_err = dep.error_rate
                dep.error_rate = round(min(dep.error_rate + err_increase, 45.0), 2)
                dep.latency_ms = round(min(dep.latency_ms + lat_increase, 4500.0), 1)

                # If error rate gets high enough, mark as degraded
                if dep.error_rate > 5.0 and dep.health == ServiceHealth.HEALTHY:
                    dep.health = ServiceHealth.DEGRADED

                # Floor: cascading alone can't push health below 5%
                # (prevent instant death spirals)
                dep_health = dep._get_health_pct()
                if dep_health < 5.0:
                    dep.error_rate = round(max(old_err, dep.error_rate - err_increase + 1.0), 2)

                # Add cascade alert (only if not already alerted this step)
                cascade_alert_key = f"cascade:{svc.name}->{dep.name}"
                existing = [a for a in self.alerts if cascade_alert_key in a.message]
                if not existing:
                    self.alerts.append(AlertInfo(
                        severity="warning",
                        message=(
                            f"Cascading: {svc.name} (health {health_pct:.0f}%) is degrading "
                            f"{dep.name} β€” error_rate +{err_increase}/s, latency +{lat_increase:.0f}ms "
                            f"[{cascade_alert_key}]"
                        ),
                        service_name=dep.name,
                        timestamp="2026-04-01T12:00:00Z",
                    ))

                dep.logs.append(
                    f"[CASCADE] Upstream {svc.name} unhealthy (health {health_pct:.0f}%) β€” "
                    f"{dep.name} error_rate now {dep.error_rate:.1f}/s, "
                    f"latency {dep.latency_ms:.0f}ms"
                )

        # Recovery propagation: healthy services help their dependents recover
        for name, svc in self.services.items():
            if svc.health == ServiceHealth.HEALTHY and svc.error_rate < 2.0:
                dependents = self._get_dependents(name)
                for dep in dependents:
                    if dep.health == ServiceHealth.DEGRADED:
                        dep.error_rate = round(dep.error_rate * 0.9, 2)
                        dep.latency_ms = round(dep.latency_ms * 0.9, 1)

    # --- Action handlers ------------------------------------------------------

    def _view_pipeline(self):
        services_summary = "\n".join(
            f"  {s.name}: {s.health.value} | v{s.current_version} -> "
            f"v{s.target_version or 'N/A'} | "
            f"latency={s.latency_ms:.0f}ms | errors={s.error_rate:.1f}/s"
            for s in self.services.values()
        )
        return (
            f"Pipeline Stage: {self.pipeline_stage.value}\n"
            f"Commit: {self.commit_sha}\n"
            f"Tests: {self.test_pass} passed, {self.test_fail} failed\n"
            f"Pending Migrations: {len(self.migrations_pending)}\n"
            f"Services:\n{services_summary}"
        )

    def _view_logs(self, service_name):
        svc = self.services.get(service_name)
        if not svc:
            return f"No service named '{service_name}'"
        logs = svc.get_logs()
        if not logs:
            return f"No logs available for {service_name}."
        return f"Logs for {service_name}:\n" + "\n".join(logs[-20:])

    def _view_config(self, service_name):
        svc = self.services.get(service_name)
        if not svc:
            return f"No service named '{service_name}'"
        config = svc.get_config_snapshot()
        lines = [f"  {k} = {v}" for k, v in config.items()]
        return f"Config for {service_name}:\n" + "\n".join(lines)

    def _edit_config(self, service_name, edits):
        svc = self.services.get(service_name)
        if not svc:
            return f"No service named '{service_name}'"
        results = []
        for edit in edits:
            result = svc.set_config(edit.key, edit.value)
            results.append(result)
        # If the config error is now fixed and service was degraded, start
        # staged recovery (2 steps) instead of instant heal
        if svc.health == ServiceHealth.DEGRADED and not self.scenario.check_config_error(service_name, svc.config):
            svc.staging_deployed = False
            svc.staging_verified = False
            # Immediate PARTIAL improvement
            svc.error_rate = round(svc.error_rate * 0.5, 2)
            svc.latency_ms = round(svc.latency_ms * 0.6, 1)
            # Set up 2-step recovery to full health (reuse tick_recovery pattern)
            svc._recovery_steps_remaining = 2
            svc._recovery_target_latency = round(50.0 * self._rng.uniform(0.8, 1.2), 1)
            svc._recovery_target_error_rate = round(0.1 * self._rng.uniform(0.9, 1.1), 3)
            # Don't set health to HEALTHY yet β€” let tick_recovery handle it
            # once error_rate drops below threshold on next steps
            results.append(f"Config fix detected for {service_name}. Service improving β€” full recovery in ~2 steps. Ready for re-deploy.")
        return "\n".join(results)

    def _run_migration(self, migration_name, migration_type):
        if migration_name not in self.migrations_pending:
            return (
                f"Migration '{migration_name}' not found in pending: "
                f"{self.migrations_pending}"
            )
        success = self.scenario.run_migration(self, migration_name)
        if success:
            self.migrations_pending.remove(migration_name)
            self.migrations_applied.append(migration_name)
            return f"Migration '{migration_name}' applied successfully."
        else:
            error = f"Migration '{migration_name}' FAILED."
            self.migration_errors.append(error)
            return error

    def _deploy(self, service_name, target_version):
        svc = self.services.get(service_name)
        if not svc:
            return f"No service named '{service_name}'"

        # Check migration dependencies
        if self.migrations_pending and self.scenario.migration_blocks_deploy(service_name):
            return (
                f"BLOCKED: Pending migrations must be applied before deploying "
                f"{service_name}. Pending: {self.migrations_pending}"
            )

        # Check if any dependency is unhealthy β€” 50% chance of deploy failure
        for dep_name in svc.dependencies:
            dep_svc = self.services.get(dep_name)
            if dep_svc and dep_svc._get_health_pct() < 50.0:
                if self._rng.random() < 0.5:
                    svc.logs.append(
                        f"[DEPLOY] Deploy {svc.name} {target_version} FAILED β€” "
                        f"dependency {dep_name} is unhealthy "
                        f"(health {dep_svc._get_health_pct():.0f}%). Retry may succeed."
                    )
                    return (
                        f"DEPLOY UNSTABLE: Dependency {dep_name} is unhealthy "
                        f"(health {dep_svc._get_health_pct():.0f}%). "
                        f"Deploy of {service_name} failed. Retry may succeed."
                    )

        # Determine target environment
        if not svc.staging_deployed:
            self.pipeline_stage = PipelineStage.STAGING
            return svc.deploy_to_staging(target_version, self.scenario)
        else:
            self.pipeline_stage = PipelineStage.DEPLOYING
            result = svc.deploy_to_production(target_version)
            # Notify scenario of deploy (for cascading effects)
            if hasattr(self.scenario, 'on_prod_deploy'):
                extra = self.scenario.on_prod_deploy(self, service_name, target_version)
                if extra:
                    result += "\n" + extra
            # Check if all target services deployed
            if all(s.prod_deployed for s in self.services.values() if s.target_version):
                self.pipeline_stage = PipelineStage.DEPLOYED
            return result

    def _rollback(self, service_name):
        svc = self.services.get(service_name)
        if not svc:
            return f"No service named '{service_name}'"
        self.pipeline_stage = PipelineStage.ROLLED_BACK

        # Check if dependents rely on current version's APIs
        old_version = svc.current_version
        dependents = self._get_dependents(service_name)
        result = svc.rollback()

        # Warn about dependent services and increase their error rates
        for dep in dependents:
            dep.error_rate = round(dep.error_rate + 5.0, 2)
            if dep.health == ServiceHealth.HEALTHY and dep.error_rate > 3.0:
                dep.health = ServiceHealth.DEGRADED
            self.alerts.append(AlertInfo(
                severity="warning",
                message=(
                    f"Rollback impact: {dep.name} depends on {service_name} "
                    f"{old_version}. Rollback may break {dep.name}. "
                    f"Error rate increased to {dep.error_rate:.1f}/s."
                ),
                service_name=dep.name,
                timestamp="2026-04-01T12:00:00Z",
            ))
            dep.logs.append(
                f"[ROLLBACK-IMPACT] {service_name} rolled back from {old_version} β€” "
                f"{dep.name} error_rate increased to {dep.error_rate:.1f}/s. "
                f"Dependency on {old_version} APIs may be broken."
            )

        if hasattr(self.scenario, 'on_rollback'):
            self.scenario.on_rollback(self, service_name)
        return result

    def _approve(self, reason):
        self.pipeline_stage = PipelineStage.DEPLOYED
        return f"Deployment APPROVED. Reason: {reason or 'No reason given.'}"

    def _abort(self, reason):
        self.pipeline_stage = PipelineStage.FAILED
        return f"Deployment ABORTED. Reason: {reason or 'No reason given.'}"

    # --- State queries --------------------------------------------------------

    def snapshot(self):
        """Capture current state for reward calculation."""
        return {
            "services": {
                name: {
                    "health": s.health.value,
                    "error_rate": s.error_rate,
                    "latency_ms": s.latency_ms,
                    "prod_deployed": s.prod_deployed,
                    "staging_verified": s.staging_verified,
                    "config": dict(s.config),
                }
                for name, s in self.services.items()
            },
            "system_health": self.get_system_health(),
            "pipeline_stage": self.pipeline_stage.value,
            "migrations_pending": list(self.migrations_pending),
            "alerts": list(self.alerts),
        }

    def get_system_health(self):
        """Aggregate health 0-100."""
        if not self.services:
            return 100.0
        total = 0.0
        for svc in self.services.values():
            total += svc._get_health_pct()
        return total / len(self.services)

    def get_service_statuses(self):
        return [s.to_status() for s in self.services.values()]

    def get_pipeline_status(self):
        return PipelineStatus(
            stage=self.pipeline_stage,
            triggered_by=self.triggered_by,
            started_at=self.started_at,
            commit_sha=self.commit_sha,
            build_logs_snippet=self.build_logs if self.build_logs else None,
            test_pass_count=self.test_pass,
            test_fail_count=self.test_fail,
        )

    def get_migration_status(self):
        return MigrationStatus(
            pending_migrations=list(self.migrations_pending),
            last_applied=self.migrations_applied[-1] if self.migrations_applied else None,
            migration_errors=self.migration_errors if self.migration_errors else None,
        )

    def get_alerts(self):
        return list(self.alerts)

    def get_service_names(self):
        return list(self.services.keys())

    def has_services(self):
        return len(self.services) > 0

    def has_pending_migrations(self):
        return len(self.migrations_pending) > 0

    def _apply_time_pressure(self):
        """During incidents, degraded services get worse each step."""
        task = self.scenario.task_name

        if task == "judgment_call":
            api_gw = self.services.get("api-gateway")
            if api_gw and api_gw.health == ServiceHealth.DEGRADED:
                degrade_lat = 80 * self._rng.uniform(0.8, 1.2)
                degrade_err = 0.8 * self._rng.uniform(0.9, 1.1)
                api_gw.latency_ms = round(min(api_gw.latency_ms + degrade_lat, 5000), 1)
                api_gw.error_rate = round(min(api_gw.error_rate + degrade_err, 50.0), 2)
                api_gw.cpu_percent = min(api_gw.cpu_percent + 1, 99)
                api_gw.logs.append(
                    f"[DEGRADING] api-gateway latency now {api_gw.latency_ms:.0f}ms, "
                    f"errors {api_gw.error_rate:.1f}/s β€” situation worsening"
                )

        elif task == "broken_pipeline":
            # Cache-service degrades if config error persists
            cache = self.services.get("cache-service")
            if cache and self.scenario.check_config_error("cache-service", cache.config):
                health_drop = 3.0 * self._rng.uniform(0.8, 1.2)
                cache.error_rate = round(min(cache.error_rate + health_drop * 0.5, 25.0), 2)
                cache.latency_ms = round(min(cache.latency_ms + 30.0 * self._rng.uniform(0.8, 1.2), 2000.0), 1)
                if cache.error_rate > 3.0 and cache.health == ServiceHealth.HEALTHY:
                    cache.health = ServiceHealth.DEGRADED
                cache.logs.append(
                    f"[DEGRADING] cache-service using staging Redis β€” "
                    f"error_rate now {cache.error_rate:.1f}/s, "
                    f"latency {cache.latency_ms:.0f}ms"
                )

            # Api-gateway latency increases if migration not applied
            api_gw = self.services.get("api-gateway")
            if api_gw and "add_index_users_email" in self.migrations_pending:
                lat_increase = 50.0 * self._rng.uniform(0.8, 1.2)
                api_gw.latency_ms = round(min(api_gw.latency_ms + lat_increase, 2000.0), 1)
                api_gw.logs.append(
                    f"[DEGRADING] api-gateway missing index β€” "
                    f"user query latency now {api_gw.latency_ms:.0f}ms"
                )

        elif task == "capacity_crisis":
            db = self.services.get("database-primary")
            api_gw = self.services.get("api-gateway")
            # Time pressure only while connection pool bottleneck persists
            if db and self.scenario.check_config_error("database-primary", db.config):
                db.cpu_percent = min(db.cpu_percent + 2, 99)
                db.latency_ms = round(db.latency_ms + 15, 1)
            # api-gateway degrades only while db bottleneck persists
            if (api_gw and api_gw.health == ServiceHealth.DEGRADED
                    and db and self.scenario.check_config_error("database-primary", db.config)):
                api_gw.latency_ms = round(min(api_gw.latency_ms + 30, 5000), 1)
                api_gw.error_rate = round(min(api_gw.error_rate + 0.5, 50.0), 2)

        elif task == "random_incident":
            failing = getattr(self.scenario, 'failing_service', None)
            if failing:
                svc = self.services.get(failing)
                if svc and svc.health == ServiceHealth.DEGRADED:
                    svc.error_rate = round(min(svc.error_rate + 0.5, 50.0), 2)
                    svc.latency_ms = round(min(svc.latency_ms + 30, 5000), 1)