| """Tests for OfflineDatacenter: step-by-step generation, batch changes.""" |
|
|
| from __future__ import annotations |
|
|
| from fractions import Fraction |
|
|
| import numpy as np |
| import pytest |
| from mlenergy_data.modeling import ITLMixtureModel |
|
|
| from openg2g.clock import SimulationClock |
| from openg2g.coordinator import SimulationLog |
| from openg2g.datacenter.command import DatacenterCommand, SetBatchSize |
| from openg2g.datacenter.config import DatacenterConfig, InferenceModelSpec |
| from openg2g.datacenter.offline import OfflineDatacenter, OfflineDatacenterState, OfflineWorkload |
| from openg2g.datacenter.workloads.inference import ( |
| InferenceData, |
| InferenceTemplateStore, |
| InferenceTrace, |
| InferenceTraceStore, |
| ITLFitStore, |
| _build_per_gpu_power_template, |
| ) |
| from openg2g.events import EventEmitter |
|
|
| MODEL = InferenceModelSpec( |
| model_label="TestModel", num_replicas=10, gpus_per_replica=1, initial_batch_size=128, itl_deadline_s=0.1 |
| ) |
| DC_CFG = DatacenterConfig(gpus_per_server=8) |
| _EVENTS = EventEmitter(SimulationClock(Fraction(1, 10)), SimulationLog(), "custom") |
|
|
|
|
| def _make_simple_store(dt: float = 0.1, T: float = 100.0) -> InferenceTemplateStore: |
| """Create a minimal InferenceTemplateStore with synthetic data.""" |
| t = np.linspace(0, 10, 100) |
| p = np.linspace(100, 200, 100) |
|
|
| traces = { |
| "TestModel": { |
| 64: InferenceTrace(t_s=t, power_w=p * (64 / 128.0), measured_gpus=1), |
| 128: InferenceTrace(t_s=t, power_w=p, measured_gpus=1), |
| } |
| } |
|
|
| store = InferenceTraceStore(traces) |
| return store.build_templates(duration_s=T, dt_s=dt) |
|
|
|
|
| def _make_workload(templates: InferenceTemplateStore, itl_fits: ITLFitStore | None = None) -> OfflineWorkload: |
| """Create an OfflineWorkload from templates and optional ITL fits.""" |
| return OfflineWorkload( |
| inference_data=InferenceData( |
| (MODEL,), |
| power_templates=templates, |
| itl_fits=itl_fits, |
| ), |
| ) |
|
|
|
|
| def test_step_returns_offline_state(): |
| store = _make_simple_store() |
| dc = OfflineDatacenter(DC_CFG, _make_workload(store), dt_s=Fraction(1, 10)) |
|
|
| clock = SimulationClock(tick_s=Fraction(1, 10)) |
| state = dc.step(clock, _EVENTS) |
|
|
| assert isinstance(state, OfflineDatacenterState) |
| assert state.power_w.a >= 0 |
| assert state.power_w.b >= 0 |
| assert state.power_w.c >= 0 |
| assert "TestModel" in state.batch_size_by_model |
| assert state.batch_size_by_model["TestModel"] == 128 |
|
|
|
|
| def test_step_produces_correct_number_of_states(): |
| """Stepping produces one state per call with monotonically increasing times.""" |
| store = _make_simple_store() |
| dc = OfflineDatacenter(DC_CFG, _make_workload(store), dt_s=Fraction(1, 10)) |
|
|
| clock = SimulationClock(tick_s=Fraction(1, 10)) |
| states = [] |
| for _ in range(10): |
| states.append(dc.step(clock, _EVENTS)) |
| clock.advance() |
|
|
| assert len(states) == 10 |
| times = [s.time_s for s in states] |
| for i in range(1, len(times)): |
| assert times[i] > times[i - 1] |
|
|
|
|
| def test_batch_change_takes_effect_immediately(): |
| """Batch size change via apply_control takes effect on the very next step.""" |
| store = _make_simple_store() |
| dc = OfflineDatacenter(DC_CFG, _make_workload(store), dt_s=Fraction(1, 10)) |
|
|
| clock = SimulationClock(tick_s=Fraction(1, 10)) |
|
|
| for _ in range(5): |
| state = dc.step(clock, _EVENTS) |
| assert state.batch_size_by_model["TestModel"] == 128 |
| clock.advance() |
|
|
| dc.apply_control(SetBatchSize(batch_size_by_model={"TestModel": 64}), _EVENTS) |
|
|
| state = dc.step(clock, _EVENTS) |
| assert state.batch_size_by_model["TestModel"] == 64 |
|
|
|
|
| def test_build_periodic_template_shape(): |
| """Template should have the right number of steps.""" |
| t = np.linspace(0, 10, 200) |
| p = np.sin(t) * 100 + 200 |
| trace = InferenceTrace(t_s=t, power_w=p, measured_gpus=2) |
|
|
| tpl = _build_per_gpu_power_template(trace, dt_s=0.1, duration_s=50.0) |
|
|
| expected_steps = int(np.ceil(50.0 / 0.1)) + 1 |
| assert tpl.shape[0] == expected_steps |
| assert np.all(tpl >= 0) |
|
|
|
|
| def test_offline_datacenter_emits_observed_itl_when_latency_fits_is_set(): |
| store = _make_simple_store() |
| fake_params = ITLMixtureModel( |
| loc=0.01, |
| pi_steady=0.8, |
| sigma_steady=0.1, |
| scale_steady=0.05, |
| pi_stall=0.2, |
| sigma_stall=0.2, |
| scale_stall=0.1, |
| ) |
| latency_fits = ITLFitStore({"TestModel": {128: fake_params}}) |
| dc = OfflineDatacenter(DC_CFG, _make_workload(store, itl_fits=latency_fits), dt_s=Fraction(1, 10)) |
|
|
| state = dc.step(SimulationClock(tick_s=Fraction(1, 10)), _EVENTS) |
| assert "TestModel" in state.observed_itl_s_by_model |
| assert np.isfinite(state.observed_itl_s_by_model["TestModel"]) |
|
|
|
|
| def test_apply_control_rejects_unknown_command(): |
| """apply_control raises TypeError for unsupported command types.""" |
|
|
| class _CustomCommand(DatacenterCommand): |
| pass |
|
|
| store = _make_simple_store() |
| dc = OfflineDatacenter(DC_CFG, _make_workload(store), dt_s=Fraction(1, 10)) |
|
|
| with pytest.raises(TypeError, match="OfflineDatacenter does not support"): |
| dc.apply_control(_CustomCommand(), _EVENTS) |
|
|