File size: 3,224 Bytes
2ba6413
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
facabc7
 
2ba6413
 
 
 
 
 
 
 
 
facabc7
 
 
 
 
 
2ba6413
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
facabc7
 
 
2ba6413
 
 
 
 
 
 
 
 
 
 
 
 
 
54da37b
 
2ba6413
 
54da37b
2ba6413
 
 
 
 
 
 
 
 
facabc7
 
 
2ba6413
 
 
facabc7
 
 
2ba6413
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
"""
Trace replay loader for DIME.

Loads pre-processed Alibaba-style cluster trace CSV files and provides
step-by-step replay of real-world traffic patterns, CPU baselines,
and latency injections.
"""

from __future__ import annotations

import csv
import os
from dataclasses import dataclass, field
from typing import Dict, List, Optional


@dataclass
class TraceStep:
    """One step of trace data."""

    request_rate: float = 100.0
    latency_injection: float = 0.0
    p99_latency: float = 0.0
    node_0_io: float = 0.0
    node_cpu: Dict[int, float] = field(default_factory=dict)
    node_mem: Dict[int, float] = field(default_factory=dict)


class TraceReplay:
    """
    Load and replay a trace CSV file step-by-step.

    The CSV must have columns:
        step, node_0_cpu, node_0_mem, ..., request_rate

    Optionally supports:
        - latency_injection
        - p99_latency
        - node_0_io

    Wraps around if the episode exceeds the trace length.
    """

    def __init__(self, csv_path: str) -> None:
        self._steps: List[TraceStep] = []
        self._load(csv_path)

    def _load(self, csv_path: str) -> None:
        if not os.path.exists(csv_path):
            raise FileNotFoundError(f"Trace file not found: {csv_path}")

        with open(csv_path, "r") as f:
            reader = csv.DictReader(f)
            for row in reader:
                ts = TraceStep(
                    request_rate=float(row["request_rate"]),
                    latency_injection=float(row.get("latency_injection", 0.0) or 0.0),
                    p99_latency=float(row.get("p99_latency", 0.0) or 0.0),
                    node_0_io=float(row.get("node_0_io", 0.0) or 0.0),
                )
                # Parse per-node metrics
                for key, val in row.items():
                    if key.startswith("node_") and key.endswith("_cpu"):
                        idx = int(key.split("_")[1])
                        ts.node_cpu[idx] = float(val)
                    elif key.startswith("node_") and key.endswith("_mem"):
                        idx = int(key.split("_")[1])
                        ts.node_mem[idx] = float(val)
                self._steps.append(ts)

    def __len__(self) -> int:
        return len(self._steps)

    def get_step(self, step: int, offset: int = 0) -> TraceStep:
        """Get trace data for a given step plus deterministic offset. Wraps around."""
        if not self._steps:
            return TraceStep()
        return self._steps[(int(step) + int(offset)) % len(self._steps)]


# ---------------------------------------------------------------------------
# Default trace path
# ---------------------------------------------------------------------------

_DEFAULT_TRACE = os.path.join(
    os.path.dirname(__file__), "traces", "alibaba_v2021_8node_500steps.csv"
)
_REAL_TRACE = os.path.join(
    os.path.dirname(__file__), "traces", "real_production_trace.csv"
)


def load_default_trace() -> Optional[TraceReplay]:
    """Load the best available trace, or None if missing."""
    if os.path.exists(_REAL_TRACE):
        return TraceReplay(_REAL_TRACE)
    if os.path.exists(_DEFAULT_TRACE):
        return TraceReplay(_DEFAULT_TRACE)
    return None