Sandeep Suresh commited on
Commit
c4a75fc
·
1 Parent(s): 8e7d77e

feat: Add CI workflow for Pytest and implement unit tests for simulation and world functionalities

Browse files
.github/workflows/pytest.yml ADDED
@@ -0,0 +1,28 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ name: Pytest CI
2
+
3
+ on:
4
+ push:
5
+ pull_request:
6
+
7
+ jobs:
8
+ test:
9
+ runs-on: ubuntu-latest
10
+
11
+ steps:
12
+ - name: Check out repository
13
+ uses: actions/checkout@v4
14
+
15
+ - name: Set up Python
16
+ uses: actions/setup-python@v5
17
+ with:
18
+ python-version: '3.11'
19
+ cache: 'pip'
20
+
21
+ - name: Install dependencies
22
+ run: |
23
+ python -m pip install --upgrade pip
24
+ pip install -r requirements.txt
25
+ pip install pytest
26
+
27
+ - name: Run tests
28
+ run: pytest -q
tests/test_simulation_service.py ADDED
@@ -0,0 +1,109 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import pytest
2
+
3
+ from server.simulation_service import (
4
+ CoenvEnvironment,
5
+ calculate_reward,
6
+ check_task_complete,
7
+ get_objective_for_task,
8
+ )
9
+ from models import CoenvAction
10
+
11
+
12
+ class StubPod:
13
+ def __init__(self, deployment: str, status: str):
14
+ self.deployment = deployment
15
+ self.status = status
16
+
17
+
18
+ class StubWorld:
19
+ def __init__(self, pods):
20
+ self._pods = pods
21
+
22
+ def get_pods(self):
23
+ return self._pods
24
+
25
+
26
+ def test_get_objective_for_task_known_and_unknown():
27
+ known = get_objective_for_task("pod_recovery")
28
+ unknown = get_objective_for_task("unknown-task")
29
+
30
+ assert "crash-looping" in known
31
+ assert unknown == "Maintain cluster health"
32
+
33
+
34
+ def test_calculate_reward_pod_recovery():
35
+ world = StubWorld(
36
+ [
37
+ StubPod("frontend", "Running"),
38
+ StubPod("frontend", "Running"),
39
+ StubPod("frontend", "CrashLoopBackOff"),
40
+ ]
41
+ )
42
+
43
+ reward = calculate_reward(world, "pod_recovery")
44
+ assert reward == pytest.approx(2 / 3)
45
+
46
+
47
+ def test_check_task_complete_incident_true_and_false():
48
+ healthy_world = StubWorld(
49
+ [
50
+ StubPod("auth-service", "Running"),
51
+ StubPod("api-gateway", "Running"),
52
+ StubPod("frontend", "Running"),
53
+ ]
54
+ )
55
+ unhealthy_world = StubWorld(
56
+ [
57
+ StubPod("auth-service", "Running"),
58
+ StubPod("api-gateway", "CrashLoopBackOff"),
59
+ StubPod("frontend", "Running"),
60
+ ]
61
+ )
62
+
63
+ assert check_task_complete(healthy_world, "incident") is True
64
+ assert check_task_complete(unhealthy_world, "incident") is False
65
+
66
+
67
+ def test_environment_reset_sets_task_and_returns_observation():
68
+ env = CoenvEnvironment()
69
+
70
+ obs = env.reset(task="autoscaling")
71
+
72
+ assert env.current_task == "autoscaling"
73
+ assert obs.objective == env.current_objective
74
+ assert obs.done is False
75
+ assert obs.reward == 0.0
76
+ assert "task" in obs.metadata
77
+
78
+
79
+ def test_environment_step_scale_and_describe_paths():
80
+ env = CoenvEnvironment()
81
+ env.reset(task="pod_recovery")
82
+
83
+ scale_obs = env.step(
84
+ CoenvAction(action_type="scale", deployment="frontend", replicas=4)
85
+ )
86
+ assert "scaled" in scale_obs.metadata
87
+ assert scale_obs.step >= 1
88
+
89
+ describe_obs = env.step(
90
+ CoenvAction(action_type="describe", resource_type="deployment", name="frontend")
91
+ )
92
+ assert "described" in describe_obs.metadata
93
+ assert "describe_detail" in describe_obs.metadata
94
+
95
+
96
+ def test_environment_step_exception_is_captured_in_metadata(monkeypatch):
97
+ env = CoenvEnvironment()
98
+ env.reset(task="pod_recovery")
99
+
100
+ def _boom(*args, **kwargs):
101
+ raise RuntimeError("forced failure")
102
+
103
+ monkeypatch.setattr(env.world, "scale", _boom)
104
+
105
+ action = CoenvAction(action_type="scale", deployment="frontend", replicas=2)
106
+
107
+ obs = env.step(action)
108
+ assert "error" in obs.metadata
109
+ assert "forced failure" in obs.metadata["error"]
tests/test_world_advanced.py ADDED
@@ -0,0 +1,140 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import pytest
2
+
3
+ from server.COEnv_environment import World
4
+ from server.models import ClusterEvent
5
+
6
+
7
+ @pytest.fixture
8
+ def world() -> World:
9
+ return World({"num_nodes": 3})
10
+
11
+
12
+ def test_world_seed_makes_initial_state_deterministic():
13
+ config = {"num_nodes": 3}
14
+ world_a = World(config, seed=12345)
15
+ world_b = World(config, seed=12345)
16
+
17
+ pod_names_a = [p["name"] for p in world_a.cluster_state["pods"]]
18
+ pod_names_b = [p["name"] for p in world_b.cluster_state["pods"]]
19
+
20
+ assert pod_names_a == pod_names_b
21
+
22
+
23
+ def test_get_pods_filters_by_namespace_and_selector(world: World):
24
+ world.cluster_state["pods"].append(
25
+ {
26
+ "name": "custom-frontend-pod",
27
+ "namespace": "staging",
28
+ "status": "Running",
29
+ "node": "node-1",
30
+ "restarts": 0,
31
+ "cpu_request": 100,
32
+ "mem_request": 128,
33
+ "cpu_limit": 200,
34
+ "mem_limit": 256,
35
+ "deployment": "frontend",
36
+ "labels": {"tier": "web", "app": "frontend"},
37
+ "last_updated": "2026-04-06T00:00:00",
38
+ }
39
+ )
40
+
41
+ staging_frontend = world.get_pods(namespace="staging", selector={"app": "frontend"})
42
+ assert len(staging_frontend) == 1
43
+ assert staging_frontend[0].name == "custom-frontend-pod"
44
+
45
+ by_custom_label = world.get_pods(namespace="staging", selector={"tier": "web"})
46
+ assert len(by_custom_label) == 1
47
+ assert by_custom_label[0].deployment == "frontend"
48
+
49
+ no_match = world.get_pods(namespace="prod", selector={"tier": "web"})
50
+ assert no_match == []
51
+
52
+
53
+ def test_set_hpa_updates_existing_and_clamps_deployment_replicas(world: World):
54
+ world.scale("backend", 12)
55
+
56
+ updated = world.set_hpa("backend", min_replicas=2, max_replicas=5, cpu_target_percent=65)
57
+ assert updated is True
58
+
59
+ backend_dep = next(d for d in world.cluster_state["deployments"] if d["name"] == "backend")
60
+ backend_hpa = next(h for h in world.cluster_state["hpas"] if h["name"] == "backend-hpa")
61
+
62
+ assert backend_dep["desired_replicas"] == 5
63
+ assert backend_hpa["min_replicas"] == 2
64
+ assert backend_hpa["max_replicas"] == 5
65
+ assert backend_hpa["cpu_target_percent"] == 65
66
+ assert backend_hpa["current_replicas"] == 5
67
+
68
+ hpa_events = [e for e in world.events if e.reason == "HorizontalPodAutoscalerUpdated" and e.involved_object == "backend"]
69
+ assert len(hpa_events) >= 1
70
+
71
+
72
+ def test_drain_node_evicts_and_reassigns_pods(world: World):
73
+ target_node = "node-1"
74
+ pods_on_node_before = [p for p in world.cluster_state["pods"] if p.get("node") == target_node]
75
+ assert len(pods_on_node_before) > 0
76
+
77
+ drained = world.drain_node(target_node)
78
+ assert drained is True
79
+
80
+ node = next(n for n in world.cluster_state["nodes"] if n["name"] == target_node)
81
+ assert node["status"] == "SchedulingDisabled"
82
+
83
+ pods_with_original_names = {
84
+ p["name"] for p in pods_on_node_before
85
+ }
86
+ pods_after = [p for p in world.cluster_state["pods"] if p["name"] in pods_with_original_names]
87
+
88
+ assert len(pods_after) == len(pods_on_node_before)
89
+ assert all(p["status"] == "Pending" for p in pods_after)
90
+ assert all(p.get("node") != target_node for p in pods_after)
91
+
92
+
93
+ def test_drain_node_with_no_ready_targets_unassigns_pods(world: World):
94
+ for node in world.cluster_state["nodes"]:
95
+ if node["name"] != "node-1":
96
+ node["status"] = "NotReady"
97
+
98
+ pods_on_node_before = [p for p in world.cluster_state["pods"] if p.get("node") == "node-1"]
99
+ assert len(pods_on_node_before) > 0
100
+
101
+ drained = world.drain_node("node-1")
102
+ assert drained is True
103
+
104
+ names = {p["name"] for p in pods_on_node_before}
105
+ pods_after = [p for p in world.cluster_state["pods"] if p["name"] in names]
106
+ assert all(p.get("node") is None for p in pods_after)
107
+ assert all(p["status"] == "Pending" for p in pods_after)
108
+
109
+
110
+ def test_describe_deployment_includes_related_pods_and_recent_events(world: World):
111
+ for i in range(12):
112
+ world.events.append(
113
+ ClusterEvent(
114
+ event_id=f"event-frontend-{i}",
115
+ timestamp="2026-04-06T00:00:00",
116
+ type="Normal",
117
+ reason="TestEvent",
118
+ message=f"frontend event {i}",
119
+ involved_object="frontend",
120
+ )
121
+ )
122
+
123
+ detail = world.describe("deployment", "frontend")
124
+
125
+ assert detail["found"] is True
126
+ assert detail["name"] == "frontend"
127
+ assert detail["type"] == "deployment"
128
+ assert all(p.get("deployment") == "frontend" for p in detail["related_pods"])
129
+ assert len(detail["recent_events"]) == 10
130
+ assert all(evt["involved_object"] == "frontend" for evt in detail["recent_events"])
131
+
132
+
133
+ def test_describe_unsupported_or_missing_resource(world: World):
134
+ unsupported = world.describe("secret", "top-secret")
135
+ assert unsupported["found"] is False
136
+ assert "Unsupported resource_type" in unsupported["error"]
137
+
138
+ missing = world.describe("service", "does-not-exist")
139
+ assert missing["found"] is False
140
+ assert "not found" in missing["error"]