File size: 11,939 Bytes
7e9a520
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
"""Unit tests for AtlasOps tool wrappers."""

import json
from unittest.mock import MagicMock, patch

import pytest


# ── kubectl tools ──────────────────────────────────────────────────────────────

class TestKubectlGet:
    def test_returns_dict_on_success(self):
        from agents.tools.kubectl import kubectl_get
        mock_result = MagicMock(returncode=0, stdout='{"items":[]}', stderr="")
        with patch("subprocess.run", return_value=mock_result):
            result = kubectl_get("pods")
        assert result["success"] is True
        assert "parsed" in result

    def test_returns_failure_on_nonzero(self):
        from agents.tools.kubectl import kubectl_get
        mock_result = MagicMock(returncode=1, stdout="", stderr="connection refused")
        with patch("subprocess.run", return_value=mock_result):
            result = kubectl_get("pods")
        assert result["success"] is False

    def test_namespace_flag_all(self):
        from agents.tools.kubectl import kubectl_get
        mock_result = MagicMock(returncode=0, stdout='{"items":[]}', stderr="")
        with patch("subprocess.run", return_value=mock_result) as mock_run:
            kubectl_get("pods", namespace="-A")
        cmd = mock_run.call_args[0][0]
        assert "-A" in cmd

    def test_namespace_flag_specific(self):
        from agents.tools.kubectl import kubectl_get
        mock_result = MagicMock(returncode=0, stdout='{"items":[]}', stderr="")
        with patch("subprocess.run", return_value=mock_result) as mock_run:
            kubectl_get("pods", namespace="default")
        cmd = mock_run.call_args[0][0]
        assert "-n" in cmd and "default" in cmd


class TestKubectlScale:
    def test_valid_replicas(self):
        from agents.tools.kubectl import kubectl_scale
        mock_result = MagicMock(returncode=0, stdout="scaled", stderr="")
        with patch("subprocess.run", return_value=mock_result):
            result = kubectl_scale("frontend", 3)
        assert result["success"] is True

    def test_rejects_negative_replicas(self):
        from agents.tools.kubectl import kubectl_scale
        result = kubectl_scale("frontend", -1)
        assert result["success"] is False

    def test_rejects_too_many_replicas(self):
        from agents.tools.kubectl import kubectl_scale
        result = kubectl_scale("frontend", 99)
        assert result["success"] is False


class TestKubectlExec:
    def test_allowlisted_command_passes(self):
        from agents.tools.kubectl import kubectl_exec
        mock_result = MagicMock(returncode=0, stdout="output", stderr="")
        with patch("subprocess.run", return_value=mock_result):
            result = kubectl_exec("mypod", ["ls", "/tmp"])
        assert result["success"] is True

    def test_non_allowlisted_command_blocked(self):
        from agents.tools.kubectl import kubectl_exec
        result = kubectl_exec("mypod", ["rm", "-rf", "/"])
        assert result["success"] is False
        assert "allowlist" in result["error"]

    def test_empty_command_blocked(self):
        from agents.tools.kubectl import kubectl_exec
        result = kubectl_exec("mypod", [])
        assert result["success"] is False


# ── Prometheus tools ───────────────────────────────────────────────────────────

class TestPromQL:
    def test_successful_query(self):
        from agents.tools.prometheus import promql_query
        mock_response = MagicMock()
        mock_response.raise_for_status = MagicMock()
        mock_response.json.return_value = {
            "status": "success",
            "data": {"resultType": "vector", "result": [{"metric": {}, "value": [1, "0.5"]}]},
        }
        with patch("requests.get", return_value=mock_response):
            result = promql_query("up")
        assert result["success"] is True
        assert len(result["result"]) == 1

    def test_prometheus_error_response(self):
        from agents.tools.prometheus import promql_query
        mock_response = MagicMock()
        mock_response.raise_for_status = MagicMock()
        mock_response.json.return_value = {"status": "error", "error": "bad_data"}
        with patch("requests.get", return_value=mock_response):
            result = promql_query("invalid{}")
        assert result["success"] is False

    def test_connection_error(self):
        from agents.tools.prometheus import promql_query
        import requests as req
        with patch("requests.get", side_effect=req.exceptions.ConnectionError("refused")):
            result = promql_query("up")
        assert result["success"] is False
        assert "error" in result


# ── Alertmanager tools ─────────────────────────────────────────────────────────

class TestAlertmanager:
    def test_silence_valid_matchers(self):
        from agents.tools.alertmanager import alertmanager_silence
        mock_response = MagicMock()
        mock_response.raise_for_status = MagicMock()
        mock_response.json.return_value = {"silenceID": "abc-123"}
        with patch("requests.post", return_value=mock_response):
            result = alertmanager_silence(
                matchers=[{"name": "alertname", "value": "HighCPU", "isRegex": False}],
                duration_minutes=30,
            )
        assert result["success"] is True
        assert result["silence_id"] == "abc-123"

    def test_silence_returns_ends_at(self):
        from agents.tools.alertmanager import alertmanager_silence
        mock_response = MagicMock()
        mock_response.raise_for_status = MagicMock()
        mock_response.json.return_value = {"silenceID": "xyz"}
        with patch("requests.post", return_value=mock_response):
            result = alertmanager_silence(
                matchers=[{"name": "alertname", "value": "Test", "isRegex": False}]
            )
        assert "ends_at" in result

    def test_list_alerts_success(self):
        from agents.tools.alertmanager import alertmanager_list_alerts
        mock_response = MagicMock()
        mock_response.raise_for_status = MagicMock()
        mock_response.json.return_value = [
            {"labels": {"alertname": "HighCPU", "severity": "critical", "namespace": "default"},
             "status": {"state": "firing"}, "startsAt": "2026-05-07T00:00:00Z"}
        ]
        with patch("requests.get", return_value=mock_response):
            result = alertmanager_list_alerts()
        assert result["success"] is True
        assert result["count"] == 1
        assert result["alerts"][0]["alertname"] == "HighCPU"


