File size: 4,115 Bytes
16c19b8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
"""Pulso do datalake — snapshot compartilhado e headers em cada resposta HTTP."""
from __future__ import annotations

import time
from datetime import UTC, datetime
from typing import Any, Callable

from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
from starlette.responses import Response

from api.lake_cache import get_lake_counts
from config import settings
from ingest.meta import collection_stats

_SKIP_PREFIXES = ("/health/live", "/docs", "/redoc", "/openapi.json")
_META_TTL_SECONDS = 15.0

_meta_cache: dict[str, Any] = {"at": 0.0, "collections": None, "latest_silver_at": None}
_last_snapshot: dict[str, Any] | None = None


def _cached_collections() -> dict:
    now = time.monotonic()
    if now - float(_meta_cache["at"]) < _META_TTL_SECONDS and _meta_cache["collections"] is not None:
        return _meta_cache["collections"]
    stats = collection_stats()
    _meta_cache["collections"] = stats
    _meta_cache["at"] = now
    return stats


def _cached_latest_silver_at() -> str | None:
    now = time.monotonic()
    if now - float(_meta_cache["at"]) < _META_TTL_SECONDS and _meta_cache.get("latest_silver_at") is not None:
        return _meta_cache["latest_silver_at"]

    silver_root = settings.lake_root / "silver"
    latest_at: str | None = None
    if silver_root.is_dir():
        parquets = list(silver_root.rglob("*.parquet"))
        if parquets:
            latest = max(parquets, key=lambda p: p.stat().st_mtime)
            latest_at = datetime.fromtimestamp(latest.stat().st_mtime, tz=UTC).isoformat()

    _meta_cache["latest_silver_at"] = latest_at
    _meta_cache["at"] = now
    return latest_at


def invalidate_pulse_meta_cache() -> None:
    _meta_cache["at"] = 0.0
    _meta_cache["collections"] = None
    _meta_cache["latest_silver_at"] = None


def build_pulse_snapshot(
    *,
    wc_models_ready: bool,
    force_lake_counts: bool = False,
) -> dict[str, Any]:
    global _last_snapshot
    articles_silver, fixtures = get_lake_counts(force=force_lake_counts)
    stats = _cached_collections()
    snapshot = {
        "status": "ok",
        "pulse_at": datetime.now(UTC).isoformat(),
        "lake_root": str(settings.lake_root),
        "articles_silver": articles_silver,
        "fixtures": fixtures,
        "latest_silver_at": _cached_latest_silver_at(),
        "collections": stats,
        "wc_models_ready": wc_models_ready,
        "actions": {
            "sync_news": {"method": "POST", "path": "/news/sync"},
            "feed": {"method": "GET", "path": "/news/all"},
        },
    }
    _last_snapshot = snapshot
    return snapshot


def get_last_pulse_snapshot() -> dict[str, Any] | None:
    return _last_snapshot


def pulse_response_headers(snapshot: dict[str, Any]) -> dict[str, str]:
    collections = snapshot.get("collections") or {}
    last_run = collections.get("last_run")
    return {
        "X-Data-Pulse-At": str(snapshot["pulse_at"]),
        "X-Articles-Silver": str(snapshot["articles_silver"]),
        "X-Fixtures": str(snapshot["fixtures"]),
        "X-WC-Models-Ready": "true" if snapshot.get("wc_models_ready") else "false",
        "X-Collections-Last-Run": last_run if last_run else "",
        "X-Latest-Silver-At": snapshot.get("latest_silver_at") or "",
    }


class DataPulseMiddleware(BaseHTTPMiddleware):
    def __init__(self, app, *, wc_models_ready: Callable[[], bool]) -> None:
        super().__init__(app)
        self._wc_models_ready = wc_models_ready

    async def dispatch(self, request: Request, call_next) -> Response:
        path = request.url.path
        if any(path.startswith(prefix) for prefix in _SKIP_PREFIXES):
            return await call_next(request)

        import asyncio

        snapshot = await asyncio.to_thread(
            build_pulse_snapshot,
            wc_models_ready=self._wc_models_ready(),
        )
        request.state.data_pulse = snapshot

        response = await call_next(request)
        for key, value in pulse_response_headers(snapshot).items():
            response.headers[key] = value
        return response