Legal-i commited on
Commit
8f366e8
Β·
verified Β·
1 Parent(s): 741565b

Stage 163: BigQuery REST query connector

Browse files
infra/ingestion/bigquery_connector.py ADDED
@@ -0,0 +1,388 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ infra.ingestion.bigquery_connector β€” pull observations from a BigQuery
3
+ dataset via the REST query API + service-account JWT auth.
4
+
5
+ Customer flow:
6
+ 1. In GCP β†’ IAM & Admin β†’ Service Accounts, create a service account
7
+ with role ``roles/bigquery.dataViewer`` (read-only) on the
8
+ dataset(s) you want OrgState to observe, plus
9
+ ``roles/bigquery.jobUser`` on the project (needed to start
10
+ queries).
11
+ 2. Generate a JSON key for the service account; the file contains
12
+ ``client_email`` + ``private_key`` + ``token_uri``.
13
+ 3. Store the entire JSON as one env var on this deployment
14
+ (recommended) OR paste it inline into connector_config. The
15
+ inline path is OK for a demo but lands the private key in the
16
+ schedule DB.
17
+ 4. Configure the SQL query + field mapping (see below).
18
+
19
+ Configuration:
20
+
21
+ project_id GCP project the queries run in (billed there).
22
+ sql Standard SQL with {entity_type} substituted
23
+ at fetch time, e.g.
24
+ "SELECT entity_id, day, mrr_cents FROM
25
+ \\`acme.bi.{entity_type}_daily\\`
26
+ WHERE day >= DATE_SUB(CURRENT_DATE(),
27
+ INTERVAL 35 DAY)"
28
+ mapping {"entity_id": "entity_id",
29
+ "day": "day",
30
+ "values": {"mrr": "mrr_cents",
31
+ "users": "active_users"}}
32
+ service_account_json (optional, inline) the raw JSON contents.
33
+ service_account_env (optional, recommended) env-var name holding
34
+ the JSON. ONE of *_json / *_env required.
35
+ api_timeout HTTP timeout in seconds (default 30).
36
+ query_timeout_ms BigQuery server-side timeout (default 60_000,
37
+ max 600_000). Set higher if the customer's
38
+ query touches a large partitioned table.
39
+ use_legacy_sql false (default) β€” Standard SQL. Set true only
40
+ for legacy datasets.
41
+ location optional BigQuery location ("US" / "EU" /
42
+ "asia-northeast1"). Required for some
43
+ datasets β€” Google's error message points at
44
+ it clearly when missing.
45
+
46
+ JWT signing uses PyJWT (already a transitive dep of authlib) with
47
+ the service account's RSA private key; token exchange hits
48
+ oauth2.googleapis.com once per fetch and caches in-memory for the
49
+ token's lifetime (default 1h). The cache is per-connector-instance
50
+ β€” a long-running scheduler tick gets one token reuse, a
51
+ once-per-day cron always fresh-mints.
52
+ """
53
+ from __future__ import annotations
54
+
55
+ import json
56
+ import os
57
+ import time
58
+ from typing import Any, List, Mapping, Optional
59
+ from urllib.error import HTTPError, URLError
60
+ from urllib.parse import urlencode
61
+ from urllib.request import Request, urlopen
62
+
63
+ from core.pipeline import Observation
64
+
65
+ from .connectors import Connector
66
+ from .http_connector import ConnectorFetchError
67
+
68
+
69
+ _OAUTH_TOKEN_URL = "https://oauth2.googleapis.com/token"
70
+ _OAUTH_SCOPE = "https://www.googleapis.com/auth/bigquery.readonly"
71
+ _BQ_QUERY_BASE = "https://bigquery.googleapis.com/bigquery/v2/projects"
72
+
73
+
74
+ class BigQueryConnector(Connector):
75
+ """Fetches observations from a BigQuery dataset via the REST API."""
76
+
77
+ type_name = "bigquery"
78
+
79
+ def __init__(
80
+ self, *,
81
+ project_id: str,
82
+ sql: str,
83
+ mapping: Mapping[str, Any],
84
+ service_account_json: Optional[str] = None,
85
+ service_account_env: Optional[str] = None,
86
+ api_timeout: float = 30.0,
87
+ query_timeout_ms: int = 60_000,
88
+ use_legacy_sql: bool = False,
89
+ location: Optional[str] = None,
90
+ ):
91
+ if not project_id:
92
+ raise ValueError("bigquery connector needs project_id")
93
+ if not sql:
94
+ raise ValueError("bigquery connector needs a sql query")
95
+ if not (service_account_json or service_account_env):
96
+ raise ValueError(
97
+ "bigquery connector needs service_account_json OR "
98
+ "service_account_env (recommended). Set one of them "
99
+ "in connector_config."
100
+ )
101
+ if query_timeout_ms < 1 or query_timeout_ms > 600_000:
102
+ raise ValueError("query_timeout_ms must be in [1, 600000]")
103
+ self.project_id = project_id
104
+ self.sql = sql
105
+ self.mapping = dict(mapping)
106
+ self.service_account_json = service_account_json
107
+ self.service_account_env = service_account_env
108
+ self.api_timeout = api_timeout
109
+ self.query_timeout_ms = query_timeout_ms
110
+ self.use_legacy_sql = use_legacy_sql
111
+ self.location = location
112
+ # In-memory token cache β€” refreshed when the existing token
113
+ # is within 60s of expiry. Set in _resolve_token().
114
+ self._cached_token: Optional[str] = None
115
+ self._cached_until: float = 0.0
116
+
117
+ # ---- contract methods -----------------------------------------
118
+
119
+ def fetch(self, entity_type: str) -> List[Observation]:
120
+ sa = self._load_service_account()
121
+ token = self._resolve_token(sa)
122
+ try:
123
+ sql_resolved = self.sql.format(entity_type=entity_type)
124
+ except KeyError as e:
125
+ raise ConnectorFetchError(
126
+ "bad_payload",
127
+ f"sql placeholder {{{e.args[0]}}} is not supported "
128
+ "(only {entity_type} is)",
129
+ cause=e,
130
+ ) from e
131
+ body: dict = {
132
+ "query": sql_resolved,
133
+ "useLegacySql": self.use_legacy_sql,
134
+ "timeoutMs": self.query_timeout_ms,
135
+ }
136
+ if self.location:
137
+ body["location"] = self.location
138
+ url = f"{_BQ_QUERY_BASE}/{self.project_id}/queries"
139
+ req = Request(
140
+ url, data=json.dumps(body).encode(),
141
+ headers={
142
+ "Authorization": f"Bearer {token}",
143
+ "Content-Type": "application/json",
144
+ "Accept": "application/json",
145
+ }, method="POST",
146
+ )
147
+ try:
148
+ with urlopen(req, timeout=self.api_timeout) as resp:
149
+ payload_bytes = resp.read()
150
+ except HTTPError as e:
151
+ code, msg = _classify_http_error(e)
152
+ raise ConnectorFetchError(
153
+ code,
154
+ f"BigQuery {e.code} {e.reason}: {msg}",
155
+ cause=e,
156
+ ) from e
157
+ except URLError as e:
158
+ raise ConnectorFetchError(
159
+ "network",
160
+ f"could not reach BigQuery: {e.reason}",
161
+ cause=e,
162
+ ) from e
163
+ try:
164
+ payload = json.loads(payload_bytes)
165
+ except json.JSONDecodeError as e:
166
+ raise ConnectorFetchError(
167
+ "bad_payload", "BigQuery returned non-JSON body", cause=e,
168
+ ) from e
169
+ return self._rows_to_observations(payload)
170
+
171
+ def entity_types(self) -> List[str]:
172
+ # Same shape as Salesforce β€” caller picks at schedule time.
173
+ return []
174
+
175
+ # ---- service account + token --------------------------------
176
+
177
+ def _load_service_account(self) -> dict:
178
+ raw = self.service_account_json
179
+ if not raw and self.service_account_env:
180
+ raw = os.environ.get(self.service_account_env, "")
181
+ if not raw:
182
+ raise ConnectorFetchError(
183
+ "missing_secret",
184
+ f"env var {self.service_account_env!r} "
185
+ "(service_account_env) is unset or empty β€” paste the "
186
+ "service-account JSON contents (NOT a path) as the "
187
+ "env value",
188
+ )
189
+ try:
190
+ sa = json.loads(raw)
191
+ except json.JSONDecodeError as e:
192
+ raise ConnectorFetchError(
193
+ "bad_payload",
194
+ "service account JSON failed to parse β€” make sure the "
195
+ "env var carries the full JSON contents, not a path",
196
+ cause=e,
197
+ ) from e
198
+ for field in ("client_email", "private_key", "token_uri"):
199
+ if not sa.get(field):
200
+ raise ConnectorFetchError(
201
+ "bad_payload",
202
+ f"service account JSON missing required field "
203
+ f"{field!r}",
204
+ )
205
+ return sa
206
+
207
+ def _resolve_token(self, sa: dict) -> str:
208
+ now = time.time()
209
+ if self._cached_token and self._cached_until > now + 60:
210
+ return self._cached_token
211
+ # Sign a JWT bearer assertion + exchange for an access token.
212
+ # Standard Google OAuth 2.0 service-account flow.
213
+ try:
214
+ import jwt as _jwt
215
+ except ImportError as e: # pragma: no cover
216
+ raise ConnectorFetchError(
217
+ "missing_dep",
218
+ "BigQuery connector needs PyJWT (transitive via "
219
+ "authlib) β€” `pip install pyjwt`.",
220
+ cause=e,
221
+ ) from e
222
+ issued = int(now)
223
+ expires = issued + 3600
224
+ claims = {
225
+ "iss": sa["client_email"],
226
+ "scope": _OAUTH_SCOPE,
227
+ "aud": sa.get("token_uri", _OAUTH_TOKEN_URL),
228
+ "iat": issued,
229
+ "exp": expires,
230
+ }
231
+ try:
232
+ assertion = _jwt.encode(claims, sa["private_key"],
233
+ algorithm="RS256")
234
+ except Exception as e:
235
+ raise ConnectorFetchError(
236
+ "bad_payload",
237
+ "JWT signing failed β€” service account private_key is "
238
+ "probably malformed (must be a PEM-encoded RSA key)",
239
+ cause=e,
240
+ ) from e
241
+ body = urlencode({
242
+ "grant_type": "urn:ietf:params:oauth:grant-type:jwt-bearer",
243
+ "assertion": assertion,
244
+ }).encode()
245
+ req = Request(
246
+ sa.get("token_uri", _OAUTH_TOKEN_URL), data=body,
247
+ headers={
248
+ "Content-Type": "application/x-www-form-urlencoded",
249
+ "Accept": "application/json",
250
+ }, method="POST",
251
+ )
252
+ try:
253
+ with urlopen(req, timeout=self.api_timeout) as resp:
254
+ tok_payload = json.loads(resp.read())
255
+ except HTTPError as e:
256
+ try:
257
+ err_body = e.read().decode("utf-8", errors="replace")
258
+ except Exception:
259
+ err_body = ""
260
+ raise ConnectorFetchError(
261
+ "invalid_credentials",
262
+ f"OAuth token exchange failed (HTTP {e.code}): "
263
+ f"{err_body[:300]}",
264
+ cause=e,
265
+ ) from e
266
+ except URLError as e: # pragma: no cover
267
+ raise ConnectorFetchError(
268
+ "network",
269
+ f"could not reach Google OAuth: {e.reason}",
270
+ cause=e,
271
+ ) from e
272
+ access_token = tok_payload.get("access_token")
273
+ if not access_token:
274
+ raise ConnectorFetchError(
275
+ "invalid_credentials",
276
+ "OAuth response had no access_token",
277
+ )
278
+ self._cached_token = access_token
279
+ self._cached_until = float(
280
+ issued + int(tok_payload.get("expires_in", 3600))
281
+ )
282
+ return access_token
283
+
284
+ # ---- response parsing ---------------------------------------
285
+
286
+ def _rows_to_observations(self, payload: dict) -> List[Observation]:
287
+ if not isinstance(payload, dict):
288
+ raise ConnectorFetchError(
289
+ "bad_payload",
290
+ f"BigQuery response is a {type(payload).__name__}, expected dict",
291
+ )
292
+ if not payload.get("jobComplete", True):
293
+ raise ConnectorFetchError(
294
+ "upstream",
295
+ "BigQuery query did not finish within timeoutMs β€” "
296
+ "raise query_timeout_ms or partition the query",
297
+ )
298
+ schema = (payload.get("schema") or {}).get("fields") or []
299
+ col_names = [f.get("name") for f in schema]
300
+ rows = payload.get("rows") or []
301
+ eid_field = self.mapping.get("entity_id")
302
+ day_field = self.mapping.get("day")
303
+ values_map = self.mapping.get("values") or {}
304
+ if not eid_field or not day_field or not isinstance(values_map, dict):
305
+ raise ConnectorFetchError(
306
+ "bad_payload",
307
+ "mapping must define entity_id, day, and values (dict). "
308
+ "Got: " + json.dumps(self.mapping, default=str),
309
+ )
310
+ # Build a {field_name: row_index} for fast lookup.
311
+ idx = {name: i for i, name in enumerate(col_names)}
312
+ # Empty result set is fine β€” the query genuinely returned
313
+ # nothing, no point yelling about a missing column. Only
314
+ # raise the column-missing error when rows exist but the
315
+ # mapping can't be applied.
316
+ if rows and (eid_field not in idx or day_field not in idx):
317
+ raise ConnectorFetchError(
318
+ "bad_payload",
319
+ f"query result is missing columns: required "
320
+ f"{eid_field!r} + {day_field!r}; got {col_names}",
321
+ )
322
+ out: List[Observation] = []
323
+ for row in rows:
324
+ cells = row.get("f") or []
325
+ def _get(field: str) -> Any:
326
+ i = idx.get(field)
327
+ if i is None or i >= len(cells):
328
+ return None
329
+ return (cells[i] or {}).get("v")
330
+ eid = _get(eid_field)
331
+ day = _get(day_field)
332
+ if not eid or not day:
333
+ continue
334
+ obs_values = {}
335
+ for metric, bq_field in values_map.items():
336
+ raw = _get(bq_field)
337
+ if raw is None:
338
+ continue
339
+ try:
340
+ obs_values[metric] = float(raw)
341
+ except (TypeError, ValueError):
342
+ continue
343
+ if not obs_values:
344
+ continue
345
+ out.append(Observation(
346
+ entity_id=str(eid),
347
+ day=str(day)[:10],
348
+ values=obs_values,
349
+ ))
350
+ if payload.get("pageToken"):
351
+ import sys as _sys
352
+ print(
353
+ f"warning: BigQuery query returned a pageToken; "
354
+ f"v1 reads only the first page ({len(rows)} rows). "
355
+ "Tighten the WHERE clause or wait for v2 pagination.",
356
+ file=_sys.stderr,
357
+ )
358
+ return out
359
+
360
+
361
+ def _classify_http_error(e: HTTPError) -> tuple:
362
+ """Map a BigQuery HTTP error to a stable (code, hint) tuple."""
363
+ try:
364
+ body = e.read().decode("utf-8", errors="replace")
365
+ except Exception:
366
+ body = ""
367
+ snippet = body[:300] if body else ""
368
+ if e.code == 401:
369
+ return ("invalid_credentials",
370
+ f"BigQuery rejected the access token β€” verify the "
371
+ f"service account is enabled and has bigquery.jobUser "
372
+ f"+ bigquery.dataViewer roles. Body: {snippet}")
373
+ if e.code == 400:
374
+ return ("bad_query",
375
+ f"BigQuery rejected the SQL or job config: {snippet}")
376
+ if e.code == 403:
377
+ return ("forbidden",
378
+ f"Service account is missing IAM scope (need "
379
+ f"bigquery.jobs.create + bigquery.tables.getData on "
380
+ f"the dataset): {snippet}")
381
+ if e.code == 404:
382
+ return ("not_found",
383
+ f"BigQuery resource not found β€” wrong project_id or "
384
+ f"dataset? Body: {snippet}")
385
+ if 500 <= e.code < 600:
386
+ return ("upstream",
387
+ f"BigQuery returned 5xx ({snippet})")
388
+ return ("http_error", snippet)
infra/ingestion/connectors.py CHANGED
@@ -188,11 +188,33 @@ def build_connector(connector_type: str, config: dict) -> Connector:
188
  timeout=float(config.get("timeout", 30.0)),
189
  base_url=config.get("base_url", "https://api.stripe.com"),
190
  )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
191
  raise ValueError(
192
  f"unknown / non-schedulable connector type {connector_type!r} "
193
  f"(schedulable: ['{CSVFolderConnector.type_name}', "
194
  f"'{HttpJsonConnector.type_name}', '{SFTPConnector.type_name}', "
195
  f"'{SQLConnector.type_name}', "
196
  f"'{SalesforceConnector.type_name}', "
197
- f"'{StripeConnector.type_name}'])"
 
198
  )
 
188
  timeout=float(config.get("timeout", 30.0)),
189
  base_url=config.get("base_url", "https://api.stripe.com"),
190
  )
191
+ # Stage 163 β€” BigQuery REST query connector.
192
+ from .bigquery_connector import BigQueryConnector
193
+ if connector_type == BigQueryConnector.type_name:
194
+ required = ("project_id", "sql", "mapping")
195
+ missing = [f for f in required if not config.get(f)]
196
+ if missing:
197
+ raise ValueError(
198
+ "bigquery connector config missing required field(s): "
199
+ f"{', '.join(missing)}"
200
+ )
201
+ return BigQueryConnector(
202
+ project_id=config["project_id"],
203
+ sql=config["sql"],
204
+ mapping=config["mapping"],
205
+ service_account_json=config.get("service_account_json"),
206
+ service_account_env=config.get("service_account_env"),
207
+ api_timeout=float(config.get("api_timeout", 30.0)),
208
+ query_timeout_ms=int(config.get("query_timeout_ms", 60_000)),
209
+ use_legacy_sql=bool(config.get("use_legacy_sql", False)),
210
+ location=config.get("location"),
211
+ )
212
  raise ValueError(
213
  f"unknown / non-schedulable connector type {connector_type!r} "
214
  f"(schedulable: ['{CSVFolderConnector.type_name}', "
215
  f"'{HttpJsonConnector.type_name}', '{SFTPConnector.type_name}', "
216
  f"'{SQLConnector.type_name}', "
217
  f"'{SalesforceConnector.type_name}', "
218
+ f"'{StripeConnector.type_name}', "
219
+ f"'{BigQueryConnector.type_name}'])"
220
  )
requirements-runtime.txt CHANGED
@@ -38,6 +38,11 @@ authlib>=1.3,<2.0
38
  # pdf_unavailable. ~3MB wheel, no native deps, embeds fonts so
39
  # Hebrew/Latin mixed reports render correctly.
40
  fpdf2>=2.7
 
 
 
 
 
41
  # Stage 156 β€” python-bidi runs the Unicode BiDi algorithm so we can
42
  # render Hebrew RTL text through fpdf2 (which is LTR-only). Tiny pure-
43
  # Python lib (~30KB); lazy-imported inside delivery/reports/pdf.py so
 
38
  # pdf_unavailable. ~3MB wheel, no native deps, embeds fonts so
39
  # Hebrew/Latin mixed reports render correctly.
40
  fpdf2>=2.7
41
+ # Stage 163 β€” PyJWT is a transitive dep of authlib but we now use it
42
+ # directly to sign service-account JWTs for the BigQuery connector
43
+ # (Google's OAuth 2.0 jwt-bearer flow). Tiny pure-Python lib β€”
44
+ # safe to make explicit so a future authlib bump can't drop it.
45
+ PyJWT>=2.10
46
  # Stage 156 β€” python-bidi runs the Unicode BiDi algorithm so we can
47
  # render Hebrew RTL text through fpdf2 (which is LTR-only). Tiny pure-
48
  # Python lib (~30KB); lazy-imported inside delivery/reports/pdf.py so