devops-pipeline-env / server /pipeline_engine.py
yashash045's picture
Upload folder using huggingface_hub
83ecd75 verified
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.
"""Simulation engine for the DevOps Pipeline Environment."""
import random
from devops_pipeline_env.models import (
ActionType,
AlertInfo,
ConfigEdit,
MigrationStatus,
PipelineAction,
PipelineStage,
PipelineStatus,
ServiceHealth,
ServiceStatus,
)
class ServiceState:
"""State machine for a single microservice."""
def __init__(self, name, version, health, config, dependencies,
latency_ms, error_rate, cpu, memory, rng=None):
self.name = name
self.current_version = version
self.target_version = None
self.health = health
self.config = dict(config)
self.dependencies = list(dependencies) if dependencies else []
self.latency_ms = latency_ms
self.error_rate = error_rate
self.cpu_percent = cpu
self.memory_percent = memory
self.active_connections = 100
self.staging_deployed = False
self.staging_verified = False
self.prod_deployed = False
self.last_deploy_timestamp = "2026-04-01T00:00:00Z"
self.logs = []
self._rng = rng or random.Random(0)
# Staged health recovery: 0 = fully recovered, >0 = still recovering
self._recovery_steps_remaining = 0
self._recovery_target_latency = 0.0
self._recovery_target_error_rate = 0.0
def deploy_to_staging(self, version, scenario):
"""Deploy version to staging. Returns result text."""
self.staging_deployed = True
self.target_version = version
# 8% chance of transient staging failure on first attempt
# Skip for clean_deploy (easy task) and during incidents (health already degraded/down)
transient_roll = self._rng.random() # always consume RNG for determinism
is_clean_deploy = hasattr(self, '_task_name') and self._task_name == "clean_deploy"
if not is_clean_deploy and not self.staging_verified and self.health == ServiceHealth.HEALTHY and transient_roll < 0.08:
self.staging_deployed = True # deployed but not verified
self.logs.append(
f"[DEPLOY] Deployed {self.name} {version} to staging. "
f"Transient failure: health check timed out. Retry should succeed."
)
return (
f"Deployed {self.name} {version} to staging. "
f"WARNING: Transient health check timeout. Try deploying again."
)
if scenario.check_config_error(self.name, self.config):
self.health = ServiceHealth.DEGRADED
lat_mult = self._rng.uniform(0.8, 1.2)
err_mult = self._rng.uniform(0.9, 1.1)
self.error_rate = round(12.0 * err_mult, 2)
self.latency_ms = round(300.0 * lat_mult, 1)
self.logs.append(
f"[DEPLOY] Deployed {self.name} {version} to staging. "
f"WARNING: Health check DEGRADED. Error rate elevated "
f"({self.error_rate:.1f}/s, latency {self.latency_ms:.0f}ms)."
)
return (
f"Deployed {self.name} {version} to staging. "
f"WARNING: Health check degraded. Error rate elevated."
)
self.health = ServiceHealth.HEALTHY
self.staging_verified = True
lat_mult = self._rng.uniform(0.8, 1.2)
self.error_rate = round(0.1 * self._rng.uniform(0.9, 1.1), 3)
self.latency_ms = round(45.0 * lat_mult, 1)
self.logs.append(
f"[DEPLOY] Deployed {self.name} {version} to staging. Health check: PASSED."
)
return f"Deployed {self.name} {version} to STAGING. Staging verified. Deploy same service+version again to PROMOTE TO PRODUCTION."
def deploy_to_production(self, version):
"""Promote to production."""
if not self.staging_verified:
self.health = ServiceHealth.DEGRADED
lat_mult = self._rng.uniform(0.8, 1.2)
err_mult = self._rng.uniform(0.9, 1.1)
self.error_rate = round(25.0 * err_mult, 2)
self.latency_ms = round(500.0 * lat_mult, 1)
self.logs.append(
f"[DEPLOY] Deployed {self.name} {version} to production "
f"WITHOUT staging verification. High risk."
)
return (
f"Deployed {self.name} {version} to production "
f"WITHOUT staging verification. High risk."
)
self.prod_deployed = True
self.current_version = version
# Staged recovery: takes 1-3 steps to fully stabilize
recovery_steps = self._rng.randint(1, 3)
self._recovery_steps_remaining = recovery_steps
base_latency = 45.0 * self._rng.uniform(0.8, 1.2)
base_error_rate = 0.1 * self._rng.uniform(0.9, 1.1)
# Non-linear deploy quality: same seed = same outcome
quality_roll = self._rng.random()
deploy_note = ""
if quality_roll < 0.7:
# Clean deploy — recovers to near-perfect
pass # base values are already good
elif quality_roll < 0.9:
# Minor issues — recovers to good but not perfect
base_latency *= 1.5
base_error_rate *= 3.0
deploy_note = " Minor post-deploy issues detected."
self.logs.append(
f"[DEPLOY] {self.name}: Minor post-deploy issues detected. "
f"Performance slightly below optimal."
)
else:
# Unstable deploy — recovers poorly
base_latency *= 2.5
base_error_rate *= 8.0
self.error_rate += 1.5
deploy_note = " Post-deploy instability detected."
self.logs.append(
f"[DEPLOY] {self.name}: Post-deploy instability detected. "
f"Elevated error rate."
)
self._recovery_target_latency = round(base_latency, 1)
self._recovery_target_error_rate = round(base_error_rate, 3)
# Start at slightly elevated values during recovery
self.health = ServiceHealth.HEALTHY
self.latency_ms = round(base_latency * (1.0 + 0.3 * recovery_steps), 1)
self.error_rate = round(base_error_rate * (1.0 + 0.5 * recovery_steps), 3)
# Trade-off: deploy causes temporary CPU/latency spike (warmup load)
# Clean deploy tasks get reduced spikes — they should be clean
if hasattr(self, '_task_name') and self._task_name == "clean_deploy":
self.cpu_percent = min(self.cpu_percent + 3, 99)
self.latency_ms += round(30 * self._rng.uniform(0.8, 1.2), 1)
else:
self.cpu_percent = min(self.cpu_percent + 15, 99)
self.latency_ms += round(200 * self._rng.uniform(0.8, 1.2), 1)
self.last_deploy_timestamp = "2026-04-01T12:00:00Z"
self.logs.append(
f"[DEPLOY] Promoted {self.name} {version} to production. Health: HEALTHY. "
f"Stabilizing over ~{recovery_steps} step(s). CPU/latency spike from warmup."
)
return (
f"Promoted {self.name} {version} to production. Health: HEALTHY. "
f"Deployed successfully. Service under warmup load — temporary CPU/latency spike expected."
f"{deploy_note}"
)
def tick_recovery(self):
"""Called each step to progress staged health recovery."""
if self._recovery_steps_remaining > 0:
self._recovery_steps_remaining -= 1
if self._recovery_steps_remaining == 0:
# Fully recovered
self.latency_ms = self._recovery_target_latency
self.error_rate = self._recovery_target_error_rate
if self.health == ServiceHealth.DEGRADED and self.error_rate < 5.0:
self.health = ServiceHealth.HEALTHY
else:
# Interpolate toward target
progress = 1.0 - (self._recovery_steps_remaining / (self._recovery_steps_remaining + 1))
self.latency_ms = round(
self.latency_ms + (self._recovery_target_latency - self.latency_ms) * progress, 1
)
self.error_rate = round(
self.error_rate + (self._recovery_target_error_rate - self.error_rate) * progress, 3
)
def rollback(self):
"""Rollback to previous version."""
self.health = ServiceHealth.HEALTHY
lat_mult = self._rng.uniform(0.8, 1.2)
err_mult = self._rng.uniform(0.9, 1.1)
self.error_rate = round(0.5 * err_mult, 3)
self.latency_ms = round(50.0 * lat_mult * 0.7, 1)
self.staging_deployed = False
self.staging_verified = False
self.prod_deployed = True # still in prod, just rolled back
self._recovery_steps_remaining = 0
# Trade-off: 25% chance rollback reintroduces a known bug
regression = False
if self._rng.random() < 0.25:
self.error_rate = round(self.error_rate + 3.0, 2)
regression = True
self.logs.append(
f"[ROLLBACK] Rolled back {self.name} to {self.current_version}. "
f"Warning: rollback may have reintroduced known issue from previous version"
)
else:
self.logs.append(
f"[ROLLBACK] Rolled back {self.name} to {self.current_version}. Service healthy."
)
result = f"Rolled back {self.name} to {self.current_version}. Rolled back. Monitoring for regression..."
if regression:
result += f" WARNING: Error rate elevated ({self.error_rate:.1f}/s) — possible regression."
return result
def set_config(self, key, value):
"""Edit a config value."""
old = self.config.get(key, "<not set>")
self.config[key] = value
# Trade-off: config change causes brief restart spike
self.latency_ms += round(100 * self._rng.uniform(0.8, 1.2), 1)
self.cpu_percent = min(self.cpu_percent + 5, 99)
self.logs.append(f"[CONFIG] {self.name}: {key} changed from '{old}' to '{value}'. Service restarting.")
return f"Config {self.name}: {key} changed from '{old}' to '{value}'. Config updated. Service restarting — brief latency spike."
def get_config_snapshot(self):
return dict(self.config)
def get_logs(self):
return list(self.logs)
def _get_health_pct(self):
"""Get numeric health percentage for this service."""
h = 100.0
if self.health == ServiceHealth.DOWN:
h = 0.0
elif self.health == ServiceHealth.DEGRADED:
h = 50.0
h -= min(self.error_rate * 2, 30)
if self.latency_ms > 200:
h -= min((self.latency_ms - 200) / 10, 30)
return max(h, 0.0)
def to_status(self):
return ServiceStatus(
name=self.name,
health=self.health,
current_version=self.current_version,
cpu_percent=self.cpu_percent,
memory_percent=self.memory_percent,
error_rate=self.error_rate,
request_latency_ms=self.latency_ms,
active_connections=self.active_connections,
last_deploy_timestamp=self.last_deploy_timestamp,
)
class PipelineEngine:
"""Manages all services, pipeline state, migrations, alerts."""
def __init__(self, scenario, seed):
self.scenario = scenario
self._rng = random.Random(seed)
self.services = {}
self.pipeline_stage = PipelineStage.IDLE
self.migrations_pending = []
self.migrations_applied = []
self.migration_errors = []
self.alerts = []
self.commit_sha = "abc123"
self.triggered_by = "deploy-bot"
self.started_at = "2026-04-01T10:00:00Z"
self.test_pass = 0
self.test_fail = 0
self.build_logs = ""
self._time_pressure = False # Set by scenario if needed
# Initialize from scenario
scenario.setup(self)
# Inject the shared RNG and task name into all services created by the scenario
for svc in self.services.values():
svc._rng = self._rng
svc._task_name = scenario.task_name
def execute(self, action):
"""Execute an action. Returns human-readable result string."""
# 1. Tick health recovery for all services (heal from previous deploys)
for svc in self.services.values():
svc.tick_recovery()
# 2. Execute the agent's action FIRST
if action.action_type == ActionType.VIEW_PIPELINE:
result = self._view_pipeline()
elif action.action_type == ActionType.VIEW_LOGS:
result = self._view_logs(action.service_name)
elif action.action_type == ActionType.VIEW_CONFIG:
result = self._view_config(action.service_name)
elif action.action_type == ActionType.EDIT_CONFIG:
result = self._edit_config(action.service_name, action.config_edits)
elif action.action_type == ActionType.RUN_MIGRATION:
result = self._run_migration(action.migration_name, action.migration_type)
elif action.action_type == ActionType.DEPLOY:
result = self._deploy(action.service_name, action.target_version)
elif action.action_type == ActionType.ROLLBACK:
result = self._rollback(action.service_name)
elif action.action_type == ActionType.APPROVE:
result = self._approve(action.reason)
elif action.action_type == ActionType.ABORT:
result = self._abort(action.reason)
else:
result = "Unknown action."
# 3. Environmental effects AFTER action (agent sees consequences)
if self._time_pressure:
self._apply_time_pressure()
self._tick_cascading_effects()
self._tick_metric_compounding()
self._tick_tipping_points()
return result
# --- Cross-metric compounding ---------------------------------------------
def _tick_metric_compounding(self):
"""Metrics compound on each other — creates realistic spirals and recovery."""
if self.scenario.task_name == "clean_deploy":
return
for name, svc in self.services.items():
# Degradation spirals (moderate — should not kill episodes in <5 steps)
if svc.error_rate > 15.0:
svc.cpu_percent = min(svc.cpu_percent + 3, 99)
if svc.cpu_percent > 90:
svc.latency_ms = round(min(svc.latency_ms + 100, 5000), 1)
if svc.latency_ms > 3000:
svc.error_rate = round(min(svc.error_rate + 1.0, 50.0), 2)
# Natural recovery (when metrics are good, they help each other)
if svc.error_rate < 2.0:
svc.cpu_percent = max(svc.cpu_percent - 3, 10)
if svc.cpu_percent < 50:
svc.latency_ms = round(max(svc.latency_ms - 50, 20), 1)
if svc.latency_ms < 200 and svc.error_rate < 1.0:
svc.error_rate = round(max(svc.error_rate - 0.5, 0.0), 2)
# --- Non-linear tipping points -------------------------------------------
def _tick_tipping_points(self):
"""Non-linear tipping points — systems cliff instead of degrading linearly."""
if self.scenario.task_name == "clean_deploy":
return
for name, svc in self.services.items():
# CPU cliff: above 85% = exponential error growth
if svc.cpu_percent > 85:
overflow = svc.cpu_percent - 85
svc.error_rate = round(min(svc.error_rate + overflow * 0.2, 50.0), 2)
# Latency cliff: above 2000ms = rapid collapse
if svc.latency_ms > 2000:
svc.error_rate = round(min(svc.error_rate + 3.0, 50.0), 2)
# Health cliff: below 30% health = accelerating death spiral
base = 50.0 if svc.health == ServiceHealth.DEGRADED else (
100.0 if svc.health == ServiceHealth.HEALTHY else 0.0
)
err_penalty = min(svc.error_rate * 2, 30)
lat_penalty = min(max(0, svc.latency_ms - 200) / 10, 30)
health_pct = max(0, base - err_penalty - lat_penalty)
if health_pct < 30:
svc.error_rate = round(min(svc.error_rate * 1.3, 50.0), 2)
# Latency → CPU feedback (high latency = retries = more CPU)
if svc.latency_ms > 1500:
svc.cpu_percent = min(svc.cpu_percent + 3, 99)
# --- Cascading failures ---------------------------------------------------
def _get_dependents(self, service_name):
"""Find all services that list service_name in their dependencies."""
return [
svc for svc in self.services.values()
if service_name in svc.dependencies
]
def _tick_cascading_effects(self):
"""Unhealthy services degrade their dependents each step."""
for svc in self.services.values():
health_pct = svc._get_health_pct()
if health_pct >= 50.0:
continue # healthy enough, no cascade
dependents = self._get_dependents(svc.name)
for dep in dependents:
if dep.health == ServiceHealth.DOWN:
continue # already down, can't get worse from cascade
# Determine cascade severity
if health_pct < 20.0:
# Source is effectively down — moderate cascade
err_increase = 1.5
lat_increase = 30.0
else:
# Source is degraded — lighter cascade
err_increase = 0.5
lat_increase = 10.0
old_err = dep.error_rate
dep.error_rate = round(min(dep.error_rate + err_increase, 45.0), 2)
dep.latency_ms = round(min(dep.latency_ms + lat_increase, 4500.0), 1)
# If error rate gets high enough, mark as degraded
if dep.error_rate > 5.0 and dep.health == ServiceHealth.HEALTHY:
dep.health = ServiceHealth.DEGRADED
# Floor: cascading alone can't push health below 5%
# (prevent instant death spirals)
dep_health = dep._get_health_pct()
if dep_health < 5.0:
dep.error_rate = round(max(old_err, dep.error_rate - err_increase + 1.0), 2)
# Add cascade alert (only if not already alerted this step)
cascade_alert_key = f"cascade:{svc.name}->{dep.name}"
existing = [a for a in self.alerts if cascade_alert_key in a.message]
if not existing:
self.alerts.append(AlertInfo(
severity="warning",
message=(
f"Cascading: {svc.name} (health {health_pct:.0f}%) is degrading "
f"{dep.name} — error_rate +{err_increase}/s, latency +{lat_increase:.0f}ms "
f"[{cascade_alert_key}]"
),
service_name=dep.name,
timestamp="2026-04-01T12:00:00Z",
))
dep.logs.append(
f"[CASCADE] Upstream {svc.name} unhealthy (health {health_pct:.0f}%) — "
f"{dep.name} error_rate now {dep.error_rate:.1f}/s, "
f"latency {dep.latency_ms:.0f}ms"
)
# Recovery propagation: healthy services help their dependents recover
for name, svc in self.services.items():
if svc.health == ServiceHealth.HEALTHY and svc.error_rate < 2.0:
dependents = self._get_dependents(name)
for dep in dependents:
if dep.health == ServiceHealth.DEGRADED:
dep.error_rate = round(dep.error_rate * 0.9, 2)
dep.latency_ms = round(dep.latency_ms * 0.9, 1)
# --- Action handlers ------------------------------------------------------
def _view_pipeline(self):
services_summary = "\n".join(
f" {s.name}: {s.health.value} | v{s.current_version} -> "
f"v{s.target_version or 'N/A'} | "
f"latency={s.latency_ms:.0f}ms | errors={s.error_rate:.1f}/s"
for s in self.services.values()
)
return (
f"Pipeline Stage: {self.pipeline_stage.value}\n"
f"Commit: {self.commit_sha}\n"
f"Tests: {self.test_pass} passed, {self.test_fail} failed\n"
f"Pending Migrations: {len(self.migrations_pending)}\n"
f"Services:\n{services_summary}"
)
def _view_logs(self, service_name):
svc = self.services.get(service_name)
if not svc:
return f"No service named '{service_name}'"
logs = svc.get_logs()
if not logs:
return f"No logs available for {service_name}."
return f"Logs for {service_name}:\n" + "\n".join(logs[-20:])
def _view_config(self, service_name):
svc = self.services.get(service_name)
if not svc:
return f"No service named '{service_name}'"
config = svc.get_config_snapshot()
lines = [f" {k} = {v}" for k, v in config.items()]
return f"Config for {service_name}:\n" + "\n".join(lines)
def _edit_config(self, service_name, edits):
svc = self.services.get(service_name)
if not svc:
return f"No service named '{service_name}'"
results = []
for edit in edits:
result = svc.set_config(edit.key, edit.value)
results.append(result)
# If the config error is now fixed and service was degraded, start
# staged recovery (2 steps) instead of instant heal
if svc.health == ServiceHealth.DEGRADED and not self.scenario.check_config_error(service_name, svc.config):
svc.staging_deployed = False
svc.staging_verified = False
# Immediate PARTIAL improvement
svc.error_rate = round(svc.error_rate * 0.5, 2)
svc.latency_ms = round(svc.latency_ms * 0.6, 1)
# Set up 2-step recovery to full health (reuse tick_recovery pattern)
svc._recovery_steps_remaining = 2
svc._recovery_target_latency = round(50.0 * self._rng.uniform(0.8, 1.2), 1)
svc._recovery_target_error_rate = round(0.1 * self._rng.uniform(0.9, 1.1), 3)
# Don't set health to HEALTHY yet — let tick_recovery handle it
# once error_rate drops below threshold on next steps
results.append(f"Config fix detected for {service_name}. Service improving — full recovery in ~2 steps. Ready for re-deploy.")
return "\n".join(results)
def _run_migration(self, migration_name, migration_type):
if migration_name not in self.migrations_pending:
return (
f"Migration '{migration_name}' not found in pending: "
f"{self.migrations_pending}"
)
success = self.scenario.run_migration(self, migration_name)
if success:
self.migrations_pending.remove(migration_name)
self.migrations_applied.append(migration_name)
return f"Migration '{migration_name}' applied successfully."
else:
error = f"Migration '{migration_name}' FAILED."
self.migration_errors.append(error)
return error
def _deploy(self, service_name, target_version):
svc = self.services.get(service_name)
if not svc:
return f"No service named '{service_name}'"
# Check migration dependencies
if self.migrations_pending and self.scenario.migration_blocks_deploy(service_name):
return (
f"BLOCKED: Pending migrations must be applied before deploying "
f"{service_name}. Pending: {self.migrations_pending}"
)
# Check if any dependency is unhealthy — 50% chance of deploy failure
for dep_name in svc.dependencies:
dep_svc = self.services.get(dep_name)
if dep_svc and dep_svc._get_health_pct() < 50.0:
if self._rng.random() < 0.5:
svc.logs.append(
f"[DEPLOY] Deploy {svc.name} {target_version} FAILED — "
f"dependency {dep_name} is unhealthy "
f"(health {dep_svc._get_health_pct():.0f}%). Retry may succeed."
)
return (
f"DEPLOY UNSTABLE: Dependency {dep_name} is unhealthy "
f"(health {dep_svc._get_health_pct():.0f}%). "
f"Deploy of {service_name} failed. Retry may succeed."
)
# Determine target environment
if not svc.staging_deployed:
self.pipeline_stage = PipelineStage.STAGING
return svc.deploy_to_staging(target_version, self.scenario)
else:
self.pipeline_stage = PipelineStage.DEPLOYING
result = svc.deploy_to_production(target_version)
# Notify scenario of deploy (for cascading effects)
if hasattr(self.scenario, 'on_prod_deploy'):
extra = self.scenario.on_prod_deploy(self, service_name, target_version)
if extra:
result += "\n" + extra
# Check if all target services deployed
if all(s.prod_deployed for s in self.services.values() if s.target_version):
self.pipeline_stage = PipelineStage.DEPLOYED
return result
def _rollback(self, service_name):
svc = self.services.get(service_name)
if not svc:
return f"No service named '{service_name}'"
self.pipeline_stage = PipelineStage.ROLLED_BACK
# Check if dependents rely on current version's APIs
old_version = svc.current_version
dependents = self._get_dependents(service_name)
result = svc.rollback()
# Warn about dependent services and increase their error rates
for dep in dependents:
dep.error_rate = round(dep.error_rate + 5.0, 2)
if dep.health == ServiceHealth.HEALTHY and dep.error_rate > 3.0:
dep.health = ServiceHealth.DEGRADED
self.alerts.append(AlertInfo(
severity="warning",
message=(
f"Rollback impact: {dep.name} depends on {service_name} "
f"{old_version}. Rollback may break {dep.name}. "
f"Error rate increased to {dep.error_rate:.1f}/s."
),
service_name=dep.name,
timestamp="2026-04-01T12:00:00Z",
))
dep.logs.append(
f"[ROLLBACK-IMPACT] {service_name} rolled back from {old_version} — "
f"{dep.name} error_rate increased to {dep.error_rate:.1f}/s. "
f"Dependency on {old_version} APIs may be broken."
)
if hasattr(self.scenario, 'on_rollback'):
self.scenario.on_rollback(self, service_name)
return result
def _approve(self, reason):
self.pipeline_stage = PipelineStage.DEPLOYED
return f"Deployment APPROVED. Reason: {reason or 'No reason given.'}"
def _abort(self, reason):
self.pipeline_stage = PipelineStage.FAILED
return f"Deployment ABORTED. Reason: {reason or 'No reason given.'}"
# --- State queries --------------------------------------------------------
def snapshot(self):
"""Capture current state for reward calculation."""
return {
"services": {
name: {
"health": s.health.value,
"error_rate": s.error_rate,
"latency_ms": s.latency_ms,
"prod_deployed": s.prod_deployed,
"staging_verified": s.staging_verified,
"config": dict(s.config),
}
for name, s in self.services.items()
},
"system_health": self.get_system_health(),
"pipeline_stage": self.pipeline_stage.value,
"migrations_pending": list(self.migrations_pending),
"alerts": list(self.alerts),
}
def get_system_health(self):
"""Aggregate health 0-100."""
if not self.services:
return 100.0
total = 0.0
for svc in self.services.values():
total += svc._get_health_pct()
return total / len(self.services)
def get_service_statuses(self):
return [s.to_status() for s in self.services.values()]
def get_pipeline_status(self):
return PipelineStatus(
stage=self.pipeline_stage,
triggered_by=self.triggered_by,
started_at=self.started_at,
commit_sha=self.commit_sha,
build_logs_snippet=self.build_logs if self.build_logs else None,
test_pass_count=self.test_pass,
test_fail_count=self.test_fail,
)
def get_migration_status(self):
return MigrationStatus(
pending_migrations=list(self.migrations_pending),
last_applied=self.migrations_applied[-1] if self.migrations_applied else None,
migration_errors=self.migration_errors if self.migration_errors else None,
)
def get_alerts(self):
return list(self.alerts)
def get_service_names(self):
return list(self.services.keys())
def has_services(self):
return len(self.services) > 0
def has_pending_migrations(self):
return len(self.migrations_pending) > 0
def _apply_time_pressure(self):
"""During incidents, degraded services get worse each step."""
task = self.scenario.task_name
if task == "judgment_call":
api_gw = self.services.get("api-gateway")
if api_gw and api_gw.health == ServiceHealth.DEGRADED:
degrade_lat = 80 * self._rng.uniform(0.8, 1.2)
degrade_err = 0.8 * self._rng.uniform(0.9, 1.1)
api_gw.latency_ms = round(min(api_gw.latency_ms + degrade_lat, 5000), 1)
api_gw.error_rate = round(min(api_gw.error_rate + degrade_err, 50.0), 2)
api_gw.cpu_percent = min(api_gw.cpu_percent + 1, 99)
api_gw.logs.append(
f"[DEGRADING] api-gateway latency now {api_gw.latency_ms:.0f}ms, "
f"errors {api_gw.error_rate:.1f}/s — situation worsening"
)
elif task == "broken_pipeline":
# Cache-service degrades if config error persists
cache = self.services.get("cache-service")
if cache and self.scenario.check_config_error("cache-service", cache.config):
health_drop = 3.0 * self._rng.uniform(0.8, 1.2)
cache.error_rate = round(min(cache.error_rate + health_drop * 0.5, 25.0), 2)
cache.latency_ms = round(min(cache.latency_ms + 30.0 * self._rng.uniform(0.8, 1.2), 2000.0), 1)
if cache.error_rate > 3.0 and cache.health == ServiceHealth.HEALTHY:
cache.health = ServiceHealth.DEGRADED
cache.logs.append(
f"[DEGRADING] cache-service using staging Redis — "
f"error_rate now {cache.error_rate:.1f}/s, "
f"latency {cache.latency_ms:.0f}ms"
)
# Api-gateway latency increases if migration not applied
api_gw = self.services.get("api-gateway")
if api_gw and "add_index_users_email" in self.migrations_pending:
lat_increase = 50.0 * self._rng.uniform(0.8, 1.2)
api_gw.latency_ms = round(min(api_gw.latency_ms + lat_increase, 2000.0), 1)
api_gw.logs.append(
f"[DEGRADING] api-gateway missing index — "
f"user query latency now {api_gw.latency_ms:.0f}ms"
)
elif task == "capacity_crisis":
db = self.services.get("database-primary")
api_gw = self.services.get("api-gateway")
# Time pressure only while connection pool bottleneck persists
if db and self.scenario.check_config_error("database-primary", db.config):
db.cpu_percent = min(db.cpu_percent + 2, 99)
db.latency_ms = round(db.latency_ms + 15, 1)
# api-gateway degrades only while db bottleneck persists
if (api_gw and api_gw.health == ServiceHealth.DEGRADED
and db and self.scenario.check_config_error("database-primary", db.config)):
api_gw.latency_ms = round(min(api_gw.latency_ms + 30, 5000), 1)
api_gw.error_rate = round(min(api_gw.error_rate + 0.5, 50.0), 2)
elif task == "random_incident":
failing = getattr(self.scenario, 'failing_service', None)
if failing:
svc = self.services.get(failing)
if svc and svc.health == ServiceHealth.DEGRADED:
svc.error_rate = round(min(svc.error_rate + 0.5, 50.0), 2)
svc.latency_ms = round(min(svc.latency_ms + 30, 5000), 1)