LIBRE / tests /integration /test_api.py
RyZ
deploy: fix hf deploy
d852d80
Raw
History Blame Contribute Delete
6.04 kB
"""
tests/integration/test_api.py
───────────────────────────────
Integration tests for FastAPI endpoints.
Uses:
β€’ httpx.AsyncClient β€” async HTTP client for FastAPI TestClient
β€’ SQLite in-memory database (aiosqlite) β€” no PostgreSQL needed
β€’ MockModelService + InMemoryBroker β€” no RabbitMQ needed
These tests exercise the full request/response cycle through all layers
(Interface β†’ Application β†’ Domain β†’ Infrastructure) using real SQLite.
"""
from __future__ import annotations
import os
import pytest
import pytest_asyncio
# ── Override settings BEFORE importing anything from src ─────────────────────
os.environ["DATABASE_URL"] = "sqlite+aiosqlite:///./test_bp_monitoring_clean.db"
os.environ["RABBITMQ_URL"] = "amqp://guest:guest@localhost:5672/"
os.environ["USE_MOCK_MODEL"] = "true"
os.environ["DEBUG"] = "true"
# SQLite compatibility compilers are now globally registered in connection.py
from httpx import ASGITransport, AsyncClient
from src.infrastructure.database.connection import create_all_tables, dispose_engine
from src.interface.api.app import create_app
from src.shared.config import get_settings
from src.domain.interfaces.services.message_broker import MessageBroker
from src.interface.api.dependencies import get_broker
class MockBroker(MessageBroker):
async def connect(self) -> None: pass
async def disconnect(self) -> None: pass
async def is_connected(self) -> bool: return True
async def publish(self, queue_name: str, message: dict, **kwargs) -> None: pass
async def consume(self, queue_name: str, handler, **kwargs) -> None: pass
@pytest_asyncio.fixture(scope="module")
async def app():
"""Create a test FastAPI app with SQLite DB."""
# Clear cached settings so env overrides take effect
get_settings.cache_clear()
test_app = create_app()
test_app.dependency_overrides[get_broker] = lambda: MockBroker()
await create_all_tables()
yield test_app
await dispose_engine()
# Clean up test DB file
if os.path.exists("./test_bp_monitoring_clean.db"):
os.remove("./test_bp_monitoring_clean.db")
@pytest_asyncio.fixture(scope="module")
async def client(app):
"""Async HTTP client for the test app."""
async with AsyncClient(
transport=ASGITransport(app=app),
base_url="http://test",
) as c:
yield c
# ── Health Check ──────────────────────────────────────────────────────────────
class TestHealthEndpoint:
async def test_health_returns_ok(self, client: AsyncClient) -> None:
response = await client.get("/health")
assert response.status_code == 200
data = response.json()
assert data["status"] == "ok"
async def test_root_endpoint(self, client: AsyncClient) -> None:
response = await client.get("/")
assert response.status_code == 200
assert "BP Monitoring Pipeline" in response.json()["service"]
# ── PPG Ingestion ─────────────────────────────────────────────────────────────
VALID_PPG_PAYLOAD = {
"device_id": "sensor-001",
"user_id": "test-user-integration",
"sampling_rate": 125.0,
"ppg_values": [0.1, 0.2, 0.5, 0.9, 0.8, 0.4, 0.15] * 180, # ~10s at 125 Hz
"duration_seconds": 10.0,
}
class TestPPGIngestEndpoint:
async def test_ingest_valid_payload_returns_201(self, client: AsyncClient) -> None:
response = await client.post("/api/v1/ppg/ingest", json=VALID_PPG_PAYLOAD)
assert response.status_code == 201
data = response.json()
assert "signal_id" in data
assert data["user_id"] == "test-user-integration"
assert data["device_id"] == "sensor-001"
assert data["num_samples"] > 0
async def test_ingest_empty_ppg_values_returns_422(self, client: AsyncClient) -> None:
payload = {**VALID_PPG_PAYLOAD, "ppg_values": []}
response = await client.post("/api/v1/ppg/ingest", json=payload)
assert response.status_code == 422 # Pydantic validation error
async def test_ingest_invalid_sampling_rate_returns_422(self, client: AsyncClient) -> None:
payload = {**VALID_PPG_PAYLOAD, "sampling_rate": -10.0}
response = await client.post("/api/v1/ppg/ingest", json=payload)
assert response.status_code == 422
async def test_ingest_missing_required_field_returns_422(self, client: AsyncClient) -> None:
payload = {k: v for k, v in VALID_PPG_PAYLOAD.items() if k != "device_id"}
response = await client.post("/api/v1/ppg/ingest", json=payload)
assert response.status_code == 422
# ── Predictions ───────────────────────────────────────────────────────────────
class TestPredictionEndpoints:
async def test_get_predictions_empty_user_returns_200(self, client: AsyncClient) -> None:
response = await client.get("/api/v1/predictions/nonexistent-user")
assert response.status_code == 200
data = response.json()
assert data["user_id"] == "nonexistent-user"
assert data["total"] == 0
assert data["predictions"] == []
async def test_get_predictions_with_limit(self, client: AsyncClient) -> None:
response = await client.get("/api/v1/predictions/test-user-integration?limit=5")
assert response.status_code == 200
async def test_get_prediction_by_signal_not_found_returns_404(
self, client: AsyncClient
) -> None:
response = await client.get(
"/api/v1/predictions/test-user/signal/nonexistent-signal-id"
)
assert response.status_code == 404