File size: 6,258 Bytes
3a08af1 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 | """Tests for src/queue_eta β slot-simulation ETA calculation."""
import json
import math
import os
import tempfile
import pytest
from src.queue_eta import (
_LARGE_MODEL_HOURS,
_SIZE_THRESHOLD_B,
_SMALL_MODEL_HOURS,
compute_single_eta,
estimate_task_hours,
format_eta,
_get_params,
)
# βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# Helpers
# βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def _write(directory, owner, filename, data):
d = os.path.join(directory, owner)
os.makedirs(d, exist_ok=True)
fpath = os.path.join(d, filename)
with open(fpath, "w") as f:
json.dump(data, f)
return fpath
# βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# estimate_task_hours
# βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
class TestEstimateTaskHours:
def test_small_model(self):
assert estimate_task_hours(7.0) == _SMALL_MODEL_HOURS
def test_at_threshold(self):
assert estimate_task_hours(_SIZE_THRESHOLD_B) == _SMALL_MODEL_HOURS
def test_large_model(self):
assert estimate_task_hours(70.0) == _LARGE_MODEL_HOURS
def test_none_defaults_small(self):
assert estimate_task_hours(None) == _SMALL_MODEL_HOURS
def test_zero_defaults_small(self):
assert estimate_task_hours(0) == _SMALL_MODEL_HOURS
def test_negative_defaults_small(self):
assert estimate_task_hours(-5) == _SMALL_MODEL_HOURS
# βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# format_eta
# βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
class TestFormatEta:
def test_zero(self):
assert format_eta(0) == "< 1h"
def test_negative(self):
assert format_eta(-1) == "< 1h"
def test_fraction(self):
assert format_eta(0.5) == "~1h"
def test_exact_hours(self):
assert format_eta(6.0) == "~6h"
def test_fractional_hours(self):
assert format_eta(6.3) == "~7h"
def test_exact_day(self):
assert format_eta(24.0) == "~1d"
def test_day_and_hours(self):
assert format_eta(27.0) == "~1d 3h"
def test_multi_days(self):
assert format_eta(50.0) == "~2d 2h"
# βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# _get_params
# βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
class TestGetParams:
def test_model_params(self):
assert _get_params({"model_params": 7.0}) == 7.0
def test_params_fallback(self):
assert _get_params({"params": 14}) == 14.0
def test_model_params_preferred(self):
assert _get_params({"model_params": 7, "params": 14}) == 7.0
def test_none_when_missing(self):
assert _get_params({}) is None
def test_string_value(self):
assert _get_params({"model_params": "7.5"}) == 7.5
def test_bad_string(self):
assert _get_params({"model_params": "unknown"}) is None
# βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# compute_single_eta β used by submit.py
# βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
class TestComputeSingleEta:
def setup_method(self):
self._tmpdir = tempfile.mkdtemp(prefix="test_single_eta_")
def test_empty_queue(self):
"""No running, no pending β new model is position 1."""
eta = compute_single_eta(self._tmpdir, model_params=7.0, concurrency=2)
# running_remaining = 0, queue_pos = 1
# 0 + ceil(1/2)*3 = 3
assert eta == 3.0
def test_with_2r_1p(self):
"""2 running + 1 pending β new model at pos 2."""
_write(self._tmpdir, "a", "r1.json", {
"model": "a/r1", "status": "Running", "script": "auto_quant",
"model_params": 7.0,
})
_write(self._tmpdir, "b", "r2.json", {
"model": "b/r2", "status": "Running", "script": "auto_eval",
"model_params": 7.0,
})
_write(self._tmpdir, "c", "p1.json", {
"model": "c/p1", "status": "Pending", "script": "auto_quant",
"model_params": 7.0, "submitted_time": "2026-04-01T00:00:00Z",
})
eta = compute_single_eta(self._tmpdir, model_params=7.0, concurrency=2)
# running_remaining = avg(3,3) = 3, queue_pos = 2
# 3 + ceil(2/2)*3 = 3+3 = 6
assert eta == 6.0
def test_large_model(self):
eta = compute_single_eta(self._tmpdir, model_params=70.0, concurrency=2)
# running_remaining = 0, queue_pos = 1
# 0 + ceil(1/2)*5 = 5
assert eta == 5.0
def test_nonexistent_dir(self):
eta = compute_single_eta("/tmp/nonexistent_eta_dir", model_params=7.0, concurrency=2)
assert eta == 3.0
|