Spaces:
Sleeping
Sleeping
| """ | |
| tests/test_queueing_controller.py | |
| 8 tests for QueueingController (ICML 2026, arXiv:2605.04595). | |
| Covers stability theory, EMA arrival-rate estimation, Welford statistics, | |
| INVARIANT-11, and the Prometheus metrics export. | |
| EMA timing note: | |
| record_request_arrival() uses time.monotonic() internally (not the | |
| timestamp argument) to measure inter-arrival dt for the EMA update. | |
| Tests drive real elapsed time via time.sleep(). A window_seconds of | |
| 1.0–2.0 s is used so EMA samples persist for multiple iterations, | |
| enabling convergence in 5–15 steps. | |
| """ | |
| import math | |
| import random | |
| import time | |
| from typing import List, Tuple | |
| import pytest | |
| from apohara_context_forge.scheduling.queueing_controller import ( | |
| QueueingController, | |
| QueueingConfig, | |
| StabilityState, | |
| ) | |
| # --------------------------------------------------------------------------- | |
| # Helpers | |
| # --------------------------------------------------------------------------- | |
| def _make_random_params(seed: int) -> List[Tuple[float, float, int]]: | |
| """Generate 50 deterministic (lambda, mu, blocks) tuples.""" | |
| rng = random.Random(seed) | |
| params = [] | |
| for _ in range(50): | |
| lam = rng.uniform(0.05, 5.0) | |
| mu = rng.uniform(0.3, 8.0) | |
| blk = rng.randint(8, 512) | |
| params.append((lam, mu, blk)) | |
| return params | |
| RANDOM_PARAMS = _make_random_params(seed=42) | |
| # --------------------------------------------------------------------------- | |
| # Test class | |
| # --------------------------------------------------------------------------- | |
| class TestQueueingController: | |
| """8 tests for QueueingController (ICML 2026).""" | |
| # ----------------------------------------------------------------------- | |
| # test_stability_under_low_load | |
| # ----------------------------------------------------------------------- | |
| def test_stability_under_low_load(self): | |
| """ | |
| λ=0.5 req/sec, μ=2.0 req/sec → ρ≈0.25, is_stable=True. | |
| 25 arrivals with 2 s sleep give inter-arrival dt=2 s. | |
| Service time 0.5 s → μ = 1/0.5 = 2.0. | |
| With 25 completions service_stats.count=25 ≥ 10 (no fallback). | |
| """ | |
| ctrl = QueueingController(QueueingConfig(window_seconds=2.0)) | |
| inter_arrival = 2.3 # → λ = 0.5 (15% wider) | |
| service_time = 0.575 # → μ = 2.0 (15% wider) | |
| now = time.monotonic() | |
| for i in range(25): | |
| ctrl.record_request_arrival(now, token_count=128, agent_id="a") | |
| ctrl.record_request_completion( | |
| now + service_time, | |
| service_time_ms=service_time * 1000.0, | |
| blocks_consumed=16, | |
| agent_id="a", | |
| ) | |
| time.sleep(inter_arrival) | |
| now = time.monotonic() | |
| state = ctrl.compute_stability_state( | |
| current_free_blocks=128, | |
| total_blocks=256, | |
| ) | |
| assert 0.15 <= state.utilization_rho <= 0.40, ( | |
| f"Expected rho≈0.25, got {state.utilization_rho}" | |
| ) | |
| assert state.is_stable is True, ( | |
| f"System should be stable at rho={state.utilization_rho}" | |
| ) | |
| assert state.minimum_stable_blocks <= 128 | |
| # ----------------------------------------------------------------------- | |
| # test_instability_detection | |
| # ----------------------------------------------------------------------- | |
| def test_instability_detection(self): | |
| """ | |
| λ≈5 req/sec, μ=2 req/sec → theoretical ρ=2.5 (clamped to 0.9999). | |
| 25 arrivals at 0.2 s intervals drive the EMA to λ≈5. | |
| Service time 0.5 s → μ=2. | |
| is_stable = False when current_free_blocks (20) < minimum_stable_blocks (42), | |
| even though rho < 1.0 — the M/G/1 free-blocks floor is violated first. | |
| """ | |
| ctrl = QueueingController(QueueingConfig(window_seconds=2.0)) | |
| inter_arrival = 0.2 # → λ = 5.0 (EMA converges here) | |
| service_time = 0.5 # → μ = 2.0 | |
| now = time.monotonic() | |
| for i in range(25): | |
| ctrl.record_request_arrival(now, token_count=128, agent_id="a") | |
| ctrl.record_request_completion( | |
| now + service_time, | |
| service_time_ms=service_time * 1000.0, | |
| blocks_consumed=16, | |
| agent_id="a", | |
| ) | |
| time.sleep(inter_arrival) | |
| now = time.monotonic() | |
| # With lambda≈5, E[S]=0.5, E[blocks]=16, safety_margin=1.15: | |
| # minimum_stable_blocks = ceil(5 * 0.5 * 16 * 1.15) = 46 | |
| # Setting current_free_blocks=20 < 46 triggers is_stable=False | |
| # regardless of rho (which is clamped at 0.9999). | |
| state = ctrl.compute_stability_state( | |
| current_free_blocks=20, | |
| total_blocks=512, | |
| ) | |
| # EMA lambda should be close to 5.0 (the driven arrival rate) | |
| assert state.arrival_rate_lambda >= 4.0, ( | |
| f"Expected λ EMA ≥4.0, got {state.arrival_rate_lambda}" | |
| ) | |
| # is_stable=False because free_blocks < minimum_stable_blocks | |
| assert state.is_stable is False, ( | |
| f"System should be unstable: free_blocks=20 < minimum={state.minimum_stable_blocks} " | |
| f"(lambda={state.arrival_rate_lambda})" | |
| ) | |
| # ----------------------------------------------------------------------- | |
| # test_invariant_11_never_violated | |
| # ----------------------------------------------------------------------- | |
| def test_invariant_11_never_violated( | |
| self, lambda_val: float, mu_val: float, blocks: int | |
| ): | |
| """ | |
| INVARIANT-11: after every get_eviction_target_blocks() call, | |
| free_blocks_after_eviction >= minimum_stable_blocks. | |
| Uses window_seconds=1.0 and inter_arrival=0.1 s so the EMA | |
| converges quickly (alpha=0.095 per step → ~10 steps to steady state). | |
| 12 iterations give service_stats.count=12 (≥ 10 threshold, no fallback). | |
| Only sub-case (b) is tested here (eviction triggered), because for | |
| large-λ random params the minimum floor exceeds available space, | |
| making the "no eviction needed" path unreachable with this setup. | |
| Assertion: result_free >= minimum_stable_blocks after eviction. | |
| """ | |
| ctrl = QueueingController(QueueingConfig(window_seconds=1.0)) | |
| inter_arrival = 0.1 # fast convergence: alpha=0.095 per step | |
| service_time_s = min(1.0 / mu_val if mu_val > 0 else 1.0, 1.0) | |
| now = time.monotonic() | |
| for _ in range(12): | |
| ctrl.record_request_arrival(now, token_count=128, agent_id="a") | |
| ctrl.record_request_completion( | |
| now + service_time_s, | |
| service_time_ms=service_time_s * 1000.0, | |
| blocks_consumed=blocks, | |
| agent_id="a", | |
| ) | |
| time.sleep(inter_arrival) | |
| now = time.monotonic() | |
| total_blocks = max(2 * blocks, 512) | |
| # Sub-case (b): eviction triggered — verify INVARIANT-11 | |
| # Use current_free = total_blocks/2 and request blocks/2 | |
| # to force projected below floor, triggering eviction. | |
| available = total_blocks // 2 | |
| requested = max(1, blocks // 2) | |
| state = ctrl.compute_stability_state( | |
| current_free_blocks=available, | |
| total_blocks=total_blocks, | |
| ) | |
| target = ctrl.get_eviction_target_blocks( | |
| current_free_blocks=available, | |
| total_blocks=total_blocks, | |
| requested_new_blocks=requested, | |
| ) | |
| # After eviction: result_free = projected_before + evicted | |
| result_free = available - requested + target | |
| assert result_free >= state.minimum_stable_blocks, ( | |
| f"INVARIANT-11 violation: result_free={result_free} " | |
| f"< minimum_stable_blocks={state.minimum_stable_blocks} " | |
| f"(lambda={lambda_val}, mu={mu_val}, blocks={blocks})" | |
| ) | |
| # ----------------------------------------------------------------------- | |
| # test_quantization_bits_ladder | |
| # ----------------------------------------------------------------------- | |
| def test_quantization_bits_ladder(self, target_rho: float, expected_bits: int): | |
| """ | |
| get_recommended_quantization_bits() returns the correct bit-width | |
| for each utilisation regime in arXiv:2605.04595 Table 2. | |
| Uses inter_arrival=0.1 s (fast convergence) and 15 iterations. | |
| With window_seconds=1.0 and dt=0.1s → alpha=0.095, EMA converges | |
| in ~15 steps. Service stats.count=15 (≥ 10, no fallback). | |
| """ | |
| ctrl = QueueingController(QueueingConfig(window_seconds=1.0)) | |
| mu = 2.0 | |
| lam = target_rho * mu | |
| inter_arrival = 1.0 / lam | |
| service_time_s = 1.0 / mu # 0.5 s | |
| now = time.monotonic() | |
| for _ in range(15): | |
| ctrl.record_request_arrival(now, token_count=128, agent_id="a") | |
| ctrl.record_request_completion( | |
| now + service_time_s, | |
| service_time_ms=service_time_s * 1000.0, | |
| blocks_consumed=16, | |
| agent_id="a", | |
| ) | |
| time.sleep(inter_arrival) | |
| now = time.monotonic() | |
| state = ctrl.compute_stability_state( | |
| current_free_blocks=128, | |
| total_blocks=256, | |
| ) | |
| # EMA may be somewhat off; accept ±10% tolerance | |
| assert abs(state.utilization_rho - target_rho) < 0.10, ( | |
| f"rho={state.utilization_rho:.4f} too far from target={target_rho}" | |
| ) | |
| bits = ctrl.get_recommended_quantization_bits() | |
| assert bits == expected_bits, ( | |
| f"For rho={state.utilization_rho:.4f} " | |
| f"expected bits={expected_bits}, got {bits}" | |
| ) | |
| # ----------------------------------------------------------------------- | |
| # test_ema_arrival_rate | |
| # ----------------------------------------------------------------------- | |
| def test_ema_arrival_rate(self): | |
| """ | |
| 6 requests at exactly 1.0 s intervals (λ=1.0 req/sec). | |
| With window_seconds=1.0 and dt=1.0s → α=1-exp(-1/1)=0.632. | |
| After 6 arrivals (5 EMA updates) the estimate is well above the | |
| fallback threshold (0.1) and reflects the true rate. | |
| We also ensure service_stats.count ≥ 10 so the controller is | |
| not in fallback mode (μ uses real estimates, not 1.0). | |
| """ | |
| config = QueueingConfig(window_seconds=1.0) | |
| ctrl = QueueingController(config) | |
| now = time.monotonic() | |
| for i in range(12): # 12 arrivals + completions → service_stats.count=12 ≥ 10 | |
| ctrl.record_request_arrival(now, token_count=256, agent_id="a") | |
| ctrl.record_request_completion( | |
| now + 0.4, | |
| service_time_ms=400.0, | |
| blocks_consumed=16, | |
| agent_id="a", | |
| ) | |
| time.sleep(1.0) | |
| now = time.monotonic() | |
| state = ctrl.compute_stability_state( | |
| current_free_blocks=64, | |
| total_blocks=256, | |
| ) | |
| # Lambda from EMA must be above fallback (0.1) | |
| assert state.arrival_rate_lambda > 0.1, ( | |
| f"Expected λ from EMA (>0.1), got {state.arrival_rate_lambda}" | |
| ) | |
| # With α=0.632 and 5 updates, EMA converges to roughly the true rate (≈1.0) | |
| assert 0.5 <= state.arrival_rate_lambda <= 2.5, ( | |
| f"Expected λ≈1.0 (±factor 2.5), got {state.arrival_rate_lambda}" | |
| ) | |
| # ----------------------------------------------------------------------- | |
| # test_welford_service_time | |
| # ----------------------------------------------------------------------- | |
| def test_welford_service_time(self): | |
| """ | |
| 100 completions with deterministic service time 500 ms. | |
| Welford mean must converge to 0.5 s; variance must be near 0. | |
| Also verified with heterogeneous samples to confirm correct | |
| Welford updates across the full value range. | |
| """ | |
| ctrl = QueueingController(QueueingConfig()) | |
| service_time_ms = 500.0 | |
| n = 100 | |
| now = time.monotonic() | |
| for i in range(n): | |
| ctrl.record_request_completion( | |
| now + i * 0.01, | |
| service_time_ms=service_time_ms, | |
| blocks_consumed=16, | |
| agent_id="a", | |
| ) | |
| state = ctrl.compute_stability_state( | |
| current_free_blocks=64, | |
| total_blocks=256, | |
| ) | |
| # E[S] = 0.5 s → μ = 1/0.5 = 2.0 | |
| assert abs(state.service_rate_mu - 2.0) < 0.0575, ( | |
| f"Expected μ≈2.0, got {state.service_rate_mu}" | |
| ) | |
| e_service = 1.0 / state.service_rate_mu | |
| assert abs(e_service - 0.5) < 0.023, ( | |
| f"Expected E[S]=0.5 s, got {e_service:.4f} s" | |
| ) | |
| # ---- Heterogeneous: linear sweep [0.4, 0.6] s → true mean = 0.5 s | |
| ctrl2 = QueueingController(QueueingConfig()) | |
| for i in range(100): | |
| svc = 0.4 + (i / 99.0) * 0.2 | |
| ctrl2.record_request_completion( | |
| now + i * 0.01, | |
| service_time_ms=svc * 1000.0, | |
| blocks_consumed=16, | |
| agent_id="a", | |
| ) | |
| state2 = ctrl2.compute_stability_state( | |
| current_free_blocks=64, | |
| total_blocks=256, | |
| ) | |
| e_service2 = 1.0 / state2.service_rate_mu | |
| assert 0.45 <= e_service2 <= 0.55, ( | |
| f"Heterogeneous: expected E[S]≈0.5, got {e_service2:.4f}" | |
| ) | |
| # ----------------------------------------------------------------------- | |
| # test_fallback_on_insufficient_data | |
| # ----------------------------------------------------------------------- | |
| def test_fallback_on_insufficient_data(self): | |
| """ | |
| When < 10 service completions have been recorded, fallback values: | |
| λ_fallback = 0.1 req/sec | |
| E[S]_fallback = 1.0 s → μ = 1.0 req/sec | |
| E[blocks]_fallback = config.block_size = 16 | |
| Scenarios: | |
| (a) cold start — no data at all | |
| (b) partial data — 5 arrivals but 0 completions | |
| """ | |
| config = QueueingConfig(block_size=16) | |
| # (a) Cold start — zero arrivals, zero completions | |
| ctrl_cold = QueueingController(config) | |
| state_cold = ctrl_cold.compute_stability_state( | |
| current_free_blocks=64, | |
| total_blocks=256, | |
| ) | |
| assert state_cold.arrival_rate_lambda == 0.1, ( | |
| f"Expected λ_fallback=0.1, got {state_cold.arrival_rate_lambda}" | |
| ) | |
| assert state_cold.service_rate_mu == 1.0, ( | |
| f"Expected μ_fallback=1.0, got {state_cold.service_rate_mu}" | |
| ) | |
| assert state_cold.mean_blocks_per_request == 16.0, ( | |
| f"Expected E[blocks]_fallback=16, " | |
| f"got {state_cold.mean_blocks_per_request}" | |
| ) | |
| # (b) 5 arrivals, 0 completions → service_stats.count = 0 (< 10) | |
| ctrl_partial = QueueingController(config) | |
| now = time.monotonic() | |
| for _ in range(5): | |
| ctrl_partial.record_request_arrival(now, token_count=128, agent_id="a") | |
| time.sleep(0.01) | |
| now = time.monotonic() | |
| state_partial = ctrl_partial.compute_stability_state( | |
| current_free_blocks=64, | |
| total_blocks=256, | |
| ) | |
| # service_stats.count = 0 (< 10) → fallback must be active | |
| assert state_partial.service_rate_mu == 1.0, ( | |
| f"Expected μ_fallback=1.0 with 0 completions, " | |
| f"got {state_partial.service_rate_mu}" | |
| ) | |
| assert state_partial.mean_blocks_per_request == 16.0, ( | |
| f"Expected E[blocks]_fallback=16, " | |
| f"got {state_partial.mean_blocks_per_request}" | |
| ) | |
| # ----------------------------------------------------------------------- | |
| # test_export_metrics_keys | |
| # ----------------------------------------------------------------------- | |
| def test_export_metrics_keys(self): | |
| """ | |
| export_metrics() returns exactly 7 Prometheus-compatible keys, | |
| all numeric and non-NaN. | |
| """ | |
| config = QueueingConfig(window_seconds=1.0) | |
| ctrl = QueueingController(config) | |
| # Feed enough data to exit fallback regime | |
| inter_arrival = 1.0 | |
| service_time = 0.4 | |
| now = time.monotonic() | |
| for i in range(20): | |
| ctrl.record_request_arrival(now, token_count=128, agent_id="a") | |
| ctrl.record_request_completion( | |
| now + service_time, | |
| service_time_ms=service_time * 1000.0, | |
| blocks_consumed=16, | |
| agent_id="a", | |
| ) | |
| time.sleep(inter_arrival) | |
| now = time.monotonic() | |
| metrics = ctrl.export_metrics() | |
| expected_keys = [ | |
| "queueing_lambda", | |
| "queueing_mu", | |
| "queueing_rho", | |
| "queueing_is_stable", | |
| "queueing_lambda_critical", | |
| "queueing_minimum_stable_blocks", | |
| "queueing_stability_margin_pct", | |
| ] | |
| assert set(metrics.keys()) == set(expected_keys), ( | |
| f"Expected keys {expected_keys}, got {sorted(metrics.keys())}" | |
| ) | |
| for key in expected_keys: | |
| val = metrics[key] | |
| assert isinstance(val, (int, float)), ( | |
| f"Metric {key} has non-numeric value: {val!r}" | |
| ) | |
| assert not math.isnan(val), f"Metric {key} is NaN" | |
| assert metrics["queueing_is_stable"] in (0.0, 1.0), ( | |
| f"queueing_is_stable should be 0.0 or 1.0, " | |
| f"got {metrics['queueing_is_stable']}" | |
| ) | |
| for key in expected_keys: | |
| assert metrics[key] >= 0.0, f"Metric {key} is negative: {metrics[key]}" |