live / tests /test_offline_dc.py
github-actions[bot]
deploy: sync from GitHub 2026-04-18T00:48:45Z
96bb363
Raw
History Blame Contribute Delete
5.17 kB
"""Tests for OfflineDatacenter: step-by-step generation, batch changes."""
from __future__ import annotations
from fractions import Fraction
import numpy as np
import pytest
from mlenergy_data.modeling import ITLMixtureModel
from openg2g.clock import SimulationClock
from openg2g.coordinator import SimulationLog
from openg2g.datacenter.command import DatacenterCommand, SetBatchSize
from openg2g.datacenter.config import DatacenterConfig, InferenceModelSpec
from openg2g.datacenter.offline import OfflineDatacenter, OfflineDatacenterState, OfflineWorkload
from openg2g.datacenter.workloads.inference import (
InferenceData,
InferenceTemplateStore,
InferenceTrace,
InferenceTraceStore,
ITLFitStore,
_build_per_gpu_power_template,
)
from openg2g.events import EventEmitter
MODEL = InferenceModelSpec(
model_label="TestModel", num_replicas=10, gpus_per_replica=1, initial_batch_size=128, itl_deadline_s=0.1
)
DC_CFG = DatacenterConfig(gpus_per_server=8)
_EVENTS = EventEmitter(SimulationClock(Fraction(1, 10)), SimulationLog(), "custom")
def _make_simple_store(dt: float = 0.1, T: float = 100.0) -> InferenceTemplateStore:
"""Create a minimal InferenceTemplateStore with synthetic data."""
t = np.linspace(0, 10, 100)
p = np.linspace(100, 200, 100)
traces = {
"TestModel": {
64: InferenceTrace(t_s=t, power_w=p * (64 / 128.0), measured_gpus=1),
128: InferenceTrace(t_s=t, power_w=p, measured_gpus=1),
}
}
store = InferenceTraceStore(traces)
return store.build_templates(duration_s=T, dt_s=dt)
def _make_workload(templates: InferenceTemplateStore, itl_fits: ITLFitStore | None = None) -> OfflineWorkload:
"""Create an OfflineWorkload from templates and optional ITL fits."""
return OfflineWorkload(
inference_data=InferenceData(
(MODEL,),
power_templates=templates,
itl_fits=itl_fits,
),
)
def test_step_returns_offline_state():
store = _make_simple_store()
dc = OfflineDatacenter(DC_CFG, _make_workload(store), dt_s=Fraction(1, 10))
clock = SimulationClock(tick_s=Fraction(1, 10))
state = dc.step(clock, _EVENTS)
assert isinstance(state, OfflineDatacenterState)
assert state.power_w.a >= 0
assert state.power_w.b >= 0
assert state.power_w.c >= 0
assert "TestModel" in state.batch_size_by_model
assert state.batch_size_by_model["TestModel"] == 128
def test_step_produces_correct_number_of_states():
"""Stepping produces one state per call with monotonically increasing times."""
store = _make_simple_store()
dc = OfflineDatacenter(DC_CFG, _make_workload(store), dt_s=Fraction(1, 10))
clock = SimulationClock(tick_s=Fraction(1, 10))
states = []
for _ in range(10):
states.append(dc.step(clock, _EVENTS))
clock.advance()
assert len(states) == 10
times = [s.time_s for s in states]
for i in range(1, len(times)):
assert times[i] > times[i - 1]
def test_batch_change_takes_effect_immediately():
"""Batch size change via apply_control takes effect on the very next step."""
store = _make_simple_store()
dc = OfflineDatacenter(DC_CFG, _make_workload(store), dt_s=Fraction(1, 10))
clock = SimulationClock(tick_s=Fraction(1, 10))
for _ in range(5):
state = dc.step(clock, _EVENTS)
assert state.batch_size_by_model["TestModel"] == 128
clock.advance()
dc.apply_control(SetBatchSize(batch_size_by_model={"TestModel": 64}), _EVENTS)
state = dc.step(clock, _EVENTS)
assert state.batch_size_by_model["TestModel"] == 64
def test_build_periodic_template_shape():
"""Template should have the right number of steps."""
t = np.linspace(0, 10, 200)
p = np.sin(t) * 100 + 200
trace = InferenceTrace(t_s=t, power_w=p, measured_gpus=2)
tpl = _build_per_gpu_power_template(trace, dt_s=0.1, duration_s=50.0)
expected_steps = int(np.ceil(50.0 / 0.1)) + 1
assert tpl.shape[0] == expected_steps
assert np.all(tpl >= 0)
def test_offline_datacenter_emits_observed_itl_when_latency_fits_is_set():
store = _make_simple_store()
fake_params = ITLMixtureModel(
loc=0.01,
pi_steady=0.8,
sigma_steady=0.1,
scale_steady=0.05,
pi_stall=0.2,
sigma_stall=0.2,
scale_stall=0.1,
)
latency_fits = ITLFitStore({"TestModel": {128: fake_params}})
dc = OfflineDatacenter(DC_CFG, _make_workload(store, itl_fits=latency_fits), dt_s=Fraction(1, 10))
state = dc.step(SimulationClock(tick_s=Fraction(1, 10)), _EVENTS)
assert "TestModel" in state.observed_itl_s_by_model
assert np.isfinite(state.observed_itl_s_by_model["TestModel"])
def test_apply_control_rejects_unknown_command():
"""apply_control raises TypeError for unsupported command types."""
class _CustomCommand(DatacenterCommand):
pass
store = _make_simple_store()
dc = OfflineDatacenter(DC_CFG, _make_workload(store), dt_s=Fraction(1, 10))
with pytest.raises(TypeError, match="OfflineDatacenter does not support"):
dc.apply_control(_CustomCommand(), _EVENTS)