# ── Comms tools ────────────────────────────────────────────────────────────────

class TestSlackPost:
    def test_logs_locally_when_no_webhook(self, tmp_path, monkeypatch):
        monkeypatch.setenv("SLACK_WEBHOOK_URL", "")
        monkeypatch.setenv("POSTMORTEM_DIR", str(tmp_path))
        import agents.tools.comms as comms_mod
        monkeypatch.setattr(comms_mod, "SLACK_WEBHOOK", "")
        from agents.tools.comms import slack_post_update
        import importlib
        result = slack_post_update(
            channel="#incidents", severity="P1",
            title="Test incident", summary="Something broke"
        )
        assert result["success"] is True
        assert result["mode"] == "logged_locally"

    def test_postmortem_draft_creates_file(self, tmp_path, monkeypatch):
        monkeypatch.setenv("POSTMORTEM_DIR", str(tmp_path))
        import agents.tools.comms as comms_mod
        from pathlib import Path
        monkeypatch.setattr(comms_mod, "POSTMORTEM_DIR", tmp_path)
        from agents.tools.comms import postmortem_draft
        incident = {
            "title": "Test Incident",
            "severity": "P1",
            "duration": "5m",
            "authors": "AtlasOps",
            "summary": "Test summary",
            "impact": "Minor",
            "timeline": [{"time": "00:01", "event": "alert fired"}],
            "root_cause": "CPU saturation",
            "detection": "Prometheus alert",
            "resolution": "Scaled up",
            "went_well": ["Fast detection"],
            "went_wrong": ["Slow response"],
            "action_items": [{"action": "Add alert", "owner": "@team", "priority": "P2", "due": "2026-06-01"}],
        }
        result = postmortem_draft(incident, output_path=str(tmp_path / "test.md"))
        assert result["success"] is True
        assert (tmp_path / "test.md").exists()
        content = (tmp_path / "test.md").read_text()
        assert "Test Incident" in content


# ── Jaeger tools ───────────────────────────────────────────────────────────────

class TestJaeger:
    def test_search_returns_traces(self):
        from agents.tools.jaeger import jaeger_search
        mock_response = MagicMock()
        mock_response.raise_for_status = MagicMock()
        mock_response.json.return_value = {
            "data": [
                {"traceID": "abc123", "spans": [{"duration": 1500000, "operationName": "/cart"}]},
            ]
        }
        with patch("requests.get", return_value=mock_response):
            result = jaeger_search("cartservice")
        assert result["success"] is True
        assert result["count"] == 1
        assert result["traces"][0]["traceID"] == "abc123"

    def test_get_trace_by_id(self):
        from agents.tools.jaeger import jaeger_get_trace
        mock_response = MagicMock()
        mock_response.raise_for_status = MagicMock()
        mock_response.json.return_value = {"data": [{"traceID": "abc123", "spans": []}]}
        with patch("requests.get", return_value=mock_response):
            result = jaeger_get_trace("abc123")
        assert result["success"] is True

    def test_lookback_parser(self):
        from agents.tools.jaeger import _parse_lookback_to_us
        assert _parse_lookback_to_us("15m") == 15 * 60 * 1_000_000
        assert _parse_lookback_to_us("1h") == 3_600_000_000
        assert _parse_lookback_to_us("30s") == 30_000_000


# ── Stream (thought emission) ──────────────────────────────────────────────────

class TestThoughtStream:
    def test_emit_stores_in_buffer(self):
        from agents.stream import emit, get_history, _thought_buffer
        _thought_buffer.clear()
        emit("triage", "tool_call", "Checking pods...", tool="kubectl_get")
        history = get_history()
        assert len(history) == 1
        assert history[0]["role"] == "triage"
        assert history[0]["thought"] == "Checking pods..."

    def test_emit_multiple_roles(self):
        from agents.stream import emit, get_history, _thought_buffer
        _thought_buffer.clear()
        emit("triage", "conclusion", "P1 assigned")
        emit("diagnosis", "tool_call", "Querying Prometheus")
        emit("remediation", "tool_result", "Rollback executed")
        history = get_history()
        assert len(history) == 3
        roles = [h["role"] for h in history]
        assert roles == ["triage", "diagnosis", "remediation"]

    def test_buffer_max_size(self):
        from agents.stream import emit, get_history, _thought_buffer
        _thought_buffer.clear()
        for i in range(250):
            emit("triage", "thinking", f"thought {i}")
        history = get_history()
        assert len(history) == 200  # maxlen=200