| """API integration tests — backend dispatch (Phase 3), drift (Phase 4), |
| registry-resolved serving (Phase 7).""" |
|
|
| from __future__ import annotations |
|
|
| import json |
| from collections.abc import Iterator |
| from pathlib import Path |
|
|
| import pytest |
| from fastapi.testclient import TestClient |
|
|
| from api.main import create_app, get_classifier, get_drift_monitor |
| from sentiment.adapters.mlflow_registry_classifier import RegistryVersionInfo |
| from sentiment.domain.classifier import SentimentClassifierPort |
| from sentiment.domain.drift import ( |
| DriftLevel, |
| DriftMonitorPort, |
| DriftReport, |
| SignalReport, |
| ) |
| from sentiment.domain.models import Sentiment, SentimentResult |
|
|
|
|
| @pytest.fixture(autouse=True) |
| def _clean_env(monkeypatch: pytest.MonkeyPatch) -> Iterator[None]: |
| """Strip env vars that would otherwise leak between tests.""" |
| for var in ("MODEL_VERSION", "MLFLOW_TRACKING_URI"): |
| monkeypatch.delenv(var, raising=False) |
| yield |
|
|
|
|
| def _patch_loader( |
| monkeypatch: pytest.MonkeyPatch, |
| *, |
| classifier: SentimentClassifierPort, |
| version_info: RegistryVersionInfo | None, |
| captured: list[dict[str, object]] | None = None, |
| raises: BaseException | None = None, |
| ) -> None: |
| """Replace api.main.load_from_registry_or_fallback with a deterministic fake.""" |
|
|
| def _fake_loader( |
| *, |
| backend: str, |
| fallback_dir: Path, |
| requested_version: str | None, |
| tracking_uri: str = "sqlite:///mlflow.db", |
| ) -> tuple[SentimentClassifierPort, RegistryVersionInfo | None]: |
| if captured is not None: |
| captured.append( |
| { |
| "backend": backend, |
| "fallback_dir": fallback_dir, |
| "requested_version": requested_version, |
| "tracking_uri": tracking_uri, |
| } |
| ) |
| if raises is not None: |
| raise raises |
| return classifier, version_info |
|
|
| monkeypatch.setattr("api.main.load_from_registry_or_fallback", _fake_loader) |
|
|
|
|
| @pytest.fixture |
| def stub_client(monkeypatch: pytest.MonkeyPatch) -> Iterator[TestClient]: |
| """App in default stub mode (no env vars set).""" |
| monkeypatch.delenv("SENTIMENT_BACKEND", raising=False) |
| with TestClient(create_app()) as client: |
| yield client |
|
|
|
|
| def test_health_returns_200(stub_client: TestClient) -> None: |
| response = stub_client.get("/health") |
| assert response.status_code == 200 |
| body = response.json() |
| assert body["status"] == "ok" |
| assert body["model"] == "stub" |
|
|
|
|
| def test_predict_arabic_text_returns_stub_shape(stub_client: TestClient) -> None: |
| response = stub_client.post("/predict", json={"text": "مرحبا بالعالم"}) |
| assert response.status_code == 200 |
| body = response.json() |
| assert body["sentiment"] in ("positive", "negative", "neutral") |
| assert isinstance(body["confidence"], float) |
| assert 0.0 <= body["confidence"] <= 1.0 |
| assert body["text"] == "مرحبا بالعالم" |
|
|
|
|
| def test_predict_empty_text_returns_422(stub_client: TestClient) -> None: |
| response = stub_client.post("/predict", json={"text": ""}) |
| assert response.status_code == 422 |
|
|
|
|
| def test_predict_whitespace_only_returns_422(stub_client: TestClient) -> None: |
| response = stub_client.post("/predict", json={"text": " "}) |
| assert response.status_code == 422 |
|
|
|
|
| def test_lifespan_loads_stub_by_default(monkeypatch: pytest.MonkeyPatch) -> None: |
| monkeypatch.delenv("SENTIMENT_BACKEND", raising=False) |
| app = create_app() |
| with TestClient(app) as client: |
| assert app.state.backend_name == "stub" |
| body = client.get("/health").json() |
| assert body["model"] == "stub" |
|
|
|
|
| class _FakeBackend(SentimentClassifierPort): |
| def predict(self, text: str) -> SentimentResult: |
| return SentimentResult(text=text, sentiment=Sentiment.POSITIVE, confidence=0.9) |
|
|
|
|
| def test_lifespan_loads_lora_when_env_set(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None: |
| captured: list[dict[str, object]] = [] |
| _patch_loader( |
| monkeypatch, |
| classifier=_FakeBackend(), |
| version_info=RegistryVersionInfo(name="arabert-lora", version="1", run_id="r"), |
| captured=captured, |
| ) |
| monkeypatch.setenv("SENTIMENT_BACKEND", "lora") |
| monkeypatch.setenv("LORA_MODEL_DIR", str(tmp_path)) |
|
|
| app = create_app() |
| with TestClient(app) as client: |
| assert app.state.backend_name == "arabert-lora-v1" |
| body = client.get("/health").json() |
| assert body["model"] == "arabert-lora-v1" |
|
|
| assert len(captured) == 1 |
| assert captured[0]["backend"] == "lora" |
| assert captured[0]["fallback_dir"] == tmp_path.resolve() |
|
|
|
|
| def test_lifespan_loads_catboost_when_env_set( |
| monkeypatch: pytest.MonkeyPatch, tmp_path: Path |
| ) -> None: |
| captured: list[dict[str, object]] = [] |
| _patch_loader( |
| monkeypatch, |
| classifier=_FakeBackend(), |
| version_info=RegistryVersionInfo(name="catboost-baseline", version="11", run_id="r"), |
| captured=captured, |
| ) |
| monkeypatch.setenv("SENTIMENT_BACKEND", "catboost") |
| monkeypatch.setenv("CATBOOST_MODEL_DIR", str(tmp_path)) |
|
|
| app = create_app() |
| with TestClient(app) as client: |
| assert app.state.backend_name == "catboost-baseline-v1" |
| body = client.get("/health").json() |
| assert body["model"] == "catboost-baseline-v1" |
|
|
| assert len(captured) == 1 |
| assert captured[0]["backend"] == "catboost" |
| assert captured[0]["fallback_dir"] == tmp_path.resolve() |
|
|
|
|
| def test_lifespan_fails_fast_on_unknown_backend( |
| monkeypatch: pytest.MonkeyPatch, |
| ) -> None: |
| monkeypatch.setenv("SENTIMENT_BACKEND", "bogus") |
| app = create_app() |
| with pytest.raises((ValueError, RuntimeError)) as excinfo: |
| with TestClient(app): |
| pass |
| msg = str(excinfo.value) |
| assert "bogus" in msg |
| for name in ("stub", "catboost", "lora"): |
| assert name in msg |
|
|
|
|
| def test_lifespan_fails_fast_when_loader_raises_filesystem_error( |
| monkeypatch: pytest.MonkeyPatch, tmp_path: Path |
| ) -> None: |
| """Phase 7: registry miss + missing fallback dir → loader propagates FileNotFoundError.""" |
| missing = tmp_path / "definitely-not-here" |
| _patch_loader( |
| monkeypatch, |
| classifier=_FakeBackend(), |
| version_info=None, |
| raises=FileNotFoundError(f"missing LoRA marker: {missing}"), |
| ) |
| monkeypatch.setenv("SENTIMENT_BACKEND", "lora") |
| monkeypatch.setenv("LORA_MODEL_DIR", str(missing)) |
| app = create_app() |
| with pytest.raises((FileNotFoundError, RuntimeError)): |
| with TestClient(app): |
| pass |
|
|
|
|
| class _RecordingFakeClassifier(SentimentClassifierPort): |
| def __init__(self, result: SentimentResult) -> None: |
| self._result = result |
| self.calls: list[str] = [] |
|
|
| def predict(self, text: str) -> SentimentResult: |
| self.calls.append(text) |
| return self._result |
|
|
|
|
| def test_predict_uses_overridden_classifier(monkeypatch: pytest.MonkeyPatch) -> None: |
| monkeypatch.delenv("SENTIMENT_BACKEND", raising=False) |
| fake = _RecordingFakeClassifier( |
| SentimentResult(text="recorded", sentiment=Sentiment.POSITIVE, confidence=0.93) |
| ) |
| app = create_app() |
| app.dependency_overrides[get_classifier] = lambda: fake |
| with TestClient(app) as client: |
| response = client.post("/predict", json={"text": "الفندق ممتاز"}) |
| assert response.status_code == 200 |
| body = response.json() |
| assert body == {"text": "recorded", "sentiment": "positive", "confidence": 0.93} |
| assert fake.calls == ["الفندق ممتاز"] |
|
|
|
|
| class _RaisingClassifier(SentimentClassifierPort): |
| def __init__(self, exc: BaseException) -> None: |
| self._exc = exc |
|
|
| def predict(self, text: str) -> SentimentResult: |
| raise self._exc |
|
|
|
|
| def test_predict_returns_500_when_classifier_raises_unexpected( |
| monkeypatch: pytest.MonkeyPatch, |
| ) -> None: |
| monkeypatch.delenv("SENTIMENT_BACKEND", raising=False) |
| user_text = "نص سري لا يجب تسريبه" |
| app = create_app() |
| app.dependency_overrides[get_classifier] = lambda: _RaisingClassifier( |
| RuntimeError("boom internal trace 0xdeadbeef") |
| ) |
| with TestClient(app, raise_server_exceptions=False) as client: |
| response = client.post("/predict", json={"text": user_text}) |
| assert response.status_code == 500 |
| body = response.json() |
| assert body == {"detail": "internal inference error"} |
| raw = response.text |
| for leak in ("boom", "0xdeadbeef", "Traceback", "RuntimeError", user_text): |
| assert leak not in raw |
|
|
|
|
| def test_predict_returns_422_when_classifier_raises_value_error( |
| monkeypatch: pytest.MonkeyPatch, |
| ) -> None: |
| monkeypatch.delenv("SENTIMENT_BACKEND", raising=False) |
| app = create_app() |
| app.dependency_overrides[get_classifier] = lambda: _RaisingClassifier( |
| ValueError("text must not be empty") |
| ) |
| with TestClient(app) as client: |
| response = client.post("/predict", json={"text": "non-empty"}) |
| assert response.status_code == 422 |
| assert response.json()["detail"] == "text must not be empty" |
|
|
|
|
| |
| |
| |
|
|
|
|
| class _RecordingMonitor(DriftMonitorPort): |
| def __init__(self, raise_on_record: bool = False) -> None: |
| self.records: list[tuple[Sentiment, float]] = [] |
| self._raise_on_record = raise_on_record |
|
|
| def record(self, label: Sentiment, confidence: float) -> None: |
| if self._raise_on_record: |
| raise RuntimeError("drift trace 0xfeedface boom") |
| self.records.append((label, confidence)) |
|
|
| def report(self) -> DriftReport: |
| raise NotImplementedError |
|
|
|
|
| class _FixedReportMonitor(DriftMonitorPort): |
| def __init__(self, report: DriftReport) -> None: |
| self._report = report |
|
|
| def record(self, label: Sentiment, confidence: float) -> None: |
| pass |
|
|
| def report(self) -> DriftReport: |
| return self._report |
|
|
|
|
| def _full_report() -> DriftReport: |
| return DriftReport( |
| backend="arabert-lora-v1", |
| observed_count=50, |
| buffer_size=1000, |
| minimum_count=50, |
| insufficient_data=False, |
| predicted_class=SignalReport( |
| psi=0.087, |
| drift_level=DriftLevel.STABLE, |
| reference={"positive": 0.6, "negative": 0.2, "neutral": 0.2}, |
| observed={"positive": 0.65, "negative": 0.17, "neutral": 0.18}, |
| reference_missing=False, |
| ), |
| confidence_bucket=SignalReport( |
| psi=0.124, |
| drift_level=DriftLevel.MODERATE, |
| reference={"low": 0.10, "medium": 0.25, "high": 0.65}, |
| observed={"low": 0.18, "medium": 0.30, "high": 0.52}, |
| reference_missing=False, |
| ), |
| ) |
|
|
|
|
| def _insufficient_report() -> DriftReport: |
| return DriftReport( |
| backend="arabert-lora-v1", |
| observed_count=12, |
| buffer_size=1000, |
| minimum_count=50, |
| insufficient_data=True, |
| predicted_class=SignalReport( |
| psi=None, |
| drift_level=None, |
| reference={"positive": 0.6, "negative": 0.2, "neutral": 0.2}, |
| observed={"positive": 0.83, "negative": 0.08, "neutral": 0.08}, |
| reference_missing=False, |
| ), |
| confidence_bucket=SignalReport( |
| psi=None, |
| drift_level=None, |
| reference={"low": 0.10, "medium": 0.25, "high": 0.65}, |
| observed={"low": 0.17, "medium": 0.25, "high": 0.58}, |
| reference_missing=False, |
| ), |
| ) |
|
|
|
|
| def test_predict_records_to_drift_monitor(monkeypatch: pytest.MonkeyPatch) -> None: |
| monkeypatch.delenv("SENTIMENT_BACKEND", raising=False) |
| fake_classifier = _RecordingFakeClassifier( |
| SentimentResult(text="recorded", sentiment=Sentiment.POSITIVE, confidence=0.93) |
| ) |
| monitor = _RecordingMonitor() |
| app = create_app() |
| app.dependency_overrides[get_classifier] = lambda: fake_classifier |
| app.dependency_overrides[get_drift_monitor] = lambda: monitor |
| with TestClient(app) as client: |
| response = client.post("/predict", json={"text": "الفندق ممتاز"}) |
| assert response.status_code == 200 |
| assert monitor.records == [(Sentiment.POSITIVE, 0.93)] |
|
|
|
|
| def test_metrics_drift_returns_both_psi_signals_after_enough_calls( |
| monkeypatch: pytest.MonkeyPatch, |
| ) -> None: |
| monkeypatch.delenv("SENTIMENT_BACKEND", raising=False) |
| app = create_app() |
| app.dependency_overrides[get_drift_monitor] = lambda: _FixedReportMonitor(_full_report()) |
| with TestClient(app) as client: |
| response = client.get("/metrics/drift") |
| assert response.status_code == 200 |
| body = response.json() |
| assert body["backend"] == "arabert-lora-v1" |
| assert body["observed_count"] == 50 |
| assert body["insufficient_data"] is False |
| pc = body["signals"]["predicted_class"] |
| cb = body["signals"]["confidence_bucket"] |
| assert isinstance(pc["psi"], float) |
| assert pc["drift_level"] == "stable" |
| assert pc["reference"] == {"positive": 0.6, "negative": 0.2, "neutral": 0.2} |
| assert pc.get("reference_missing") is None |
| assert isinstance(cb["psi"], float) |
| assert cb["drift_level"] == "moderate" |
|
|
|
|
| def test_metrics_drift_returns_insufficient_data_below_threshold( |
| monkeypatch: pytest.MonkeyPatch, |
| ) -> None: |
| monkeypatch.delenv("SENTIMENT_BACKEND", raising=False) |
| app = create_app() |
| app.dependency_overrides[get_drift_monitor] = lambda: _FixedReportMonitor( |
| _insufficient_report() |
| ) |
| with TestClient(app) as client: |
| response = client.get("/metrics/drift") |
| assert response.status_code == 200 |
| body = response.json() |
| assert body["insufficient_data"] is True |
| pc = body["signals"]["predicted_class"] |
| cb = body["signals"]["confidence_bucket"] |
| assert pc["psi"] is None |
| assert pc["drift_level"] is None |
| assert pc["reference"] is not None |
| assert pc["observed"] is not None |
| assert cb["psi"] is None |
| assert cb["drift_level"] is None |
| assert cb["reference"] is not None |
|
|
|
|
| def test_metrics_drift_returns_503_for_stub_backend(monkeypatch: pytest.MonkeyPatch) -> None: |
| monkeypatch.delenv("SENTIMENT_BACKEND", raising=False) |
| with TestClient(create_app()) as client: |
| response = client.get("/metrics/drift") |
| assert response.status_code == 503 |
| assert response.json() == {"detail": "drift monitoring disabled for stub backend"} |
|
|
|
|
| def test_metrics_drift_marks_signal_reference_missing_when_field_absent( |
| monkeypatch: pytest.MonkeyPatch, tmp_path: Path |
| ) -> None: |
| reports_dir = tmp_path / "reports" |
| reports_dir.mkdir() |
| |
| (reports_dir / "arabert-lora-v1.json").write_text( |
| json.dumps({"confidence_histogram": {"low": 0.1, "medium": 0.25, "high": 0.65}}), |
| encoding="utf-8", |
| ) |
|
|
| _patch_loader(monkeypatch, classifier=_FakeBackend(), version_info=None) |
| monkeypatch.setenv("SENTIMENT_BACKEND", "lora") |
| monkeypatch.setenv("LORA_MODEL_DIR", str(tmp_path)) |
| monkeypatch.setenv("DRIFT_REPORTS_DIR", str(reports_dir)) |
|
|
| with TestClient(create_app()) as client: |
| response = client.get("/metrics/drift") |
| assert response.status_code == 200 |
| body = response.json() |
| pc = body["signals"]["predicted_class"] |
| cb = body["signals"]["confidence_bucket"] |
| assert pc["reference_missing"] is True |
| assert pc["psi"] is None |
| assert pc["drift_level"] is None |
| assert "reference" not in pc |
| assert pc["observed"] is not None |
| |
| |
| assert cb.get("reference_missing") is None |
| assert cb["reference"] == {"low": 0.1, "medium": 0.25, "high": 0.65} |
|
|
|
|
| def test_predict_succeeds_when_drift_monitor_record_raises( |
| monkeypatch: pytest.MonkeyPatch, |
| ) -> None: |
| monkeypatch.delenv("SENTIMENT_BACKEND", raising=False) |
| user_text = "نص حساس لا يجب تسريبه أبدًا" |
| fake_classifier = _RecordingFakeClassifier( |
| SentimentResult(text="recorded", sentiment=Sentiment.POSITIVE, confidence=0.93) |
| ) |
| raising_monitor = _RecordingMonitor(raise_on_record=True) |
| app = create_app() |
| app.dependency_overrides[get_classifier] = lambda: fake_classifier |
| app.dependency_overrides[get_drift_monitor] = lambda: raising_monitor |
| with TestClient(app) as client: |
| response = client.post("/predict", json={"text": user_text}) |
| assert response.status_code == 200 |
| assert response.json() == { |
| "text": "recorded", |
| "sentiment": "positive", |
| "confidence": 0.93, |
| } |
| raw = response.text |
| for leak in ("0xfeedface", "boom", "Traceback", "RuntimeError", user_text): |
| assert leak not in raw |
|
|
|
|
| def test_metrics_drift_response_omits_text_payload(monkeypatch: pytest.MonkeyPatch) -> None: |
| monkeypatch.delenv("SENTIMENT_BACKEND", raising=False) |
| arabic_text = "السرّيّة محفوظة" |
| fake_classifier = _RecordingFakeClassifier( |
| SentimentResult(text=arabic_text, sentiment=Sentiment.POSITIVE, confidence=0.93) |
| ) |
| |
| |
| from sentiment.adapters.in_memory_drift_monitor import InMemoryDriftMonitor |
|
|
| monitor = InMemoryDriftMonitor( |
| backend_name="arabert-lora-v1", |
| predicted_class_reference={ |
| Sentiment.POSITIVE: 0.6, |
| Sentiment.NEGATIVE: 0.2, |
| Sentiment.NEUTRAL: 0.2, |
| }, |
| confidence_bucket_reference={"low": 0.1, "medium": 0.25, "high": 0.65}, |
| buffer_size=10, |
| minimum_count=1, |
| ) |
| app = create_app() |
| app.dependency_overrides[get_classifier] = lambda: fake_classifier |
| app.dependency_overrides[get_drift_monitor] = lambda: monitor |
| with TestClient(app) as client: |
| client.post("/predict", json={"text": arabic_text}) |
| response = client.get("/metrics/drift") |
| assert response.status_code == 200 |
| assert arabic_text not in response.text |
| |
| import re |
|
|
| assert re.search(r"[-ۿݐ-ݿ]", response.text) is None |
|
|
|
|
| def test_lifespan_rejects_invalid_drift_buffer_size(monkeypatch: pytest.MonkeyPatch) -> None: |
| _patch_loader(monkeypatch, classifier=_FakeBackend(), version_info=None) |
| monkeypatch.setenv("SENTIMENT_BACKEND", "lora") |
| monkeypatch.setenv("LORA_MODEL_DIR", "/tmp") |
| monkeypatch.setenv("DRIFT_BUFFER_SIZE", "0") |
| app = create_app() |
| with pytest.raises((ValueError, RuntimeError)) as excinfo: |
| with TestClient(app): |
| pass |
| assert "DRIFT_BUFFER_SIZE" in str(excinfo.value) |
|
|
|
|
| |
| |
| |
|
|
|
|
| def _boot_catboost_with_loader( |
| monkeypatch: pytest.MonkeyPatch, |
| tmp_path: Path, |
| *, |
| version_info: RegistryVersionInfo | None, |
| captured: list[dict[str, object]] | None = None, |
| ) -> TestClient: |
| _patch_loader( |
| monkeypatch, |
| classifier=_FakeBackend(), |
| version_info=version_info, |
| captured=captured, |
| ) |
| monkeypatch.setenv("SENTIMENT_BACKEND", "catboost") |
| monkeypatch.setenv("CATBOOST_MODEL_DIR", str(tmp_path)) |
| return TestClient(create_app()) |
|
|
|
|
| def test_health_reports_registry_version_when_loaded( |
| monkeypatch: pytest.MonkeyPatch, tmp_path: Path |
| ) -> None: |
| with _boot_catboost_with_loader( |
| monkeypatch, |
| tmp_path, |
| version_info=RegistryVersionInfo(name="catboost-baseline", version="3", run_id="run-abc"), |
| ) as client: |
| response = client.get("/health") |
| assert response.status_code == 200 |
| assert response.json() == { |
| "status": "ok", |
| "model": "catboost-baseline-v1", |
| "model_version": { |
| "name": "catboost-baseline", |
| "version": "3", |
| "run_id": "run-abc", |
| "source": "registry", |
| }, |
| } |
|
|
|
|
| def test_health_reports_null_version_on_filesystem_fallback( |
| monkeypatch: pytest.MonkeyPatch, tmp_path: Path |
| ) -> None: |
| with _boot_catboost_with_loader(monkeypatch, tmp_path, version_info=None) as client: |
| response = client.get("/health") |
| assert response.status_code == 200 |
| body = response.json() |
| assert body["model"] == "catboost-baseline-v1" |
| assert body["model_version"] is None |
|
|
|
|
| def test_health_model_version_is_null_for_stub_backend(stub_client: TestClient) -> None: |
| response = stub_client.get("/health") |
| assert response.status_code == 200 |
| body = response.json() |
| assert body["model"] == "stub" |
| assert body["model_version"] is None |
|
|
|
|
| def test_model_version_env_passed_to_loader( |
| monkeypatch: pytest.MonkeyPatch, tmp_path: Path |
| ) -> None: |
| captured: list[dict[str, object]] = [] |
| monkeypatch.setenv("MODEL_VERSION", "5") |
| with _boot_catboost_with_loader( |
| monkeypatch, |
| tmp_path, |
| version_info=RegistryVersionInfo(name="catboost-baseline", version="5", run_id="r"), |
| captured=captured, |
| ) as client: |
| body = client.get("/health").json() |
| assert body["model_version"]["version"] == "5" |
| assert len(captured) == 1 |
| assert captured[0]["requested_version"] == "5" |
|
|
|
|
| def test_loader_called_without_model_version_when_env_unset( |
| monkeypatch: pytest.MonkeyPatch, tmp_path: Path |
| ) -> None: |
| captured: list[dict[str, object]] = [] |
| with _boot_catboost_with_loader( |
| monkeypatch, |
| tmp_path, |
| version_info=RegistryVersionInfo(name="catboost-baseline", version="9", run_id="r"), |
| captured=captured, |
| ): |
| pass |
| assert captured[0]["requested_version"] is None |
|
|
|
|
| def test_mlflow_tracking_uri_env_forwarded_to_loader( |
| monkeypatch: pytest.MonkeyPatch, tmp_path: Path |
| ) -> None: |
| captured: list[dict[str, object]] = [] |
| monkeypatch.setenv("MLFLOW_TRACKING_URI", "sqlite:///custom.db") |
| with _boot_catboost_with_loader( |
| monkeypatch, |
| tmp_path, |
| version_info=None, |
| captured=captured, |
| ): |
| pass |
| assert captured[0]["tracking_uri"] == "sqlite:///custom.db" |
|
|