| """Unit tests for AtlasOps tool wrappers.""" |
|
|
| import json |
| from unittest.mock import MagicMock, patch |
|
|
| import pytest |
|
|
|
|
| |
|
|
| class TestKubectlGet: |
| def test_returns_dict_on_success(self): |
| from agents.tools.kubectl import kubectl_get |
| mock_result = MagicMock(returncode=0, stdout='{"items":[]}', stderr="") |
| with patch("subprocess.run", return_value=mock_result): |
| result = kubectl_get("pods") |
| assert result["success"] is True |
| assert "parsed" in result |
|
|
| def test_returns_failure_on_nonzero(self): |
| from agents.tools.kubectl import kubectl_get |
| mock_result = MagicMock(returncode=1, stdout="", stderr="connection refused") |
| with patch("subprocess.run", return_value=mock_result): |
| result = kubectl_get("pods") |
| assert result["success"] is False |
|
|
| def test_namespace_flag_all(self): |
| from agents.tools.kubectl import kubectl_get |
| mock_result = MagicMock(returncode=0, stdout='{"items":[]}', stderr="") |
| with patch("subprocess.run", return_value=mock_result) as mock_run: |
| kubectl_get("pods", namespace="-A") |
| cmd = mock_run.call_args[0][0] |
| assert "-A" in cmd |
|
|
| def test_namespace_flag_specific(self): |
| from agents.tools.kubectl import kubectl_get |
| mock_result = MagicMock(returncode=0, stdout='{"items":[]}', stderr="") |
| with patch("subprocess.run", return_value=mock_result) as mock_run: |
| kubectl_get("pods", namespace="default") |
| cmd = mock_run.call_args[0][0] |
| assert "-n" in cmd and "default" in cmd |
|
|
|
|
| class TestKubectlScale: |
| def test_valid_replicas(self): |
| from agents.tools.kubectl import kubectl_scale |
| mock_result = MagicMock(returncode=0, stdout="scaled", stderr="") |
| with patch("subprocess.run", return_value=mock_result): |
| result = kubectl_scale("frontend", 3) |
| assert result["success"] is True |
|
|
| def test_rejects_negative_replicas(self): |
| from agents.tools.kubectl import kubectl_scale |
| result = kubectl_scale("frontend", -1) |
| assert result["success"] is False |
|
|
| def test_rejects_too_many_replicas(self): |
| from agents.tools.kubectl import kubectl_scale |
| result = kubectl_scale("frontend", 99) |
| assert result["success"] is False |
|
|
|
|
| class TestKubectlExec: |
| def test_allowlisted_command_passes(self): |
| from agents.tools.kubectl import kubectl_exec |
| mock_result = MagicMock(returncode=0, stdout="output", stderr="") |
| with patch("subprocess.run", return_value=mock_result): |
| result = kubectl_exec("mypod", ["ls", "/tmp"]) |
| assert result["success"] is True |
|
|
| def test_non_allowlisted_command_blocked(self): |
| from agents.tools.kubectl import kubectl_exec |
| result = kubectl_exec("mypod", ["rm", "-rf", "/"]) |
| assert result["success"] is False |
| assert "allowlist" in result["error"] |
|
|
| def test_empty_command_blocked(self): |
| from agents.tools.kubectl import kubectl_exec |
| result = kubectl_exec("mypod", []) |
| assert result["success"] is False |
|
|
|
|
| |
|
|
| class TestPromQL: |
| def test_successful_query(self): |
| from agents.tools.prometheus import promql_query |
| mock_response = MagicMock() |
| mock_response.raise_for_status = MagicMock() |
| mock_response.json.return_value = { |
| "status": "success", |
| "data": {"resultType": "vector", "result": [{"metric": {}, "value": [1, "0.5"]}]}, |
| } |
| with patch("requests.get", return_value=mock_response): |
| result = promql_query("up") |
| assert result["success"] is True |
| assert len(result["result"]) == 1 |
|
|
| def test_prometheus_error_response(self): |
| from agents.tools.prometheus import promql_query |
| mock_response = MagicMock() |
| mock_response.raise_for_status = MagicMock() |
| mock_response.json.return_value = {"status": "error", "error": "bad_data"} |
| with patch("requests.get", return_value=mock_response): |
| result = promql_query("invalid{}") |
| assert result["success"] is False |
|
|
| def test_connection_error(self): |
| from agents.tools.prometheus import promql_query |
| import requests as req |
| with patch("requests.get", side_effect=req.exceptions.ConnectionError("refused")): |
| result = promql_query("up") |
| assert result["success"] is False |
| assert "error" in result |
|
|
|
|
| |
|
|
| class TestAlertmanager: |
| def test_silence_valid_matchers(self): |
| from agents.tools.alertmanager import alertmanager_silence |
| mock_response = MagicMock() |
| mock_response.raise_for_status = MagicMock() |
| mock_response.json.return_value = {"silenceID": "abc-123"} |
| with patch("requests.post", return_value=mock_response): |
| result = alertmanager_silence( |
| matchers=[{"name": "alertname", "value": "HighCPU", "isRegex": False}], |
| duration_minutes=30, |
| ) |
| assert result["success"] is True |
| assert result["silence_id"] == "abc-123" |
|
|
| def test_silence_returns_ends_at(self): |
| from agents.tools.alertmanager import alertmanager_silence |
| mock_response = MagicMock() |
| mock_response.raise_for_status = MagicMock() |
| mock_response.json.return_value = {"silenceID": "xyz"} |
| with patch("requests.post", return_value=mock_response): |
| result = alertmanager_silence( |
| matchers=[{"name": "alertname", "value": "Test", "isRegex": False}] |
| ) |
| assert "ends_at" in result |
|
|
| def test_list_alerts_success(self): |
| from agents.tools.alertmanager import alertmanager_list_alerts |
| mock_response = MagicMock() |
| mock_response.raise_for_status = MagicMock() |
| mock_response.json.return_value = [ |
| {"labels": {"alertname": "HighCPU", "severity": "critical", "namespace": "default"}, |
| "status": {"state": "firing"}, "startsAt": "2026-05-07T00:00:00Z"} |
| ] |
| with patch("requests.get", return_value=mock_response): |
| result = alertmanager_list_alerts() |
| assert result["success"] is True |
| assert result["count"] == 1 |
| assert result["alerts"][0]["alertname"] == "HighCPU" |
|
|
|
|
| |
|
|
| class TestSlackPost: |
| def test_logs_locally_when_no_webhook(self, tmp_path, monkeypatch): |
| monkeypatch.setenv("SLACK_WEBHOOK_URL", "") |
| monkeypatch.setenv("POSTMORTEM_DIR", str(tmp_path)) |
| import agents.tools.comms as comms_mod |
| monkeypatch.setattr(comms_mod, "SLACK_WEBHOOK", "") |
| from agents.tools.comms import slack_post_update |
| import importlib |
| result = slack_post_update( |
| channel="#incidents", severity="P1", |
| title="Test incident", summary="Something broke" |
| ) |
| assert result["success"] is True |
| assert result["mode"] == "logged_locally" |
|
|
| def test_postmortem_draft_creates_file(self, tmp_path, monkeypatch): |
| monkeypatch.setenv("POSTMORTEM_DIR", str(tmp_path)) |
| import agents.tools.comms as comms_mod |
| from pathlib import Path |
| monkeypatch.setattr(comms_mod, "POSTMORTEM_DIR", tmp_path) |
| from agents.tools.comms import postmortem_draft |
| incident = { |
| "title": "Test Incident", |
| "severity": "P1", |
| "duration": "5m", |
| "authors": "AtlasOps", |
| "summary": "Test summary", |
| "impact": "Minor", |
| "timeline": [{"time": "00:01", "event": "alert fired"}], |
| "root_cause": "CPU saturation", |
| "detection": "Prometheus alert", |
| "resolution": "Scaled up", |
| "went_well": ["Fast detection"], |
| "went_wrong": ["Slow response"], |
| "action_items": [{"action": "Add alert", "owner": "@team", "priority": "P2", "due": "2026-06-01"}], |
| } |
| result = postmortem_draft(incident, output_path=str(tmp_path / "test.md")) |
| assert result["success"] is True |
| assert (tmp_path / "test.md").exists() |
| content = (tmp_path / "test.md").read_text() |
| assert "Test Incident" in content |
|
|
|
|
| |
|
|
| class TestJaeger: |
| def test_search_returns_traces(self): |
| from agents.tools.jaeger import jaeger_search |
| mock_response = MagicMock() |
| mock_response.raise_for_status = MagicMock() |
| mock_response.json.return_value = { |
| "data": [ |
| {"traceID": "abc123", "spans": [{"duration": 1500000, "operationName": "/cart"}]}, |
| ] |
| } |
| with patch("requests.get", return_value=mock_response): |
| result = jaeger_search("cartservice") |
| assert result["success"] is True |
| assert result["count"] == 1 |
| assert result["traces"][0]["traceID"] == "abc123" |
|
|
| def test_get_trace_by_id(self): |
| from agents.tools.jaeger import jaeger_get_trace |
| mock_response = MagicMock() |
| mock_response.raise_for_status = MagicMock() |
| mock_response.json.return_value = {"data": [{"traceID": "abc123", "spans": []}]} |
| with patch("requests.get", return_value=mock_response): |
| result = jaeger_get_trace("abc123") |
| assert result["success"] is True |
|
|
| def test_lookback_parser(self): |
| from agents.tools.jaeger import _parse_lookback_to_us |
| assert _parse_lookback_to_us("15m") == 15 * 60 * 1_000_000 |
| assert _parse_lookback_to_us("1h") == 3_600_000_000 |
| assert _parse_lookback_to_us("30s") == 30_000_000 |
|
|
|
|
| |
|
|
| class TestThoughtStream: |
| def test_emit_stores_in_buffer(self): |
| from agents.stream import emit, get_history, _thought_buffer |
| _thought_buffer.clear() |
| emit("triage", "tool_call", "Checking pods...", tool="kubectl_get") |
| history = get_history() |
| assert len(history) == 1 |
| assert history[0]["role"] == "triage" |
| assert history[0]["thought"] == "Checking pods..." |
|
|
| def test_emit_multiple_roles(self): |
| from agents.stream import emit, get_history, _thought_buffer |
| _thought_buffer.clear() |
| emit("triage", "conclusion", "P1 assigned") |
| emit("diagnosis", "tool_call", "Querying Prometheus") |
| emit("remediation", "tool_result", "Rollback executed") |
| history = get_history() |
| assert len(history) == 3 |
| roles = [h["role"] for h in history] |
| assert roles == ["triage", "diagnosis", "remediation"] |
|
|
| def test_buffer_max_size(self): |
| from agents.stream import emit, get_history, _thought_buffer |
| _thought_buffer.clear() |
| for i in range(250): |
| emit("triage", "thinking", f"thought {i}") |
| history = get_history() |
| assert len(history) == 200 |
|
|