PrajnaShetty's picture
feat(mlflow): swap default tracking URI to sqlite:///mlflow.db
5f2258e
"""API integration tests — backend dispatch (Phase 3), drift (Phase 4),
registry-resolved serving (Phase 7)."""
from __future__ import annotations
import json
from collections.abc import Iterator
from pathlib import Path
import pytest
from fastapi.testclient import TestClient
from api.main import create_app, get_classifier, get_drift_monitor
from sentiment.adapters.mlflow_registry_classifier import RegistryVersionInfo
from sentiment.domain.classifier import SentimentClassifierPort
from sentiment.domain.drift import (
DriftLevel,
DriftMonitorPort,
DriftReport,
SignalReport,
)
from sentiment.domain.models import Sentiment, SentimentResult
@pytest.fixture(autouse=True)
def _clean_env(monkeypatch: pytest.MonkeyPatch) -> Iterator[None]:
"""Strip env vars that would otherwise leak between tests."""
for var in ("MODEL_VERSION", "MLFLOW_TRACKING_URI"):
monkeypatch.delenv(var, raising=False)
yield
def _patch_loader(
monkeypatch: pytest.MonkeyPatch,
*,
classifier: SentimentClassifierPort,
version_info: RegistryVersionInfo | None,
captured: list[dict[str, object]] | None = None,
raises: BaseException | None = None,
) -> None:
"""Replace api.main.load_from_registry_or_fallback with a deterministic fake."""
def _fake_loader(
*,
backend: str,
fallback_dir: Path,
requested_version: str | None,
tracking_uri: str = "sqlite:///mlflow.db",
) -> tuple[SentimentClassifierPort, RegistryVersionInfo | None]:
if captured is not None:
captured.append(
{
"backend": backend,
"fallback_dir": fallback_dir,
"requested_version": requested_version,
"tracking_uri": tracking_uri,
}
)
if raises is not None:
raise raises
return classifier, version_info
monkeypatch.setattr("api.main.load_from_registry_or_fallback", _fake_loader)
@pytest.fixture
def stub_client(monkeypatch: pytest.MonkeyPatch) -> Iterator[TestClient]:
"""App in default stub mode (no env vars set)."""
monkeypatch.delenv("SENTIMENT_BACKEND", raising=False)
with TestClient(create_app()) as client:
yield client
def test_health_returns_200(stub_client: TestClient) -> None:
response = stub_client.get("/health")
assert response.status_code == 200
body = response.json()
assert body["status"] == "ok"
assert body["model"] == "stub"
def test_predict_arabic_text_returns_stub_shape(stub_client: TestClient) -> None:
response = stub_client.post("/predict", json={"text": "مرحبا بالعالم"})
assert response.status_code == 200
body = response.json()
assert body["sentiment"] in ("positive", "negative", "neutral")
assert isinstance(body["confidence"], float)
assert 0.0 <= body["confidence"] <= 1.0
assert body["text"] == "مرحبا بالعالم"
def test_predict_empty_text_returns_422(stub_client: TestClient) -> None:
response = stub_client.post("/predict", json={"text": ""})
assert response.status_code == 422
def test_predict_whitespace_only_returns_422(stub_client: TestClient) -> None:
response = stub_client.post("/predict", json={"text": " "})
assert response.status_code == 422
def test_lifespan_loads_stub_by_default(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.delenv("SENTIMENT_BACKEND", raising=False)
app = create_app()
with TestClient(app) as client:
assert app.state.backend_name == "stub"
body = client.get("/health").json()
assert body["model"] == "stub"
class _FakeBackend(SentimentClassifierPort):
def predict(self, text: str) -> SentimentResult:
return SentimentResult(text=text, sentiment=Sentiment.POSITIVE, confidence=0.9)
def test_lifespan_loads_lora_when_env_set(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None:
captured: list[dict[str, object]] = []
_patch_loader(
monkeypatch,
classifier=_FakeBackend(),
version_info=RegistryVersionInfo(name="arabert-lora", version="1", run_id="r"),
captured=captured,
)
monkeypatch.setenv("SENTIMENT_BACKEND", "lora")
monkeypatch.setenv("LORA_MODEL_DIR", str(tmp_path))
app = create_app()
with TestClient(app) as client:
assert app.state.backend_name == "arabert-lora-v1"
body = client.get("/health").json()
assert body["model"] == "arabert-lora-v1"
assert len(captured) == 1
assert captured[0]["backend"] == "lora"
assert captured[0]["fallback_dir"] == tmp_path.resolve()
def test_lifespan_loads_catboost_when_env_set(
monkeypatch: pytest.MonkeyPatch, tmp_path: Path
) -> None:
captured: list[dict[str, object]] = []
_patch_loader(
monkeypatch,
classifier=_FakeBackend(),
version_info=RegistryVersionInfo(name="catboost-baseline", version="11", run_id="r"),
captured=captured,
)
monkeypatch.setenv("SENTIMENT_BACKEND", "catboost")
monkeypatch.setenv("CATBOOST_MODEL_DIR", str(tmp_path))
app = create_app()
with TestClient(app) as client:
assert app.state.backend_name == "catboost-baseline-v1"
body = client.get("/health").json()
assert body["model"] == "catboost-baseline-v1"
assert len(captured) == 1
assert captured[0]["backend"] == "catboost"
assert captured[0]["fallback_dir"] == tmp_path.resolve()
def test_lifespan_fails_fast_on_unknown_backend(
monkeypatch: pytest.MonkeyPatch,
) -> None:
monkeypatch.setenv("SENTIMENT_BACKEND", "bogus")
app = create_app()
with pytest.raises((ValueError, RuntimeError)) as excinfo:
with TestClient(app):
pass
msg = str(excinfo.value)
assert "bogus" in msg
for name in ("stub", "catboost", "lora"):
assert name in msg
def test_lifespan_fails_fast_when_loader_raises_filesystem_error(
monkeypatch: pytest.MonkeyPatch, tmp_path: Path
) -> None:
"""Phase 7: registry miss + missing fallback dir → loader propagates FileNotFoundError."""
missing = tmp_path / "definitely-not-here"
_patch_loader(
monkeypatch,
classifier=_FakeBackend(),
version_info=None,
raises=FileNotFoundError(f"missing LoRA marker: {missing}"),
)
monkeypatch.setenv("SENTIMENT_BACKEND", "lora")
monkeypatch.setenv("LORA_MODEL_DIR", str(missing))
app = create_app()
with pytest.raises((FileNotFoundError, RuntimeError)):
with TestClient(app):
pass
class _RecordingFakeClassifier(SentimentClassifierPort):
def __init__(self, result: SentimentResult) -> None:
self._result = result
self.calls: list[str] = []
def predict(self, text: str) -> SentimentResult:
self.calls.append(text)
return self._result
def test_predict_uses_overridden_classifier(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.delenv("SENTIMENT_BACKEND", raising=False)
fake = _RecordingFakeClassifier(
SentimentResult(text="recorded", sentiment=Sentiment.POSITIVE, confidence=0.93)
)
app = create_app()
app.dependency_overrides[get_classifier] = lambda: fake
with TestClient(app) as client:
response = client.post("/predict", json={"text": "الفندق ممتاز"})
assert response.status_code == 200
body = response.json()
assert body == {"text": "recorded", "sentiment": "positive", "confidence": 0.93}
assert fake.calls == ["الفندق ممتاز"]
class _RaisingClassifier(SentimentClassifierPort):
def __init__(self, exc: BaseException) -> None:
self._exc = exc
def predict(self, text: str) -> SentimentResult:
raise self._exc
def test_predict_returns_500_when_classifier_raises_unexpected(
monkeypatch: pytest.MonkeyPatch,
) -> None:
monkeypatch.delenv("SENTIMENT_BACKEND", raising=False)
user_text = "نص سري لا يجب تسريبه"
app = create_app()
app.dependency_overrides[get_classifier] = lambda: _RaisingClassifier(
RuntimeError("boom internal trace 0xdeadbeef")
)
with TestClient(app, raise_server_exceptions=False) as client:
response = client.post("/predict", json={"text": user_text})
assert response.status_code == 500
body = response.json()
assert body == {"detail": "internal inference error"}
raw = response.text
for leak in ("boom", "0xdeadbeef", "Traceback", "RuntimeError", user_text):
assert leak not in raw
def test_predict_returns_422_when_classifier_raises_value_error(
monkeypatch: pytest.MonkeyPatch,
) -> None:
monkeypatch.delenv("SENTIMENT_BACKEND", raising=False)
app = create_app()
app.dependency_overrides[get_classifier] = lambda: _RaisingClassifier(
ValueError("text must not be empty")
)
with TestClient(app) as client:
response = client.post("/predict", json={"text": "non-empty"})
assert response.status_code == 422
assert response.json()["detail"] == "text must not be empty"
# ---------------------------------------------------------------------------
# Phase 4 — /metrics/drift + recording instrumentation
# ---------------------------------------------------------------------------
class _RecordingMonitor(DriftMonitorPort):
def __init__(self, raise_on_record: bool = False) -> None:
self.records: list[tuple[Sentiment, float]] = []
self._raise_on_record = raise_on_record
def record(self, label: Sentiment, confidence: float) -> None:
if self._raise_on_record:
raise RuntimeError("drift trace 0xfeedface boom")
self.records.append((label, confidence))
def report(self) -> DriftReport:
raise NotImplementedError
class _FixedReportMonitor(DriftMonitorPort):
def __init__(self, report: DriftReport) -> None:
self._report = report
def record(self, label: Sentiment, confidence: float) -> None:
pass
def report(self) -> DriftReport:
return self._report
def _full_report() -> DriftReport:
return DriftReport(
backend="arabert-lora-v1",
observed_count=50,
buffer_size=1000,
minimum_count=50,
insufficient_data=False,
predicted_class=SignalReport(
psi=0.087,
drift_level=DriftLevel.STABLE,
reference={"positive": 0.6, "negative": 0.2, "neutral": 0.2},
observed={"positive": 0.65, "negative": 0.17, "neutral": 0.18},
reference_missing=False,
),
confidence_bucket=SignalReport(
psi=0.124,
drift_level=DriftLevel.MODERATE,
reference={"low": 0.10, "medium": 0.25, "high": 0.65},
observed={"low": 0.18, "medium": 0.30, "high": 0.52},
reference_missing=False,
),
)
def _insufficient_report() -> DriftReport:
return DriftReport(
backend="arabert-lora-v1",
observed_count=12,
buffer_size=1000,
minimum_count=50,
insufficient_data=True,
predicted_class=SignalReport(
psi=None,
drift_level=None,
reference={"positive": 0.6, "negative": 0.2, "neutral": 0.2},
observed={"positive": 0.83, "negative": 0.08, "neutral": 0.08},
reference_missing=False,
),
confidence_bucket=SignalReport(
psi=None,
drift_level=None,
reference={"low": 0.10, "medium": 0.25, "high": 0.65},
observed={"low": 0.17, "medium": 0.25, "high": 0.58},
reference_missing=False,
),
)
def test_predict_records_to_drift_monitor(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.delenv("SENTIMENT_BACKEND", raising=False)
fake_classifier = _RecordingFakeClassifier(
SentimentResult(text="recorded", sentiment=Sentiment.POSITIVE, confidence=0.93)
)
monitor = _RecordingMonitor()
app = create_app()
app.dependency_overrides[get_classifier] = lambda: fake_classifier
app.dependency_overrides[get_drift_monitor] = lambda: monitor
with TestClient(app) as client:
response = client.post("/predict", json={"text": "الفندق ممتاز"})
assert response.status_code == 200
assert monitor.records == [(Sentiment.POSITIVE, 0.93)]
def test_metrics_drift_returns_both_psi_signals_after_enough_calls(
monkeypatch: pytest.MonkeyPatch,
) -> None:
monkeypatch.delenv("SENTIMENT_BACKEND", raising=False)
app = create_app()
app.dependency_overrides[get_drift_monitor] = lambda: _FixedReportMonitor(_full_report())
with TestClient(app) as client:
response = client.get("/metrics/drift")
assert response.status_code == 200
body = response.json()
assert body["backend"] == "arabert-lora-v1"
assert body["observed_count"] == 50
assert body["insufficient_data"] is False
pc = body["signals"]["predicted_class"]
cb = body["signals"]["confidence_bucket"]
assert isinstance(pc["psi"], float)
assert pc["drift_level"] == "stable"
assert pc["reference"] == {"positive": 0.6, "negative": 0.2, "neutral": 0.2}
assert pc.get("reference_missing") is None # field omitted when reference present
assert isinstance(cb["psi"], float)
assert cb["drift_level"] == "moderate"
def test_metrics_drift_returns_insufficient_data_below_threshold(
monkeypatch: pytest.MonkeyPatch,
) -> None:
monkeypatch.delenv("SENTIMENT_BACKEND", raising=False)
app = create_app()
app.dependency_overrides[get_drift_monitor] = lambda: _FixedReportMonitor(
_insufficient_report()
)
with TestClient(app) as client:
response = client.get("/metrics/drift")
assert response.status_code == 200
body = response.json()
assert body["insufficient_data"] is True
pc = body["signals"]["predicted_class"]
cb = body["signals"]["confidence_bucket"]
assert pc["psi"] is None
assert pc["drift_level"] is None
assert pc["reference"] is not None # baseline still rendered for the dashboard
assert pc["observed"] is not None
assert cb["psi"] is None
assert cb["drift_level"] is None
assert cb["reference"] is not None
def test_metrics_drift_returns_503_for_stub_backend(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.delenv("SENTIMENT_BACKEND", raising=False)
with TestClient(create_app()) as client:
response = client.get("/metrics/drift")
assert response.status_code == 503
assert response.json() == {"detail": "drift monitoring disabled for stub backend"}
def test_metrics_drift_marks_signal_reference_missing_when_field_absent(
monkeypatch: pytest.MonkeyPatch, tmp_path: Path
) -> None:
reports_dir = tmp_path / "reports"
reports_dir.mkdir()
# Report has confidence_histogram but NO confusion_matrix.
(reports_dir / "arabert-lora-v1.json").write_text(
json.dumps({"confidence_histogram": {"low": 0.1, "medium": 0.25, "high": 0.65}}),
encoding="utf-8",
)
_patch_loader(monkeypatch, classifier=_FakeBackend(), version_info=None)
monkeypatch.setenv("SENTIMENT_BACKEND", "lora")
monkeypatch.setenv("LORA_MODEL_DIR", str(tmp_path))
monkeypatch.setenv("DRIFT_REPORTS_DIR", str(reports_dir))
with TestClient(create_app()) as client:
response = client.get("/metrics/drift")
assert response.status_code == 200
body = response.json()
pc = body["signals"]["predicted_class"]
cb = body["signals"]["confidence_bucket"]
assert pc["reference_missing"] is True
assert pc["psi"] is None
assert pc["drift_level"] is None
assert "reference" not in pc # omitted when reference_missing
assert pc["observed"] is not None
# Confidence-bucket reference IS loaded; the only reason psi is null here
# is insufficient_data (no predictions served yet) — exercised numerically in step-5 smoke.
assert cb.get("reference_missing") is None
assert cb["reference"] == {"low": 0.1, "medium": 0.25, "high": 0.65}
def test_predict_succeeds_when_drift_monitor_record_raises(
monkeypatch: pytest.MonkeyPatch,
) -> None:
monkeypatch.delenv("SENTIMENT_BACKEND", raising=False)
user_text = "نص حساس لا يجب تسريبه أبدًا"
fake_classifier = _RecordingFakeClassifier(
SentimentResult(text="recorded", sentiment=Sentiment.POSITIVE, confidence=0.93)
)
raising_monitor = _RecordingMonitor(raise_on_record=True)
app = create_app()
app.dependency_overrides[get_classifier] = lambda: fake_classifier
app.dependency_overrides[get_drift_monitor] = lambda: raising_monitor
with TestClient(app) as client:
response = client.post("/predict", json={"text": user_text})
assert response.status_code == 200
assert response.json() == {
"text": "recorded",
"sentiment": "positive",
"confidence": 0.93,
}
raw = response.text
for leak in ("0xfeedface", "boom", "Traceback", "RuntimeError", user_text):
assert leak not in raw
def test_metrics_drift_response_omits_text_payload(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.delenv("SENTIMENT_BACKEND", raising=False)
arabic_text = "السرّيّة محفوظة"
fake_classifier = _RecordingFakeClassifier(
SentimentResult(text=arabic_text, sentiment=Sentiment.POSITIVE, confidence=0.93)
)
# Real InMemoryDriftMonitor via override — proves the buffer's record()
# cannot accept text, and the serializer never reaches text.
from sentiment.adapters.in_memory_drift_monitor import InMemoryDriftMonitor
monitor = InMemoryDriftMonitor(
backend_name="arabert-lora-v1",
predicted_class_reference={
Sentiment.POSITIVE: 0.6,
Sentiment.NEGATIVE: 0.2,
Sentiment.NEUTRAL: 0.2,
},
confidence_bucket_reference={"low": 0.1, "medium": 0.25, "high": 0.65},
buffer_size=10,
minimum_count=1,
)
app = create_app()
app.dependency_overrides[get_classifier] = lambda: fake_classifier
app.dependency_overrides[get_drift_monitor] = lambda: monitor
with TestClient(app) as client:
client.post("/predict", json={"text": arabic_text})
response = client.get("/metrics/drift")
assert response.status_code == 200
assert arabic_text not in response.text
# No Arabic chars at all in the body.
import re
assert re.search(r"[؀-ۿݐ-ݿ]", response.text) is None
def test_lifespan_rejects_invalid_drift_buffer_size(monkeypatch: pytest.MonkeyPatch) -> None:
_patch_loader(monkeypatch, classifier=_FakeBackend(), version_info=None)
monkeypatch.setenv("SENTIMENT_BACKEND", "lora")
monkeypatch.setenv("LORA_MODEL_DIR", "/tmp")
monkeypatch.setenv("DRIFT_BUFFER_SIZE", "0")
app = create_app()
with pytest.raises((ValueError, RuntimeError)) as excinfo:
with TestClient(app):
pass
assert "DRIFT_BUFFER_SIZE" in str(excinfo.value)
# ---------------------------------------------------------------------------
# Phase 7 (ADR-0004) — /health reflects what was actually loaded
# ---------------------------------------------------------------------------
def _boot_catboost_with_loader(
monkeypatch: pytest.MonkeyPatch,
tmp_path: Path,
*,
version_info: RegistryVersionInfo | None,
captured: list[dict[str, object]] | None = None,
) -> TestClient:
_patch_loader(
monkeypatch,
classifier=_FakeBackend(),
version_info=version_info,
captured=captured,
)
monkeypatch.setenv("SENTIMENT_BACKEND", "catboost")
monkeypatch.setenv("CATBOOST_MODEL_DIR", str(tmp_path))
return TestClient(create_app())
def test_health_reports_registry_version_when_loaded(
monkeypatch: pytest.MonkeyPatch, tmp_path: Path
) -> None:
with _boot_catboost_with_loader(
monkeypatch,
tmp_path,
version_info=RegistryVersionInfo(name="catboost-baseline", version="3", run_id="run-abc"),
) as client:
response = client.get("/health")
assert response.status_code == 200
assert response.json() == {
"status": "ok",
"model": "catboost-baseline-v1",
"model_version": {
"name": "catboost-baseline",
"version": "3",
"run_id": "run-abc",
"source": "registry",
},
}
def test_health_reports_null_version_on_filesystem_fallback(
monkeypatch: pytest.MonkeyPatch, tmp_path: Path
) -> None:
with _boot_catboost_with_loader(monkeypatch, tmp_path, version_info=None) as client:
response = client.get("/health")
assert response.status_code == 200
body = response.json()
assert body["model"] == "catboost-baseline-v1"
assert body["model_version"] is None
def test_health_model_version_is_null_for_stub_backend(stub_client: TestClient) -> None:
response = stub_client.get("/health")
assert response.status_code == 200
body = response.json()
assert body["model"] == "stub"
assert body["model_version"] is None
def test_model_version_env_passed_to_loader(
monkeypatch: pytest.MonkeyPatch, tmp_path: Path
) -> None:
captured: list[dict[str, object]] = []
monkeypatch.setenv("MODEL_VERSION", "5")
with _boot_catboost_with_loader(
monkeypatch,
tmp_path,
version_info=RegistryVersionInfo(name="catboost-baseline", version="5", run_id="r"),
captured=captured,
) as client:
body = client.get("/health").json()
assert body["model_version"]["version"] == "5"
assert len(captured) == 1
assert captured[0]["requested_version"] == "5"
def test_loader_called_without_model_version_when_env_unset(
monkeypatch: pytest.MonkeyPatch, tmp_path: Path
) -> None:
captured: list[dict[str, object]] = []
with _boot_catboost_with_loader(
monkeypatch,
tmp_path,
version_info=RegistryVersionInfo(name="catboost-baseline", version="9", run_id="r"),
captured=captured,
):
pass
assert captured[0]["requested_version"] is None
def test_mlflow_tracking_uri_env_forwarded_to_loader(
monkeypatch: pytest.MonkeyPatch, tmp_path: Path
) -> None:
captured: list[dict[str, object]] = []
monkeypatch.setenv("MLFLOW_TRACKING_URI", "sqlite:///custom.db")
with _boot_catboost_with_loader(
monkeypatch,
tmp_path,
version_info=None,
captured=captured,
):
pass
assert captured[0]["tracking_uri"] == "sqlite:///custom.db"