atlasops / tests /test_tools.py
Harikishanth R
fix: skip-kubectl + scroll + health β€” HF Space ready
7e9a520
"""Unit tests for AtlasOps tool wrappers."""
import json
from unittest.mock import MagicMock, patch
import pytest
# ── kubectl tools ──────────────────────────────────────────────────────────────
class TestKubectlGet:
def test_returns_dict_on_success(self):
from agents.tools.kubectl import kubectl_get
mock_result = MagicMock(returncode=0, stdout='{"items":[]}', stderr="")
with patch("subprocess.run", return_value=mock_result):
result = kubectl_get("pods")
assert result["success"] is True
assert "parsed" in result
def test_returns_failure_on_nonzero(self):
from agents.tools.kubectl import kubectl_get
mock_result = MagicMock(returncode=1, stdout="", stderr="connection refused")
with patch("subprocess.run", return_value=mock_result):
result = kubectl_get("pods")
assert result["success"] is False
def test_namespace_flag_all(self):
from agents.tools.kubectl import kubectl_get
mock_result = MagicMock(returncode=0, stdout='{"items":[]}', stderr="")
with patch("subprocess.run", return_value=mock_result) as mock_run:
kubectl_get("pods", namespace="-A")
cmd = mock_run.call_args[0][0]
assert "-A" in cmd
def test_namespace_flag_specific(self):
from agents.tools.kubectl import kubectl_get
mock_result = MagicMock(returncode=0, stdout='{"items":[]}', stderr="")
with patch("subprocess.run", return_value=mock_result) as mock_run:
kubectl_get("pods", namespace="default")
cmd = mock_run.call_args[0][0]
assert "-n" in cmd and "default" in cmd
class TestKubectlScale:
def test_valid_replicas(self):
from agents.tools.kubectl import kubectl_scale
mock_result = MagicMock(returncode=0, stdout="scaled", stderr="")
with patch("subprocess.run", return_value=mock_result):
result = kubectl_scale("frontend", 3)
assert result["success"] is True
def test_rejects_negative_replicas(self):
from agents.tools.kubectl import kubectl_scale
result = kubectl_scale("frontend", -1)
assert result["success"] is False
def test_rejects_too_many_replicas(self):
from agents.tools.kubectl import kubectl_scale
result = kubectl_scale("frontend", 99)
assert result["success"] is False
class TestKubectlExec:
def test_allowlisted_command_passes(self):
from agents.tools.kubectl import kubectl_exec
mock_result = MagicMock(returncode=0, stdout="output", stderr="")
with patch("subprocess.run", return_value=mock_result):
result = kubectl_exec("mypod", ["ls", "/tmp"])
assert result["success"] is True
def test_non_allowlisted_command_blocked(self):
from agents.tools.kubectl import kubectl_exec
result = kubectl_exec("mypod", ["rm", "-rf", "/"])
assert result["success"] is False
assert "allowlist" in result["error"]
def test_empty_command_blocked(self):
from agents.tools.kubectl import kubectl_exec
result = kubectl_exec("mypod", [])
assert result["success"] is False
# ── Prometheus tools ───────────────────────────────────────────────────────────
class TestPromQL:
def test_successful_query(self):
from agents.tools.prometheus import promql_query
mock_response = MagicMock()
mock_response.raise_for_status = MagicMock()
mock_response.json.return_value = {
"status": "success",
"data": {"resultType": "vector", "result": [{"metric": {}, "value": [1, "0.5"]}]},
}
with patch("requests.get", return_value=mock_response):
result = promql_query("up")
assert result["success"] is True
assert len(result["result"]) == 1
def test_prometheus_error_response(self):
from agents.tools.prometheus import promql_query
mock_response = MagicMock()
mock_response.raise_for_status = MagicMock()
mock_response.json.return_value = {"status": "error", "error": "bad_data"}
with patch("requests.get", return_value=mock_response):
result = promql_query("invalid{}")
assert result["success"] is False
def test_connection_error(self):
from agents.tools.prometheus import promql_query
import requests as req
with patch("requests.get", side_effect=req.exceptions.ConnectionError("refused")):
result = promql_query("up")
assert result["success"] is False
assert "error" in result
# ── Alertmanager tools ─────────────────────────────────────────────────────────
class TestAlertmanager:
def test_silence_valid_matchers(self):
from agents.tools.alertmanager import alertmanager_silence
mock_response = MagicMock()
mock_response.raise_for_status = MagicMock()
mock_response.json.return_value = {"silenceID": "abc-123"}
with patch("requests.post", return_value=mock_response):
result = alertmanager_silence(
matchers=[{"name": "alertname", "value": "HighCPU", "isRegex": False}],
duration_minutes=30,
)
assert result["success"] is True
assert result["silence_id"] == "abc-123"
def test_silence_returns_ends_at(self):
from agents.tools.alertmanager import alertmanager_silence
mock_response = MagicMock()
mock_response.raise_for_status = MagicMock()
mock_response.json.return_value = {"silenceID": "xyz"}
with patch("requests.post", return_value=mock_response):
result = alertmanager_silence(
matchers=[{"name": "alertname", "value": "Test", "isRegex": False}]
)
assert "ends_at" in result
def test_list_alerts_success(self):
from agents.tools.alertmanager import alertmanager_list_alerts
mock_response = MagicMock()
mock_response.raise_for_status = MagicMock()
mock_response.json.return_value = [
{"labels": {"alertname": "HighCPU", "severity": "critical", "namespace": "default"},
"status": {"state": "firing"}, "startsAt": "2026-05-07T00:00:00Z"}
]
with patch("requests.get", return_value=mock_response):
result = alertmanager_list_alerts()
assert result["success"] is True
assert result["count"] == 1
assert result["alerts"][0]["alertname"] == "HighCPU"
# ── Comms tools ────────────────────────────────────────────────────────────────
class TestSlackPost:
def test_logs_locally_when_no_webhook(self, tmp_path, monkeypatch):
monkeypatch.setenv("SLACK_WEBHOOK_URL", "")
monkeypatch.setenv("POSTMORTEM_DIR", str(tmp_path))
import agents.tools.comms as comms_mod
monkeypatch.setattr(comms_mod, "SLACK_WEBHOOK", "")
from agents.tools.comms import slack_post_update
import importlib
result = slack_post_update(
channel="#incidents", severity="P1",
title="Test incident", summary="Something broke"
)
assert result["success"] is True
assert result["mode"] == "logged_locally"
def test_postmortem_draft_creates_file(self, tmp_path, monkeypatch):
monkeypatch.setenv("POSTMORTEM_DIR", str(tmp_path))
import agents.tools.comms as comms_mod
from pathlib import Path
monkeypatch.setattr(comms_mod, "POSTMORTEM_DIR", tmp_path)
from agents.tools.comms import postmortem_draft
incident = {
"title": "Test Incident",
"severity": "P1",
"duration": "5m",
"authors": "AtlasOps",
"summary": "Test summary",
"impact": "Minor",
"timeline": [{"time": "00:01", "event": "alert fired"}],
"root_cause": "CPU saturation",
"detection": "Prometheus alert",
"resolution": "Scaled up",
"went_well": ["Fast detection"],
"went_wrong": ["Slow response"],
"action_items": [{"action": "Add alert", "owner": "@team", "priority": "P2", "due": "2026-06-01"}],
}
result = postmortem_draft(incident, output_path=str(tmp_path / "test.md"))
assert result["success"] is True
assert (tmp_path / "test.md").exists()
content = (tmp_path / "test.md").read_text()
assert "Test Incident" in content
# ── Jaeger tools ───────────────────────────────────────────────────────────────
class TestJaeger:
def test_search_returns_traces(self):
from agents.tools.jaeger import jaeger_search
mock_response = MagicMock()
mock_response.raise_for_status = MagicMock()
mock_response.json.return_value = {
"data": [
{"traceID": "abc123", "spans": [{"duration": 1500000, "operationName": "/cart"}]},
]
}
with patch("requests.get", return_value=mock_response):
result = jaeger_search("cartservice")
assert result["success"] is True
assert result["count"] == 1
assert result["traces"][0]["traceID"] == "abc123"
def test_get_trace_by_id(self):
from agents.tools.jaeger import jaeger_get_trace
mock_response = MagicMock()
mock_response.raise_for_status = MagicMock()
mock_response.json.return_value = {"data": [{"traceID": "abc123", "spans": []}]}
with patch("requests.get", return_value=mock_response):
result = jaeger_get_trace("abc123")
assert result["success"] is True
def test_lookback_parser(self):
from agents.tools.jaeger import _parse_lookback_to_us
assert _parse_lookback_to_us("15m") == 15 * 60 * 1_000_000
assert _parse_lookback_to_us("1h") == 3_600_000_000
assert _parse_lookback_to_us("30s") == 30_000_000
# ── Stream (thought emission) ──────────────────────────────────────────────────
class TestThoughtStream:
def test_emit_stores_in_buffer(self):
from agents.stream import emit, get_history, _thought_buffer
_thought_buffer.clear()
emit("triage", "tool_call", "Checking pods...", tool="kubectl_get")
history = get_history()
assert len(history) == 1
assert history[0]["role"] == "triage"
assert history[0]["thought"] == "Checking pods..."
def test_emit_multiple_roles(self):
from agents.stream import emit, get_history, _thought_buffer
_thought_buffer.clear()
emit("triage", "conclusion", "P1 assigned")
emit("diagnosis", "tool_call", "Querying Prometheus")
emit("remediation", "tool_result", "Rollback executed")
history = get_history()
assert len(history) == 3
roles = [h["role"] for h in history]
assert roles == ["triage", "diagnosis", "remediation"]
def test_buffer_max_size(self):
from agents.stream import emit, get_history, _thought_buffer
_thought_buffer.clear()
for i in range(250):
emit("triage", "thinking", f"thought {i}")
history = get_history()
assert len(history) == 200 # maxlen=200