Spaces:
Configuration error
Configuration error
File size: 3,224 Bytes
2ba6413 facabc7 2ba6413 facabc7 2ba6413 facabc7 2ba6413 54da37b 2ba6413 54da37b 2ba6413 facabc7 2ba6413 facabc7 2ba6413 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 | """
Trace replay loader for DIME.
Loads pre-processed Alibaba-style cluster trace CSV files and provides
step-by-step replay of real-world traffic patterns, CPU baselines,
and latency injections.
"""
from __future__ import annotations
import csv
import os
from dataclasses import dataclass, field
from typing import Dict, List, Optional
@dataclass
class TraceStep:
"""One step of trace data."""
request_rate: float = 100.0
latency_injection: float = 0.0
p99_latency: float = 0.0
node_0_io: float = 0.0
node_cpu: Dict[int, float] = field(default_factory=dict)
node_mem: Dict[int, float] = field(default_factory=dict)
class TraceReplay:
"""
Load and replay a trace CSV file step-by-step.
The CSV must have columns:
step, node_0_cpu, node_0_mem, ..., request_rate
Optionally supports:
- latency_injection
- p99_latency
- node_0_io
Wraps around if the episode exceeds the trace length.
"""
def __init__(self, csv_path: str) -> None:
self._steps: List[TraceStep] = []
self._load(csv_path)
def _load(self, csv_path: str) -> None:
if not os.path.exists(csv_path):
raise FileNotFoundError(f"Trace file not found: {csv_path}")
with open(csv_path, "r") as f:
reader = csv.DictReader(f)
for row in reader:
ts = TraceStep(
request_rate=float(row["request_rate"]),
latency_injection=float(row.get("latency_injection", 0.0) or 0.0),
p99_latency=float(row.get("p99_latency", 0.0) or 0.0),
node_0_io=float(row.get("node_0_io", 0.0) or 0.0),
)
# Parse per-node metrics
for key, val in row.items():
if key.startswith("node_") and key.endswith("_cpu"):
idx = int(key.split("_")[1])
ts.node_cpu[idx] = float(val)
elif key.startswith("node_") and key.endswith("_mem"):
idx = int(key.split("_")[1])
ts.node_mem[idx] = float(val)
self._steps.append(ts)
def __len__(self) -> int:
return len(self._steps)
def get_step(self, step: int, offset: int = 0) -> TraceStep:
"""Get trace data for a given step plus deterministic offset. Wraps around."""
if not self._steps:
return TraceStep()
return self._steps[(int(step) + int(offset)) % len(self._steps)]
# ---------------------------------------------------------------------------
# Default trace path
# ---------------------------------------------------------------------------
_DEFAULT_TRACE = os.path.join(
os.path.dirname(__file__), "traces", "alibaba_v2021_8node_500steps.csv"
)
_REAL_TRACE = os.path.join(
os.path.dirname(__file__), "traces", "real_production_trace.csv"
)
def load_default_trace() -> Optional[TraceReplay]:
"""Load the best available trace, or None if missing."""
if os.path.exists(_REAL_TRACE):
return TraceReplay(_REAL_TRACE)
if os.path.exists(_DEFAULT_TRACE):
return TraceReplay(_DEFAULT_TRACE)
return None
|