File size: 10,298 Bytes
bad6218
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
"""HTTP client for the Cube.js API of Indicateurs Territoriaux."""

import os
from typing import Any

import httpx
from dotenv import load_dotenv

load_dotenv()


class CubeJsClientError(Exception):
    """Base exception for Cube.js client errors."""

    pass


class AuthenticationError(CubeJsClientError):
    """Raised when authentication fails (401)."""

    pass


class BadRequestError(CubeJsClientError):
    """Raised when the request is malformed (400)."""

    pass


class CubeJsClient:
    """HTTP client for the Cube.js REST API.
    
    This client handles authentication and provides methods to interact
    with the Indicateurs Territoriaux API endpoints.
    """

    def __init__(
        self,
        base_url: str | None = None,
        token: str | None = None,
        timeout: float = 30.0,
    ):
        """Initialize the Cube.js client.
        
        Args:
            base_url: Base URL of the API. Defaults to env var INDICATEURS_TE_BASE_URL.
            token: JWT authentication token. Defaults to env var INDICATEURS_TE_TOKEN.
            timeout: Request timeout in seconds.
        """
        self.base_url = (
            base_url
            or os.getenv("INDICATEURS_TE_BASE_URL")
            or "https://api.indicateurs.ecologie.gouv.fr"
        )
        self.token = token or os.getenv("INDICATEURS_TE_TOKEN")
        
        if not self.token:
            raise ValueError(
                "No API token provided. Set INDICATEURS_TE_TOKEN environment variable "
                "or pass token parameter."
            )
        
        self.timeout = timeout
        self._client: httpx.AsyncClient | None = None

    @property
    def headers(self) -> dict[str, str]:
        """HTTP headers for API requests."""
        return {
            "Authorization": f"Bearer {self.token}",
            "Content-Type": "application/json",
        }

    async def _get_client(self) -> httpx.AsyncClient:
        """Get or create the async HTTP client."""
        if self._client is None or self._client.is_closed:
            self._client = httpx.AsyncClient(
                base_url=self.base_url,
                headers=self.headers,
                timeout=self.timeout,
            )
        return self._client

    async def close(self) -> None:
        """Close the HTTP client."""
        if self._client is not None and not self._client.is_closed:
            await self._client.aclose()
            self._client = None

    async def _handle_response(self, response: httpx.Response) -> dict[str, Any]:
        """Handle API response and raise appropriate errors.
        
        Args:
            response: The HTTP response object.
            
        Returns:
            Parsed JSON response.
            
        Raises:
            AuthenticationError: If the token is invalid or expired (401).
            BadRequestError: If the request is malformed (400).
            CubeJsClientError: For other HTTP errors.
        """
        if response.status_code == 401:
            raise AuthenticationError(
                "Authentication failed. Your API token may be invalid or expired. "
                "Please check your INDICATEURS_TE_TOKEN environment variable."
            )
        
        if response.status_code == 400:
            try:
                error_detail = response.json()
            except Exception:
                error_detail = response.text
            raise BadRequestError(
                f"Bad request to API. Details: {error_detail}"
            )
        
        if response.status_code >= 400:
            raise CubeJsClientError(
                f"API request failed with status {response.status_code}: {response.text}"
            )
        
        return response.json()

    async def get_meta(self) -> dict[str, Any]:
        """Fetch the API schema metadata.
        
        Returns the complete schema including all cubes, their measures,
        dimensions, and available filters.
        
        Returns:
            Dict containing the API metadata with 'cubes' key.
            
        Raises:
            AuthenticationError: If authentication fails.
            CubeJsClientError: For other API errors.
        """
        client = await self._get_client()
        response = await client.get("/cubejs-api/v1/meta")
        return await self._handle_response(response)

    async def load(self, query: dict[str, Any]) -> dict[str, Any]:
        """Execute a data query against the Cube.js API.
        
        Args:
            query: The Cube.js query object containing measures, dimensions,
                   filters, and other query parameters.
                   
        Returns:
            Dict containing the query results with 'data' key.
            
        Raises:
            AuthenticationError: If authentication fails.
            BadRequestError: If the query is malformed.
            CubeJsClientError: For other API errors.
            
        Example:
            >>> query = {
            ...     "measures": ["indicateur_metadata.count"],
            ...     "dimensions": ["indicateur_metadata.id", "indicateur_metadata.libelle"],
            ...     "limit": 10
            ... }
            >>> result = await client.load(query)
        """
        client = await self._get_client()
        response = await client.post(
            "/cubejs-api/v1/load",
            json={"query": query},
        )
        return await self._handle_response(response)

    async def load_indicators_metadata(
        self,
        dimensions: list[str] | None = None,
        filters: list[dict[str, Any]] | None = None,
        limit: int = 500,
    ) -> list[dict[str, Any]]:
        """Load indicator metadata from the indicateur_metadata cube.
        
        Convenience method for querying the indicator metadata cube.
        
        Args:
            dimensions: List of dimensions to fetch. Defaults to basic info.
            filters: Optional list of filters to apply.
            limit: Maximum number of results.
            
        Returns:
            List of indicator metadata records.
        """
        if dimensions is None:
            dimensions = [
                "indicateur_metadata.id",
                "indicateur_metadata.libelle",
                "indicateur_metadata.unite",
                "indicateur_metadata.description",
                "indicateur_metadata.mailles_disponibles",
                "indicateur_metadata.thematique_fnv",
                "indicateur_metadata.annees_disponibles",
            ]
        
        query: dict[str, Any] = {
            "dimensions": dimensions,
            "limit": limit,
        }
        
        if filters:
            query["filters"] = filters
        
        result = await self.load(query)
        return result.get("data", [])

    async def load_sources_metadata(
        self,
        indicator_id: int | None = None,
        limit: int = 100,
    ) -> list[dict[str, Any]]:
        """Load source metadata from the indicateur_x_source_metadata cube.
        
        Args:
            indicator_id: Optional indicator ID to filter sources.
            limit: Maximum number of results.
            
        Returns:
            List of source metadata records.
        """
        dimensions = [
            "indicateur_x_source_metadata.id_indicateur",
            "indicateur_x_source_metadata.nom_source",
            "indicateur_x_source_metadata.libelle",
            "indicateur_x_source_metadata.description",
            "indicateur_x_source_metadata.producteur_source",
            "indicateur_x_source_metadata.distributeur_source",
            "indicateur_x_source_metadata.license_source",
            "indicateur_x_source_metadata.lien_page",
            "indicateur_x_source_metadata.date_derniere_extraction",
        ]
        
        query: dict[str, Any] = {
            "dimensions": dimensions,
            "limit": limit,
        }
        
        if indicator_id is not None:
            query["filters"] = [
                {
                    "member": "indicateur_x_source_metadata.id_indicateur",
                    "operator": "equals",
                    "values": [str(indicator_id)],
                }
            ]
        
        result = await self.load(query)
        return result.get("data", [])

    async def search_indicators_by_libelle(
        self,
        search_term: str,
        limit: int = 50,
    ) -> list[dict[str, Any]]:
        """Search indicators by keyword in libelle using contains filter.
        
        This uses Cube.js contains operator for server-side filtering.
        Note: Limited to single term, for multi-term use client-side filtering.
        
        Args:
            search_term: Term to search for in indicator libelle.
            limit: Maximum number of results.
            
        Returns:
            List of matching indicator metadata records.
        """
        query: dict[str, Any] = {
            "dimensions": [
                "indicateur_metadata.id",
                "indicateur_metadata.libelle",
                "indicateur_metadata.description",
                "indicateur_metadata.unite",
                "indicateur_metadata.mailles_disponibles",
                "indicateur_metadata.thematique_fnv",
            ],
            "filters": [
                {
                    "member": "indicateur_metadata.libelle",
                    "operator": "contains",
                    "values": [search_term],
                }
            ],
            "limit": limit,
        }
        
        result = await self.load(query)
        return result.get("data", [])


# Singleton instance for the application
_client_instance: CubeJsClient | None = None


def get_client() -> CubeJsClient:
    """Get or create the singleton CubeJsClient instance.
    
    Returns:
        The shared CubeJsClient instance.
    """
    global _client_instance
    if _client_instance is None:
        _client_instance = CubeJsClient()
    return _client_instance


async def close_client() -> None:
    """Close the singleton client instance."""
    global _client_instance
    if _client_instance is not None:
        await _client_instance.close()
        _client_instance = None