Spaces:
Sleeping
Sleeping
| """ | |
| Trace replay loader for DIME. | |
| Loads pre-processed Alibaba-style cluster trace CSV files and provides | |
| step-by-step replay of real-world traffic patterns, CPU baselines, | |
| and latency injections. | |
| """ | |
| from __future__ import annotations | |
| import csv | |
| import os | |
| from dataclasses import dataclass, field | |
| from typing import Dict, List, Optional | |
| class TraceStep: | |
| """One step of trace data.""" | |
| request_rate: float = 100.0 | |
| latency_injection: float = 0.0 | |
| p99_latency: float = 0.0 | |
| node_0_io: float = 0.0 | |
| node_cpu: Dict[int, float] = field(default_factory=dict) | |
| node_mem: Dict[int, float] = field(default_factory=dict) | |
| class TraceReplay: | |
| """ | |
| Load and replay a trace CSV file step-by-step. | |
| The CSV must have columns: | |
| step, node_0_cpu, node_0_mem, ..., request_rate | |
| Optionally supports: | |
| - latency_injection | |
| - p99_latency | |
| - node_0_io | |
| Wraps around if the episode exceeds the trace length. | |
| """ | |
| def __init__(self, csv_path: str) -> None: | |
| self._steps: List[TraceStep] = [] | |
| self._load(csv_path) | |
| def _load(self, csv_path: str) -> None: | |
| if not os.path.exists(csv_path): | |
| raise FileNotFoundError(f"Trace file not found: {csv_path}") | |
| with open(csv_path, "r") as f: | |
| reader = csv.DictReader(f) | |
| for row in reader: | |
| ts = TraceStep( | |
| request_rate=float(row["request_rate"]), | |
| latency_injection=float(row.get("latency_injection", 0.0) or 0.0), | |
| p99_latency=float(row.get("p99_latency", 0.0) or 0.0), | |
| node_0_io=float(row.get("node_0_io", 0.0) or 0.0), | |
| ) | |
| # Parse per-node metrics | |
| for key, val in row.items(): | |
| if key.startswith("node_") and key.endswith("_cpu"): | |
| idx = int(key.split("_")[1]) | |
| ts.node_cpu[idx] = float(val) | |
| elif key.startswith("node_") and key.endswith("_mem"): | |
| idx = int(key.split("_")[1]) | |
| ts.node_mem[idx] = float(val) | |
| self._steps.append(ts) | |
| def __len__(self) -> int: | |
| return len(self._steps) | |
| def get_step(self, step: int) -> TraceStep: | |
| """Get trace data for a given step. Wraps around.""" | |
| if not self._steps: | |
| return TraceStep() | |
| return self._steps[step % len(self._steps)] | |
| # --------------------------------------------------------------------------- | |
| # Default trace path | |
| # --------------------------------------------------------------------------- | |
| _DEFAULT_TRACE = os.path.join( | |
| os.path.dirname(__file__), "traces", "alibaba_v2021_8node_500steps.csv" | |
| ) | |
| _REAL_TRACE = os.path.join( | |
| os.path.dirname(__file__), "traces", "real_production_trace.csv" | |
| ) | |
| def load_default_trace() -> Optional[TraceReplay]: | |
| """Load the best available trace, or None if missing.""" | |
| if os.path.exists(_REAL_TRACE): | |
| return TraceReplay(_REAL_TRACE) | |
| if os.path.exists(_DEFAULT_TRACE): | |
| return TraceReplay(_DEFAULT_TRACE) | |
| return None | |