Spaces:
Sleeping
Sleeping
File size: 4,093 Bytes
6242ddb | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 | """Tests for API endpoints."""
from __future__ import annotations
import io
import json
from unittest.mock import AsyncMock, patch
import pytest
class TestHealthEndpoints:
def test_health(self, client, api_headers):
resp = client.get("/health")
assert resp.status_code == 200
data = resp.json()
assert data["status"] in ("healthy", "degraded")
assert "version" in data
assert "uptime_seconds" in data
def test_liveness(self, client):
resp = client.get("/health/live")
assert resp.status_code == 200
assert resp.json()["status"] == "alive"
class TestUploadEndpoints:
def test_upload_csv(self, client, api_headers, sample_csv_content):
with patch("app.api.analysis.run_analysis", new_callable=AsyncMock) as mock_run:
mock_run.return_value = None
resp = client.post(
"/api/v1/upload",
files={"file": ("test.csv", io.BytesIO(sample_csv_content), "text/csv")},
headers=api_headers,
)
assert resp.status_code == 200
data = resp.json()
assert "job_id" in data
assert data["status"] == "pending"
def test_upload_json(self, client, api_headers, sample_json_content):
with patch("app.api.analysis.run_analysis", new_callable=AsyncMock):
resp = client.post(
"/api/v1/upload",
files={"file": ("test.json", io.BytesIO(sample_json_content), "application/json")},
headers=api_headers,
)
assert resp.status_code == 200
def test_upload_unsupported_format(self, client, api_headers):
resp = client.post(
"/api/v1/upload",
files={"file": ("test.txt", io.BytesIO(b"hello"), "text/plain")},
headers=api_headers,
)
assert resp.status_code == 400
assert "Unsupported" in resp.json()["detail"]
def test_upload_no_api_key(self, client, sample_csv_content):
resp = client.post(
"/api/v1/upload",
files={"file": ("test.csv", io.BytesIO(sample_csv_content), "text/csv")},
)
assert resp.status_code == 403
def test_upload_invalid_api_key(self, client, sample_csv_content):
resp = client.post(
"/api/v1/upload",
files={"file": ("test.csv", io.BytesIO(sample_csv_content), "text/csv")},
headers={"X-API-Key": "wrong-key"},
)
assert resp.status_code == 403
def test_upload_empty_file(self, client, api_headers):
resp = client.post(
"/api/v1/upload",
files={"file": ("test.csv", io.BytesIO(b"text\n"), "text/csv")},
headers=api_headers,
)
assert resp.status_code == 400
class TestJobEndpoints:
def test_list_jobs(self, client, api_headers):
resp = client.get("/api/v1/jobs", headers=api_headers)
assert resp.status_code == 200
assert isinstance(resp.json(), list)
def test_get_nonexistent_job(self, client, api_headers):
resp = client.get("/api/v1/jobs/nonexistent", headers=api_headers)
assert resp.status_code == 404
class TestWebhookEndpoints:
def test_webhook_invalid_signature(self, client):
payload = json.dumps({
"event_type": "feedback",
"data": [{"text": "test feedback"}],
})
resp = client.post(
"/api/v1/webhooks/ingest",
content=payload,
headers={
"Content-Type": "application/json",
"X-Signature": "v1=invalid",
"X-Timestamp": "0",
},
)
assert resp.status_code == 401
def test_webhook_missing_signature(self, client):
payload = json.dumps({"event_type": "feedback", "data": [{"text": "test"}]})
resp = client.post(
"/api/v1/webhooks/ingest",
content=payload,
headers={"Content-Type": "application/json"},
)
assert resp.status_code == 401
|