live / tests /test_online_dc.py
github-actions[bot]
deploy: sync from GitHub 2026-04-18T00:48:45Z
96bb363
Raw
History Blame Contribute Delete
18.7 kB
"""Tests for OnlineDatacenter pure-logic components.
Tests _RollingPowerBuffer, shared InferencePowerAugmenter integration,
_LoadGenerator.get_observed_itl, _parse_prometheus_text, and health
check functions without requiring real vLLM servers or zeusd instances.
"""
from __future__ import annotations
import collections
import contextlib
import threading
import time
from fractions import Fraction
from unittest.mock import MagicMock, patch
import numpy as np
import pytest
from zeus.monitor.power_streaming import PowerReadings, PowerStreamingClient
from openg2g.clock import SimulationClock
from openg2g.coordinator import SimulationLog
from openg2g.datacenter.config import DatacenterConfig, InferenceModelSpec, PowerAugmentationConfig
from openg2g.datacenter.layout import ServerLayout
from openg2g.datacenter.online import (
GPUEndpointMapping,
LiveServerConfig,
OnlineDatacenter,
VLLMDeployment,
_parse_prometheus_text,
_RollingPowerBuffer,
)
from openg2g.datacenter.workloads.inference import InferencePowerAugmenter
from openg2g.events import EventEmitter
_EVENTS = EventEmitter(SimulationClock(Fraction(1, 10)), SimulationLog(), "custom")
def _make_deployment(
label: str = "test-model",
num_replicas: int = 100,
gpus_per_replica: int = 1,
host: str = "node1",
port: int = 4938,
gpu_indices: tuple[int, ...] = (0,),
) -> VLLMDeployment:
spec = InferenceModelSpec(
model_label=label,
num_replicas=num_replicas,
gpus_per_replica=gpus_per_replica,
initial_batch_size=128,
itl_deadline_s=0.1,
)
return VLLMDeployment(
spec=spec,
vllm_base_url=f"http://{host}:8000",
gpu_endpoints=(GPUEndpointMapping(host=host, port=port, gpu_indices=gpu_indices),),
)
def _fake_power_readings(
deployments: list[VLLMDeployment],
per_gpu_w: float = 300.0,
) -> dict[str, PowerReadings]:
"""Create fake power readings matching PowerStreamingClient output."""
readings: dict[str, PowerReadings] = {}
for d in deployments:
for ep in d.gpu_endpoints:
readings[ep.endpoint_key] = PowerReadings(
timestamp_s=0.0,
gpu_power_w={idx: per_gpu_w for idx in ep.gpu_indices},
)
return readings
def _make_itl_stub(model_label: str, itl_window_s: float = 1.0):
"""Create a _LoadGenerator stub with just enough state for get_observed_itl."""
from openg2g.datacenter.online import _LoadGenerator
lg = _LoadGenerator.__new__(_LoadGenerator)
lg._itl_window_s = itl_window_s
lg._lock = threading.Lock()
lg._itl_samples = {model_label: collections.deque(maxlen=100_000)}
return lg
@contextlib.contextmanager
def _online_dc(
deployments: list[VLLMDeployment],
per_gpu_w: float = 300.0,
stagger_buffer_s: float = 10.0,
prometheus_poll_interval_s: float = 0,
):
"""Build an OnlineDatacenter with mocked power client and load gen.
Yields the dc within a live STAGGER_BUFFER_S patch, so both __init__
and any subsequent _warmup calls see the same value.
"""
fake_client = MagicMock(spec=PowerStreamingClient)
fake_client.get_power.return_value = _fake_power_readings(deployments, per_gpu_w)
with (
patch("openg2g.datacenter.online._LoadGenerator"),
patch("openg2g.datacenter.online.PowerStreamingClient", return_value=fake_client),
patch("openg2g.datacenter.online.STAGGER_BUFFER_S", stagger_buffer_s),
):
dc = OnlineDatacenter(
DatacenterConfig(gpus_per_server=8, base_kw_per_phase=0.0),
deployments,
dt_s=Fraction(1, 10),
seed=42,
power_augmentation=PowerAugmentationConfig(
noise_fraction=0.0,
amplitude_scale_range=(1.0, 1.0),
),
live_server=LiveServerConfig(
prometheus_poll_interval_s=prometheus_poll_interval_s,
),
)
yield dc, fake_client
class TestRollingPowerBuffer:
def test_empty_buffer_returns_zeros(self) -> None:
buf = _RollingPowerBuffer(["m1"])
offsets = np.array([0.0, 1.0, 2.0])
result = buf.sample_servers("m1", now=10.0, stagger_offsets=offsets)
assert result.shape == (3,)
np.testing.assert_array_equal(result, 0.0)
def test_single_sample(self) -> None:
buf = _RollingPowerBuffer(["m1"])
buf.append("m1", 5.0, 300.0)
offsets = np.array([0.0, 1.0, 10.0])
result = buf.sample_servers("m1", now=5.0, stagger_offsets=offsets)
np.testing.assert_array_equal(result, 300.0)
def test_stagger_lookup(self) -> None:
buf = _RollingPowerBuffer(["m1"])
buf.append("m1", 1.0, 100.0)
buf.append("m1", 3.0, 300.0)
buf.append("m1", 5.0, 500.0)
offsets = np.array([0.0, 2.5, 4.5])
result = buf.sample_servers("m1", now=5.0, stagger_offsets=offsets)
assert result[0] == 500.0
assert result[1] == 100.0
assert result[2] == 100.0
def test_before_first_returns_first(self) -> None:
buf = _RollingPowerBuffer(["m1"])
buf.append("m1", 5.0, 42.0)
offsets = np.array([10.0])
result = buf.sample_servers("m1", now=5.0, stagger_offsets=offsets)
assert result[0] == 42.0
def test_after_last_returns_last(self) -> None:
buf = _RollingPowerBuffer(["m1"])
buf.append("m1", 1.0, 100.0)
buf.append("m1", 2.0, 200.0)
offsets = np.array([0.0])
result = buf.sample_servers("m1", now=10.0, stagger_offsets=offsets)
assert result[0] == 200.0
def test_clear(self) -> None:
buf = _RollingPowerBuffer(["m1", "m2"])
buf.append("m1", 1.0, 100.0)
buf.append("m2", 1.0, 200.0)
buf.clear()
result = buf.sample_servers("m1", now=1.0, stagger_offsets=np.array([0.0]))
assert result[0] == 0.0
def test_shape_matches_offsets(self) -> None:
buf = _RollingPowerBuffer(["m1"])
buf.append("m1", 1.0, 100.0)
for n in [1, 5, 20]:
offsets = np.zeros(n)
result = buf.sample_servers("m1", now=1.0, stagger_offsets=offsets)
assert result.shape == (n,)
class TestOnlineAugmentationPipeline:
"""Test the shared InferencePowerAugmenter with ServerLayout built for online mode."""
def _build_layout_and_augmenter(
self,
num_replicas: int = 100,
gpus_per_replica: int = 1,
gpus_per_server: int = 8,
noise_fraction: float = 0.0,
amplitude_scale_range: tuple[float, float] = (1.0, 1.0),
seed: int = 42,
) -> tuple[ServerLayout, InferencePowerAugmenter]:
import math
from openg2g.datacenter.config import InferenceRampSchedule
from openg2g.datacenter.layout import RampActivationPolicy
from openg2g.utils import split_integer_evenly
total_gpus = num_replicas * gpus_per_replica
num_servers = math.ceil(total_gpus / gpus_per_server)
rng = np.random.default_rng(seed)
sA, sB, sC = split_integer_evenly(num_servers, 3)
phase_list = np.asarray(([0] * sA) + ([1] * sB) + ([2] * sC), dtype=int)
rng.shuffle(phase_list)
policy = RampActivationPolicy(InferenceRampSchedule(), num_servers, rng)
stagger_offsets = rng.uniform(0.0, 10.0, size=num_servers)
amplitude_scales = rng.uniform(
amplitude_scale_range[0],
amplitude_scale_range[1],
size=num_servers,
)
gpus_per_server_list = np.full(num_servers, gpus_per_server, dtype=int)
tail = total_gpus - (num_servers - 1) * gpus_per_server
gpus_per_server_list[-1] = int(tail) if tail > 0 else gpus_per_server
layout = ServerLayout(
num_servers=num_servers,
total_gpus=total_gpus,
gpus_per_replica=gpus_per_replica,
gpus_per_server_list=gpus_per_server_list,
phase_list=phase_list,
stagger_offsets=stagger_offsets,
amplitude_scales=amplitude_scales,
noise_fraction=noise_fraction,
)
augmenter = InferencePowerAugmenter(
layouts={"test-model": layout},
policies={"test-model": policy},
seed=seed + 12345,
)
return layout, augmenter
def test_stagger_offsets_are_float(self) -> None:
layout, _ = self._build_layout_and_augmenter()
assert layout.stagger_offsets.dtype == np.float64
def test_uniform_power_scaling(self) -> None:
layout, augmenter = self._build_layout_and_augmenter(
num_replicas=100,
gpus_per_replica=1,
gpus_per_server=8,
noise_fraction=0.0,
amplitude_scale_range=(1.0, 1.0),
)
per_gpu = np.full(layout.num_servers, 300.0)
aug = augmenter.augment({"test-model": per_gpu}, t=0.0)
total = aug.power_w.a + aug.power_w.b + aug.power_w.c
expected = 300.0 * 100
assert total == pytest.approx(expected, rel=1e-3)
def test_noise_adds_variance(self) -> None:
layout, augmenter = self._build_layout_and_augmenter(noise_fraction=0.1)
per_gpu = np.full(layout.num_servers, 300.0)
values = []
for t in range(50):
aug = augmenter.augment({"test-model": per_gpu}, t=float(t))
values.append(aug.power_w.a + aug.power_w.b + aug.power_w.c)
assert np.std(values) > 0
def test_phase_shares_from_layout(self) -> None:
layout, _ = self._build_layout_and_augmenter()
counts = np.bincount(layout.phase_list, minlength=3)
assert counts.sum() == layout.num_servers
for c in counts:
assert c > 0
def test_active_replicas_reported(self) -> None:
layout, augmenter = self._build_layout_and_augmenter(num_replicas=100, gpus_per_replica=1, gpus_per_server=8)
per_gpu = np.full(layout.num_servers, 100.0)
aug = augmenter.augment({"test-model": per_gpu}, t=0.0)
assert aug.active_replicas_by_model["test-model"] == 100
def test_power_nonnegative(self) -> None:
layout, augmenter = self._build_layout_and_augmenter(noise_fraction=0.5)
per_gpu = np.full(layout.num_servers, 1.0)
for t in range(100):
aug = augmenter.augment({"test-model": per_gpu}, t=float(t))
assert aug.power_w.a >= 0.0
assert aug.power_w.b >= 0.0
assert aug.power_w.c >= 0.0
class TestLoadGeneratorITL:
MODEL = "test-model"
def test_get_observed_itl_empty(self) -> None:
lg = _make_itl_stub(self.MODEL)
assert np.isnan(lg.get_observed_itl(self.MODEL))
def test_get_observed_itl_windowed(self) -> None:
lg = _make_itl_stub(self.MODEL)
now = time.monotonic()
lg._itl_samples[self.MODEL].append((now - 5.0, 0.100))
lg._itl_samples[self.MODEL].append((now - 0.5, 0.050))
lg._itl_samples[self.MODEL].append((now - 0.1, 0.030))
result = lg.get_observed_itl(self.MODEL, window_s=1.0)
assert result == pytest.approx(0.040, rel=1e-6)
def test_get_observed_itl_all_expired(self) -> None:
lg = _make_itl_stub(self.MODEL, itl_window_s=0.01)
lg._itl_samples[self.MODEL].append((time.monotonic() - 10.0, 0.050))
assert np.isnan(lg.get_observed_itl(self.MODEL, window_s=0.01))
def test_per_token_itl_samples_stored(self) -> None:
"""Verify per-token ITL samples are individual entries, not averages."""
lg = _make_itl_stub(self.MODEL, itl_window_s=10.0)
now = time.monotonic()
for i in range(5):
lg._itl_samples[self.MODEL].append((now - 0.1 * i, 0.010 + 0.002 * i))
result = lg.get_observed_itl(self.MODEL, window_s=10.0)
expected = sum(0.010 + 0.002 * i for i in range(5)) / 5
assert result == pytest.approx(expected, rel=1e-6)
class TestParsePrometheusText:
def test_basic_gauges(self) -> None:
text = """# HELP vllm:num_requests_running Number of requests running
# TYPE vllm:num_requests_running gauge
vllm:num_requests_running{model_name="m"} 32
# HELP vllm:kv_cache_usage_perc KV cache usage
# TYPE vllm:kv_cache_usage_perc gauge
vllm:kv_cache_usage_perc{model_name="m"} 0.45
"""
result = _parse_prometheus_text(text)
assert result["num_requests_running"] == 32.0
assert result["kv_cache_usage_perc"] == pytest.approx(0.45)
def test_empty_text(self) -> None:
assert _parse_prometheus_text("") == {}
def test_comments_only(self) -> None:
text = "# HELP something\n# TYPE something gauge\n"
assert _parse_prometheus_text(text) == {}
def test_summing_multiple_labels(self) -> None:
text = """vllm:num_requests_running{model_name="a"} 10
vllm:num_requests_running{model_name="b"} 20
"""
result = _parse_prometheus_text(text)
assert result["num_requests_running"] == 30.0
def test_all_metrics(self) -> None:
text = """vllm:num_requests_running{} 5
vllm:num_requests_waiting{} 2
vllm:num_preemptions_total{} 1
vllm:kv_cache_usage_perc{} 0.78
"""
result = _parse_prometheus_text(text)
assert result["num_requests_running"] == 5.0
assert result["num_requests_waiting"] == 2.0
assert result["num_preemptions_total"] == 1.0
assert result["kv_cache_usage_perc"] == pytest.approx(0.78)
class TestOnlineDatacenterStep:
"""Integration test for OnlineDatacenter.step() with a fake power client.
Exercises the full path: power reading -> rolling buffer -> shared
InferencePowerAugmenter -> three-phase power output, without requiring
real vLLM servers or zeusd.
"""
def test_step_produces_nonzero_power(self) -> None:
dep = _make_deployment(num_replicas=100, gpu_indices=(0, 1, 2, 3))
with _online_dc([dep], per_gpu_w=300.0) as (dc, _):
dc._started = True
clock = SimulationClock(tick_s=Fraction(1, 10))
state = dc.step(clock, _EVENTS)
total_power = state.power_w.a + state.power_w.b + state.power_w.c
assert total_power > 0
assert dep.model_label in state.augmented_power_w_by_model
assert state.augmented_power_w_by_model[dep.model_label] > 0
assert dep.model_label in state.measured_power_w_by_model
assert state.measured_power_w_by_model[dep.model_label] == pytest.approx(300.0 * 4)
def test_step_power_scales_with_replicas(self) -> None:
powers = []
for n_replicas in [50, 200]:
dep = _make_deployment(num_replicas=n_replicas, gpu_indices=(0,))
with _online_dc([dep], per_gpu_w=300.0) as (dc, _):
dc._started = True
clock = SimulationClock(tick_s=Fraction(1, 10))
state = dc.step(clock, _EVENTS)
powers.append(state.power_w.a + state.power_w.b + state.power_w.c)
assert powers[1] > powers[0]
assert powers[1] / powers[0] == pytest.approx(200.0 / 50.0, rel=0.1)
def test_phase_shares_from_layout(self) -> None:
dep = _make_deployment(num_replicas=100, gpu_indices=(0,))
with _online_dc([dep]) as (dc, _):
shares = dc.phase_share_by_model
assert dep.model_label in shares
assert shares[dep.model_label].shape == (3,)
assert shares[dep.model_label].sum() == pytest.approx(1.0)
class TestWarmup:
"""Test the warmup phase of OnlineDatacenter.start()."""
def test_warmup_completes_when_saturated(self) -> None:
dep = _make_deployment(num_replicas=100, gpu_indices=(0, 1))
with _online_dc([dep], stagger_buffer_s=0.3) as (dc, _):
prom_mock = MagicMock()
prom_mock.get_latest.return_value = {dep.model_label: {"num_requests_running": 128.0}}
dc._prometheus = prom_mock
dc._warmup(timeout_s=5.0, poll_interval_s=0.05)
def test_warmup_waits_for_buffer_after_saturation(self) -> None:
dep = _make_deployment(num_replicas=100, gpu_indices=(0, 1))
with _online_dc([dep], stagger_buffer_s=0.4) as (dc, _):
prom_mock = MagicMock()
prom_mock.get_latest.return_value = {dep.model_label: {"num_requests_running": 128.0}}
dc._prometheus = prom_mock
t_before = time.monotonic()
dc._warmup(timeout_s=5.0, poll_interval_s=0.05)
elapsed = time.monotonic() - t_before
# Must wait at least stagger_buffer_s after saturation
assert elapsed >= 0.4
def test_warmup_timeout_raises_with_trajectory(self) -> None:
dep = _make_deployment(num_replicas=100, gpu_indices=(0, 1))
with _online_dc([dep], stagger_buffer_s=0.1) as (dc, _):
prom_mock = MagicMock()
prom_mock.get_latest.return_value = {dep.model_label: {"num_requests_running": 10.0}}
dc._prometheus = prom_mock
with pytest.raises(RuntimeError, match="Warmup timed out") as exc_info:
dc._warmup(timeout_s=0.5, poll_interval_s=0.05)
msg = str(exc_info.value)
assert dep.model_label in msg
assert "target: 128" in msg
assert "reached: 10" in msg
assert "t=0s:" in msg
def test_warmup_no_prometheus_waits_for_buffer_only(self) -> None:
dep = _make_deployment(num_replicas=100, gpu_indices=(0, 1))
with _online_dc([dep], stagger_buffer_s=0.3, prometheus_poll_interval_s=0) as (dc, _):
assert dc._prometheus is None
t_before = time.monotonic()
dc._warmup(timeout_s=5.0, poll_interval_s=0.05)
elapsed = time.monotonic() - t_before
assert elapsed >= 0.3
class TestHealthChecks:
def test_check_vllm_health_failure(self) -> None:
from openg2g.datacenter.online import _check_vllm_health
with pytest.raises(RuntimeError, match="vLLM health check failed"):
_check_vllm_health("http://nonexistent-host-12345:9999", timeout_s=0.5)
def test_check_vllm_model_failure(self) -> None:
from openg2g.datacenter.online import _check_vllm_model
with pytest.raises(RuntimeError, match="vLLM model check failed"):
_check_vllm_model("http://nonexistent-host-12345:9999", "some-model", timeout_s=0.5)
def test_check_zeusd_health_failure(self) -> None:
from openg2g.datacenter.online import _check_zeusd_health
with pytest.raises(RuntimeError, match="zeusd health check failed"):
_check_zeusd_health("nonexistent-host-12345", port=9999, timeout_s=0.5)