Spaces:
Sleeping
Sleeping
File size: 4,598 Bytes
05a686e | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 | import pytest
from server.simulation_service import (
CoenvEnvironment,
calculate_reward,
check_task_complete,
get_objective_for_task,
)
from models import CoenvAction
class StubPod:
def __init__(self, deployment: str, status: str, restarts: int = 0):
self.deployment = deployment
self.status = status
self.restarts = restarts
class StubHPA:
def __init__(self, name: str, min_replicas: int, max_replicas: int, cpu_target_percent: int):
self.name = name
self.min_replicas = min_replicas
self.max_replicas = max_replicas
self.cpu_target_percent = cpu_target_percent
class StubWorld:
def __init__(self, pods, hpas=None):
self._pods = pods
self._hpas = hpas or []
def get_pods(self):
return self._pods
def get_hpas(self):
return self._hpas
def test_get_objective_for_task_known_and_unknown():
known = get_objective_for_task("pod_recovery")
unknown = get_objective_for_task("unknown-task")
assert "crash-looping" in known
assert unknown == "Maintain cluster health"
def test_calculate_reward_pod_recovery():
world = StubWorld(
[
StubPod("frontend", "Running"),
StubPod("frontend", "Running"),
StubPod("frontend", "CrashLoopBackOff"),
]
)
reward = calculate_reward(world, "pod_recovery")
assert reward == pytest.approx(2 / 3)
def test_calculate_reward_autoscaling_rewards_stability_and_hpa_policy():
healthy_world = StubWorld(
[
StubPod("backend", "Running", restarts=0),
StubPod("backend", "Running", restarts=0),
],
hpas=[StubHPA("backend-hpa", min_replicas=2, max_replicas=6, cpu_target_percent=70)],
)
unstable_world = StubWorld(
[
StubPod("backend", "Running", restarts=10),
StubPod("backend", "Running", restarts=9),
],
hpas=[StubHPA("backend-hpa", min_replicas=2, max_replicas=6, cpu_target_percent=70)],
)
no_hpa_world = StubWorld(
[
StubPod("backend", "Running", restarts=0),
StubPod("backend", "Running", restarts=0),
],
hpas=[],
)
healthy_reward = calculate_reward(healthy_world, "autoscaling")
unstable_reward = calculate_reward(unstable_world, "autoscaling")
no_hpa_reward = calculate_reward(no_hpa_world, "autoscaling")
assert healthy_reward == pytest.approx(1.0)
assert unstable_reward < healthy_reward
assert no_hpa_reward < healthy_reward
def test_check_task_complete_incident_true_and_false():
healthy_world = StubWorld(
[
StubPod("auth-service", "Running"),
StubPod("api-gateway", "Running"),
StubPod("frontend", "Running"),
]
)
unhealthy_world = StubWorld(
[
StubPod("auth-service", "Running"),
StubPod("api-gateway", "CrashLoopBackOff"),
StubPod("frontend", "Running"),
]
)
assert check_task_complete(healthy_world, "incident") is True
assert check_task_complete(unhealthy_world, "incident") is False
def test_environment_reset_sets_task_and_returns_observation():
env = CoenvEnvironment()
obs = env.reset(task="autoscaling")
assert env.current_task == "autoscaling"
assert obs.objective == env.current_objective
assert obs.done is False
assert obs.reward == 0.0
assert "task" in obs.metadata
def test_environment_step_scale_and_describe_paths():
env = CoenvEnvironment()
env.reset(task="pod_recovery")
scale_obs = env.step(
CoenvAction(action_type="scale", deployment="frontend", replicas=4)
)
assert "scaled" in scale_obs.metadata
assert scale_obs.step >= 1
describe_obs = env.step(
CoenvAction(action_type="describe", resource_type="deployment", name="frontend")
)
assert "described" in describe_obs.metadata
assert "describe_detail" in describe_obs.metadata
wait_obs = env.step(CoenvAction(action_type="wait"))
assert wait_obs.metadata.get("waited") is True
def test_environment_step_exception_is_captured_in_metadata(monkeypatch):
env = CoenvEnvironment()
env.reset(task="pod_recovery")
def _boom(*args, **kwargs):
raise RuntimeError("forced failure")
monkeypatch.setattr(env.world, "scale", _boom)
action = CoenvAction(action_type="scale", deployment="frontend", replicas=2)
obs = env.step(action)
assert "error" in obs.metadata
assert "forced failure" in obs.metadata["error"]
|