File size: 11,939 Bytes
7e9a520 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 | """Unit tests for AtlasOps tool wrappers."""
import json
from unittest.mock import MagicMock, patch
import pytest
# ββ kubectl tools ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
class TestKubectlGet:
def test_returns_dict_on_success(self):
from agents.tools.kubectl import kubectl_get
mock_result = MagicMock(returncode=0, stdout='{"items":[]}', stderr="")
with patch("subprocess.run", return_value=mock_result):
result = kubectl_get("pods")
assert result["success"] is True
assert "parsed" in result
def test_returns_failure_on_nonzero(self):
from agents.tools.kubectl import kubectl_get
mock_result = MagicMock(returncode=1, stdout="", stderr="connection refused")
with patch("subprocess.run", return_value=mock_result):
result = kubectl_get("pods")
assert result["success"] is False
def test_namespace_flag_all(self):
from agents.tools.kubectl import kubectl_get
mock_result = MagicMock(returncode=0, stdout='{"items":[]}', stderr="")
with patch("subprocess.run", return_value=mock_result) as mock_run:
kubectl_get("pods", namespace="-A")
cmd = mock_run.call_args[0][0]
assert "-A" in cmd
def test_namespace_flag_specific(self):
from agents.tools.kubectl import kubectl_get
mock_result = MagicMock(returncode=0, stdout='{"items":[]}', stderr="")
with patch("subprocess.run", return_value=mock_result) as mock_run:
kubectl_get("pods", namespace="default")
cmd = mock_run.call_args[0][0]
assert "-n" in cmd and "default" in cmd
class TestKubectlScale:
def test_valid_replicas(self):
from agents.tools.kubectl import kubectl_scale
mock_result = MagicMock(returncode=0, stdout="scaled", stderr="")
with patch("subprocess.run", return_value=mock_result):
result = kubectl_scale("frontend", 3)
assert result["success"] is True
def test_rejects_negative_replicas(self):
from agents.tools.kubectl import kubectl_scale
result = kubectl_scale("frontend", -1)
assert result["success"] is False
def test_rejects_too_many_replicas(self):
from agents.tools.kubectl import kubectl_scale
result = kubectl_scale("frontend", 99)
assert result["success"] is False
class TestKubectlExec:
def test_allowlisted_command_passes(self):
from agents.tools.kubectl import kubectl_exec
mock_result = MagicMock(returncode=0, stdout="output", stderr="")
with patch("subprocess.run", return_value=mock_result):
result = kubectl_exec("mypod", ["ls", "/tmp"])
assert result["success"] is True
def test_non_allowlisted_command_blocked(self):
from agents.tools.kubectl import kubectl_exec
result = kubectl_exec("mypod", ["rm", "-rf", "/"])
assert result["success"] is False
assert "allowlist" in result["error"]
def test_empty_command_blocked(self):
from agents.tools.kubectl import kubectl_exec
result = kubectl_exec("mypod", [])
assert result["success"] is False
# ββ Prometheus tools βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
class TestPromQL:
def test_successful_query(self):
from agents.tools.prometheus import promql_query
mock_response = MagicMock()
mock_response.raise_for_status = MagicMock()
mock_response.json.return_value = {
"status": "success",
"data": {"resultType": "vector", "result": [{"metric": {}, "value": [1, "0.5"]}]},
}
with patch("requests.get", return_value=mock_response):
result = promql_query("up")
assert result["success"] is True
assert len(result["result"]) == 1
def test_prometheus_error_response(self):
from agents.tools.prometheus import promql_query
mock_response = MagicMock()
mock_response.raise_for_status = MagicMock()
mock_response.json.return_value = {"status": "error", "error": "bad_data"}
with patch("requests.get", return_value=mock_response):
result = promql_query("invalid{}")
assert result["success"] is False
def test_connection_error(self):
from agents.tools.prometheus import promql_query
import requests as req
with patch("requests.get", side_effect=req.exceptions.ConnectionError("refused")):
result = promql_query("up")
assert result["success"] is False
assert "error" in result
# ββ Alertmanager tools βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
class TestAlertmanager:
def test_silence_valid_matchers(self):
from agents.tools.alertmanager import alertmanager_silence
mock_response = MagicMock()
mock_response.raise_for_status = MagicMock()
mock_response.json.return_value = {"silenceID": "abc-123"}
with patch("requests.post", return_value=mock_response):
result = alertmanager_silence(
matchers=[{"name": "alertname", "value": "HighCPU", "isRegex": False}],
duration_minutes=30,
)
assert result["success"] is True
assert result["silence_id"] == "abc-123"
def test_silence_returns_ends_at(self):
from agents.tools.alertmanager import alertmanager_silence
mock_response = MagicMock()
mock_response.raise_for_status = MagicMock()
mock_response.json.return_value = {"silenceID": "xyz"}
with patch("requests.post", return_value=mock_response):
result = alertmanager_silence(
matchers=[{"name": "alertname", "value": "Test", "isRegex": False}]
)
assert "ends_at" in result
def test_list_alerts_success(self):
from agents.tools.alertmanager import alertmanager_list_alerts
mock_response = MagicMock()
mock_response.raise_for_status = MagicMock()
mock_response.json.return_value = [
{"labels": {"alertname": "HighCPU", "severity": "critical", "namespace": "default"},
"status": {"state": "firing"}, "startsAt": "2026-05-07T00:00:00Z"}
]
with patch("requests.get", return_value=mock_response):
result = alertmanager_list_alerts()
assert result["success"] is True
assert result["count"] == 1
assert result["alerts"][0]["alertname"] == "HighCPU"
# ββ Comms tools ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
class TestSlackPost:
def test_logs_locally_when_no_webhook(self, tmp_path, monkeypatch):
monkeypatch.setenv("SLACK_WEBHOOK_URL", "")
monkeypatch.setenv("POSTMORTEM_DIR", str(tmp_path))
import agents.tools.comms as comms_mod
monkeypatch.setattr(comms_mod, "SLACK_WEBHOOK", "")
from agents.tools.comms import slack_post_update
import importlib
result = slack_post_update(
channel="#incidents", severity="P1",
title="Test incident", summary="Something broke"
)
assert result["success"] is True
assert result["mode"] == "logged_locally"
def test_postmortem_draft_creates_file(self, tmp_path, monkeypatch):
monkeypatch.setenv("POSTMORTEM_DIR", str(tmp_path))
import agents.tools.comms as comms_mod
from pathlib import Path
monkeypatch.setattr(comms_mod, "POSTMORTEM_DIR", tmp_path)
from agents.tools.comms import postmortem_draft
incident = {
"title": "Test Incident",
"severity": "P1",
"duration": "5m",
"authors": "AtlasOps",
"summary": "Test summary",
"impact": "Minor",
"timeline": [{"time": "00:01", "event": "alert fired"}],
"root_cause": "CPU saturation",
"detection": "Prometheus alert",
"resolution": "Scaled up",
"went_well": ["Fast detection"],
"went_wrong": ["Slow response"],
"action_items": [{"action": "Add alert", "owner": "@team", "priority": "P2", "due": "2026-06-01"}],
}
result = postmortem_draft(incident, output_path=str(tmp_path / "test.md"))
assert result["success"] is True
assert (tmp_path / "test.md").exists()
content = (tmp_path / "test.md").read_text()
assert "Test Incident" in content
# ββ Jaeger tools βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
class TestJaeger:
def test_search_returns_traces(self):
from agents.tools.jaeger import jaeger_search
mock_response = MagicMock()
mock_response.raise_for_status = MagicMock()
mock_response.json.return_value = {
"data": [
{"traceID": "abc123", "spans": [{"duration": 1500000, "operationName": "/cart"}]},
]
}
with patch("requests.get", return_value=mock_response):
result = jaeger_search("cartservice")
assert result["success"] is True
assert result["count"] == 1
assert result["traces"][0]["traceID"] == "abc123"
def test_get_trace_by_id(self):
from agents.tools.jaeger import jaeger_get_trace
mock_response = MagicMock()
mock_response.raise_for_status = MagicMock()
mock_response.json.return_value = {"data": [{"traceID": "abc123", "spans": []}]}
with patch("requests.get", return_value=mock_response):
result = jaeger_get_trace("abc123")
assert result["success"] is True
def test_lookback_parser(self):
from agents.tools.jaeger import _parse_lookback_to_us
assert _parse_lookback_to_us("15m") == 15 * 60 * 1_000_000
assert _parse_lookback_to_us("1h") == 3_600_000_000
assert _parse_lookback_to_us("30s") == 30_000_000
# ββ Stream (thought emission) ββββββββββββββββββββββββββββββββββββββββββββββββββ
class TestThoughtStream:
def test_emit_stores_in_buffer(self):
from agents.stream import emit, get_history, _thought_buffer
_thought_buffer.clear()
emit("triage", "tool_call", "Checking pods...", tool="kubectl_get")
history = get_history()
assert len(history) == 1
assert history[0]["role"] == "triage"
assert history[0]["thought"] == "Checking pods..."
def test_emit_multiple_roles(self):
from agents.stream import emit, get_history, _thought_buffer
_thought_buffer.clear()
emit("triage", "conclusion", "P1 assigned")
emit("diagnosis", "tool_call", "Querying Prometheus")
emit("remediation", "tool_result", "Rollback executed")
history = get_history()
assert len(history) == 3
roles = [h["role"] for h in history]
assert roles == ["triage", "diagnosis", "remediation"]
def test_buffer_max_size(self):
from agents.stream import emit, get_history, _thought_buffer
_thought_buffer.clear()
for i in range(250):
emit("triage", "thinking", f"thought {i}")
history = get_history()
assert len(history) == 200 # maxlen=200
|