mcpscope / tests /test_http.py
Syntho's picture
Upload folder using huggingface_hub
5aa5d28 verified
Raw
History Blame Contribute Delete
8.63 kB
import pytest
from mcpscope.api.server import create_app
from mcpscope.storage.store import Store
from mcpscope.models.finding import Finding, Severity
from mcpscope.models.scan import ScanRun
@pytest.fixture
def store():
import tempfile
import os
with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as f:
db_path = f.name
s = Store(db_path=db_path)
s.save_scan(
ScanRun(id="http-test", scanner="test", target="t"),
[
Finding(
scan_id="http-test",
scanner="test",
tool_name="tool_a",
severity=Severity.CRITICAL,
title="Critical finding",
),
Finding(
scan_id="http-test",
scanner="test",
tool_name="tool_b",
severity=Severity.LOW,
title="Low priority",
),
],
)
yield s
s._conn.close()
os.unlink(db_path)
@pytest.fixture
def client(store):
from httpx import AsyncClient, ASGITransport
app = create_app(store)
transport = ASGITransport(app=app)
return AsyncClient(transport=transport, base_url="http://test")
@pytest.mark.asyncio
async def test_health(client):
r = await client.get("/api/health")
assert r.status_code == 200
assert r.json() == {"status": "ok"}
@pytest.mark.asyncio
async def test_list_scans(client):
r = await client.get("/api/scans")
assert r.status_code == 200
data = r.json()
assert data["total"] >= 1
assert len(data["scans"]) >= 1
assert "page" in data
assert "pages" in data
@pytest.mark.asyncio
async def test_list_scans_pagination(client):
r = await client.get("/api/scans?page=1&page_size=1")
assert r.status_code == 200
data = r.json()
assert data["page"] == 1
assert data["page_size"] == 1
@pytest.mark.asyncio
async def test_get_scan(client):
r = await client.get("/api/scans/http-test")
assert r.status_code == 200
data = r.json()
assert data["scan"]["id"] == "http-test"
assert len(data["findings"]) >= 1
@pytest.mark.asyncio
async def test_get_scan_not_found(client):
r = await client.get("/api/scans/nonexistent")
assert r.status_code == 404
@pytest.mark.asyncio
async def test_list_findings(client):
r = await client.get("/api/findings")
assert r.status_code == 200
data = r.json()
assert data["total"] >= 2
assert len(data["findings"]) >= 2
@pytest.mark.asyncio
async def test_findings_filter_severity(client):
r = await client.get("/api/findings?severity=critical")
assert r.status_code == 200
data = r.json()
assert data["total"] >= 1
for f in data["findings"]:
assert f["severity"] == "critical"
@pytest.mark.asyncio
async def test_findings_filter_search(client):
r = await client.get("/api/findings?search=Critical")
assert r.status_code == 200
data = r.json()
assert data["total"] >= 1
@pytest.mark.asyncio
async def test_findings_filter_scanner(client):
r = await client.get("/api/findings?scanner=test")
assert r.status_code == 200
data = r.json()
assert data["total"] >= 2
@pytest.mark.asyncio
async def test_get_finding(client):
r = await client.get("/api/findings")
fid = r.json()["findings"][0]["id"]
r = await client.get(f"/api/findings/{fid}")
assert r.status_code == 200
assert r.json()["finding"]["id"] == fid
assert r.json()["scan"] is not None
@pytest.mark.asyncio
async def test_get_finding_not_found(client):
r = await client.get("/api/findings/nonexistent")
assert r.status_code == 404
@pytest.mark.asyncio
async def test_stats_summary(client):
r = await client.get("/api/stats/summary")
assert r.status_code == 200
data = r.json()
assert "total_scans" in data
assert "critical" in data
assert data["total_scans"] >= 1
@pytest.mark.asyncio
async def test_stats_top_tools(client):
r = await client.get("/api/stats/top-tools")
assert r.status_code == 200
data = r.json()
assert "tools" in data
@pytest.mark.asyncio
async def test_stats_scanners(client):
r = await client.get("/api/stats/scanners")
assert r.status_code == 200
data = r.json()
assert "test" in data["scanners"]
@pytest.mark.asyncio
async def test_stats_duplicates(client):
r = await client.get("/api/stats/duplicates")
assert r.status_code == 200
assert "duplicates" in r.json()
@pytest.mark.asyncio
async def test_report_json(client):
r = await client.get("/api/report/json")
assert r.status_code == 200
data = r.json()
assert "summary" in data
assert "generated_at" in data
@pytest.mark.asyncio
async def test_report_csv(client):
r = await client.get("/api/report/csv")
assert r.status_code == 200
assert r.headers["content-type"] == "text/csv; charset=utf-8"
body = r.text
assert "severity" in body
assert "critical" in body
@pytest.mark.asyncio
async def test_dashboard_html(client):
r = await client.get("/")
assert r.status_code == 200
assert "MCP-Scope" in r.text
@pytest.mark.asyncio
async def test_finding_detail_html(client):
r = await client.get("/api/findings")
fid = r.json()["findings"][0]["id"]
r = await client.get(f"/findings/{fid}")
assert r.status_code == 200
assert "Finding Detail" in r.text or "MCP-Scope" in r.text
@pytest.mark.asyncio
async def test_api_key_protection(client):
from mcpscope.api.server import create_app
from mcpscope.config import Settings
import tempfile
import os
with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as f:
db_path = f.name
s = Store(db_path=db_path)
cfg = Settings(api_key="test-key-123")
app = create_app(s, settings=cfg)
from httpx import AsyncClient, ASGITransport
transport = ASGITransport(app=app)
c = AsyncClient(transport=transport, base_url="http://test")
r = await c.get("/api/scans")
assert r.status_code == 401
r = await c.get("/api/scans", headers={"X-API-Key": "test-key-123"})
assert r.status_code == 200
r = await c.get("/")
assert r.status_code == 200
await c.aclose()
s._conn.close()
os.unlink(db_path)
@pytest.mark.asyncio
async def test_diff_endpoint(client):
r = await client.get("/api/scans")
scans = r.json()["scans"]
if len(scans) >= 2:
a, b = scans[0]["id"], scans[1]["id"]
r = await client.get(f"/api/scans/{a}/diff/{b}")
assert r.status_code == 200
data = r.json()
assert "new_count" in data
assert "fixed_count" in data
@pytest.mark.asyncio
async def test_cors_headers(client):
r = await client.options("/api/health")
assert "access-control-allow-origin" in r.headers or r.status_code in (
200,
204,
405,
)
@pytest.mark.asyncio
async def test_swagger_docs(client):
r = await client.get("/docs")
assert r.status_code in (200, 404)
@pytest.mark.asyncio
async def test_openapi_json(client):
r = await client.get("/api/openapi.json")
assert r.status_code in (200, 404)
if r.status_code == 200:
assert r.json()["info"]["title"] == "MCP-Scope API"
@pytest.mark.asyncio
async def test_post_event(client):
r = await client.post(
"/api/events",
json={
"event_type": "prompt_injection",
"severity": "high",
"message": "Test injection blocked",
"source": "mcpguard-test",
"blocked": True,
},
)
assert r.status_code == 200
data = r.json()
assert data["status"] == "ok"
assert data["id"] is not None
@pytest.mark.asyncio
async def test_list_events(client):
await client.post(
"/api/events",
json={
"event_type": "jailbreak_pattern",
"severity": "critical",
"message": "GODMODE detected",
"tool": "test_tool",
},
)
r = await client.get("/api/events")
assert r.status_code == 200
data = r.json()
assert data["total"] >= 1
assert len(data["events"]) >= 1
assert data["events"][0]["event_type"] == "jailbreak_pattern"
@pytest.mark.asyncio
async def test_event_stats(client):
r = await client.get("/api/events/stats")
assert r.status_code == 200
data = r.json()
assert "total" in data
assert "blocked" in data
@pytest.mark.asyncio
async def test_clear_events(client):
r = await client.delete("/api/events")
assert r.status_code == 200
assert r.json()["status"] == "ok"