Spaces:
Runtime error
Runtime error
| """Pulso do datalake — snapshot compartilhado e headers em cada resposta HTTP.""" | |
| from __future__ import annotations | |
| import time | |
| from datetime import UTC, datetime | |
| from typing import Any, Callable | |
| from starlette.middleware.base import BaseHTTPMiddleware | |
| from starlette.requests import Request | |
| from starlette.responses import Response | |
| from api.lake_cache import get_lake_counts | |
| from config import settings | |
| from ingest.meta import collection_stats | |
| _SKIP_PREFIXES = ("/health/live", "/docs", "/redoc", "/openapi.json") | |
| _META_TTL_SECONDS = 15.0 | |
| _meta_cache: dict[str, Any] = {"at": 0.0, "collections": None, "latest_silver_at": None} | |
| _last_snapshot: dict[str, Any] | None = None | |
| def _cached_collections() -> dict: | |
| now = time.monotonic() | |
| if now - float(_meta_cache["at"]) < _META_TTL_SECONDS and _meta_cache["collections"] is not None: | |
| return _meta_cache["collections"] | |
| stats = collection_stats() | |
| _meta_cache["collections"] = stats | |
| _meta_cache["at"] = now | |
| return stats | |
| def _cached_latest_silver_at() -> str | None: | |
| now = time.monotonic() | |
| if now - float(_meta_cache["at"]) < _META_TTL_SECONDS and _meta_cache.get("latest_silver_at") is not None: | |
| return _meta_cache["latest_silver_at"] | |
| silver_root = settings.lake_root / "silver" | |
| latest_at: str | None = None | |
| if silver_root.is_dir(): | |
| parquets = list(silver_root.rglob("*.parquet")) | |
| if parquets: | |
| latest = max(parquets, key=lambda p: p.stat().st_mtime) | |
| latest_at = datetime.fromtimestamp(latest.stat().st_mtime, tz=UTC).isoformat() | |
| _meta_cache["latest_silver_at"] = latest_at | |
| _meta_cache["at"] = now | |
| return latest_at | |
| def invalidate_pulse_meta_cache() -> None: | |
| _meta_cache["at"] = 0.0 | |
| _meta_cache["collections"] = None | |
| _meta_cache["latest_silver_at"] = None | |
| def build_pulse_snapshot( | |
| *, | |
| wc_models_ready: bool, | |
| force_lake_counts: bool = False, | |
| ) -> dict[str, Any]: | |
| global _last_snapshot | |
| articles_silver, fixtures = get_lake_counts(force=force_lake_counts) | |
| stats = _cached_collections() | |
| snapshot = { | |
| "status": "ok", | |
| "pulse_at": datetime.now(UTC).isoformat(), | |
| "lake_root": str(settings.lake_root), | |
| "articles_silver": articles_silver, | |
| "fixtures": fixtures, | |
| "latest_silver_at": _cached_latest_silver_at(), | |
| "collections": stats, | |
| "wc_models_ready": wc_models_ready, | |
| "actions": { | |
| "sync_news": {"method": "POST", "path": "/news/sync"}, | |
| "feed": {"method": "GET", "path": "/news/all"}, | |
| }, | |
| } | |
| _last_snapshot = snapshot | |
| return snapshot | |
| def get_last_pulse_snapshot() -> dict[str, Any] | None: | |
| return _last_snapshot | |
| def pulse_response_headers(snapshot: dict[str, Any]) -> dict[str, str]: | |
| collections = snapshot.get("collections") or {} | |
| last_run = collections.get("last_run") | |
| return { | |
| "X-Data-Pulse-At": str(snapshot["pulse_at"]), | |
| "X-Articles-Silver": str(snapshot["articles_silver"]), | |
| "X-Fixtures": str(snapshot["fixtures"]), | |
| "X-WC-Models-Ready": "true" if snapshot.get("wc_models_ready") else "false", | |
| "X-Collections-Last-Run": last_run if last_run else "", | |
| "X-Latest-Silver-At": snapshot.get("latest_silver_at") or "", | |
| } | |
| class DataPulseMiddleware(BaseHTTPMiddleware): | |
| def __init__(self, app, *, wc_models_ready: Callable[[], bool]) -> None: | |
| super().__init__(app) | |
| self._wc_models_ready = wc_models_ready | |
| async def dispatch(self, request: Request, call_next) -> Response: | |
| path = request.url.path | |
| if any(path.startswith(prefix) for prefix in _SKIP_PREFIXES): | |
| return await call_next(request) | |
| import asyncio | |
| snapshot = await asyncio.to_thread( | |
| build_pulse_snapshot, | |
| wc_models_ready=self._wc_models_ready(), | |
| ) | |
| request.state.data_pulse = snapshot | |
| response = await call_next(request) | |
| for key, value in pulse_response_headers(snapshot).items(): | |
| response.headers[key] = value | |
| return response | |