import pytest from mcpscope.api.server import create_app from mcpscope.storage.store import Store from mcpscope.models.finding import Finding, Severity from mcpscope.models.scan import ScanRun @pytest.fixture def store(): import tempfile import os with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as f: db_path = f.name s = Store(db_path=db_path) s.save_scan( ScanRun(id="http-test", scanner="test", target="t"), [ Finding( scan_id="http-test", scanner="test", tool_name="tool_a", severity=Severity.CRITICAL, title="Critical finding", ), Finding( scan_id="http-test", scanner="test", tool_name="tool_b", severity=Severity.LOW, title="Low priority", ), ], ) yield s s._conn.close() os.unlink(db_path) @pytest.fixture def client(store): from httpx import AsyncClient, ASGITransport app = create_app(store) transport = ASGITransport(app=app) return AsyncClient(transport=transport, base_url="http://test") @pytest.mark.asyncio async def test_health(client): r = await client.get("/api/health") assert r.status_code == 200 assert r.json() == {"status": "ok"} @pytest.mark.asyncio async def test_list_scans(client): r = await client.get("/api/scans") assert r.status_code == 200 data = r.json() assert data["total"] >= 1 assert len(data["scans"]) >= 1 assert "page" in data assert "pages" in data @pytest.mark.asyncio async def test_list_scans_pagination(client): r = await client.get("/api/scans?page=1&page_size=1") assert r.status_code == 200 data = r.json() assert data["page"] == 1 assert data["page_size"] == 1 @pytest.mark.asyncio async def test_get_scan(client): r = await client.get("/api/scans/http-test") assert r.status_code == 200 data = r.json() assert data["scan"]["id"] == "http-test" assert len(data["findings"]) >= 1 @pytest.mark.asyncio async def test_get_scan_not_found(client): r = await client.get("/api/scans/nonexistent") assert r.status_code == 404 @pytest.mark.asyncio async def test_list_findings(client): r = await client.get("/api/findings") assert r.status_code == 200 data = r.json() assert data["total"] >= 2 assert len(data["findings"]) >= 2 @pytest.mark.asyncio async def test_findings_filter_severity(client): r = await client.get("/api/findings?severity=critical") assert r.status_code == 200 data = r.json() assert data["total"] >= 1 for f in data["findings"]: assert f["severity"] == "critical" @pytest.mark.asyncio async def test_findings_filter_search(client): r = await client.get("/api/findings?search=Critical") assert r.status_code == 200 data = r.json() assert data["total"] >= 1 @pytest.mark.asyncio async def test_findings_filter_scanner(client): r = await client.get("/api/findings?scanner=test") assert r.status_code == 200 data = r.json() assert data["total"] >= 2 @pytest.mark.asyncio async def test_get_finding(client): r = await client.get("/api/findings") fid = r.json()["findings"][0]["id"] r = await client.get(f"/api/findings/{fid}") assert r.status_code == 200 assert r.json()["finding"]["id"] == fid assert r.json()["scan"] is not None @pytest.mark.asyncio async def test_get_finding_not_found(client): r = await client.get("/api/findings/nonexistent") assert r.status_code == 404 @pytest.mark.asyncio async def test_stats_summary(client): r = await client.get("/api/stats/summary") assert r.status_code == 200 data = r.json() assert "total_scans" in data assert "critical" in data assert data["total_scans"] >= 1 @pytest.mark.asyncio async def test_stats_top_tools(client): r = await client.get("/api/stats/top-tools") assert r.status_code == 200 data = r.json() assert "tools" in data @pytest.mark.asyncio async def test_stats_scanners(client): r = await client.get("/api/stats/scanners") assert r.status_code == 200 data = r.json() assert "test" in data["scanners"] @pytest.mark.asyncio async def test_stats_duplicates(client): r = await client.get("/api/stats/duplicates") assert r.status_code == 200 assert "duplicates" in r.json() @pytest.mark.asyncio async def test_report_json(client): r = await client.get("/api/report/json") assert r.status_code == 200 data = r.json() assert "summary" in data assert "generated_at" in data @pytest.mark.asyncio async def test_report_csv(client): r = await client.get("/api/report/csv") assert r.status_code == 200 assert r.headers["content-type"] == "text/csv; charset=utf-8" body = r.text assert "severity" in body assert "critical" in body @pytest.mark.asyncio async def test_dashboard_html(client): r = await client.get("/") assert r.status_code == 200 assert "MCP-Scope" in r.text @pytest.mark.asyncio async def test_finding_detail_html(client): r = await client.get("/api/findings") fid = r.json()["findings"][0]["id"] r = await client.get(f"/findings/{fid}") assert r.status_code == 200 assert "Finding Detail" in r.text or "MCP-Scope" in r.text @pytest.mark.asyncio async def test_api_key_protection(client): from mcpscope.api.server import create_app from mcpscope.config import Settings import tempfile import os with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as f: db_path = f.name s = Store(db_path=db_path) cfg = Settings(api_key="test-key-123") app = create_app(s, settings=cfg) from httpx import AsyncClient, ASGITransport transport = ASGITransport(app=app) c = AsyncClient(transport=transport, base_url="http://test") r = await c.get("/api/scans") assert r.status_code == 401 r = await c.get("/api/scans", headers={"X-API-Key": "test-key-123"}) assert r.status_code == 200 r = await c.get("/") assert r.status_code == 200 await c.aclose() s._conn.close() os.unlink(db_path) @pytest.mark.asyncio async def test_diff_endpoint(client): r = await client.get("/api/scans") scans = r.json()["scans"] if len(scans) >= 2: a, b = scans[0]["id"], scans[1]["id"] r = await client.get(f"/api/scans/{a}/diff/{b}") assert r.status_code == 200 data = r.json() assert "new_count" in data assert "fixed_count" in data @pytest.mark.asyncio async def test_cors_headers(client): r = await client.options("/api/health") assert "access-control-allow-origin" in r.headers or r.status_code in ( 200, 204, 405, ) @pytest.mark.asyncio async def test_swagger_docs(client): r = await client.get("/docs") assert r.status_code in (200, 404) @pytest.mark.asyncio async def test_openapi_json(client): r = await client.get("/api/openapi.json") assert r.status_code in (200, 404) if r.status_code == 200: assert r.json()["info"]["title"] == "MCP-Scope API" @pytest.mark.asyncio async def test_post_event(client): r = await client.post( "/api/events", json={ "event_type": "prompt_injection", "severity": "high", "message": "Test injection blocked", "source": "mcpguard-test", "blocked": True, }, ) assert r.status_code == 200 data = r.json() assert data["status"] == "ok" assert data["id"] is not None @pytest.mark.asyncio async def test_list_events(client): await client.post( "/api/events", json={ "event_type": "jailbreak_pattern", "severity": "critical", "message": "GODMODE detected", "tool": "test_tool", }, ) r = await client.get("/api/events") assert r.status_code == 200 data = r.json() assert data["total"] >= 1 assert len(data["events"]) >= 1 assert data["events"][0]["event_type"] == "jailbreak_pattern" @pytest.mark.asyncio async def test_event_stats(client): r = await client.get("/api/events/stats") assert r.status_code == 200 data = r.json() assert "total" in data assert "blocked" in data @pytest.mark.asyncio async def test_clear_events(client): r = await client.delete("/api/events") assert r.status_code == 200 assert r.json()["status"] == "ok"