openwind-mcp-dev / tests /test_api_passage_cache.py
openwind-ci
sync: github 571d770
0c81874
Raw
History Blame Contribute Delete
6.11 kB
"""Endpoint tests for the client-supplied forecast_cache wiring in app.py.
hf-space has no pyproject (it ships via Docker), so we load app.py by path and
drive the handlers with a minimal fake Request. Run from the mcp-core worktree
venv, which resolves the worktree's editable openwind_data (with cache_backed):
cd packages/mcp-core && uv run --no-active python -m pytest \
../hf-space/tests/test_api_passage_cache.py
"""
from __future__ import annotations
import importlib.util
import json
import pathlib
from datetime import UTC, datetime, timedelta
import pytest
_APP_PATH = (pathlib.Path(__file__).parents[1] / "app.py").resolve()
_spec = importlib.util.spec_from_file_location("hf_app", _APP_PATH)
app = importlib.util.module_from_spec(_spec)
_spec.loader.exec_module(app)
from openwind_data.adapters.cache_backed import CacheBackedAdapter # noqa: E402
DEPARTURE = datetime(2026, 5, 1, 6, 0, tzinfo=UTC)
MARSEILLE = (43.30, 5.35)
PORQUEROLLES = (43.00, 6.20)
class _FakeRequest:
def __init__(self, body: dict) -> None:
self._body = body
async def json(self) -> dict:
return self._body
def _resp_json(resp) -> dict:
return json.loads(bytes(resp.body))
def _corridor_cache(*, with_arome: bool = True) -> dict:
"""Constant 10 kn northerly along Marseille->Porquerolles, axis 04:00..18:00."""
t0 = datetime(2026, 5, 1, 4, 0, tzinfo=UTC)
times_ms = [int((t0 + timedelta(hours=h)).timestamp() * 1000) for h in range(15)]
n = len(times_ms)
def wind() -> dict:
block = {"icon_eu": {"speed_kn": [10.0] * n, "direction_deg": [0.0] * n, "gust_kn": [None] * n}}
if with_arome:
block["meteofrance_arome_france"] = {
"speed_kn": [10.0] * n,
"direction_deg": [0.0] * n,
"gust_kn": [None] * n,
}
return block
def sea() -> dict:
return {
"wave_height_m": [0.4] * n,
"wave_period_s": [4.0] * n,
"wave_direction_deg": [0.0] * n,
"current_speed_kn": [0.1] * n,
"current_direction_to_deg": [90.0] * n,
"tide_height_m": [0.0] * n,
"current_source": "openmeteo_smoc",
}
return {
"version": 1,
"models": ["meteofrance_arome_france", "icon_eu"],
"times_ms": times_ms,
"points": [
{"lat": lat, "lon": lon, "wind_by_model": wind(), "sea": sea()}
for (lat, lon) in (MARSEILLE, PORQUEROLLES)
],
}
def _single_body(**extra) -> dict:
body = {
"waypoints": [list(MARSEILLE), list(PORQUEROLLES)],
"departure": DEPARTURE.isoformat(),
"archetype": "cruiser_40ft",
}
body.update(extra)
return body
# --------------------------------------------------------------- full path
@pytest.mark.asyncio
async def test_single_with_cache_returns_200_from_cache(monkeypatch) -> None:
# Guard: if the handler ever instantiates the live adapter, fail loudly.
import httpx
def _boom(*a, **k): # noqa: ANN002, ANN003
raise AssertionError("forecast_cache path must not hit Open-Meteo")
monkeypatch.setattr(httpx.AsyncClient, "get", _boom)
resp = await app._api_passage(_FakeRequest(_single_body(forecast_cache=_corridor_cache())))
assert resp.status_code == 200
payload = _resp_json(resp)
segments = payload["passage"]["segments"]
assert segments and all(seg["tws_kn"] == 10.0 for seg in segments)
@pytest.mark.asyncio
async def test_malformed_cache_returns_422(monkeypatch) -> None:
resp = await app._api_passage(
_FakeRequest(_single_body(forecast_cache={"version": 999, "models": [], "times_ms": [], "points": []}))
)
assert resp.status_code == 422
assert "forecast_cache" in _resp_json(resp)["error"]
# --------------------------------------------------------------- wiring capture
@pytest.mark.asyncio
async def test_single_passes_cache_adapter(monkeypatch) -> None:
captured: dict = {}
async def _stub(*args, **kwargs): # noqa: ANN002, ANN003
captured["adapter"] = kwargs.get("adapter")
captured["model_chain"] = kwargs.get("model_chain")
raise app.NoModelCoveredError("stop here") # short-circuit after capture
monkeypatch.setattr(app, "estimate_passage", _stub)
# With cache -> CacheBackedAdapter + chain from cache models.
await app._api_passage(_FakeRequest(_single_body(forecast_cache=_corridor_cache())))
assert isinstance(captured["adapter"], CacheBackedAdapter)
assert captured["model_chain"] == ("meteofrance_arome_france", "icon_eu")
# Without cache -> adapter None (MCP/live path unchanged).
captured.clear()
await app._api_passage(_FakeRequest(_single_body()))
assert captured["adapter"] is None
@pytest.mark.asyncio
async def test_sweep_passes_cache_adapter(monkeypatch) -> None:
captured: dict = {}
async def _stub(*args, **kwargs): # noqa: ANN002, ANN003
captured["adapter"] = kwargs.get("adapter")
return []
monkeypatch.setattr(app, "estimate_passage_windows", _stub)
body = _single_body(
forecast_cache=_corridor_cache(),
latest_departure=(DEPARTURE + timedelta(hours=6)).isoformat(),
sweep_interval_hours=3,
)
await app._api_passage(_FakeRequest(body))
assert isinstance(captured["adapter"], CacheBackedAdapter)
@pytest.mark.asyncio
async def test_by_eta_passes_cache_adapter(monkeypatch) -> None:
captured: dict = {}
async def _stub(*args, **kwargs): # noqa: ANN002, ANN003
captured["adapter"] = kwargs.get("adapter")
raise app.NoModelCoveredError("stop here")
monkeypatch.setattr(app, "estimate_passage_for_arrival", _stub)
body = {
"waypoints": [list(MARSEILLE), list(PORQUEROLLES)],
"target_arrival": (DEPARTURE + timedelta(hours=8)).isoformat(),
"archetype": "cruiser_40ft",
"forecast_cache": _corridor_cache(),
}
await app._api_passage_by_eta(_FakeRequest(body))
assert isinstance(captured["adapter"], CacheBackedAdapter)