Spaces:
Running
Running
| """tests/test_api.py β FastAPI endpoint integration tests.""" | |
| from __future__ import annotations | |
| import io | |
| import pytest | |
| import numpy as np | |
| from PIL import Image | |
| from fastapi.testclient import TestClient | |
| def client(): | |
| from src.api.main import app | |
| return TestClient(app) | |
| def jpeg_bytes(): | |
| arr = (np.random.rand(224, 224, 3) * 255).astype(np.uint8) | |
| buf = io.BytesIO() | |
| Image.fromarray(arr).save(buf, format="JPEG") | |
| return buf.getvalue() | |
| # ββ GET /health βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def test_health_returns_200(client): | |
| r = client.get("/health") | |
| assert r.status_code == 200 | |
| def test_health_has_required_fields(client): | |
| data = client.get("/health").json() | |
| assert data["status"] == "ok" | |
| assert "version" in data | |
| assert "engines" in data | |
| assert "inference_backend" in data | |
| assert "runpod_configured" in data | |
| assert set(data["engines"]) == {"fingerprint", "coherence", "sstgnn"} | |
| def test_health_models_returns_inventory(client): | |
| data = client.get("/health/models").json() | |
| assert "fingerprint" in data | |
| assert "coherence" in data | |
| assert "sstgnn" in data | |
| assert "generator_labels" in data | |
| assert "stable_diffusion" in data["generator_labels"] | |
| # ββ GET / βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def test_root_returns_html(client): | |
| r = client.get("/") | |
| assert r.status_code == 200 | |
| assert "text/html" in r.headers["content-type"] | |
| # ββ POST /detect/image ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def test_detect_image_returns_200(client, jpeg_bytes): | |
| r = client.post( | |
| "/detect/image", | |
| files={"file": ("test.jpg", jpeg_bytes, "image/jpeg")}, | |
| ) | |
| assert r.status_code == 200 | |
| def test_detect_image_response_schema(client, jpeg_bytes): | |
| data = client.post( | |
| "/detect/image", | |
| files={"file": ("test.jpg", jpeg_bytes, "image/jpeg")}, | |
| ).json() | |
| assert data["verdict"] in ("FAKE", "REAL") | |
| assert 0.0 <= data["confidence"] <= 1.0 | |
| assert "attributed_generator" in data | |
| assert "explanation" in data | |
| assert "engine_breakdown" in data | |
| assert len(data["engine_breakdown"]) == 3 | |
| def test_detect_image_engine_names(client, jpeg_bytes): | |
| data = client.post( | |
| "/detect/image", | |
| files={"file": ("test.jpg", jpeg_bytes, "image/jpeg")}, | |
| ).json() | |
| engine_names = {e["engine"] for e in data["engine_breakdown"]} | |
| assert engine_names == {"fingerprint", "coherence", "sstgnn"} | |
| def test_detect_image_engine_confidence_range(client, jpeg_bytes): | |
| data = client.post( | |
| "/detect/image", | |
| files={"file": ("test.jpg", jpeg_bytes, "image/jpeg")}, | |
| ).json() | |
| for engine in data["engine_breakdown"]: | |
| assert 0.0 <= engine["confidence"] <= 1.0 | |
| assert engine["verdict"] in ("FAKE", "REAL") | |
| def test_detect_image_too_large_returns_413(client): | |
| big = b"x" * (21 * 1024 * 1024) # 21MB > 20MB limit | |
| r = client.post( | |
| "/detect/image", | |
| files={"file": ("big.jpg", big, "image/jpeg")}, | |
| ) | |
| assert r.status_code == 413 | |
| def test_detect_image_wrong_type_returns_415(client, jpeg_bytes): | |
| r = client.post( | |
| "/detect/image", | |
| files={"file": ("test.mp4", jpeg_bytes, "video/mp4")}, | |
| ) | |
| assert r.status_code == 415 | |
| def test_detect_image_processing_time_positive(client, jpeg_bytes): | |
| data = client.post( | |
| "/detect/image", | |
| files={"file": ("test.jpg", jpeg_bytes, "image/jpeg")}, | |
| ).json() | |
| assert data["processing_time_ms"] >= 0 | |
| # ββ POST /detect/video ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def test_detect_video_wrong_type_returns_415(client, jpeg_bytes): | |
| r = client.post( | |
| "/detect/video", | |
| files={"file": ("test.jpg", jpeg_bytes, "image/jpeg")}, | |
| ) | |
| assert r.status_code == 415 | |