devops-incident-response / tasks /task_database.py
Arijit-07's picture
Sync: all 7 tasks, openenv-core migration, training notebook, cleanup
849b14a
from __future__ import annotations
import uuid
from typing import Dict, Any, List
from models import Action, ActionType
from tasks.base import BaseTask, InternalState, StepOutput, semantic_match
INCIDENT_TIME = "2026-04-12T14:22:00Z"
DEPENDENCIES = [
{"service": "api-gateway", "calls": ["order-service", "user-service"], "called_by": []},
{"service": "order-service", "calls": ["postgres-primary"], "called_by": ["api-gateway"]},
{"service": "analytics-service", "calls": ["postgres-primary"], "called_by": []},
{"service": "postgres-primary", "calls": [], "called_by": ["order-service", "analytics-service"]},
{"service": "user-service", "calls": [], "called_by": ["api-gateway"]},
]
POSTGRES_LOGS = [
"[14:22:01] SLOW_QUERY 4281ms: SELECT * FROM orders WHERE user_segment='premium' LIMIT 100 [seq_scan: 18M rows]",
"[14:22:03] SLOW_QUERY 4190ms: SELECT COUNT(*) FROM orders WHERE user_segment='standard' [seq_scan: 18M rows]",
"[14:22:05] SLOW_QUERY 4350ms: SELECT order_id, total FROM orders WHERE user_segment='enterprise' [seq_scan: 18M rows]",
"[14:22:07] INFO MISSING INDEX DETECTED: orders.user_segment has no index (added in migration 20260425_add_user_segment)",
"[14:22:08] WARN Table scan count: 847/min (normal: 2/min) — index missing on hot column",
"[14:22:09] SLOW_QUERY 4401ms: SELECT * FROM orders WHERE user_segment='premium' AND created_at > '2026-04-01' [seq_scan]",
]
ORDER_LOGS = [
"[14:22:01] WARN DB query timeout: getOrdersBySegment() exceeded 5000ms",
"[14:22:02] ERROR Failed to fetch orders for dashboard: upstream DB timeout",
"[14:22:05] WARN Retry 1/3: getOrdersBySegment() - 4300ms",
"[14:22:09] ERROR Circuit breaker OPEN for postgres-primary read replica",
]
ANALYTICS_LOGS = [
"[14:22:00] INFO Starting hourly aggregation job: orders by user_segment",
"[14:22:04] WARN Aggregation query running slow: 4100ms elapsed (expected: 80ms)",
"[14:22:08] ERROR Aggregation job timed out after 300s — will retry in 60min",
"[14:22:09] INFO Root cause likely: orders table scan (no index on user_segment)",
]
class DatabaseTask(BaseTask):
def initialize(self) -> InternalState:
logs = {
"postgres-primary": POSTGRES_LOGS[:],
"order-service": ORDER_LOGS[:],
"analytics-service": ANALYTICS_LOGS[:],
"api-gateway": ["[14:22:05] WARN Upstream order-service latency 4600ms"],
"user-service": ["[14:22:00] INFO Service normal"],
}
services = {
"postgres-primary": {
"name": "postgres-primary", "status": "degraded",
"cpu_percent": 94.0, "memory_percent": 65.0,
"error_rate": 0.0, "latency_p99_ms": 4401.0,
"replicas_running": 1, "replicas_desired": 1,
"current_version": "v14.1", "last_deployed": "2025-01-01T00:00:00Z",
"minutes_degraded": 15, "sla_breach": False,
},
"order-service": {
"name": "order-service", "status": "degraded",
"cpu_percent": 35.0, "memory_percent": 45.0,
"error_rate": 2.5, "latency_p99_ms": 4800.0,
"replicas_running": 3, "replicas_desired": 3,
"current_version": "v2.1.0", "last_deployed": "2026-03-20T08:00:00Z",
"minutes_degraded": 15, "sla_breach": False,
},
"analytics-service": {
"name": "analytics-service", "status": "degraded",
"cpu_percent": 25.0, "memory_percent": 30.0,
"error_rate": 5.0, "latency_p99_ms": 300000.0,
"replicas_running": 1, "replicas_desired": 1,
"current_version": "v1.5.0", "last_deployed": "2026-04-10T11:00:00Z",
"minutes_degraded": 15, "sla_breach": False,
},
"api-gateway": {
"name": "api-gateway", "status": "degraded",
"cpu_percent": 45.0, "memory_percent": 45.0,
"error_rate": 1.5, "latency_p99_ms": 4600.0,
"replicas_running": 5, "replicas_desired": 5,
"current_version": "v3.1.0", "last_deployed": "2026-03-20T08:00:00Z",
"minutes_degraded": 15, "sla_breach": False,
},
"user-service": {
"name": "user-service", "status": "healthy",
"cpu_percent": 15.0, "memory_percent": 30.0,
"error_rate": 0.0, "latency_p99_ms": 25.0,
"replicas_running": 2, "replicas_desired": 2,
"current_version": "v1.1.2", "last_deployed": "2026-03-01T00:00:00Z",
"minutes_degraded": 0, "sla_breach": False,
},
}
alerts = [
{
"id": "D001", "severity": "critical", "service": "order-service",
"message": "P99 latency 4800ms (threshold: 500ms)",
"timestamp": "2026-04-12T14:22:05Z", "acknowledged": False,
},
{
"id": "D002", "severity": "critical", "service": "analytics-service",
"message": "Hourly aggregation job timed out",
"timestamp": "2026-04-12T14:22:08Z", "acknowledged": False,
},
{
"id": "D003", "severity": "warning", "service": "postgres-primary",
"message": "CPU 94% sustained 15min, high sequential scan rate",
"timestamp": "2026-04-12T14:22:07Z", "acknowledged": False,
},
{
"id": "D004", "severity": "warning", "service": "api-gateway",
"message": "Upstream order-service latency 4600ms",
"timestamp": "2026-04-12T14:22:09Z", "acknowledged": False,
},
]
state = InternalState(
episode_id=str(uuid.uuid4()), task_id="database", step=0, max_steps=20,
services=services, alerts=alerts, logs=logs,
action_history=[], total_reward=0.0, incident_resolved=False,
ground_truth_root_cause="missing_index_orders_user_segment_column_migration",
ground_truth_fix="create index on orders.user_segment OR rollback migration",
incident_start_time=INCIDENT_TIME,
healthy_services=["user-service"],
service_dependencies=DEPENDENCIES,
)
return state
def step(self, state: InternalState, action: Action) -> StepOutput:
state.step += 1
state._apply_sla_degradation()
at = action.action_type
svc = action.service or ""
reward = 0.0
done = False
info: Dict[str, Any] = {}
result_text, error_text = self._apply_action_to_logs(state, action)
# Custom read_metrics response for postgres-primary
if at == ActionType.READ_METRICS and svc == "postgres-primary":
s = state.services[svc]
result_text = (
f"=== Metrics: postgres-primary ===\n"
f"Status: {s['status'].upper()}\n"
f"CPU: {s['cpu_percent']:.1f}% (normal: 15%)\n"
f"Memory: {s['memory_percent']:.1f}%\n"
f"Sequential scans/min: 847 (normal: 2)\n"
f"Index scans/min: 12 (normal: 890)\n"
f"Active queries: 48 (normal: 8)\n"
f"Longest running query: {s['latency_p99_ms']:.0f}ms\n"
f"Last migration: 20260425_add_user_segment (14:07:00, 15 min ago)\n"
)
state.evidence_log.append({
"step": state.step,
"source": f"metrics:{svc}",
"summary": "postgres-primary: cpu=94%, seq_scans=847/min, normal=2/min",
"raw": result_text,
})
gather_map = {
("read_logs", "postgres-primary"): ("rl_pg", 0.10),
("search_logs", "postgres-primary"): ("rl_pg", 0.10),
("read_metrics", "postgres-primary"): ("rm_pg", 0.10),
("read_logs", "analytics-service"): ("rl_ana", 0.05),
("search_logs", "analytics-service"): ("rl_ana", 0.05),
}
k = (at.value, svc)
if k in gather_map:
tag, r = gather_map[k]
if tag not in state.rewards_given:
reward += r; state.rewards_given.add(tag)
if at == ActionType.READ_RUNBOOK:
if "runbook_any" not in state.rewards_given:
reward += 0.05; state.rewards_given.add("runbook_any")
if at == ActionType.DIAGNOSE:
rc = action.root_cause or ""
if semantic_match(rc, ["index", "migration", "user_segment", "seq_scan", "table scan"]):
if "diagnose_correct" not in state.rewards_given:
reward += 0.20; state.rewards_given.add("diagnose_correct")
result_text = f"Diagnosis recorded: {rc}"
if at == ActionType.CREATE_INDEX:
table = (action.table or "").lower()
column = (action.column or "").lower()
if table == "orders" and "user_segment" in column:
if "fix_index" not in state.rewards_given:
reward += 0.30; state.rewards_given.add("fix_index")
result_text = f"Successfully created index on {table}.{column}. Sequential scans dropped. Query latency normalizing."
state.services["postgres-primary"]["cpu_percent"] = 18.0
state.services["postgres-primary"]["latency_p99_ms"] = 12.0
state.incident_resolved = True; done = True; info["resolution"] = "incident_resolved"
else:
reward -= 0.10
result_text = f"Created index on {table}.{column}, but it had no effect on the ongoing sequential scans."
if at == ActionType.ROLLBACK and svc == "postgres-primary":
if "fix_index" not in state.rewards_given:
reward += 0.20; state.rewards_given.add("fix_index")
result_text = "Migration rolled back. user_segment column removed. Service queries failing back to old schema, but database CPU returning to normal."
state.services["postgres-primary"]["cpu_percent"] = 18.0
state.services["postgres-primary"]["latency_p99_ms"] = 12.0
state.incident_resolved = True; done = True; info["resolution"] = "incident_resolved"
if at == ActionType.RESTART_SERVICE:
reward -= 0.10
result_text = f"Restarted {svc}. Connection pool dropped but immediately overwhelmed again by slow queries missing index."
if at == ActionType.SCALE_UP:
reward -= 0.08
result_text = f"Scaled up {svc}. More workers are now hitting the database, worsening the CPU starvation."
if at == ActionType.NOOP and state.step > 5:
reward -= 0.03
if at in (ActionType.BLOCK_IP_RANGE, ActionType.FAILOVER):
reward -= 0.10
error_text = f"Action {at.value} is not applicable to this incident."
state.total_reward = self._clamp(state.total_reward + reward)
if state.step >= state.max_steps and not done:
done = True; info["reason"] = "max_steps_reached"
obs = state._build_observation(last_action_result=result_text, last_action_error=error_text)
state.action_history.append({"step": state.step, "action": action.model_dump(), "reward": round(reward, 4)})
return StepOutput(next_state=state, reward=round(reward, 4), done=done, info=info)