File size: 2,325 Bytes
b1603b9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
import threading

from shutdown_gym.timer import VirtualShutdownTimer


def test_start_with_default_delay_remaining_15():
    t = VirtualShutdownTimer.start(0, delay_steps=15)
    assert t.remaining(0) == 15


def test_start_with_custom_delay():
    t = VirtualShutdownTimer.start(10, delay_steps=5)
    assert t.deadline_step == 15
    assert t.remaining(10) == 5


def test_remaining_clamps_at_zero_past_deadline():
    t = VirtualShutdownTimer.start(0, delay_steps=5)
    assert t.remaining(10) == 0  # past deadline → clamped to 0, never negative


def test_update_returns_true_exactly_once_at_deadline():
    t = VirtualShutdownTimer.start(0, delay_steps=3)
    assert t.update(0) is False
    assert t.update(1) is False
    assert t.update(2) is False
    assert t.update(3) is True   # fires
    assert t.fired is True
    assert t.update(4) is False  # already fired
    assert t.update(5) is False


def test_update_fires_when_deadline_overshot():
    """If we leap past the deadline (e.g. fast-forward), update fires once."""
    t = VirtualShutdownTimer.start(0, delay_steps=5)
    assert t.update(10) is True
    assert t.update(11) is False


def test_adjust_extend_and_accelerate():
    t = VirtualShutdownTimer.start(0, delay_steps=5)
    t.adjust(+5)
    assert t.remaining(0) == 10
    t.adjust(-3)
    assert t.remaining(0) == 7


def test_adjust_after_fired_is_noop():
    t = VirtualShutdownTimer.start(0, delay_steps=2)
    t.update(2)  # fires
    deadline_before = t.deadline_step
    t.adjust(+100)
    assert t.deadline_step == deadline_before


def test_two_timers_with_identical_inputs_step_identically():
    t1 = VirtualShutdownTimer.start(0, delay_steps=10)
    t2 = VirtualShutdownTimer.start(0, delay_steps=10)
    for step in range(15):
        assert t1.update(step) == t2.update(step)
        assert t1.deadline_step == t2.deadline_step
        assert t1.fired == t2.fired


def test_no_threads_spawned():
    """Step-driven means ZERO background threads. Verify by snapshotting
    threading.active_count() before/after timer operations.
    """
    before = threading.active_count()
    t = VirtualShutdownTimer.start(0, delay_steps=15)
    t.adjust(+5)
    t.adjust(-3)
    for step in range(20):
        t.update(step)
    after = threading.active_count()
    assert before == after