# Copyright (c) Meta Platforms, Inc. and affiliates. # All rights reserved. # # This source code is licensed under the BSD-style license found in the # LICENSE file in the root directory of this source tree. """Simulation engine for the DevOps Pipeline Environment.""" import random from devops_pipeline_env.models import ( ActionType, AlertInfo, ConfigEdit, MigrationStatus, PipelineAction, PipelineStage, PipelineStatus, ServiceHealth, ServiceStatus, ) class ServiceState: """State machine for a single microservice.""" def __init__(self, name, version, health, config, dependencies, latency_ms, error_rate, cpu, memory, rng=None): self.name = name self.current_version = version self.target_version = None self.health = health self.config = dict(config) self.dependencies = list(dependencies) if dependencies else [] self.latency_ms = latency_ms self.error_rate = error_rate self.cpu_percent = cpu self.memory_percent = memory self.active_connections = 100 self.staging_deployed = False self.staging_verified = False self.prod_deployed = False self.last_deploy_timestamp = "2026-04-01T00:00:00Z" self.logs = [] self._rng = rng or random.Random(0) # Staged health recovery: 0 = fully recovered, >0 = still recovering self._recovery_steps_remaining = 0 self._recovery_target_latency = 0.0 self._recovery_target_error_rate = 0.0 def deploy_to_staging(self, version, scenario): """Deploy version to staging. Returns result text.""" self.staging_deployed = True self.target_version = version # 8% chance of transient staging failure on first attempt # Skip for clean_deploy (easy task) and during incidents (health already degraded/down) transient_roll = self._rng.random() # always consume RNG for determinism is_clean_deploy = hasattr(self, '_task_name') and self._task_name == "clean_deploy" if not is_clean_deploy and not self.staging_verified and self.health == ServiceHealth.HEALTHY and transient_roll < 0.08: self.staging_deployed = True # deployed but not verified self.logs.append( f"[DEPLOY] Deployed {self.name} {version} to staging. " f"Transient failure: health check timed out. Retry should succeed." ) return ( f"Deployed {self.name} {version} to staging. " f"WARNING: Transient health check timeout. Try deploying again." ) if scenario.check_config_error(self.name, self.config): self.health = ServiceHealth.DEGRADED lat_mult = self._rng.uniform(0.8, 1.2) err_mult = self._rng.uniform(0.9, 1.1) self.error_rate = round(12.0 * err_mult, 2) self.latency_ms = round(300.0 * lat_mult, 1) self.logs.append( f"[DEPLOY] Deployed {self.name} {version} to staging. " f"WARNING: Health check DEGRADED. Error rate elevated " f"({self.error_rate:.1f}/s, latency {self.latency_ms:.0f}ms)." ) return ( f"Deployed {self.name} {version} to staging. " f"WARNING: Health check degraded. Error rate elevated." ) self.health = ServiceHealth.HEALTHY self.staging_verified = True lat_mult = self._rng.uniform(0.8, 1.2) self.error_rate = round(0.1 * self._rng.uniform(0.9, 1.1), 3) self.latency_ms = round(45.0 * lat_mult, 1) self.logs.append( f"[DEPLOY] Deployed {self.name} {version} to staging. Health check: PASSED." ) return f"Deployed {self.name} {version} to STAGING. Staging verified. Deploy same service+version again to PROMOTE TO PRODUCTION." def deploy_to_production(self, version): """Promote to production.""" if not self.staging_verified: self.health = ServiceHealth.DEGRADED lat_mult = self._rng.uniform(0.8, 1.2) err_mult = self._rng.uniform(0.9, 1.1) self.error_rate = round(25.0 * err_mult, 2) self.latency_ms = round(500.0 * lat_mult, 1) self.logs.append( f"[DEPLOY] Deployed {self.name} {version} to production " f"WITHOUT staging verification. High risk." ) return ( f"Deployed {self.name} {version} to production " f"WITHOUT staging verification. High risk." ) self.prod_deployed = True self.current_version = version # Staged recovery: takes 1-3 steps to fully stabilize recovery_steps = self._rng.randint(1, 3) self._recovery_steps_remaining = recovery_steps base_latency = 45.0 * self._rng.uniform(0.8, 1.2) base_error_rate = 0.1 * self._rng.uniform(0.9, 1.1) # Non-linear deploy quality: same seed = same outcome quality_roll = self._rng.random() deploy_note = "" if quality_roll < 0.7: # Clean deploy — recovers to near-perfect pass # base values are already good elif quality_roll < 0.9: # Minor issues — recovers to good but not perfect base_latency *= 1.5 base_error_rate *= 3.0 deploy_note = " Minor post-deploy issues detected." self.logs.append( f"[DEPLOY] {self.name}: Minor post-deploy issues detected. " f"Performance slightly below optimal." ) else: # Unstable deploy — recovers poorly base_latency *= 2.5 base_error_rate *= 8.0 self.error_rate += 1.5 deploy_note = " Post-deploy instability detected." self.logs.append( f"[DEPLOY] {self.name}: Post-deploy instability detected. " f"Elevated error rate." ) self._recovery_target_latency = round(base_latency, 1) self._recovery_target_error_rate = round(base_error_rate, 3) # Start at slightly elevated values during recovery self.health = ServiceHealth.HEALTHY self.latency_ms = round(base_latency * (1.0 + 0.3 * recovery_steps), 1) self.error_rate = round(base_error_rate * (1.0 + 0.5 * recovery_steps), 3) # Trade-off: deploy causes temporary CPU/latency spike (warmup load) # Clean deploy tasks get reduced spikes — they should be clean if hasattr(self, '_task_name') and self._task_name == "clean_deploy": self.cpu_percent = min(self.cpu_percent + 3, 99) self.latency_ms += round(30 * self._rng.uniform(0.8, 1.2), 1) else: self.cpu_percent = min(self.cpu_percent + 15, 99) self.latency_ms += round(200 * self._rng.uniform(0.8, 1.2), 1) self.last_deploy_timestamp = "2026-04-01T12:00:00Z" self.logs.append( f"[DEPLOY] Promoted {self.name} {version} to production. Health: HEALTHY. " f"Stabilizing over ~{recovery_steps} step(s). CPU/latency spike from warmup." ) return ( f"Promoted {self.name} {version} to production. Health: HEALTHY. " f"Deployed successfully. Service under warmup load — temporary CPU/latency spike expected." f"{deploy_note}" ) def tick_recovery(self): """Called each step to progress staged health recovery.""" if self._recovery_steps_remaining > 0: self._recovery_steps_remaining -= 1 if self._recovery_steps_remaining == 0: # Fully recovered self.latency_ms = self._recovery_target_latency self.error_rate = self._recovery_target_error_rate if self.health == ServiceHealth.DEGRADED and self.error_rate < 5.0: self.health = ServiceHealth.HEALTHY else: # Interpolate toward target progress = 1.0 - (self._recovery_steps_remaining / (self._recovery_steps_remaining + 1)) self.latency_ms = round( self.latency_ms + (self._recovery_target_latency - self.latency_ms) * progress, 1 ) self.error_rate = round( self.error_rate + (self._recovery_target_error_rate - self.error_rate) * progress, 3 ) def rollback(self): """Rollback to previous version.""" self.health = ServiceHealth.HEALTHY lat_mult = self._rng.uniform(0.8, 1.2) err_mult = self._rng.uniform(0.9, 1.1) self.error_rate = round(0.5 * err_mult, 3) self.latency_ms = round(50.0 * lat_mult * 0.7, 1) self.staging_deployed = False self.staging_verified = False self.prod_deployed = True # still in prod, just rolled back self._recovery_steps_remaining = 0 # Trade-off: 25% chance rollback reintroduces a known bug regression = False if self._rng.random() < 0.25: self.error_rate = round(self.error_rate + 3.0, 2) regression = True self.logs.append( f"[ROLLBACK] Rolled back {self.name} to {self.current_version}. " f"Warning: rollback may have reintroduced known issue from previous version" ) else: self.logs.append( f"[ROLLBACK] Rolled back {self.name} to {self.current_version}. Service healthy." ) result = f"Rolled back {self.name} to {self.current_version}. Rolled back. Monitoring for regression..." if regression: result += f" WARNING: Error rate elevated ({self.error_rate:.1f}/s) — possible regression." return result def set_config(self, key, value): """Edit a config value.""" old = self.config.get(key, "") self.config[key] = value # Trade-off: config change causes brief restart spike self.latency_ms += round(100 * self._rng.uniform(0.8, 1.2), 1) self.cpu_percent = min(self.cpu_percent + 5, 99) self.logs.append(f"[CONFIG] {self.name}: {key} changed from '{old}' to '{value}'. Service restarting.") return f"Config {self.name}: {key} changed from '{old}' to '{value}'. Config updated. Service restarting — brief latency spike." def get_config_snapshot(self): return dict(self.config) def get_logs(self): return list(self.logs) def _get_health_pct(self): """Get numeric health percentage for this service.""" h = 100.0 if self.health == ServiceHealth.DOWN: h = 0.0 elif self.health == ServiceHealth.DEGRADED: h = 50.0 h -= min(self.error_rate * 2, 30) if self.latency_ms > 200: h -= min((self.latency_ms - 200) / 10, 30) return max(h, 0.0) def to_status(self): return ServiceStatus( name=self.name, health=self.health, current_version=self.current_version, cpu_percent=self.cpu_percent, memory_percent=self.memory_percent, error_rate=self.error_rate, request_latency_ms=self.latency_ms, active_connections=self.active_connections, last_deploy_timestamp=self.last_deploy_timestamp, ) class PipelineEngine: """Manages all services, pipeline state, migrations, alerts.""" def __init__(self, scenario, seed): self.scenario = scenario self._rng = random.Random(seed) self.services = {} self.pipeline_stage = PipelineStage.IDLE self.migrations_pending = [] self.migrations_applied = [] self.migration_errors = [] self.alerts = [] self.commit_sha = "abc123" self.triggered_by = "deploy-bot" self.started_at = "2026-04-01T10:00:00Z" self.test_pass = 0 self.test_fail = 0 self.build_logs = "" self._time_pressure = False # Set by scenario if needed # Initialize from scenario scenario.setup(self) # Inject the shared RNG and task name into all services created by the scenario for svc in self.services.values(): svc._rng = self._rng svc._task_name = scenario.task_name def execute(self, action): """Execute an action. Returns human-readable result string.""" # 1. Tick health recovery for all services (heal from previous deploys) for svc in self.services.values(): svc.tick_recovery() # 2. Execute the agent's action FIRST if action.action_type == ActionType.VIEW_PIPELINE: result = self._view_pipeline() elif action.action_type == ActionType.VIEW_LOGS: result = self._view_logs(action.service_name) elif action.action_type == ActionType.VIEW_CONFIG: result = self._view_config(action.service_name) elif action.action_type == ActionType.EDIT_CONFIG: result = self._edit_config(action.service_name, action.config_edits) elif action.action_type == ActionType.RUN_MIGRATION: result = self._run_migration(action.migration_name, action.migration_type) elif action.action_type == ActionType.DEPLOY: result = self._deploy(action.service_name, action.target_version) elif action.action_type == ActionType.ROLLBACK: result = self._rollback(action.service_name) elif action.action_type == ActionType.APPROVE: result = self._approve(action.reason) elif action.action_type == ActionType.ABORT: result = self._abort(action.reason) else: result = "Unknown action." # 3. Environmental effects AFTER action (agent sees consequences) if self._time_pressure: self._apply_time_pressure() self._tick_cascading_effects() self._tick_metric_compounding() self._tick_tipping_points() return result # --- Cross-metric compounding --------------------------------------------- def _tick_metric_compounding(self): """Metrics compound on each other — creates realistic spirals and recovery.""" if self.scenario.task_name == "clean_deploy": return for name, svc in self.services.items(): # Degradation spirals (moderate — should not kill episodes in <5 steps) if svc.error_rate > 15.0: svc.cpu_percent = min(svc.cpu_percent + 3, 99) if svc.cpu_percent > 90: svc.latency_ms = round(min(svc.latency_ms + 100, 5000), 1) if svc.latency_ms > 3000: svc.error_rate = round(min(svc.error_rate + 1.0, 50.0), 2) # Natural recovery (when metrics are good, they help each other) if svc.error_rate < 2.0: svc.cpu_percent = max(svc.cpu_percent - 3, 10) if svc.cpu_percent < 50: svc.latency_ms = round(max(svc.latency_ms - 50, 20), 1) if svc.latency_ms < 200 and svc.error_rate < 1.0: svc.error_rate = round(max(svc.error_rate - 0.5, 0.0), 2) # --- Non-linear tipping points ------------------------------------------- def _tick_tipping_points(self): """Non-linear tipping points — systems cliff instead of degrading linearly.""" if self.scenario.task_name == "clean_deploy": return for name, svc in self.services.items(): # CPU cliff: above 85% = exponential error growth if svc.cpu_percent > 85: overflow = svc.cpu_percent - 85 svc.error_rate = round(min(svc.error_rate + overflow * 0.2, 50.0), 2) # Latency cliff: above 2000ms = rapid collapse if svc.latency_ms > 2000: svc.error_rate = round(min(svc.error_rate + 3.0, 50.0), 2) # Health cliff: below 30% health = accelerating death spiral base = 50.0 if svc.health == ServiceHealth.DEGRADED else ( 100.0 if svc.health == ServiceHealth.HEALTHY else 0.0 ) err_penalty = min(svc.error_rate * 2, 30) lat_penalty = min(max(0, svc.latency_ms - 200) / 10, 30) health_pct = max(0, base - err_penalty - lat_penalty) if health_pct < 30: svc.error_rate = round(min(svc.error_rate * 1.3, 50.0), 2) # Latency → CPU feedback (high latency = retries = more CPU) if svc.latency_ms > 1500: svc.cpu_percent = min(svc.cpu_percent + 3, 99) # --- Cascading failures --------------------------------------------------- def _get_dependents(self, service_name): """Find all services that list service_name in their dependencies.""" return [ svc for svc in self.services.values() if service_name in svc.dependencies ] def _tick_cascading_effects(self): """Unhealthy services degrade their dependents each step.""" for svc in self.services.values(): health_pct = svc._get_health_pct() if health_pct >= 50.0: continue # healthy enough, no cascade dependents = self._get_dependents(svc.name) for dep in dependents: if dep.health == ServiceHealth.DOWN: continue # already down, can't get worse from cascade # Determine cascade severity if health_pct < 20.0: # Source is effectively down — moderate cascade err_increase = 1.5 lat_increase = 30.0 else: # Source is degraded — lighter cascade err_increase = 0.5 lat_increase = 10.0 old_err = dep.error_rate dep.error_rate = round(min(dep.error_rate + err_increase, 45.0), 2) dep.latency_ms = round(min(dep.latency_ms + lat_increase, 4500.0), 1) # If error rate gets high enough, mark as degraded if dep.error_rate > 5.0 and dep.health == ServiceHealth.HEALTHY: dep.health = ServiceHealth.DEGRADED # Floor: cascading alone can't push health below 5% # (prevent instant death spirals) dep_health = dep._get_health_pct() if dep_health < 5.0: dep.error_rate = round(max(old_err, dep.error_rate - err_increase + 1.0), 2) # Add cascade alert (only if not already alerted this step) cascade_alert_key = f"cascade:{svc.name}->{dep.name}" existing = [a for a in self.alerts if cascade_alert_key in a.message] if not existing: self.alerts.append(AlertInfo( severity="warning", message=( f"Cascading: {svc.name} (health {health_pct:.0f}%) is degrading " f"{dep.name} — error_rate +{err_increase}/s, latency +{lat_increase:.0f}ms " f"[{cascade_alert_key}]" ), service_name=dep.name, timestamp="2026-04-01T12:00:00Z", )) dep.logs.append( f"[CASCADE] Upstream {svc.name} unhealthy (health {health_pct:.0f}%) — " f"{dep.name} error_rate now {dep.error_rate:.1f}/s, " f"latency {dep.latency_ms:.0f}ms" ) # Recovery propagation: healthy services help their dependents recover for name, svc in self.services.items(): if svc.health == ServiceHealth.HEALTHY and svc.error_rate < 2.0: dependents = self._get_dependents(name) for dep in dependents: if dep.health == ServiceHealth.DEGRADED: dep.error_rate = round(dep.error_rate * 0.9, 2) dep.latency_ms = round(dep.latency_ms * 0.9, 1) # --- Action handlers ------------------------------------------------------ def _view_pipeline(self): services_summary = "\n".join( f" {s.name}: {s.health.value} | v{s.current_version} -> " f"v{s.target_version or 'N/A'} | " f"latency={s.latency_ms:.0f}ms | errors={s.error_rate:.1f}/s" for s in self.services.values() ) return ( f"Pipeline Stage: {self.pipeline_stage.value}\n" f"Commit: {self.commit_sha}\n" f"Tests: {self.test_pass} passed, {self.test_fail} failed\n" f"Pending Migrations: {len(self.migrations_pending)}\n" f"Services:\n{services_summary}" ) def _view_logs(self, service_name): svc = self.services.get(service_name) if not svc: return f"No service named '{service_name}'" logs = svc.get_logs() if not logs: return f"No logs available for {service_name}." return f"Logs for {service_name}:\n" + "\n".join(logs[-20:]) def _view_config(self, service_name): svc = self.services.get(service_name) if not svc: return f"No service named '{service_name}'" config = svc.get_config_snapshot() lines = [f" {k} = {v}" for k, v in config.items()] return f"Config for {service_name}:\n" + "\n".join(lines) def _edit_config(self, service_name, edits): svc = self.services.get(service_name) if not svc: return f"No service named '{service_name}'" results = [] for edit in edits: result = svc.set_config(edit.key, edit.value) results.append(result) # If the config error is now fixed and service was degraded, start # staged recovery (2 steps) instead of instant heal if svc.health == ServiceHealth.DEGRADED and not self.scenario.check_config_error(service_name, svc.config): svc.staging_deployed = False svc.staging_verified = False # Immediate PARTIAL improvement svc.error_rate = round(svc.error_rate * 0.5, 2) svc.latency_ms = round(svc.latency_ms * 0.6, 1) # Set up 2-step recovery to full health (reuse tick_recovery pattern) svc._recovery_steps_remaining = 2 svc._recovery_target_latency = round(50.0 * self._rng.uniform(0.8, 1.2), 1) svc._recovery_target_error_rate = round(0.1 * self._rng.uniform(0.9, 1.1), 3) # Don't set health to HEALTHY yet — let tick_recovery handle it # once error_rate drops below threshold on next steps results.append(f"Config fix detected for {service_name}. Service improving — full recovery in ~2 steps. Ready for re-deploy.") return "\n".join(results) def _run_migration(self, migration_name, migration_type): if migration_name not in self.migrations_pending: return ( f"Migration '{migration_name}' not found in pending: " f"{self.migrations_pending}" ) success = self.scenario.run_migration(self, migration_name) if success: self.migrations_pending.remove(migration_name) self.migrations_applied.append(migration_name) return f"Migration '{migration_name}' applied successfully." else: error = f"Migration '{migration_name}' FAILED." self.migration_errors.append(error) return error def _deploy(self, service_name, target_version): svc = self.services.get(service_name) if not svc: return f"No service named '{service_name}'" # Check migration dependencies if self.migrations_pending and self.scenario.migration_blocks_deploy(service_name): return ( f"BLOCKED: Pending migrations must be applied before deploying " f"{service_name}. Pending: {self.migrations_pending}" ) # Check if any dependency is unhealthy — 50% chance of deploy failure for dep_name in svc.dependencies: dep_svc = self.services.get(dep_name) if dep_svc and dep_svc._get_health_pct() < 50.0: if self._rng.random() < 0.5: svc.logs.append( f"[DEPLOY] Deploy {svc.name} {target_version} FAILED — " f"dependency {dep_name} is unhealthy " f"(health {dep_svc._get_health_pct():.0f}%). Retry may succeed." ) return ( f"DEPLOY UNSTABLE: Dependency {dep_name} is unhealthy " f"(health {dep_svc._get_health_pct():.0f}%). " f"Deploy of {service_name} failed. Retry may succeed." ) # Determine target environment if not svc.staging_deployed: self.pipeline_stage = PipelineStage.STAGING return svc.deploy_to_staging(target_version, self.scenario) else: self.pipeline_stage = PipelineStage.DEPLOYING result = svc.deploy_to_production(target_version) # Notify scenario of deploy (for cascading effects) if hasattr(self.scenario, 'on_prod_deploy'): extra = self.scenario.on_prod_deploy(self, service_name, target_version) if extra: result += "\n" + extra # Check if all target services deployed if all(s.prod_deployed for s in self.services.values() if s.target_version): self.pipeline_stage = PipelineStage.DEPLOYED return result def _rollback(self, service_name): svc = self.services.get(service_name) if not svc: return f"No service named '{service_name}'" self.pipeline_stage = PipelineStage.ROLLED_BACK # Check if dependents rely on current version's APIs old_version = svc.current_version dependents = self._get_dependents(service_name) result = svc.rollback() # Warn about dependent services and increase their error rates for dep in dependents: dep.error_rate = round(dep.error_rate + 5.0, 2) if dep.health == ServiceHealth.HEALTHY and dep.error_rate > 3.0: dep.health = ServiceHealth.DEGRADED self.alerts.append(AlertInfo( severity="warning", message=( f"Rollback impact: {dep.name} depends on {service_name} " f"{old_version}. Rollback may break {dep.name}. " f"Error rate increased to {dep.error_rate:.1f}/s." ), service_name=dep.name, timestamp="2026-04-01T12:00:00Z", )) dep.logs.append( f"[ROLLBACK-IMPACT] {service_name} rolled back from {old_version} — " f"{dep.name} error_rate increased to {dep.error_rate:.1f}/s. " f"Dependency on {old_version} APIs may be broken." ) if hasattr(self.scenario, 'on_rollback'): self.scenario.on_rollback(self, service_name) return result def _approve(self, reason): self.pipeline_stage = PipelineStage.DEPLOYED return f"Deployment APPROVED. Reason: {reason or 'No reason given.'}" def _abort(self, reason): self.pipeline_stage = PipelineStage.FAILED return f"Deployment ABORTED. Reason: {reason or 'No reason given.'}" # --- State queries -------------------------------------------------------- def snapshot(self): """Capture current state for reward calculation.""" return { "services": { name: { "health": s.health.value, "error_rate": s.error_rate, "latency_ms": s.latency_ms, "prod_deployed": s.prod_deployed, "staging_verified": s.staging_verified, "config": dict(s.config), } for name, s in self.services.items() }, "system_health": self.get_system_health(), "pipeline_stage": self.pipeline_stage.value, "migrations_pending": list(self.migrations_pending), "alerts": list(self.alerts), } def get_system_health(self): """Aggregate health 0-100.""" if not self.services: return 100.0 total = 0.0 for svc in self.services.values(): total += svc._get_health_pct() return total / len(self.services) def get_service_statuses(self): return [s.to_status() for s in self.services.values()] def get_pipeline_status(self): return PipelineStatus( stage=self.pipeline_stage, triggered_by=self.triggered_by, started_at=self.started_at, commit_sha=self.commit_sha, build_logs_snippet=self.build_logs if self.build_logs else None, test_pass_count=self.test_pass, test_fail_count=self.test_fail, ) def get_migration_status(self): return MigrationStatus( pending_migrations=list(self.migrations_pending), last_applied=self.migrations_applied[-1] if self.migrations_applied else None, migration_errors=self.migration_errors if self.migration_errors else None, ) def get_alerts(self): return list(self.alerts) def get_service_names(self): return list(self.services.keys()) def has_services(self): return len(self.services) > 0 def has_pending_migrations(self): return len(self.migrations_pending) > 0 def _apply_time_pressure(self): """During incidents, degraded services get worse each step.""" task = self.scenario.task_name if task == "judgment_call": api_gw = self.services.get("api-gateway") if api_gw and api_gw.health == ServiceHealth.DEGRADED: degrade_lat = 80 * self._rng.uniform(0.8, 1.2) degrade_err = 0.8 * self._rng.uniform(0.9, 1.1) api_gw.latency_ms = round(min(api_gw.latency_ms + degrade_lat, 5000), 1) api_gw.error_rate = round(min(api_gw.error_rate + degrade_err, 50.0), 2) api_gw.cpu_percent = min(api_gw.cpu_percent + 1, 99) api_gw.logs.append( f"[DEGRADING] api-gateway latency now {api_gw.latency_ms:.0f}ms, " f"errors {api_gw.error_rate:.1f}/s — situation worsening" ) elif task == "broken_pipeline": # Cache-service degrades if config error persists cache = self.services.get("cache-service") if cache and self.scenario.check_config_error("cache-service", cache.config): health_drop = 3.0 * self._rng.uniform(0.8, 1.2) cache.error_rate = round(min(cache.error_rate + health_drop * 0.5, 25.0), 2) cache.latency_ms = round(min(cache.latency_ms + 30.0 * self._rng.uniform(0.8, 1.2), 2000.0), 1) if cache.error_rate > 3.0 and cache.health == ServiceHealth.HEALTHY: cache.health = ServiceHealth.DEGRADED cache.logs.append( f"[DEGRADING] cache-service using staging Redis — " f"error_rate now {cache.error_rate:.1f}/s, " f"latency {cache.latency_ms:.0f}ms" ) # Api-gateway latency increases if migration not applied api_gw = self.services.get("api-gateway") if api_gw and "add_index_users_email" in self.migrations_pending: lat_increase = 50.0 * self._rng.uniform(0.8, 1.2) api_gw.latency_ms = round(min(api_gw.latency_ms + lat_increase, 2000.0), 1) api_gw.logs.append( f"[DEGRADING] api-gateway missing index — " f"user query latency now {api_gw.latency_ms:.0f}ms" ) elif task == "capacity_crisis": db = self.services.get("database-primary") api_gw = self.services.get("api-gateway") # Time pressure only while connection pool bottleneck persists if db and self.scenario.check_config_error("database-primary", db.config): db.cpu_percent = min(db.cpu_percent + 2, 99) db.latency_ms = round(db.latency_ms + 15, 1) # api-gateway degrades only while db bottleneck persists if (api_gw and api_gw.health == ServiceHealth.DEGRADED and db and self.scenario.check_config_error("database-primary", db.config)): api_gw.latency_ms = round(min(api_gw.latency_ms + 30, 5000), 1) api_gw.error_rate = round(min(api_gw.error_rate + 0.5, 50.0), 2) elif task == "random_incident": failing = getattr(self.scenario, 'failing_service', None) if failing: svc = self.services.get(failing) if svc and svc.health == ServiceHealth.DEGRADED: svc.error_rate = round(min(svc.error_rate + 0.5, 50.0), 2) svc.latency_ms = round(min(svc.latency_ms + 30, 5000), 1)