DIME / server /tasks.py
Naseer-010's picture
added canonical evaluation harness, unified DIME index, deterministic replay guarantees
54da37b
"""
Graded tasks with curriculum-based difficulty levels for the
Distributed Infrastructure Management Environment.
Each task provides:
- setup(env, rng): configure initial node states and scenario parameters
- grade(env): return float in (0.0, 1.0) with partial credit
- is_done(env): termination condition check
- hint: natural language task description for the agent
Curriculum Levels
-----------------
Level 1 Warm Start — Identify the failing node from logs (high success rate)
Level 2 Single Fix — One node fails, agent must restart it
Level 3 Stochastic — Gaussian traffic spikes, multi-step interventions
Level 4 Expert — Brutal cascading failures with tight budgets
"""
from __future__ import annotations
from typing import TYPE_CHECKING
if TYPE_CHECKING:
import random
from server.environment import DistributedInfraEnvironment
# ============================================================================
# Level 1 — Warm Start: Read Logs & Identify Failing Node
# ============================================================================
def _setup_level_1(env: "DistributedInfraEnvironment", rng: "random.Random"):
"""One node is pre-failed. Agent just needs to identify it via query_logs."""
sim = env.sim
sim.max_steps = 15
sim.current_request_rate = sim.base_request_rate * 1.0 # normal traffic
# Fail one random node
fail_idx = rng.randint(0, len(sim.nodes) - 1)
sim.nodes[fail_idx].is_failed = True
sim.nodes[fail_idx].cpu_util = 0.0
sim.nodes[fail_idx].queue_length = 0
def _grade_level_1(env: "DistributedInfraEnvironment") -> float:
"""
Score = 0.7 * identified failing node (restarted it) + 0.3 * speed.
"""
sim = env.sim
# Did the agent restart the failed node?
all_alive = all(not n.is_failed for n in sim.nodes)
identification = 1.0 if all_alive else 0.2
# Speed bonus: faster = better
speed = max(0.0, 1.0 - sim.step_count / sim.max_steps)
score = 0.70 * identification + 0.30 * speed
return round(min(0.99, max(0.01, score)), 4)
def _is_done_level_1(env: "DistributedInfraEnvironment") -> bool:
sim = env.sim
# Done if agent fixed the node or time ran out
all_alive = all(not n.is_failed and n.restart_countdown == 0 for n in sim.nodes)
return all_alive or sim.step_count >= sim.max_steps
# ============================================================================
# Level 2 / Task 1 — Traffic Spike Recovery
# ============================================================================
def _setup_traffic_spike(env: "DistributedInfraEnvironment", rng: "random.Random"):
"""System receives 3x normal request rate."""
sim = env.sim
sim.current_request_rate = sim.base_request_rate * 3.0
sim.max_steps = 30
# Start with moderate load
for node in sim.nodes:
node.cpu_util = 0.45 + rng.uniform(-0.05, 0.1)
node.queue_length = rng.randint(5, 15)
def _grade_traffic_spike(env: "DistributedInfraEnvironment") -> float:
"""
Score = latency below threshold (50%) + uptime (30%) + resource efficiency (20%).
"""
sim = env.sim
if not sim.latency_history:
return 0.01
# Latency component: fraction of steps where latency was below target
target = 50.0 # ms
below_target = sum(1 for lat in sim.latency_history if lat < target)
latency_score = below_target / len(sim.latency_history)
# Uptime component: average uptime ratio
avg_uptime = (
sum(sim.uptime_history) / len(sim.uptime_history) if sim.uptime_history else 1.0
)
# Efficiency: penalty for excessive actions
max_reasonable = sim.max_steps * 0.5
efficiency = max(0.0, 1.0 - sim.actions_taken / max(1, max_reasonable))
score = 0.50 * latency_score + 0.30 * avg_uptime + 0.20 * efficiency
return round(min(0.99, max(0.01, score)), 4)
def _is_done_traffic_spike(env: "DistributedInfraEnvironment") -> bool:
return env.sim.step_count >= env.sim.max_steps
# ============================================================================
# Level 2 / Task 2 — Single Node Failure
# ============================================================================
def _setup_node_failure(env: "DistributedInfraEnvironment", rng: "random.Random"):
"""One node will fail at step 5. Agent must maintain 80%+ uptime."""
sim = env.sim
sim.max_steps = 40
sim.current_request_rate = sim.base_request_rate * 1.5
# Mark node 3 for pre-programmed failure
sim.nodes[3].cpu_util = 0.60
sim.nodes[3].queue_length = 20
def _grade_node_failure(env: "DistributedInfraEnvironment") -> float:
"""
Score = MTTR (40%) + uptime during failure window (40%) - restart penalty (20%).
"""
sim = env.sim
if not sim.uptime_history:
return 0.01
# MTTR: how quickly system recovered from the failure
failure_duration = 0
in_failure = False
for uptime in sim.uptime_history:
if uptime < 1.0:
in_failure = True
failure_duration += 1
elif in_failure:
break
max_failure_window = 10
mttr_score = max(0.0, 1.0 - failure_duration / max_failure_window)
# Uptime component: fraction of steps with >80% uptime
above_80 = sum(1 for u in sim.uptime_history if u >= 0.80)
uptime_score = above_80 / len(sim.uptime_history)
# Restart penalty: more than 2 restarts is wasteful
restart_penalty = max(0.0, 1.0 - max(0, sim.restart_count - 1) / 5)
score = 0.40 * mttr_score + 0.40 * uptime_score + 0.20 * restart_penalty
return round(min(0.99, max(0.01, score)), 4)
def _is_done_node_failure(env: "DistributedInfraEnvironment") -> bool:
sim = env.sim
# Inject failure at step 5
if sim.step_count == 5 and 3 < len(sim.nodes) and not sim.nodes[3].is_failed:
sim.nodes[3].is_failed = True
sim.nodes[3].cpu_util = 0.0
sim.nodes[3].queue_length = 0
# Redistribute its load
env._redistribute_from_node(3)
return sim.step_count >= sim.max_steps
# ============================================================================
# Level 2 — Alias: Single Fix (same as node_failure)
# ============================================================================
_setup_level_2 = _setup_node_failure
_grade_level_2 = _grade_node_failure
_is_done_level_2 = _is_done_node_failure
# ============================================================================
# Level 3 / Task 3 — Cascading Failure Prevention (Stochastic)
# ============================================================================
def _setup_cascading_failure(env: "DistributedInfraEnvironment", rng: "random.Random"):
"""Two nodes near critical CPU. Agent must prevent cascade chain."""
sim = env.sim
sim.max_steps = 50
sim.current_request_rate = sim.base_request_rate * 2.0
# Put nodes 1 and 4 near critical
sim.nodes[1].cpu_util = 0.88
sim.nodes[1].queue_length = 30
sim.nodes[1].high_cpu_streak = 2
sim.nodes[4].cpu_util = 0.86
sim.nodes[4].queue_length = 25
sim.nodes[4].high_cpu_streak = 1
# Higher base load across all nodes
for i, node in enumerate(sim.nodes):
if i not in (1, 4):
node.cpu_util = 0.55 + rng.uniform(-0.05, 0.1)
node.queue_length = rng.randint(8, 20)
def _grade_cascading_failure(env: "DistributedInfraEnvironment") -> float:
"""
Score = cascade prevented (50%) + nodes below 85% CPU (30%)
+ action efficiency (20%).
"""
sim = env.sim
cascade_score = 1.0 if not sim.cascade_occurred else 0.3
if sim.uptime_history:
healthy_now = sum(1 for n in sim.nodes if not n.is_failed and n.cpu_util < 0.85)
total_now = len(sim.nodes)
cpu_score = healthy_now / total_now if total_now > 0 else 0.0
else:
cpu_score = 0.0
max_reasonable = sim.max_steps * 0.4
efficiency = max(0.0, 1.0 - sim.actions_taken / max(1, max_reasonable))
score = 0.50 * cascade_score + 0.30 * cpu_score + 0.20 * efficiency
return round(min(0.99, max(0.01, score)), 4)
def _is_done_cascading_failure(env: "DistributedInfraEnvironment") -> bool:
sim = env.sim
failed_count = sum(1 for n in sim.nodes if n.is_failed)
if failed_count > len(sim.nodes) // 2:
return True
return sim.step_count >= sim.max_steps
# ============================================================================
# Level 3 — Alias: Stochastic (enhanced version of cascading_failure)
# ============================================================================
def _setup_level_3(env: "DistributedInfraEnvironment", rng: "random.Random"):
"""Gaussian stochastic traffic spikes with noisy sensors."""
_setup_cascading_failure(env, rng)
sim = env.sim
# Add Gaussian noise to request rate each step (handled in sim dynamics)
sim.current_request_rate = sim.base_request_rate * (2.0 + rng.gauss(0, 0.5))
sim.max_steps = 45
_grade_level_3 = _grade_cascading_failure
_is_done_level_3 = _is_done_cascading_failure
# ============================================================================
# Level 4 / Task 4 — Expert: Flash Crowd
# ============================================================================
def _setup_flash_crowd(env: "DistributedInfraEnvironment", rng: "random.Random"):
"""Massive 5x traffic spike. Agent must scale up AND throttle to survive."""
sim = env.sim
sim.current_request_rate = sim.base_request_rate * 5.0
sim.max_steps = 40
for node in sim.nodes:
node.cpu_util = 0.60 + rng.uniform(-0.05, 0.1)
node.queue_length = rng.randint(15, 30)
def _grade_flash_crowd(env: "DistributedInfraEnvironment") -> float:
"""
Score = Survival Uptime (50%) + Latency control (50%).
Cascade penalty applied if the system collapses.
"""
sim = env.sim
avg_uptime = (
sum(sim.uptime_history) / len(sim.uptime_history) if sim.uptime_history else 0.0
)
# Latency target is more generous for a massive flash crowd (100ms)
target = 100.0
below_target = sum(1 for lat in sim.latency_history if lat < target)
latency_score = (
below_target / len(sim.latency_history) if sim.latency_history else 0.0
)
cascade_penalty = 0.4 if sim.cascade_occurred else 0.0
score = 0.50 * avg_uptime + 0.50 * latency_score - cascade_penalty
return round(min(0.99, max(0.01, score)), 4)
def _is_done_flash_crowd(env: "DistributedInfraEnvironment") -> bool:
failed_count = sum(1 for n in env.sim.nodes if n.is_failed)
# Terminate early if more than 60% of the cluster dies
if failed_count > len(env.sim.nodes) * 0.6:
return True
return env.sim.step_count >= env.sim.max_steps
# ============================================================================
# Level 4 — Alias: Expert (flash crowd with tightest constraints)
# ============================================================================
_setup_level_4 = _setup_flash_crowd
_grade_level_4 = _grade_flash_crowd
_is_done_level_4 = _is_done_flash_crowd
# ============================================================================
# Level 5 — Alibaba Trace Replay (Real-World Production Traffic)
# ============================================================================
def _setup_alibaba_trace(env: "DistributedInfraEnvironment", rng: "random.Random"):
"""Load real Alibaba cluster trace and replay it step-by-step."""
from server.trace_loader import load_default_trace
sim = env.sim
sim.max_steps = 60 # ~30 minutes of real time at 30s intervals (or longer trace window)
sim.cloud_budget = 8 # tight budget
sim.error_budget = 100.0
sim.scenario = sim.task_id
trace = load_default_trace()
if trace is not None:
sim.trace_replay = trace
# Start replay from a deterministic benchmark offset when provided,
# otherwise preserve the existing stochastic task variation.
if not sim.trace_offset_locked:
sim.trace_offset = rng.randint(0, max(1, len(trace) - sim.max_steps))
sim.current_request_rate = trace.get_step(0, offset=sim.trace_offset).request_rate
else:
# Fallback: synthetic 2x traffic if trace not generated
sim.current_request_rate = sim.base_request_rate * 2.0
# Pre-stress the cluster slightly
for node in sim.nodes:
if node.role == "app_server":
node.cpu_util = 0.40 + rng.uniform(-0.05, 0.1)
node.queue_length = rng.randint(3, 12)
elif node.role == "database":
node.cpu_util = 0.35 + rng.uniform(-0.03, 0.05)
node.queue_length = rng.randint(2, 8)
def _grade_alibaba_trace(env: "DistributedInfraEnvironment") -> float:
"""
Score = Uptime (35%) + Latency (30%) + Throughput (20%) + Efficiency (15%).
"""
sim = env.sim
# Uptime
avg_uptime = (
sum(sim.uptime_history) / len(sim.uptime_history) if sim.uptime_history else 0.0
)
# Latency: fraction of steps below 80ms (more generous for real traffic)
target = 80.0
below_target = sum(1 for lat in sim.latency_history if lat < target)
latency_score = (
below_target / len(sim.latency_history) if sim.latency_history else 0.0
)
# Throughput: did the agent actually serve requests?
throughput_ratio = sim.total_requests_served / max(1, sim.total_requests_received)
throughput_score = min(1.0, throughput_ratio / 0.6) # 60% = full marks
# Efficiency: budget conservation
budget_used = 8 - sim.cloud_budget
efficiency_score = max(0.0, 1.0 - budget_used / 8)
score = (
0.35 * avg_uptime
+ 0.30 * latency_score
+ 0.20 * throughput_score
+ 0.15 * efficiency_score
)
return round(min(0.99, max(0.01, score)), 4)
def _is_done_alibaba_trace(env: "DistributedInfraEnvironment") -> bool:
sim = env.sim
# Terminate early if >70% of cluster dies
failed_count = sum(1 for n in sim.nodes if n.is_failed)
if failed_count > len(sim.nodes) * 0.7:
return True
return sim.step_count >= sim.max_steps
# ============================================================================
# Task Registry
# ============================================================================
TASKS = {
# --- Curriculum levels ---
"level_1_read_logs": {
"setup": _setup_level_1,
"grade": _grade_level_1,
"is_done": _is_done_level_1,
"hint": (
"WARM START (Level 1): One node in your cluster has silently failed. "
"Use 'query_logs' to investigate nodes with telemetry dropouts and "
"identify the failing node. Then restart it. "
"This is a diagnostic exercise — focus on observation before action."
),
},
"level_2_single_fix": {
"setup": _setup_level_2,
"grade": _grade_level_2,
"is_done": _is_done_level_2,
"hint": (
"SINGLE FIX (Level 2): A node failure will occur during this episode. "
"Detect the failure, restart the affected node, and maintain uptime "
"above 80%%. Minimise unnecessary restarts."
),
},
"level_3_stochastic": {
"setup": _setup_level_3,
"grade": _grade_level_3,
"is_done": _is_done_level_3,
"hint": (
"STOCHASTIC SPIKES (Level 3): Traffic follows a noisy Gaussian pattern. "
"Multiple nodes are near critical CPU. Proactively reroute traffic and "
"scale up before cascading failures occur. Telemetry may be spotty — "
"use query_logs to investigate timeouts."
),
},
"level_4_expert": {
"setup": _setup_level_4,
"grade": _grade_level_4,
"is_done": _is_done_level_4,
"hint": (
"EXPERT MODE (Level 4): A brutal 5x flash crowd with tight cloud budget. "
"You MUST aggressively scale up AND throttle to survive. Budget is limited — "
"every scale_up costs 1 unit. If you exhaust your budget, you cannot add "
"more capacity. Plan wisely."
),
},
# --- Original task aliases (backward-compatible) ---
"traffic_spike": {
"setup": _setup_traffic_spike,
"grade": _grade_traffic_spike,
"is_done": _is_done_traffic_spike,
"hint": (
"TRAFFIC SPIKE: The system is experiencing 3x normal request volume. "
"Your goal is to keep latency below 50ms while maintaining full uptime. "
"Consider rerouting traffic from overloaded nodes, scaling up capacity, "
"or throttling incoming requests. Minimize unnecessary actions."
),
},
"node_failure": {
"setup": _setup_node_failure,
"grade": _grade_node_failure,
"is_done": _is_done_node_failure,
"hint": (
"NODE FAILURE: A node failure will occur during this episode. "
"You must detect the failure, restart the affected node, and maintain "
"system uptime above 80%%. React quickly — Mean Time To Repair matters. "
"Avoid unnecessary restarts of healthy nodes."
),
},
"cascading_failure": {
"setup": _setup_cascading_failure,
"grade": _grade_cascading_failure,
"is_done": _is_done_cascading_failure,
"hint": (
"CASCADING FAILURE PREVENTION: Two nodes are near critical CPU load "
"(>85%%). If they reach 90%% for 3 consecutive steps, they will fail "
"and their load will cascade to neighbors, potentially triggering a "
"chain reaction. ACT PROACTIVELY: reroute traffic away from hot nodes "
"BEFORE they fail. Scaling up can help absorb excess load. "
"Prevention is rewarded more than recovery."
),
},
"flash_crowd": {
"setup": _setup_flash_crowd,
"grade": _grade_flash_crowd,
"is_done": _is_done_flash_crowd,
"hint": (
"FLASH CROWD: The system is facing an unprecedented 5x traffic surge. "
"Your objective is pure survival. You MUST aggressively use 'scale_up' "
"to add capacity AND use 'throttle' to drop excess traffic. "
"If you do not shed load, the cluster will collapse."
),
},
# --- Level 5: Alibaba Trace Replay ---
"level_5_alibaba_trace": {
"setup": _setup_alibaba_trace,
"grade": _grade_alibaba_trace,
"is_done": _is_done_alibaba_trace,
"hint": (
"ALIBABA TRACE REPLAY (Level 5): You are operating on REAL production "
"traffic from Alibaba's microservices cluster (2021 trace data). "
"Traffic has multimodal spikes, micro-bursts, and silent maintenance windows. "
"Node 0 is the DATABASE (single point of failure). Nodes 1-7 are app servers. "
"New nodes have a 3-step cold start. Budget is tight (8 credits). "
"Read Prometheus metrics carefully — they follow production scrape format."
),
},
# --- Level 5 Trace-backed chaos scenarios (aliases for evaluation loops) ---
"thundering_herd": {
"setup": _setup_alibaba_trace,
"grade": _grade_alibaba_trace,
"is_done": _is_done_alibaba_trace,
"hint": (
"THUNDERING HERD: Sudden heavy-tailed traffic spikes drive tail latency and "
"retry amplification. Use throttling judiciously to prevent runaway retries "
"while protecting the DB (node 0)."
),
},
"zombie_node": {
"setup": _setup_alibaba_trace,
"grade": _grade_alibaba_trace,
"is_done": _is_done_alibaba_trace,
"hint": (
"ZOMBIE NODE: A worker may appear underutilized while contributing to severe "
"tail latency. Consider rerouting away from suspicious nodes and restarting "
"if needed."
),
},
"memory_leak_slow_burn": {
"setup": _setup_alibaba_trace,
"grade": _grade_alibaba_trace,
"is_done": _is_done_alibaba_trace,
"hint": (
"MEMORY LEAK (SLOW BURN): One worker's memory creeps upward toward an OOM cliff. "
"Scaling does not fix leaks—restart the leaking pod before it crosses the cliff."
),
},
"split_brain_io_bottleneck": {
"setup": _setup_alibaba_trace,
"grade": _grade_alibaba_trace,
"is_done": _is_done_alibaba_trace,
"hint": (
"SPLIT-BRAIN / IO BOTTLENECK: The database disk (node 0) can saturate (high io_wait). "
"Avoid scaling when DB I/O is pegged; shed load to protect the SPOF."
),
},
"black_swan_az_failure": {
"setup": _setup_alibaba_trace,
"grade": _grade_alibaba_trace,
"is_done": _is_done_alibaba_trace,
"hint": (
"BLACK SWAN: Multi-node failures can cascade. Prioritize survivor protection "
"and graceful load shedding while recovering failed capacity."
),
},
"retry_storm": {
"setup": _setup_alibaba_trace,
"grade": _grade_alibaba_trace,
"is_done": _is_done_alibaba_trace,
"hint": (
"RETRY STORM: Tail latency spikes can induce exponential retries. Break the loop "
"with traffic throttling before the DB collapses."
),
},
"hot_shard_skew": {
"setup": _setup_alibaba_trace,
"grade": _grade_alibaba_trace,
"is_done": _is_done_alibaba_trace,
"hint": (
"HOT SHARD / SKEW: One worker may run hot while others remain cool. Prefer traffic "
"shifts (reroute) over scaling when the cluster has spare headroom."
),
},
"connection_pool_deadlock": {
"setup": _setup_alibaba_trace,
"grade": _grade_alibaba_trace,
"is_done": _is_done_alibaba_trace,
"hint": (
"CONNECTION POOL DEADLOCK: Symptoms can look like low CPU with high latency. "
"Reroute away from the stuck node or restart it to clear deadlocks."
),
},
"autoscaler_flapping_trap": {
"setup": _setup_alibaba_trace,
"grade": _grade_alibaba_trace,
"is_done": _is_done_alibaba_trace,
"hint": (
"AUTOSCALER FLAPPING TRAP: Avoid overreacting to small oscillations. Prefer no_op "
"when stable, and make only high-confidence interventions."
),
},
}