""" Trace replay loader for DIME. Loads pre-processed Alibaba-style cluster trace CSV files and provides step-by-step replay of real-world traffic patterns, CPU baselines, and latency injections. """ from __future__ import annotations import csv import os from dataclasses import dataclass, field from typing import Dict, List, Optional @dataclass class TraceStep: """One step of trace data.""" request_rate: float = 100.0 latency_injection: float = 0.0 p99_latency: float = 0.0 node_0_io: float = 0.0 node_cpu: Dict[int, float] = field(default_factory=dict) node_mem: Dict[int, float] = field(default_factory=dict) class TraceReplay: """ Load and replay a trace CSV file step-by-step. The CSV must have columns: step, node_0_cpu, node_0_mem, ..., request_rate Optionally supports: - latency_injection - p99_latency - node_0_io Wraps around if the episode exceeds the trace length. """ def __init__(self, csv_path: str) -> None: self._steps: List[TraceStep] = [] self._load(csv_path) def _load(self, csv_path: str) -> None: if not os.path.exists(csv_path): raise FileNotFoundError(f"Trace file not found: {csv_path}") with open(csv_path, "r") as f: reader = csv.DictReader(f) for row in reader: ts = TraceStep( request_rate=float(row["request_rate"]), latency_injection=float(row.get("latency_injection", 0.0) or 0.0), p99_latency=float(row.get("p99_latency", 0.0) or 0.0), node_0_io=float(row.get("node_0_io", 0.0) or 0.0), ) # Parse per-node metrics for key, val in row.items(): if key.startswith("node_") and key.endswith("_cpu"): idx = int(key.split("_")[1]) ts.node_cpu[idx] = float(val) elif key.startswith("node_") and key.endswith("_mem"): idx = int(key.split("_")[1]) ts.node_mem[idx] = float(val) self._steps.append(ts) def __len__(self) -> int: return len(self._steps) def get_step(self, step: int) -> TraceStep: """Get trace data for a given step. Wraps around.""" if not self._steps: return TraceStep() return self._steps[step % len(self._steps)] # --------------------------------------------------------------------------- # Default trace path # --------------------------------------------------------------------------- _DEFAULT_TRACE = os.path.join( os.path.dirname(__file__), "traces", "alibaba_v2021_8node_500steps.csv" ) _REAL_TRACE = os.path.join( os.path.dirname(__file__), "traces", "real_production_trace.csv" ) def load_default_trace() -> Optional[TraceReplay]: """Load the best available trace, or None if missing.""" if os.path.exists(_REAL_TRACE): return TraceReplay(_REAL_TRACE) if os.path.exists(_DEFAULT_TRACE): return TraceReplay(_DEFAULT_TRACE) return None