Naseer-010 commited on
Commit
54da37b
·
1 Parent(s): c1ba9ce

added canonical evaluation harness, unified DIME index, deterministic replay guarantees

Browse files
agents/__init__.py ADDED
@@ -0,0 +1,8 @@
 
 
 
 
 
 
 
 
 
1
+ """Baseline agents for the DIME benchmark."""
2
+
3
+ from agents.base_agent import BaseAgent
4
+ from agents.heuristic_agent import HeuristicAgent
5
+ from agents.random_agent import RandomAgent
6
+ from agents.threshold_agent import ThresholdAgent
7
+
8
+ __all__ = ["BaseAgent", "RandomAgent", "HeuristicAgent", "ThresholdAgent"]
agents/base_agent.py ADDED
@@ -0,0 +1,18 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Canonical agent interface for DIME evaluation."""
2
+
3
+ from __future__ import annotations
4
+
5
+ from abc import ABC, abstractmethod
6
+ from typing import Any
7
+
8
+
9
+ class BaseAgent(ABC):
10
+ """Minimal inference-only interface required by the benchmark harness."""
11
+
12
+ def reset(self, seed: int | None = None, task_id: str | None = None) -> None:
13
+ """Reset per-episode agent state."""
14
+
15
+ @abstractmethod
16
+ def act(self, observation: Any) -> Any:
17
+ """Return an action for the current observation."""
18
+ raise NotImplementedError
agents/heuristic_agent.py ADDED
@@ -0,0 +1,79 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Canonical symbolic DIME baseline using the existing triage tree."""
2
+
3
+ from __future__ import annotations
4
+
5
+ from typing import Any
6
+
7
+ from agents.base_agent import BaseAgent
8
+ from benchmark.utils import observation_to_dict
9
+ from server.command_parser import CommandParseError, parse_command
10
+ from server.models import InfraAction
11
+
12
+
13
+ def expected_triage_command(observation: Any) -> str:
14
+ """Return the kubectl command mandated by the DIME triage tree."""
15
+ obs = observation_to_dict(observation)
16
+ cpu = obs.get("cpu_loads", [0.3] * 8)
17
+ mem = obs.get("mem_utilizations", [0.2] * 8)
18
+ failed = set(obs.get("failed_nodes", []) or [])
19
+ io_wait = float(obs.get("io_wait", 0.0) or 0.0)
20
+ p99 = float(obs.get("p99_latency", 0.0) or 0.0)
21
+ request_rate = float(obs.get("request_rate", 100.0) or 100.0)
22
+ error_budget = float(obs.get("error_budget", 100.0) or 100.0)
23
+
24
+ for idx, memory in enumerate(mem):
25
+ if float(memory) > 0.92:
26
+ return f"kubectl delete pod node-{idx}"
27
+
28
+ if 0 in failed:
29
+ return "kubectl delete pod node-0"
30
+
31
+ if io_wait > 0.80:
32
+ return "kubectl throttle ingress --rate=0.5"
33
+
34
+ workers = [(idx, float(load)) for idx, load in enumerate(cpu[1:], 1) if float(load) >= 0.0]
35
+ if workers:
36
+ avg_worker_cpu = sum(load for _, load in workers) / len(workers)
37
+ for idx, load in workers:
38
+ if load > 0.90 and avg_worker_cpu < 0.60:
39
+ candidates = [candidate for candidate, _ in workers if candidate != idx and candidate not in failed]
40
+ if candidates:
41
+ dst = min(candidates, key=lambda node_idx: float(cpu[node_idx]))
42
+ return f"kubectl exec -it istio-proxy -- traffic shift --from={idx} --to={dst}"
43
+
44
+ if p99 > 100.0 and request_rate > 150.0:
45
+ return "kubectl throttle ingress --rate=0.4"
46
+
47
+ for idx, load in workers:
48
+ if 0.0 <= load < 0.10 and p99 > 100.0:
49
+ dst = next(
50
+ (candidate for candidate, candidate_load in workers if candidate_load > 0.2 and candidate not in failed and candidate != idx),
51
+ None,
52
+ )
53
+ if dst is not None:
54
+ return f"kubectl exec -it istio-proxy -- traffic shift --from={idx} --to={dst}"
55
+
56
+ if len(failed) >= 2:
57
+ return "kubectl throttle ingress --rate=0.3"
58
+
59
+ db_cpu = float(cpu[0]) if cpu and float(cpu[0]) >= 0.0 else 0.0
60
+ if db_cpu > 0.80:
61
+ return "kubectl throttle ingress --rate=0.7"
62
+
63
+ if workers and sum(load for _, load in workers) / len(workers) > 0.75 and error_budget > 20.0:
64
+ return "kubectl scale deployment frontend --replicas=10"
65
+
66
+ return "no_op"
67
+
68
+
69
+ class HeuristicAgent(BaseAgent):
70
+ """Rule-based symbolic SRE baseline."""
71
+
72
+ def act(self, observation: Any) -> InfraAction:
73
+ command = expected_triage_command(observation)
74
+ if command == "no_op":
75
+ return InfraAction(action_type="no_op")
76
+ try:
77
+ return parse_command(command)
78
+ except CommandParseError:
79
+ return InfraAction(action_type="no_op")
agents/random_agent.py ADDED
@@ -0,0 +1,50 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Seeded random DIME baseline."""
2
+
3
+ from __future__ import annotations
4
+
5
+ import random
6
+ from typing import Any
7
+
8
+ from agents.base_agent import BaseAgent
9
+ from benchmark.utils import observation_to_dict
10
+ from server.models import InfraAction
11
+
12
+
13
+ class RandomAgent(BaseAgent):
14
+ """Uniformly sample valid DIME management actions with deterministic seeding."""
15
+
16
+ def __init__(self, seed: int = 0) -> None:
17
+ self._base_seed = seed
18
+ self._rng = random.Random(seed)
19
+
20
+ def reset(self, seed: int | None = None, task_id: str | None = None) -> None:
21
+ self._rng = random.Random(self._base_seed if seed is None else seed)
22
+
23
+ def act(self, observation: Any) -> InfraAction:
24
+ obs = observation_to_dict(observation)
25
+ node_count = max(1, len(obs.get("cpu_loads", []) or [0]))
26
+ action_type = self._rng.choice(
27
+ ["restart_node", "reroute_traffic", "throttle", "scale_up", "no_op"]
28
+ )
29
+
30
+ if action_type == "restart_node":
31
+ failed = list(obs.get("failed_nodes", []) or [])
32
+ target = int(self._rng.choice(failed)) if failed else self._rng.randrange(node_count)
33
+ return InfraAction(action_type="restart_node", target=target)
34
+
35
+ if action_type == "reroute_traffic" and node_count > 1:
36
+ src = self._rng.randrange(node_count)
37
+ dst_choices = [idx for idx in range(node_count) if idx != src]
38
+ return InfraAction(
39
+ action_type="reroute_traffic",
40
+ from_node=src,
41
+ to_node=self._rng.choice(dst_choices),
42
+ )
43
+
44
+ if action_type == "throttle":
45
+ return InfraAction(action_type="throttle", rate=self._rng.choice([0.3, 0.5, 0.7, 0.9]))
46
+
47
+ if action_type == "scale_up":
48
+ return InfraAction(action_type="scale_up")
49
+
50
+ return InfraAction(action_type="no_op")
agents/threshold_agent.py ADDED
@@ -0,0 +1,28 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Classical threshold automation baseline for DIME."""
2
+
3
+ from __future__ import annotations
4
+
5
+ from typing import Any
6
+
7
+ from agents.base_agent import BaseAgent
8
+ from benchmark.utils import observation_to_dict
9
+ from server.models import InfraAction
10
+
11
+
12
+ class ThresholdAgent(BaseAgent):
13
+ """Reactive autoscaler-style baseline."""
14
+
15
+ def act(self, observation: Any) -> InfraAction:
16
+ obs = observation_to_dict(observation)
17
+ cpu_loads = [float(v) for v in obs.get("cpu_loads", []) if float(v) >= 0.0]
18
+ avg_cpu = sum(cpu_loads) / len(cpu_loads) if cpu_loads else 0.0
19
+ latency = float(obs.get("latency_ms", 0.0) or 0.0)
20
+ failed_nodes = list(obs.get("failed_nodes", []) or [])
21
+
22
+ if avg_cpu > 0.80:
23
+ return InfraAction(action_type="scale_up")
24
+ if latency > 100.0:
25
+ return InfraAction(action_type="throttle", rate=0.7)
26
+ if failed_nodes:
27
+ return InfraAction(action_type="restart_node", target=int(failed_nodes[0]))
28
+ return InfraAction(action_type="no_op")
benchmark/__init__.py ADDED
@@ -0,0 +1,5 @@
 
 
 
 
 
 
1
+ """Canonical DIME benchmark package."""
2
+
3
+ from benchmark.benchmark_config import DIME_V1_CONFIG
4
+
5
+ __all__ = ["DIME_V1_CONFIG"]
benchmark/benchmark_config.py ADDED
@@ -0,0 +1,104 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Immutable DIME-v1.0 benchmark definition."""
2
+
3
+ from __future__ import annotations
4
+
5
+ from dataclasses import dataclass
6
+ from types import MappingProxyType
7
+ from typing import Mapping
8
+
9
+ from benchmark.benchmark_registry import task_registry_snapshot
10
+
11
+
12
+ @dataclass(frozen=True)
13
+ class EvaluationProtocol:
14
+ """Locked evaluation protocol for a benchmark version."""
15
+
16
+ episodes_per_task: int
17
+ seeds: tuple[int, ...]
18
+ inference_only: bool
19
+ disable_online_learning: bool
20
+
21
+
22
+ @dataclass(frozen=True)
23
+ class DeterministicPolicy:
24
+ """Seed and replay requirements."""
25
+
26
+ seed_components: tuple[str, ...]
27
+ torch_deterministic: bool
28
+ trace_wraparound: str
29
+ replay_validation_required: bool
30
+
31
+
32
+ @dataclass(frozen=True)
33
+ class TopologyConstraints:
34
+ """Allowed constrained topology templates for DIME-v1.0."""
35
+
36
+ node_count: int
37
+ database_node: int
38
+ templates: tuple[str, ...]
39
+ app_nodes: tuple[int, ...]
40
+
41
+
42
+ @dataclass(frozen=True)
43
+ class BenchmarkConfig:
44
+ """Frozen benchmark-critical configuration."""
45
+
46
+ benchmark_name: str
47
+ benchmark_version: str
48
+ task_registry: Mapping[str, tuple[str, ...]]
49
+ evaluation_protocol: EvaluationProtocol
50
+ metric_weights: Mapping[str, float]
51
+ normalization_method: Mapping[str, object]
52
+ deterministic_policy: DeterministicPolicy
53
+ topology_constraints: TopologyConstraints
54
+
55
+
56
+ _METRIC_WEIGHTS = MappingProxyType(
57
+ {
58
+ "uptime": 0.35,
59
+ "latency_score": 0.25,
60
+ "throughput": 0.20,
61
+ "recovery_speed": 0.10,
62
+ "cost_efficiency": 0.10,
63
+ }
64
+ )
65
+
66
+ _NORMALIZATION_METHOD = MappingProxyType(
67
+ {
68
+ "latency": "auto",
69
+ "latency_candidates": ("inverse_minmax", "smooth_exponential"),
70
+ "target_latency_ms": 50.0,
71
+ "max_latency_ms": 500.0,
72
+ "latency_scale_ms": 100.0,
73
+ "max_allowed_recovery_time": 10.0,
74
+ "max_budget": "episode_initial_cloud_budget",
75
+ "selection_persistence": "run_config_snapshot",
76
+ }
77
+ )
78
+
79
+
80
+ DIME_V1_CONFIG = BenchmarkConfig(
81
+ benchmark_name="DIME",
82
+ benchmark_version="DIME-v1.0",
83
+ task_registry=MappingProxyType(dict(task_registry_snapshot(include_hidden=True))),
84
+ evaluation_protocol=EvaluationProtocol(
85
+ episodes_per_task=100,
86
+ seeds=tuple(range(100)),
87
+ inference_only=True,
88
+ disable_online_learning=True,
89
+ ),
90
+ metric_weights=_METRIC_WEIGHTS,
91
+ normalization_method=_NORMALIZATION_METHOD,
92
+ deterministic_policy=DeterministicPolicy(
93
+ seed_components=("seed", "task", "topology_template", "trace_offset"),
94
+ torch_deterministic=True,
95
+ trace_wraparound="(step + trace_offset) % trace_length",
96
+ replay_validation_required=True,
97
+ ),
98
+ topology_constraints=TopologyConstraints(
99
+ node_count=8,
100
+ database_node=0,
101
+ templates=("default", "app_ring", "dense_mesh", "sampled_mesh"),
102
+ app_nodes=(1, 2, 3, 4, 5, 6, 7),
103
+ ),
104
+ )
benchmark/benchmark_registry.py ADDED
@@ -0,0 +1,105 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Frozen task split registry for DIME-v1.0."""
2
+
3
+ from __future__ import annotations
4
+
5
+ from dataclasses import dataclass, field
6
+ from enum import Enum
7
+ from typing import Iterable, Mapping
8
+
9
+
10
+ class Split(str, Enum):
11
+ """Canonical benchmark split names."""
12
+
13
+ TRAIN = "train"
14
+ VALIDATION = "validation"
15
+ HIDDEN_EVAL = "hidden_eval"
16
+
17
+
18
+ @dataclass(frozen=True)
19
+ class TaskSpec:
20
+ """One immutable benchmark task entry."""
21
+
22
+ registry_id: str
23
+ task_id: str
24
+ split: Split
25
+ curriculum_level: int
26
+ topology_template: str = "default"
27
+ trace_offset: int = 0
28
+ tags: tuple[str, ...] = field(default_factory=tuple)
29
+
30
+ @property
31
+ def reset_kwargs(self) -> dict[str, object]:
32
+ return {
33
+ "task": self.task_id,
34
+ "curriculum_level": self.curriculum_level,
35
+ "topology_template": self.topology_template,
36
+ "trace_offset": self.trace_offset,
37
+ }
38
+
39
+
40
+ TRAIN_TASKS: tuple[TaskSpec, ...] = (
41
+ TaskSpec("train.level_1_read_logs", "level_1_read_logs", Split.TRAIN, 1),
42
+ TaskSpec("train.traffic_spike", "traffic_spike", Split.TRAIN, 2),
43
+ TaskSpec("train.node_failure", "node_failure", Split.TRAIN, 2),
44
+ TaskSpec("train.cascading_failure", "cascading_failure", Split.TRAIN, 3),
45
+ )
46
+
47
+ VALIDATION_TASKS: tuple[TaskSpec, ...] = (
48
+ TaskSpec("validation.flash_crowd", "flash_crowd", Split.VALIDATION, 4),
49
+ TaskSpec("validation.thundering_herd", "thundering_herd", Split.VALIDATION, 5, trace_offset=17),
50
+ TaskSpec("validation.zombie_node", "zombie_node", Split.VALIDATION, 5, trace_offset=41),
51
+ TaskSpec("validation.hot_shard_skew", "hot_shard_skew", Split.VALIDATION, 5, trace_offset=73),
52
+ )
53
+
54
+ _HIDDEN_EVAL_TASKS: tuple[TaskSpec, ...] = (
55
+ TaskSpec("hidden.retry_storm.default.011", "retry_storm", Split.HIDDEN_EVAL, 5, "default", 11, ("trace",)),
56
+ TaskSpec("hidden.black_swan.default.029", "black_swan_az_failure", Split.HIDDEN_EVAL, 5, "default", 29, ("trace",)),
57
+ TaskSpec("hidden.connection_pool.default.053", "connection_pool_deadlock", Split.HIDDEN_EVAL, 5, "default", 53, ("trace",)),
58
+ TaskSpec("hidden.autoscaler.default.089", "autoscaler_flapping_trap", Split.HIDDEN_EVAL, 5, "default", 89, ("trace",)),
59
+ TaskSpec("hidden.retry_storm.ring.137", "retry_storm", Split.HIDDEN_EVAL, 5, "app_ring", 137, ("topology_variant", "trace")),
60
+ TaskSpec("hidden.black_swan.dense.211", "black_swan_az_failure", Split.HIDDEN_EVAL, 5, "dense_mesh", 211, ("topology_variant", "trace")),
61
+ TaskSpec("hidden.connection_pool.ring.307", "connection_pool_deadlock", Split.HIDDEN_EVAL, 5, "app_ring", 307, ("topology_variant", "trace")),
62
+ TaskSpec("hidden.autoscaler.sampled.401", "autoscaler_flapping_trap", Split.HIDDEN_EVAL, 5, "sampled_mesh", 401, ("topology_variant", "trace")),
63
+ )
64
+
65
+
66
+ def get_training_task_ids() -> tuple[str, ...]:
67
+ """Return only tasks permitted for RL training."""
68
+ return tuple(task.task_id for task in TRAIN_TASKS)
69
+
70
+
71
+ def get_public_task_specs(split: Split | str) -> tuple[TaskSpec, ...]:
72
+ """Return non-hidden task specs for public training/tuning use."""
73
+ split_value = Split(split)
74
+ if split_value is Split.TRAIN:
75
+ return TRAIN_TASKS
76
+ if split_value is Split.VALIDATION:
77
+ return VALIDATION_TASKS
78
+ raise PermissionError("hidden_eval tasks require the official benchmark harness")
79
+
80
+
81
+ def get_benchmark_task_specs(split: Split | str) -> tuple[TaskSpec, ...]:
82
+ """Return task specs for the official evaluation harness."""
83
+ split_value = Split(split)
84
+ if split_value is Split.HIDDEN_EVAL:
85
+ return _HIDDEN_EVAL_TASKS
86
+ return get_public_task_specs(split_value)
87
+
88
+
89
+ def task_registry_snapshot(include_hidden: bool = True) -> Mapping[str, tuple[str, ...]]:
90
+ """Immutable split-to-registry-id snapshot for benchmark configs."""
91
+ snapshot: dict[str, tuple[str, ...]] = {
92
+ Split.TRAIN.value: tuple(task.registry_id for task in TRAIN_TASKS),
93
+ Split.VALIDATION.value: tuple(task.registry_id for task in VALIDATION_TASKS),
94
+ }
95
+ if include_hidden:
96
+ snapshot[Split.HIDDEN_EVAL.value] = tuple(task.registry_id for task in _HIDDEN_EVAL_TASKS)
97
+ return snapshot
98
+
99
+
100
+ def iter_all_specs(include_hidden: bool = True) -> Iterable[TaskSpec]:
101
+ """Iterate registered specs; hidden specs are opt-in."""
102
+ yield from TRAIN_TASKS
103
+ yield from VALIDATION_TASKS
104
+ if include_hidden:
105
+ yield from _HIDDEN_EVAL_TASKS
benchmark/deterministic.py ADDED
@@ -0,0 +1,173 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Deterministic replay controls and validation for DIME."""
2
+
3
+ from __future__ import annotations
4
+
5
+ import argparse
6
+ import random
7
+ from dataclasses import dataclass
8
+ from typing import Any
9
+
10
+ import numpy as np
11
+
12
+ from agents.base_agent import BaseAgent
13
+ from agents.heuristic_agent import HeuristicAgent
14
+ from benchmark.utils import action_to_dict, observation_to_dict
15
+ from server.environment import DistributedInfraEnvironment
16
+ from server.models import InfraAction
17
+
18
+
19
+ def set_global_seed(seed: int) -> None:
20
+ """Seed Python, NumPy, and torch if installed."""
21
+ random.seed(seed)
22
+ np.random.seed(seed)
23
+ try:
24
+ import torch
25
+
26
+ torch.manual_seed(seed)
27
+ if torch.cuda.is_available():
28
+ torch.cuda.manual_seed_all(seed)
29
+ try:
30
+ torch.use_deterministic_algorithms(True, warn_only=True)
31
+ except TypeError:
32
+ torch.use_deterministic_algorithms(True)
33
+ if hasattr(torch.backends, "cudnn"):
34
+ torch.backends.cudnn.deterministic = True
35
+ torch.backends.cudnn.benchmark = False
36
+ except ImportError:
37
+ return
38
+
39
+
40
+ @dataclass(frozen=True)
41
+ class ReplayValidationResult:
42
+ """Outcome from deterministic replay validation."""
43
+
44
+ passed: bool
45
+ task_id: str
46
+ seed: int
47
+ topology_template: str
48
+ trace_offset: int
49
+ steps: int
50
+
51
+
52
+ def _reset_agent(agent: Any, seed: int, task_id: str) -> None:
53
+ reset = getattr(agent, "reset", None)
54
+ if reset is None:
55
+ return
56
+ try:
57
+ reset(seed=seed, task_id=task_id)
58
+ except TypeError:
59
+ reset()
60
+
61
+
62
+ def _coerce_action(action: Any) -> InfraAction:
63
+ if isinstance(action, InfraAction):
64
+ return action
65
+ if isinstance(action, dict):
66
+ try:
67
+ return InfraAction.model_validate(action)
68
+ except Exception:
69
+ return InfraAction(action_type="no_op")
70
+ return InfraAction(action_type="no_op")
71
+
72
+
73
+ def _run_replay(
74
+ agent: BaseAgent,
75
+ *,
76
+ task_id: str,
77
+ seed: int,
78
+ topology_template: str,
79
+ trace_offset: int,
80
+ ) -> dict[str, Any]:
81
+ set_global_seed(seed)
82
+ _reset_agent(agent, seed, task_id)
83
+ env = DistributedInfraEnvironment()
84
+ obs = env.reset(
85
+ seed=seed,
86
+ task=task_id,
87
+ topology_template=topology_template,
88
+ trace_offset=trace_offset,
89
+ )
90
+ trajectory: list[dict[str, Any]] = []
91
+ rewards: list[float] = []
92
+
93
+ while True:
94
+ action = _coerce_action(agent.act(obs))
95
+ obs = env.step(action)
96
+ obs_dict = observation_to_dict(obs)
97
+ rewards.append(float(obs_dict.get("reward", 0.0) or 0.0))
98
+ trajectory.append(
99
+ {
100
+ "action": action_to_dict(action),
101
+ "reward": rewards[-1],
102
+ "latency_ms": obs_dict.get("latency_ms"),
103
+ "failed_nodes": obs_dict.get("failed_nodes", []),
104
+ "step": obs_dict.get("step"),
105
+ }
106
+ )
107
+ if bool(obs_dict.get("done", False)) or env.sim.step_count >= env.sim.max_steps:
108
+ break
109
+
110
+ return {
111
+ "rewards": rewards,
112
+ "latency_history": list(env.sim.latency_history),
113
+ "failure_history": [row["failed_nodes"] for row in trajectory],
114
+ "trajectory": trajectory,
115
+ }
116
+
117
+
118
+ def validate_replay(
119
+ agent: BaseAgent | None = None,
120
+ task_id: str = "traffic_spike",
121
+ seed: int = 42,
122
+ topology_template: str = "default",
123
+ trace_offset: int = 0,
124
+ ) -> ReplayValidationResult:
125
+ """Run identical seeds twice and fail if deterministic replay diverges."""
126
+ active_agent = agent or HeuristicAgent()
127
+ first = _run_replay(
128
+ active_agent,
129
+ task_id=task_id,
130
+ seed=seed,
131
+ topology_template=topology_template,
132
+ trace_offset=trace_offset,
133
+ )
134
+ second = _run_replay(
135
+ active_agent,
136
+ task_id=task_id,
137
+ seed=seed,
138
+ topology_template=topology_template,
139
+ trace_offset=trace_offset,
140
+ )
141
+ if first != second:
142
+ raise AssertionError(
143
+ "Deterministic replay diverged for "
144
+ f"seed={seed}, task={task_id}, topology={topology_template}, trace_offset={trace_offset}"
145
+ )
146
+ return ReplayValidationResult(
147
+ passed=True,
148
+ task_id=task_id,
149
+ seed=seed,
150
+ topology_template=topology_template,
151
+ trace_offset=trace_offset,
152
+ steps=len(first["trajectory"]),
153
+ )
154
+
155
+
156
+ def main() -> None:
157
+ parser = argparse.ArgumentParser(description="Validate deterministic DIME replay.")
158
+ parser.add_argument("--task", default="traffic_spike")
159
+ parser.add_argument("--seed", type=int, default=42)
160
+ parser.add_argument("--topology-template", default="default")
161
+ parser.add_argument("--trace-offset", type=int, default=0)
162
+ args = parser.parse_args()
163
+ result = validate_replay(
164
+ task_id=args.task,
165
+ seed=args.seed,
166
+ topology_template=args.topology_template,
167
+ trace_offset=args.trace_offset,
168
+ )
169
+ print(result)
170
+
171
+
172
+ if __name__ == "__main__":
173
+ main()
benchmark/dime_index.py ADDED
@@ -0,0 +1,182 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Official DIME Index calculation."""
2
+
3
+ from __future__ import annotations
4
+
5
+ import math
6
+ from collections import defaultdict
7
+ from statistics import mean, variance
8
+ from typing import Any, Iterable, Mapping
9
+
10
+ from benchmark.benchmark_config import DIME_V1_CONFIG
11
+ from benchmark.utils import clamp
12
+
13
+
14
+ LATENCY_INVERSE_MINMAX = "inverse_minmax"
15
+ LATENCY_SMOOTH_EXPONENTIAL = "smooth_exponential"
16
+
17
+
18
+ def latency_inverse_minmax(
19
+ latency_ms: float,
20
+ *,
21
+ target_latency: float = 50.0,
22
+ max_latency: float = 500.0,
23
+ ) -> float:
24
+ """Method A: inverse min-max latency normalization."""
25
+ denom = max(max_latency - target_latency, 1e-9)
26
+ return 1.0 - clamp((float(latency_ms) - target_latency) / denom)
27
+
28
+
29
+ def latency_smooth_exponential(latency_ms: float, *, latency_scale: float = 100.0) -> float:
30
+ """Method B: smooth exponential latency normalization."""
31
+ return clamp(math.exp(-float(latency_ms) / max(latency_scale, 1e-9)))
32
+
33
+
34
+ def normalize_latency(
35
+ latency_ms: float,
36
+ method: str,
37
+ config: Mapping[str, Any] | None = None,
38
+ ) -> float:
39
+ """Normalize latency to [0, 1] with the selected method."""
40
+ norm = config or DIME_V1_CONFIG.normalization_method
41
+ if method == LATENCY_INVERSE_MINMAX:
42
+ return latency_inverse_minmax(
43
+ latency_ms,
44
+ target_latency=float(norm.get("target_latency_ms", 50.0)),
45
+ max_latency=float(norm.get("max_latency_ms", 500.0)),
46
+ )
47
+ if method == LATENCY_SMOOTH_EXPONENTIAL:
48
+ return latency_smooth_exponential(
49
+ latency_ms,
50
+ latency_scale=float(norm.get("latency_scale_ms", 100.0)),
51
+ )
52
+ raise ValueError(f"Unknown latency normalization method: {method}")
53
+
54
+
55
+ def normalize_metrics(
56
+ metrics: Mapping[str, Any],
57
+ *,
58
+ latency_method: str,
59
+ config: Mapping[str, Any] | None = None,
60
+ ) -> dict[str, float]:
61
+ """Normalize canonical DIME metrics to [0, 1]."""
62
+ norm = config or DIME_V1_CONFIG.normalization_method
63
+ mttr = float(metrics.get("mttr", metrics.get("MTTR", 0.0)) or 0.0)
64
+ recovery_window = float(norm.get("max_allowed_recovery_time", 10.0))
65
+ resource_cost = float(metrics.get("resource_cost", 0.0) or 0.0)
66
+ max_budget = float(metrics.get("max_budget", metrics.get("initial_cloud_budget", 1.0)) or 1.0)
67
+
68
+ return {
69
+ "uptime": clamp(float(metrics.get("uptime", metrics.get("uptime_ratio", 0.0)) or 0.0)),
70
+ "latency_score": normalize_latency(
71
+ float(metrics.get("p99_latency", metrics.get("latency_ms", 0.0)) or 0.0),
72
+ latency_method,
73
+ norm,
74
+ ),
75
+ "throughput": clamp(float(metrics.get("throughput", metrics.get("throughput_ratio", 0.0)) or 0.0)),
76
+ "recovery_speed": 1.0 - clamp(mttr / max(recovery_window, 1e-9)),
77
+ "cost_efficiency": 1.0 - clamp(resource_cost / max(max_budget, 1e-9)),
78
+ }
79
+
80
+
81
+ def compute_dime_index(
82
+ metrics: Mapping[str, Any],
83
+ config_snapshot: Mapping[str, Any] | None = None,
84
+ *,
85
+ latency_method: str | None = None,
86
+ ) -> dict[str, float]:
87
+ """Compute the official DIME Index and normalized metric breakdown."""
88
+ snapshot = config_snapshot or {}
89
+ method = latency_method or str(snapshot.get("selected_latency_method") or LATENCY_SMOOTH_EXPONENTIAL)
90
+ norm = snapshot.get("normalization_method") if isinstance(snapshot, Mapping) else None
91
+ normalized = normalize_metrics(metrics, latency_method=method, config=norm)
92
+ weights = (
93
+ snapshot.get("metric_weights")
94
+ if isinstance(snapshot, Mapping) and "metric_weights" in snapshot
95
+ else DIME_V1_CONFIG.metric_weights
96
+ )
97
+ score = sum(float(weights[key]) * normalized[key] for key in normalized)
98
+ return {"dime_index": round(clamp(score), 6), **{k: round(v, 6) for k, v in normalized.items()}}
99
+
100
+
101
+ def _rank(values: list[float]) -> list[int]:
102
+ order = sorted(range(len(values)), key=lambda idx: values[idx])
103
+ ranks = [0] * len(values)
104
+ for rank, idx in enumerate(order):
105
+ ranks[idx] = rank
106
+ return ranks
107
+
108
+
109
+ def _pearson(a: list[float], b: list[float]) -> float:
110
+ if len(a) < 2 or len(b) < 2:
111
+ return 0.0
112
+ mean_a = mean(a)
113
+ mean_b = mean(b)
114
+ numerator = sum((x - mean_a) * (y - mean_b) for x, y in zip(a, b))
115
+ denom_a = math.sqrt(sum((x - mean_a) ** 2 for x in a))
116
+ denom_b = math.sqrt(sum((y - mean_b) ** 2 for y in b))
117
+ if denom_a == 0.0 or denom_b == 0.0:
118
+ return 0.0
119
+ return numerator / (denom_a * denom_b)
120
+
121
+
122
+ def _method_quality(records: list[Mapping[str, Any]], method: str) -> dict[str, float]:
123
+ latencies = [float(r.get("p99_latency", r.get("latency_ms", 0.0)) or 0.0) for r in records]
124
+ scores = [normalize_latency(lat, method) for lat in latencies]
125
+ outcomes = [
126
+ float(r.get("task_success", 0.0) or 0.0) + float(r.get("task_score", 0.0) or 0.0)
127
+ for r in records
128
+ ]
129
+
130
+ rank_consistency = abs(_pearson([float(v) for v in _rank(scores)], [float(v) for v in _rank(outcomes)]))
131
+ var = variance(scores) if len(scores) > 1 else 0.0
132
+ variance_stability = 1.0 / (1.0 + var)
133
+ sorted_scores = [score for _, score in sorted(zip(latencies, scores), key=lambda item: item[0])]
134
+ jumps = [abs(b - a) for a, b in zip(sorted_scores, sorted_scores[1:])]
135
+ smoothness = 1.0 / (1.0 + (max(jumps) if jumps else 0.0))
136
+
137
+ by_task: dict[str, list[float]] = defaultdict(list)
138
+ for record, score in zip(records, scores):
139
+ by_task[str(record.get("task_id", record.get("task", "unknown")))].append(score)
140
+ task_means = [mean(values) for values in by_task.values() if values]
141
+ between = variance(task_means) if len(task_means) > 1 else 0.0
142
+ within_values = []
143
+ for values in by_task.values():
144
+ if len(values) > 1:
145
+ within_values.append(variance(values))
146
+ within = mean(within_values) if within_values else 0.0
147
+ separability = clamp(between / (between + within + 1e-9))
148
+
149
+ aggregate = mean([rank_consistency, variance_stability, smoothness, separability])
150
+ return {
151
+ "ranking_consistency": round(rank_consistency, 6),
152
+ "variance_stability": round(variance_stability, 6),
153
+ "score_smoothness": round(smoothness, 6),
154
+ "task_separability": round(separability, 6),
155
+ "aggregate": round(aggregate, 6),
156
+ }
157
+
158
+
159
+ def select_latency_normalization(records: Iterable[Mapping[str, Any]]) -> dict[str, Any]:
160
+ """Evaluate both latency normalization candidates and select the better one."""
161
+ data = list(records)
162
+ if not data:
163
+ return {
164
+ "selected_method": LATENCY_SMOOTH_EXPONENTIAL,
165
+ "method_scores": {
166
+ LATENCY_INVERSE_MINMAX: {"aggregate": 0.0},
167
+ LATENCY_SMOOTH_EXPONENTIAL: {"aggregate": 0.0},
168
+ },
169
+ }
170
+
171
+ method_scores = {
172
+ LATENCY_INVERSE_MINMAX: _method_quality(data, LATENCY_INVERSE_MINMAX),
173
+ LATENCY_SMOOTH_EXPONENTIAL: _method_quality(data, LATENCY_SMOOTH_EXPONENTIAL),
174
+ }
175
+ selected = max(
176
+ method_scores,
177
+ key=lambda method: (
178
+ method_scores[method]["aggregate"],
179
+ method_scores[method].get("score_smoothness", 0.0),
180
+ ),
181
+ )
182
+ return {"selected_method": selected, "method_scores": method_scores}
benchmark/evaluation_harness.py ADDED
@@ -0,0 +1,399 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Official DIME-v1.0 benchmark evaluation harness."""
2
+
3
+ from __future__ import annotations
4
+
5
+ import argparse
6
+ import contextlib
7
+ import time
8
+ from pathlib import Path
9
+ from typing import Any, Callable, Iterable, Mapping
10
+
11
+ import requests
12
+
13
+ from agents.base_agent import BaseAgent
14
+ from agents.heuristic_agent import HeuristicAgent
15
+ from agents.random_agent import RandomAgent
16
+ from agents.threshold_agent import ThresholdAgent
17
+ from benchmark.benchmark_config import BenchmarkConfig, DIME_V1_CONFIG
18
+ from benchmark.benchmark_registry import Split, TaskSpec, get_benchmark_task_specs
19
+ from benchmark.deterministic import set_global_seed
20
+ from benchmark.dime_index import compute_dime_index, select_latency_normalization
21
+ from benchmark.statistical_report import build_statistical_report, persist_statistical_report
22
+ from benchmark.utils import (
23
+ BENCHMARK_RUNS_DIR,
24
+ SEED_LOGS_DIR,
25
+ STATISTICAL_REPORTS_DIR,
26
+ action_to_dict,
27
+ append_jsonl,
28
+ atomic_write_json,
29
+ ensure_result_dirs,
30
+ observation_to_dict,
31
+ to_plain_data,
32
+ utc_run_id,
33
+ write_csv,
34
+ )
35
+ from server.environment import DistributedInfraEnvironment
36
+ from server.models import InfraAction
37
+
38
+
39
+ class CallableAgent(BaseAgent):
40
+ """Adapter for local callables that return InfraAction-compatible data."""
41
+
42
+ def __init__(self, fn: Callable[[Any], Any]) -> None:
43
+ self._fn = fn
44
+
45
+ def act(self, observation: Any) -> Any:
46
+ return self._fn(observation)
47
+
48
+
49
+ class APIAgent(BaseAgent):
50
+ """Adapter for API agents that accept JSON observations and return actions."""
51
+
52
+ def __init__(self, endpoint: str, timeout_s: float = 30.0) -> None:
53
+ self.endpoint = endpoint
54
+ self.timeout_s = timeout_s
55
+
56
+ def act(self, observation: Any) -> Any:
57
+ response = requests.post(
58
+ self.endpoint,
59
+ json={"observation": observation_to_dict(observation)},
60
+ timeout=self.timeout_s,
61
+ )
62
+ response.raise_for_status()
63
+ payload = response.json()
64
+ return payload.get("action", payload)
65
+
66
+
67
+ class ReplayAgent(BaseAgent):
68
+ """Replay pre-recorded actions for deterministic trajectory checks."""
69
+
70
+ def __init__(self, actions: Iterable[Mapping[str, Any]]) -> None:
71
+ self._actions = [dict(action) for action in actions]
72
+ self._idx = 0
73
+
74
+ def reset(self, seed: int | None = None, task_id: str | None = None) -> None:
75
+ self._idx = 0
76
+
77
+ def act(self, observation: Any) -> Any:
78
+ if self._idx >= len(self._actions):
79
+ return InfraAction(action_type="no_op")
80
+ action = self._actions[self._idx]
81
+ self._idx += 1
82
+ return action
83
+
84
+
85
+ def _resolve_agent(agent: str | BaseAgent | Callable[[Any], Any]) -> BaseAgent:
86
+ if isinstance(agent, BaseAgent):
87
+ return agent
88
+ if callable(agent) and not isinstance(agent, str):
89
+ return CallableAgent(agent)
90
+ if agent == "random":
91
+ return RandomAgent()
92
+ if agent == "heuristic":
93
+ return HeuristicAgent()
94
+ if agent == "threshold":
95
+ return ThresholdAgent()
96
+ if isinstance(agent, str) and agent.startswith("http"):
97
+ return APIAgent(agent)
98
+ raise ValueError(f"Unknown agent specifier: {agent!r}")
99
+
100
+
101
+ def _coerce_action(action: Any) -> InfraAction:
102
+ if isinstance(action, InfraAction):
103
+ return action
104
+ if isinstance(action, Mapping):
105
+ try:
106
+ return InfraAction.model_validate(dict(action))
107
+ except Exception:
108
+ return InfraAction(action_type="no_op")
109
+ return InfraAction(action_type="no_op")
110
+
111
+
112
+ def _reset_agent(agent: BaseAgent, seed: int, task_id: str) -> None:
113
+ try:
114
+ agent.reset(seed=seed, task_id=task_id)
115
+ except TypeError:
116
+ agent.reset()
117
+
118
+
119
+ @contextlib.contextmanager
120
+ def _inference_only(agent: BaseAgent):
121
+ """Block common online-learning mutation entrypoints during evaluation."""
122
+ patched: list[tuple[Any, str, Any]] = []
123
+
124
+ def disabled(*args: Any, **kwargs: Any) -> None:
125
+ raise RuntimeError("DIME benchmark evaluation is inference-only")
126
+
127
+ names = (
128
+ "backward",
129
+ "learn",
130
+ "optimize",
131
+ "optimizer_step",
132
+ "policy_update",
133
+ "rollout",
134
+ "train_step",
135
+ "update",
136
+ "update_policy",
137
+ )
138
+ for name in names:
139
+ if hasattr(agent, name):
140
+ patched.append((agent, name, getattr(agent, name)))
141
+ setattr(agent, name, disabled)
142
+
143
+ for owner_name in ("optimizer", "optim", "replay_buffer"):
144
+ owner = getattr(agent, owner_name, None)
145
+ if owner is None:
146
+ continue
147
+ for name in ("step", "add", "append", "extend", "push", "update"):
148
+ if hasattr(owner, name):
149
+ patched.append((owner, name, getattr(owner, name)))
150
+ setattr(owner, name, disabled)
151
+
152
+ if hasattr(agent, "eval"):
153
+ try:
154
+ agent.eval()
155
+ except TypeError:
156
+ pass
157
+ if hasattr(agent, "train"):
158
+ try:
159
+ agent.train(False)
160
+ except TypeError:
161
+ pass
162
+
163
+ try:
164
+ try:
165
+ import torch
166
+
167
+ with torch.inference_mode():
168
+ yield
169
+ except ImportError:
170
+ yield
171
+ finally:
172
+ for owner, name, original in reversed(patched):
173
+ setattr(owner, name, original)
174
+
175
+
176
+ def _percentile(values: list[float], pct: float) -> float:
177
+ if not values:
178
+ return 0.0
179
+ ordered = sorted(float(v) for v in values)
180
+ idx = min(len(ordered) - 1, max(0, int(round((pct / 100.0) * (len(ordered) - 1)))))
181
+ return ordered[idx]
182
+
183
+
184
+ def _mttr(uptime_history: list[float]) -> float:
185
+ durations: list[int] = []
186
+ current = 0
187
+ for uptime in uptime_history:
188
+ if uptime < 1.0:
189
+ current += 1
190
+ elif current:
191
+ durations.append(current)
192
+ current = 0
193
+ if current:
194
+ durations.append(current)
195
+ return sum(durations) / len(durations) if durations else 0.0
196
+
197
+
198
+ def _episode_metrics(
199
+ env: DistributedInfraEnvironment,
200
+ rewards: list[float],
201
+ task_score: float,
202
+ initial_cloud_budget: int,
203
+ ) -> dict[str, float]:
204
+ sim = env.sim
205
+ uptime = sum(sim.uptime_history) / len(sim.uptime_history) if sim.uptime_history else 0.0
206
+ throughput = sim.total_requests_served / max(1, sim.total_requests_received)
207
+ alive = sum(1 for node in sim.nodes if not node.is_failed)
208
+ total = max(1, len(sim.nodes))
209
+ resource_cost = max(0, initial_cloud_budget - sim.cloud_budget)
210
+ return {
211
+ "uptime": uptime,
212
+ "p99_latency": _percentile(sim.latency_history, 99.0),
213
+ "throughput": throughput,
214
+ "throughput_ratio": throughput,
215
+ "mttr": _mttr(sim.uptime_history),
216
+ "resource_cost": float(resource_cost),
217
+ "max_budget": float(max(1, initial_cloud_budget)),
218
+ "initial_cloud_budget": float(initial_cloud_budget),
219
+ "survival_rate": alive / total,
220
+ "cumulative_reward": sum(rewards),
221
+ "task_success": 1.0 if task_score >= 0.8 else 0.0,
222
+ "task_score": task_score,
223
+ }
224
+
225
+
226
+ def _config_snapshot(config: BenchmarkConfig, selected_latency_method: str | None = None) -> dict[str, Any]:
227
+ snapshot = to_plain_data(config)
228
+ if selected_latency_method is not None:
229
+ snapshot["selected_latency_method"] = selected_latency_method
230
+ return snapshot
231
+
232
+
233
+ def _run_episode(
234
+ agent: BaseAgent,
235
+ spec: TaskSpec,
236
+ seed: int,
237
+ *,
238
+ run_dir: Path,
239
+ ) -> dict[str, Any]:
240
+ set_global_seed(seed)
241
+ _reset_agent(agent, seed=seed, task_id=spec.task_id)
242
+ env = DistributedInfraEnvironment()
243
+ obs = env.reset(seed=seed, episode_id=f"{spec.registry_id}:{seed}", **spec.reset_kwargs)
244
+ initial_cloud_budget = env.sim.cloud_budget
245
+ trajectory: list[dict[str, Any]] = [
246
+ {"event": "reset", "seed": seed, "task_id": spec.task_id, "registry_id": spec.registry_id, "observation": observation_to_dict(obs)}
247
+ ]
248
+ rewards: list[float] = []
249
+ task_score = float(getattr(obs, "task_score", 0.0) or 0.0)
250
+ start = time.perf_counter()
251
+
252
+ while True:
253
+ action = _coerce_action(agent.act(obs))
254
+ obs = env.step(action)
255
+ obs_dict = observation_to_dict(obs)
256
+ reward = float(obs_dict.get("reward", 0.0) or 0.0)
257
+ rewards.append(reward)
258
+ task_score = float(obs_dict.get("task_score", task_score) or task_score)
259
+ trajectory.append(
260
+ {
261
+ "event": "step",
262
+ "step": obs_dict.get("step"),
263
+ "action": action_to_dict(action),
264
+ "reward": reward,
265
+ "done": bool(obs_dict.get("done", False)),
266
+ "task_score": task_score,
267
+ "observation": obs_dict,
268
+ }
269
+ )
270
+ if bool(obs_dict.get("done", False)) or env.sim.step_count >= env.sim.max_steps:
271
+ break
272
+
273
+ elapsed_s = time.perf_counter() - start
274
+ raw_path = run_dir / "trajectories" / spec.registry_id / f"seed_{seed:03d}.jsonl"
275
+ append_jsonl(raw_path, trajectory)
276
+ seed_log_path = SEED_LOGS_DIR / f"{run_dir.name}_{spec.registry_id}_seed_{seed:03d}.json"
277
+
278
+ metrics = _episode_metrics(env, rewards, task_score, initial_cloud_budget)
279
+ row = {
280
+ "benchmark_version": DIME_V1_CONFIG.benchmark_version,
281
+ "registry_id": spec.registry_id,
282
+ "task_id": spec.task_id,
283
+ "split": spec.split.value,
284
+ "seed": seed,
285
+ "topology_template": spec.topology_template,
286
+ "trace_offset": spec.trace_offset,
287
+ "steps": env.sim.step_count,
288
+ "elapsed_s": round(elapsed_s, 6),
289
+ "trajectory_path": str(raw_path),
290
+ **metrics,
291
+ }
292
+ atomic_write_json(seed_log_path, row)
293
+ return row
294
+
295
+
296
+ def run_benchmark(
297
+ agent: str | BaseAgent | Callable[[Any], Any],
298
+ benchmark_version: str = "DIME-v1.0",
299
+ split: str = "hidden_eval",
300
+ ) -> dict[str, Any]:
301
+ """Run the official DIME benchmark and persist all artifacts."""
302
+ if benchmark_version != DIME_V1_CONFIG.benchmark_version:
303
+ raise ValueError(f"Unsupported benchmark version: {benchmark_version}")
304
+
305
+ ensure_result_dirs()
306
+ active_agent = _resolve_agent(agent)
307
+ split_value = Split(split)
308
+ specs = get_benchmark_task_specs(split_value)
309
+ config = DIME_V1_CONFIG
310
+ seeds = config.evaluation_protocol.seeds
311
+ if len(seeds) != config.evaluation_protocol.episodes_per_task:
312
+ raise RuntimeError("DIME-v1.0 requires exactly 100 seeds for 100 episodes per task")
313
+
314
+ run_id = utc_run_id(f"{benchmark_version}_{split_value.value}")
315
+ run_dir = BENCHMARK_RUNS_DIR / run_id
316
+ run_dir.mkdir(parents=True, exist_ok=True)
317
+ atomic_write_json(run_dir / "benchmark_config.initial.json", _config_snapshot(config))
318
+
319
+ episode_rows: list[dict[str, Any]] = []
320
+ with _inference_only(active_agent):
321
+ for spec in specs:
322
+ for seed in seeds:
323
+ episode_rows.append(_run_episode(active_agent, spec, seed, run_dir=run_dir))
324
+
325
+ latency_selection = select_latency_normalization(episode_rows)
326
+ selected_method = latency_selection["selected_method"]
327
+ final_config_snapshot = _config_snapshot(config, selected_method)
328
+ final_config_snapshot["latency_method_selection"] = latency_selection
329
+
330
+ scored_rows: list[dict[str, Any]] = []
331
+ for row in episode_rows:
332
+ score_payload = compute_dime_index(row, final_config_snapshot)
333
+ scored_rows.append({**row, **score_payload})
334
+
335
+ report = build_statistical_report(scored_rows)
336
+ summary = {
337
+ "run_id": run_id,
338
+ "benchmark_version": benchmark_version,
339
+ "split": split_value.value,
340
+ "episodes_per_task": config.evaluation_protocol.episodes_per_task,
341
+ "num_tasks": len(specs),
342
+ "num_episodes": len(scored_rows),
343
+ "selected_latency_method": selected_method,
344
+ "latency_method_selection": latency_selection,
345
+ "mean_dime_index": report["episodes"]["dime_index"]["mean"],
346
+ "artifact_dir": str(run_dir),
347
+ }
348
+
349
+ atomic_write_json(run_dir / "benchmark_config.snapshot.json", final_config_snapshot)
350
+ atomic_write_json(run_dir / "benchmark_summary.json", summary)
351
+ atomic_write_json(run_dir / "episode_metrics.json", scored_rows)
352
+ write_csv(
353
+ run_dir / "episode_metrics.csv",
354
+ scored_rows,
355
+ [
356
+ "benchmark_version",
357
+ "registry_id",
358
+ "task_id",
359
+ "split",
360
+ "seed",
361
+ "topology_template",
362
+ "trace_offset",
363
+ "steps",
364
+ "dime_index",
365
+ "uptime",
366
+ "latency_score",
367
+ "throughput",
368
+ "recovery_speed",
369
+ "cost_efficiency",
370
+ "p99_latency",
371
+ "mttr",
372
+ "resource_cost",
373
+ "cumulative_reward",
374
+ "task_success",
375
+ "survival_rate",
376
+ "task_score",
377
+ ],
378
+ )
379
+ persist_statistical_report(
380
+ report,
381
+ STATISTICAL_REPORTS_DIR / f"{run_id}.json",
382
+ STATISTICAL_REPORTS_DIR / f"{run_id}.csv",
383
+ )
384
+ atomic_write_json(run_dir / "statistical_report.json", report)
385
+ return {"summary": summary, "report": report, "run_dir": str(run_dir)}
386
+
387
+
388
+ def main() -> None:
389
+ parser = argparse.ArgumentParser(description="Run the official DIME-v1.0 benchmark.")
390
+ parser.add_argument("--agent", default="heuristic", help="random, heuristic, threshold, or an HTTP endpoint")
391
+ parser.add_argument("--split", default="hidden_eval", choices=[split.value for split in Split])
392
+ parser.add_argument("--benchmark-version", default="DIME-v1.0")
393
+ args = parser.parse_args()
394
+ result = run_benchmark(args.agent, benchmark_version=args.benchmark_version, split=args.split)
395
+ print(result["summary"])
396
+
397
+
398
+ if __name__ == "__main__":
399
+ main()
benchmark/statistical_report.py ADDED
@@ -0,0 +1,95 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Statistical reporting for DIME benchmark runs."""
2
+
3
+ from __future__ import annotations
4
+
5
+ import math
6
+ from collections import defaultdict
7
+ from pathlib import Path
8
+ from statistics import mean, variance
9
+ from typing import Any, Iterable, Mapping
10
+
11
+ from benchmark.utils import atomic_write_json, write_csv
12
+
13
+
14
+ DEFAULT_REPORT_METRICS = (
15
+ "dime_index",
16
+ "uptime",
17
+ "latency_score",
18
+ "throughput",
19
+ "recovery_speed",
20
+ "cost_efficiency",
21
+ "p99_latency",
22
+ "mttr",
23
+ "cumulative_reward",
24
+ "task_success",
25
+ "survival_rate",
26
+ )
27
+
28
+
29
+ def summarize_values(values: Iterable[float]) -> dict[str, float]:
30
+ """Compute benchmark-grade summary statistics."""
31
+ data = [float(v) for v in values]
32
+ if not data:
33
+ return {"n": 0, "mean": 0.0, "std": 0.0, "variance": 0.0, "min": 0.0, "max": 0.0, "ci95_low": 0.0, "ci95_high": 0.0}
34
+ mu = mean(data)
35
+ var = variance(data) if len(data) > 1 else 0.0
36
+ std = math.sqrt(var)
37
+ half_width = 1.96 * std / math.sqrt(len(data)) if len(data) > 1 else 0.0
38
+ return {
39
+ "n": len(data),
40
+ "mean": round(mu, 6),
41
+ "std": round(std, 6),
42
+ "variance": round(var, 6),
43
+ "min": round(min(data), 6),
44
+ "max": round(max(data), 6),
45
+ "ci95_low": round(mu - half_width, 6),
46
+ "ci95_high": round(mu + half_width, 6),
47
+ }
48
+
49
+
50
+ def _summarize_records(records: list[Mapping[str, Any]], metrics: tuple[str, ...]) -> dict[str, dict[str, float]]:
51
+ report: dict[str, dict[str, float]] = {}
52
+ for metric in metrics:
53
+ values = [float(row[metric]) for row in records if metric in row and row[metric] is not None]
54
+ report[metric] = summarize_values(values)
55
+ return report
56
+
57
+
58
+ def build_statistical_report(
59
+ records: Iterable[Mapping[str, Any]],
60
+ *,
61
+ metrics: tuple[str, ...] = DEFAULT_REPORT_METRICS,
62
+ ) -> dict[str, Any]:
63
+ """Build summaries across episodes, tasks, and seeds."""
64
+ data = list(records)
65
+ by_task: dict[str, list[Mapping[str, Any]]] = defaultdict(list)
66
+ by_seed: dict[str, list[Mapping[str, Any]]] = defaultdict(list)
67
+
68
+ for row in data:
69
+ by_task[str(row.get("task_id", row.get("task", "unknown")))].append(row)
70
+ by_seed[str(row.get("seed", "unknown"))].append(row)
71
+
72
+ return {
73
+ "episodes": _summarize_records(data, metrics),
74
+ "tasks": {task: _summarize_records(rows, metrics) for task, rows in sorted(by_task.items())},
75
+ "seeds": {seed: _summarize_records(rows, metrics) for seed, rows in sorted(by_seed.items())},
76
+ }
77
+
78
+
79
+ def persist_statistical_report(report: Mapping[str, Any], json_path: Path, csv_path: Path) -> None:
80
+ """Persist statistical report as JSON and long-form CSV."""
81
+ atomic_write_json(json_path, report)
82
+ rows: list[dict[str, Any]] = []
83
+ for group, group_payload in report.items():
84
+ if group == "episodes":
85
+ for metric, stats in group_payload.items():
86
+ rows.append({"group": group, "key": "all", "metric": metric, **stats})
87
+ else:
88
+ for key, metric_payload in group_payload.items():
89
+ for metric, stats in metric_payload.items():
90
+ rows.append({"group": group, "key": key, "metric": metric, **stats})
91
+ write_csv(
92
+ csv_path,
93
+ rows,
94
+ ["group", "key", "metric", "n", "mean", "std", "variance", "min", "max", "ci95_low", "ci95_high"],
95
+ )
benchmark/utils.py ADDED
@@ -0,0 +1,126 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Utility helpers for canonical DIME benchmark runs."""
2
+
3
+ from __future__ import annotations
4
+
5
+ import json
6
+ import os
7
+ import tempfile
8
+ from dataclasses import fields, is_dataclass
9
+ from datetime import datetime, timezone
10
+ from pathlib import Path
11
+ from typing import Any, Iterable, Mapping
12
+
13
+
14
+ PROJECT_ROOT = Path(__file__).resolve().parents[1]
15
+ RESULTS_ROOT = PROJECT_ROOT / "results"
16
+ BENCHMARK_RUNS_DIR = RESULTS_ROOT / "benchmark_runs"
17
+ SEED_LOGS_DIR = RESULTS_ROOT / "seed_logs"
18
+ STATISTICAL_REPORTS_DIR = RESULTS_ROOT / "statistical_reports"
19
+
20
+
21
+ def clamp(value: float, lower: float = 0.0, upper: float = 1.0) -> float:
22
+ """Return ``value`` clipped to the closed interval [lower, upper]."""
23
+ return max(lower, min(upper, float(value)))
24
+
25
+
26
+ def ensure_result_dirs() -> None:
27
+ """Create benchmark artifact directories if they are missing."""
28
+ for path in (BENCHMARK_RUNS_DIR, SEED_LOGS_DIR, STATISTICAL_REPORTS_DIR):
29
+ path.mkdir(parents=True, exist_ok=True)
30
+
31
+
32
+ def utc_run_id(prefix: str = "dime") -> str:
33
+ """Stable UTC run identifier with second-level precision."""
34
+ stamp = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")
35
+ return f"{prefix}_{stamp}"
36
+
37
+
38
+ def to_plain_data(value: Any) -> Any:
39
+ """Convert dataclasses, Pydantic models, paths, and tuples to JSON data."""
40
+ if is_dataclass(value):
41
+ return {field.name: to_plain_data(getattr(value, field.name)) for field in fields(value)}
42
+ if hasattr(value, "model_dump"):
43
+ return to_plain_data(value.model_dump())
44
+ if isinstance(value, Mapping):
45
+ return {str(k): to_plain_data(v) for k, v in value.items()}
46
+ if isinstance(value, (list, tuple)):
47
+ return [to_plain_data(v) for v in value]
48
+ if isinstance(value, Path):
49
+ return str(value)
50
+ return value
51
+
52
+
53
+ def atomic_write_json(path: Path, payload: Any) -> None:
54
+ """Atomically write JSON so interrupted runs do not corrupt artifacts."""
55
+ path.parent.mkdir(parents=True, exist_ok=True)
56
+ fd, tmp_name = tempfile.mkstemp(prefix=path.name, dir=str(path.parent))
57
+ try:
58
+ with os.fdopen(fd, "w", encoding="utf-8") as fh:
59
+ json.dump(to_plain_data(payload), fh, indent=2, sort_keys=True)
60
+ fh.write("\n")
61
+ os.replace(tmp_name, path)
62
+ except Exception:
63
+ try:
64
+ os.unlink(tmp_name)
65
+ finally:
66
+ raise
67
+
68
+
69
+ def append_jsonl(path: Path, records: Iterable[Mapping[str, Any]]) -> None:
70
+ """Append JSONL records to ``path``."""
71
+ path.parent.mkdir(parents=True, exist_ok=True)
72
+ with path.open("a", encoding="utf-8") as fh:
73
+ for record in records:
74
+ fh.write(json.dumps(to_plain_data(record), sort_keys=True) + "\n")
75
+
76
+
77
+ def write_csv(path: Path, rows: Iterable[Mapping[str, Any]], fieldnames: list[str]) -> None:
78
+ """Write a small CSV without bringing in pandas as a runtime dependency."""
79
+ import csv
80
+
81
+ path.parent.mkdir(parents=True, exist_ok=True)
82
+ with path.open("w", newline="", encoding="utf-8") as fh:
83
+ writer = csv.DictWriter(fh, fieldnames=fieldnames, extrasaction="ignore")
84
+ writer.writeheader()
85
+ for row in rows:
86
+ writer.writerow({k: to_plain_data(v) for k, v in row.items()})
87
+
88
+
89
+ def observation_to_dict(observation: Any) -> dict[str, Any]:
90
+ """Normalize a DIME observation model or mapping to a plain dict."""
91
+ if isinstance(observation, Mapping):
92
+ return dict(observation)
93
+ if hasattr(observation, "model_dump"):
94
+ return observation.model_dump()
95
+ keys = [
96
+ "cpu_loads",
97
+ "mem_utilizations",
98
+ "queue_lengths",
99
+ "failed_nodes",
100
+ "latency_ms",
101
+ "request_rate",
102
+ "io_wait",
103
+ "p99_latency",
104
+ "error_budget",
105
+ "step",
106
+ "task_hint",
107
+ "task_score",
108
+ "done",
109
+ "reward",
110
+ "cloud_budget",
111
+ "action_errors",
112
+ ]
113
+ return {key: getattr(observation, key) for key in keys if hasattr(observation, key)}
114
+
115
+
116
+ def action_to_dict(action: Any) -> dict[str, Any]:
117
+ """Normalize an InfraAction-like object or mapping to a plain dict."""
118
+ if isinstance(action, Mapping):
119
+ return dict(action)
120
+ if hasattr(action, "model_dump"):
121
+ return action.model_dump(exclude_none=True)
122
+ return {
123
+ key: getattr(action, key)
124
+ for key in ("action_type", "target", "from_node", "to_node", "rate", "raw_command")
125
+ if hasattr(action, key) and getattr(action, key) is not None
126
+ }
server/environment.py CHANGED
@@ -102,10 +102,13 @@ class SimulationState:
102
 
103
  # --- Trace replay ---
104
  trace_replay: Any = None # Optional[TraceReplay]
 
 
105
  last_trace_p99_latency: float = 0.0
106
  last_trace_node_0_io: float = 0.0
107
  scenario: str = "" # task-specific chaos scenario overlay
108
  _black_swan_applied: bool = False
 
109
 
110
  # --- Throughput tracking (anti-exploit) ---
111
  total_requests_received: int = 0
@@ -122,15 +125,20 @@ class SimulationState:
122
  # ---------------------------------------------------------------------------
123
 
124
 
125
- def _build_default_graph(n: int = 8) -> Tuple[List[Node], Dict[int, List[int]]]:
126
- """Create a default graph with node roles: node 0 = Database, rest = App Servers."""
 
 
 
 
 
127
  nodes = []
128
  for i in range(n):
129
  if i == 0:
130
  # Database node: higher capacity, single point of failure
131
  nodes.append(
132
  Node(
133
- cpu_util=0.20 + random.uniform(-0.03, 0.03),
134
  capacity=25,
135
  role="database",
136
  )
@@ -138,7 +146,7 @@ def _build_default_graph(n: int = 8) -> Tuple[List[Node], Dict[int, List[int]]]:
138
  else:
139
  nodes.append(
140
  Node(
141
- cpu_util=0.25 + random.uniform(-0.05, 0.05),
142
  capacity=15,
143
  role="app_server",
144
  )
@@ -150,17 +158,41 @@ def _build_default_graph(n: int = 8) -> Tuple[List[Node], Dict[int, List[int]]]:
150
  for i in range(1, n):
151
  adjacency[0].append(i)
152
  adjacency[i].append(0)
153
- # App servers: ring + skip connections among themselves
 
154
  for i in range(1, n):
155
  right = 1 + (i % (n - 1)) # wrap within app server range
156
  if right not in adjacency[i]:
157
  adjacency[i].append(right)
158
  adjacency[right].append(i)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
159
  skip = 1 + ((i + 1) % (n - 1))
160
  if skip not in adjacency[i] and skip != right:
161
  adjacency[i].append(skip)
162
  adjacency[skip].append(i)
163
 
 
 
 
 
 
 
 
164
  return nodes, adjacency
165
 
166
 
@@ -218,6 +250,9 @@ class DistributedInfraEnvironment(Environment):
218
 
219
  task_id = kwargs.get("task", kwargs.get("task_id", "traffic_spike"))
220
  curriculum_level = int(kwargs.get("curriculum_level", 0))
 
 
 
221
 
222
  # Auto-detect curriculum level from task_id if not explicitly given
223
  if curriculum_level == 0:
@@ -233,7 +268,11 @@ class DistributedInfraEnvironment(Environment):
233
  }
234
  curriculum_level = _level_map.get(task_id, 2)
235
 
236
- nodes, adjacency = _build_default_graph(8)
 
 
 
 
237
  self._sim = SimulationState(
238
  nodes=nodes,
239
  adjacency=adjacency,
@@ -250,6 +289,9 @@ class DistributedInfraEnvironment(Environment):
250
  curriculum_level=curriculum_level,
251
  cloud_budget=max(5, 15 - curriculum_level * 2), # harder = tighter budget
252
  error_budget=100.0,
 
 
 
253
  )
254
 
255
  # Apply task-specific setup
@@ -488,7 +530,7 @@ class DistributedInfraEnvironment(Environment):
488
 
489
  # --- Trace replay: override request rate from real data ---
490
  if sim.trace_replay is not None:
491
- trace_step = sim.trace_replay.get_step(sim.step_count)
492
  sim.current_request_rate = trace_step.request_rate
493
  sim.last_trace_p99_latency = float(
494
  getattr(trace_step, "p99_latency", 0.0) or 0.0
 
102
 
103
  # --- Trace replay ---
104
  trace_replay: Any = None # Optional[TraceReplay]
105
+ trace_offset: int = 0
106
+ trace_offset_locked: bool = False
107
  last_trace_p99_latency: float = 0.0
108
  last_trace_node_0_io: float = 0.0
109
  scenario: str = "" # task-specific chaos scenario overlay
110
  _black_swan_applied: bool = False
111
+ topology_template: str = "default"
112
 
113
  # --- Throughput tracking (anti-exploit) ---
114
  total_requests_received: int = 0
 
125
  # ---------------------------------------------------------------------------
126
 
127
 
128
+ def _build_default_graph(
129
+ n: int = 8,
130
+ rng: Optional[random.Random] = None,
131
+ topology_template: str = "default",
132
+ ) -> Tuple[List[Node], Dict[int, List[int]]]:
133
+ """Create a constrained graph with node 0 = Database and rest = App Servers."""
134
+ rng = rng or random
135
  nodes = []
136
  for i in range(n):
137
  if i == 0:
138
  # Database node: higher capacity, single point of failure
139
  nodes.append(
140
  Node(
141
+ cpu_util=0.20 + rng.uniform(-0.03, 0.03),
142
  capacity=25,
143
  role="database",
144
  )
 
146
  else:
147
  nodes.append(
148
  Node(
149
+ cpu_util=0.25 + rng.uniform(-0.05, 0.05),
150
  capacity=15,
151
  role="app_server",
152
  )
 
158
  for i in range(1, n):
159
  adjacency[0].append(i)
160
  adjacency[i].append(0)
161
+ # App servers: deterministic constrained templates built from the same
162
+ # DB-star/ring physics. No arbitrary graph generation is introduced.
163
  for i in range(1, n):
164
  right = 1 + (i % (n - 1)) # wrap within app server range
165
  if right not in adjacency[i]:
166
  adjacency[i].append(right)
167
  adjacency[right].append(i)
168
+
169
+ if topology_template == "app_ring":
170
+ return nodes, adjacency
171
+
172
+ if topology_template == "sampled_mesh":
173
+ for i in range(1, n):
174
+ candidates = [j for j in range(1, n) if j != i and j not in adjacency[i]]
175
+ if candidates:
176
+ peer = rng.choice(candidates)
177
+ adjacency[i].append(peer)
178
+ adjacency[peer].append(i)
179
+ return nodes, adjacency
180
+
181
+ # Default and dense_mesh retain the historical skip-link shape.
182
+ for i in range(1, n):
183
+ right = 1 + (i % (n - 1))
184
  skip = 1 + ((i + 1) % (n - 1))
185
  if skip not in adjacency[i] and skip != right:
186
  adjacency[i].append(skip)
187
  adjacency[skip].append(i)
188
 
189
+ if topology_template == "dense_mesh":
190
+ for i in range(1, n):
191
+ extra = 1 + ((i + 2) % (n - 1))
192
+ if extra != i and extra not in adjacency[i]:
193
+ adjacency[i].append(extra)
194
+ adjacency[extra].append(i)
195
+
196
  return nodes, adjacency
197
 
198
 
 
250
 
251
  task_id = kwargs.get("task", kwargs.get("task_id", "traffic_spike"))
252
  curriculum_level = int(kwargs.get("curriculum_level", 0))
253
+ topology_template = str(kwargs.get("topology_template", "default"))
254
+ trace_offset_arg = kwargs.get("trace_offset", None)
255
+ trace_offset = int(trace_offset_arg) if trace_offset_arg is not None else 0
256
 
257
  # Auto-detect curriculum level from task_id if not explicitly given
258
  if curriculum_level == 0:
 
268
  }
269
  curriculum_level = _level_map.get(task_id, 2)
270
 
271
+ nodes, adjacency = _build_default_graph(
272
+ 8,
273
+ rng=self._rng,
274
+ topology_template=topology_template,
275
+ )
276
  self._sim = SimulationState(
277
  nodes=nodes,
278
  adjacency=adjacency,
 
289
  curriculum_level=curriculum_level,
290
  cloud_budget=max(5, 15 - curriculum_level * 2), # harder = tighter budget
291
  error_budget=100.0,
292
+ trace_offset=trace_offset,
293
+ trace_offset_locked=trace_offset_arg is not None,
294
+ topology_template=topology_template,
295
  )
296
 
297
  # Apply task-specific setup
 
530
 
531
  # --- Trace replay: override request rate from real data ---
532
  if sim.trace_replay is not None:
533
+ trace_step = sim.trace_replay.get_step(sim.step_count, offset=sim.trace_offset)
534
  sim.current_request_rate = trace_step.request_rate
535
  sim.last_trace_p99_latency = float(
536
  getattr(trace_step, "p99_latency", 0.0) or 0.0
server/tasks.py CHANGED
@@ -331,10 +331,11 @@ def _setup_alibaba_trace(env: "DistributedInfraEnvironment", rng: "random.Random
331
  trace = load_default_trace()
332
  if trace is not None:
333
  sim.trace_replay = trace
334
- # Start replay from a random offset to vary episodes
335
- offset = rng.randint(0, max(1, len(trace) - sim.max_steps))
336
- # We store offset in step_count adjustment — trace_loader wraps around
337
- sim.current_request_rate = trace.get_step(offset).request_rate
 
338
  else:
339
  # Fallback: synthetic 2x traffic if trace not generated
340
  sim.current_request_rate = sim.base_request_rate * 2.0
 
331
  trace = load_default_trace()
332
  if trace is not None:
333
  sim.trace_replay = trace
334
+ # Start replay from a deterministic benchmark offset when provided,
335
+ # otherwise preserve the existing stochastic task variation.
336
+ if not sim.trace_offset_locked:
337
+ sim.trace_offset = rng.randint(0, max(1, len(trace) - sim.max_steps))
338
+ sim.current_request_rate = trace.get_step(0, offset=sim.trace_offset).request_rate
339
  else:
340
  # Fallback: synthetic 2x traffic if trace not generated
341
  sim.current_request_rate = sim.base_request_rate * 2.0
server/trace_loader.py CHANGED
@@ -71,11 +71,11 @@ class TraceReplay:
71
  def __len__(self) -> int:
72
  return len(self._steps)
73
 
74
- def get_step(self, step: int) -> TraceStep:
75
- """Get trace data for a given step. Wraps around."""
76
  if not self._steps:
77
  return TraceStep()
78
- return self._steps[step % len(self._steps)]
79
 
80
 
81
  # ---------------------------------------------------------------------------
 
71
  def __len__(self) -> int:
72
  return len(self._steps)
73
 
74
+ def get_step(self, step: int, offset: int = 0) -> TraceStep:
75
+ """Get trace data for a given step plus deterministic offset. Wraps around."""
76
  if not self._steps:
77
  return TraceStep()
78
+ return self._steps[(int(step) + int(offset)) % len(self._steps)]
79
 
80
 
81
  # ---------------------------------------------------------------------------
train_grpo_unsloth.py CHANGED
@@ -101,19 +101,10 @@ MAX_COMPLETION_LENGTH = (
101
  SAVE_STEPS = 100
102
 
103
  ALL_TASKS = [
 
104
  "traffic_spike",
105
  "node_failure",
106
  "cascading_failure",
107
- "flash_crowd",
108
- "thundering_herd",
109
- "zombie_node",
110
- "hot_shard_skew",
111
- "memory_leak_slow_burn",
112
- "split_brain_io_bottleneck",
113
- "black_swan_az_failure",
114
- "retry_storm",
115
- "connection_pool_deadlock",
116
- "autoscaler_flapping_trap",
117
  ]
118
 
119
  # ---------------------------------------------------------------------------
 
101
  SAVE_STEPS = 100
102
 
103
  ALL_TASKS = [
104
+ "level_1_read_logs",
105
  "traffic_spike",
106
  "node_failure",
107
  "cascading_failure",
 
 
 
 
 
 
 
 
 
 
108
  ]
109
 
110
  # ---------------------------------------------------------------------------