sofhiaazzhr commited on
Commit
faf8a4f
Β·
2 Parent(s): 0ae8717212dad3

Merge commit 'refs/pr/1' of https://huggingface.co/spaces/DataEyond/Agentic-Service-Data-Eyond-Catalog into pr/1

Browse files
PROGRESS.md CHANGED
@@ -2,8 +2,8 @@
2
 
3
  Persistent tracker mirroring the 42-item ownership table in `REPO_CONTEXT.md` "Team β€” division of work". Update as PRs land. Future Claude Code sessions read this to know what's already done.
4
 
5
- **Last updated**: 2026-05-07 (PR2a β€” enricher + structured pipeline shipped)
6
- **Current open PR**: PR2a (DB owner β€” CatalogEnricher + StructuredPipeline + on_db_registered trigger + FK extension)
7
 
8
  ---
9
 
@@ -22,9 +22,10 @@ Persistent tracker mirroring the 42-item ownership table in `REPO_CONTEXT.md` "T
22
  |---|---|---|---|
23
  | PR1 | `[x]` merged | DB | Contract locks + catalog plumbing + DB introspector + IR validator + tests |
24
  | PR1-tab | `[ ]` | TAB | Tabular introspector + golden IR examples for tabular |
25
- | PR2a | `[~]` open | DB | CatalogEnricher + StructuredPipeline + on_db_registered trigger + FK extension on Table |
26
  | PR2b | `[ ]` | B | IntentRouter + planner prompt (pair) + planner LLM service |
27
- | PR3 | `[ ]` | B (split) | SQL compiler + DB executor (DB), pandas compiler + tabular executor (TAB) |
 
28
  | PR4 | `[ ]` | B (pair) | ExecutorDispatcher + QueryService + chat stream endpoint integration |
29
  | PR5 | `[ ]` | B | Retry/self-correction loop on execution failure |
30
  | PR6 | `[ ]` | B | Eval harness (golden questionβ†’IRβ†’result examples) |
@@ -91,10 +92,10 @@ Persistent tracker mirroring the 42-item ownership table in `REPO_CONTEXT.md` "T
91
 
92
  | # | Item | Status | Notes |
93
  |---|---|---|---|
94
- | 25 | SQL compiler (`query/compiler/sql.py`) | `[ ]` | PR3 β€” IR β†’ (sql, params); identifiers from catalog (quoted), values parameterized |
95
- | 26 | DB executor (`query/executor/db.py`) | `[ ]` | PR3 β€” asyncpg/pymysql; sqlglot SELECT-only guard; RO txn; 30s timeout |
96
  | 27 | Credential encryption (`security/credentials.py`) | `[ ]` | Stub exists; PR1 reused Phase 1 `utils/db_credential_encryption.py` instead. Move in cleanup PR |
97
- | 28 | User-DB connection management | `[ ]` | Reused Phase 1 `db_pipeline_service.engine_scope` in PR1; new helper in PR3 if needed |
98
 
99
  ### Query β€” Tabular path
100
 
@@ -124,7 +125,7 @@ Persistent tracker mirroring the 42-item ownership table in `REPO_CONTEXT.md` "T
124
 
125
  | # | Item | Owner | Status | Notes |
126
  |---|---|---|---|---|
127
- | 38 | DB compiler golden tests (`tests/query/compiler/test_sql.py`) | DB | `[ ]` | PR3 β€” pure-Python, no LLM |
128
  | 39 | Pandas compiler golden tests (`tests/query/compiler/test_pandas.py`) | TAB | `[ ]` | PR3 β€” pure-Python, no LLM |
129
  | 40 | IR validator tests (`tests/query/ir/test_validator.py`) | B | `[x]` | PR1 β€” 19 tests, all rules covered |
130
  | β€” | PII detector tests (`tests/catalog/test_pii_detector.py`) | B | `[x]` | PR1 β€” 26 tests (parametrized) |
@@ -139,7 +140,26 @@ Persistent tracker mirroring the 42-item ownership table in `REPO_CONTEXT.md` "T
139
 
140
  ---
141
 
142
- ## What just shipped (PR2a β€” DB owner)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
143
 
144
  **Files implemented**:
145
  - `src/catalog/enricher.py` β€” Azure OpenAI GPT-4o + structured output (`EnrichmentResponse`), `render_source` (reusable by planner prompt later), `apply_descriptions` merger, injectable `structured_chain` for tests
 
2
 
3
  Persistent tracker mirroring the 42-item ownership table in `REPO_CONTEXT.md` "Team β€” division of work". Update as PRs land. Future Claude Code sessions read this to know what's already done.
4
 
5
+ **Last updated**: 2026-05-07 (PR3-DB β€” SQL compiler + DB executor shipped)
6
+ **Current open PR**: PR3-DB (DB owner — SqlCompiler + DbExecutor + golden IR→SQL tests)
7
 
8
  ---
9
 
 
22
  |---|---|---|---|
23
  | PR1 | `[x]` merged | DB | Contract locks + catalog plumbing + DB introspector + IR validator + tests |
24
  | PR1-tab | `[ ]` | TAB | Tabular introspector + golden IR examples for tabular |
25
+ | PR2a | `[x]` merged | DB | CatalogEnricher + StructuredPipeline + on_db_registered trigger + FK extension on Table |
26
  | PR2b | `[ ]` | B | IntentRouter + planner prompt (pair) + planner LLM service |
27
+ | PR3-DB | `[~]` open | DB | SqlCompiler (Postgres) + DbExecutor (sqlglot guard, RO + statement_timeout, asyncio.to_thread) + 36 golden IR→SQL tests |
28
+ | PR3-TAB | `[ ]` | TAB | Pandas compiler + tabular executor + golden IR→DataFrame tests |
29
  | PR4 | `[ ]` | B (pair) | ExecutorDispatcher + QueryService + chat stream endpoint integration |
30
  | PR5 | `[ ]` | B | Retry/self-correction loop on execution failure |
31
  | PR6 | `[ ]` | B | Eval harness (golden questionβ†’IRβ†’result examples) |
 
92
 
93
  | # | Item | Status | Notes |
94
  |---|---|---|---|
95
+ | 25 | SQL compiler (`query/compiler/sql.py`) | `[x]` | PR3-DB β€” Postgres dialect (Supabase reuses); deterministic IR β†’ (sql, named-params dict); double-quoted identifiers from catalog; all whitelisted ops (=, !=, <, <=, >, >=, in, not_in, is_null, is_not_null, like, between); alias-aware order_by; `CompiledSql.params: dict[str, Any]` (changed from `list`). MySQL/BigQuery/Snowflake compilers later. |
96
+ | 26 | DB executor (`query/executor/db.py`) | `[x]` | PR3-DB β€” sync engine via `db_pipeline_service.engine_scope` inside `asyncio.to_thread`. sqlglot SELECT-only / no-DML guard. Postgres-only session settings: `default_transaction_read_only=on` + `statement_timeout=30000`. asyncio.wait_for backstop. Never raises β€” populates `QueryResult.error`. 10k row hard cap. |
97
  | 27 | Credential encryption (`security/credentials.py`) | `[ ]` | Stub exists; PR1 reused Phase 1 `utils/db_credential_encryption.py` instead. Move in cleanup PR |
98
+ | 28 | User-DB connection management | `[x]` | PR3-DB reused Phase 1 `db_pipeline_service.engine_scope` (same as PR1 introspector); no new helper needed |
99
 
100
  ### Query β€” Tabular path
101
 
 
125
 
126
  | # | Item | Owner | Status | Notes |
127
  |---|---|---|---|---|
128
+ | 38 | DB compiler golden tests (`tests/query/compiler/test_sql.py`) | DB | `[x]` | PR3-DB β€” 36 tests across all whitelisted ops, identifier quoting, agg / count_distinct / count(*), order_by alias resolution, parameter sequencing, error paths. Pure-Python, no LLM, no DB. |
129
  | 39 | Pandas compiler golden tests (`tests/query/compiler/test_pandas.py`) | TAB | `[ ]` | PR3 β€” pure-Python, no LLM |
130
  | 40 | IR validator tests (`tests/query/ir/test_validator.py`) | B | `[x]` | PR1 β€” 19 tests, all rules covered |
131
  | β€” | PII detector tests (`tests/catalog/test_pii_detector.py`) | B | `[x]` | PR1 β€” 26 tests (parametrized) |
 
140
 
141
  ---
142
 
143
+ ## What just shipped (PR3-DB β€” DB owner)
144
+
145
+ **Files implemented**:
146
+ - `src/query/compiler/sql.py` β€” `SqlCompiler` for Postgres dialect; `CompiledSql(sql, params)` dataclass with `params: dict[str, Any]` (changed from `list`); supports all 12 whitelisted filter ops, all 6 aggs, alias-aware order_by; `_qident` escapes embedded double-quotes
147
+ - `src/query/executor/db.py` β€” `DbExecutor` with sqlglot SELECT-only guard, Postgres session-level read-only + 30s `statement_timeout`, `asyncio.wait_for` backstop, 10k row hard cap; rejects non-`schema` source_type and `dbclient://` URI mismatch; never raises (populates `QueryResult.error`)
148
+
149
+ **Files extended**:
150
+ - `src/query/compiler/pandas.py` β€” fixed pre-existing UP035 (Callable import)
151
+ - `pyproject.toml` β€” added `S608` to `tests/**` ruff ignore (false positive: tests assert literal SQL strings)
152
+
153
+ **Tests added** (36 new, all passing β€” total now 100):
154
+ - `tests/query/compiler/test_sql.py` β€” every filter op, every agg, count(*), count_distinct, order_by alias vs column, multi-filter AND, identifier quoting escape, error paths
155
+
156
+ **Lint**: `ruff check` clean on Phase 2 paths.
157
+
158
+ **Hand-off note for teammate**: `CompiledSql.params` is now `dict[str, Any]` not `list`. The pandas compiler will follow the same convention (or document its own) β€” coordinate when PR3-TAB lands.
159
+
160
+ ---
161
+
162
+ ## What shipped previously (PR2a β€” DB owner)
163
 
164
  **Files implemented**:
165
  - `src/catalog/enricher.py` β€” Azure OpenAI GPT-4o + structured output (`EnrichmentResponse`), `render_source` (reusable by planner prompt later), `apply_descriptions` merger, injectable `structured_chain` for tests
pyproject.toml CHANGED
@@ -121,7 +121,8 @@ ignore = [
121
  ]
122
 
123
  [tool.ruff.lint.per-file-ignores]
124
- "tests/**" = ["S101", "S105", "S106"]
 
125
 
126
  [tool.mypy]
127
  python_version = "3.12"
 
121
  ]
122
 
123
  [tool.ruff.lint.per-file-ignores]
124
+ # S608 in tests is a false positive β€” tests assert literal SQL strings as fixtures.
125
+ "tests/**" = ["S101", "S105", "S106", "S608"]
126
 
127
  [tool.mypy]
128
  python_version = "3.12"
src/query/compiler/pandas.py CHANGED
@@ -5,7 +5,7 @@ For tabular sources. The callable encapsulates the chain of operations
5
  to a DataFrame loaded eagerly or via predicate pushdown / polars lazy scan.
6
  """
7
 
8
- from typing import Callable
9
 
10
  from ...catalog.models import Catalog
11
  from ..ir.models import QueryIR
 
5
  to a DataFrame loaded eagerly or via predicate pushdown / polars lazy scan.
6
  """
7
 
8
+ from collections.abc import Callable
9
 
10
  from ...catalog.models import Catalog
11
  from ..ir.models import QueryIR
src/query/compiler/sql.py CHANGED
@@ -1,28 +1,305 @@
1
- """SqlCompiler β€” IR β†’ (SQL string, parameters list).
2
 
3
- Identifiers (table, column names) come from the catalog (trusted).
4
- Values come from IR.filters and are ALWAYS parameterized β€” never inlined.
5
- Output is validated by sqlglot before reaching the executor.
 
 
 
 
 
 
 
 
 
6
  """
7
 
8
- from dataclasses import dataclass
 
 
 
9
 
10
- from ...catalog.models import Catalog
11
- from ..ir.models import QueryIR
 
 
 
 
 
 
 
12
  from .base import BaseCompiler
13
 
14
 
15
  @dataclass
16
  class CompiledSql:
17
  sql: str
18
- params: list[object]
 
 
 
 
 
 
 
 
 
19
 
20
 
21
  class SqlCompiler(BaseCompiler):
22
- """Deterministic IR β†’ SQL. No LLM."""
23
 
24
- def __init__(self, catalog: Catalog) -> None:
 
 
 
 
25
  self._catalog = catalog
 
26
 
27
  def compile(self, ir: QueryIR) -> CompiledSql:
28
- raise NotImplementedError
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """SqlCompiler β€” IR β†’ (SQL string, named-params dict).
2
 
3
+ Identifiers (table / column names) come from the catalog and are quoted
4
+ verbatim β€” they were verified by the IR validator against the catalog,
5
+ so injection through identifiers is not possible at this layer.
6
+ Values from filter clauses are ALWAYS parameterized.
7
+
8
+ The output `CompiledSql.sql` uses SQLAlchemy-style named placeholders
9
+ (`:p_0, :p_1, ...`) so it can be executed via `text(sql)` with a params
10
+ dict on a sync SQLAlchemy engine.
11
+
12
+ v1 supports the Postgres dialect only. Supabase reuses the same compiler
13
+ output (Supabase = Postgres). MySQL / BigQuery / Snowflake compilers will
14
+ be separate classes that implement `BaseCompiler`.
15
  """
16
 
17
+ from __future__ import annotations
18
+
19
+ from dataclasses import dataclass, field
20
+ from typing import Any
21
 
22
+ from ...catalog.models import Catalog, Column, Source, Table
23
+ from ..ir.models import (
24
+ AggSelect,
25
+ ColumnSelect,
26
+ FilterClause,
27
+ OrderByClause,
28
+ QueryIR,
29
+ SelectItem,
30
+ )
31
  from .base import BaseCompiler
32
 
33
 
34
  @dataclass
35
  class CompiledSql:
36
  sql: str
37
+ params: dict[str, Any] = field(default_factory=dict)
38
+
39
+
40
+ class SqlCompilerError(Exception):
41
+ pass
42
+
43
+
44
+ _NULLARY_OPS = frozenset({"is_null", "is_not_null"})
45
+ _LIST_OPS = frozenset({"in", "not_in"})
46
+ _COMPARISON_OPS = frozenset({"=", "!=", "<", "<=", ">", ">="})
47
 
48
 
49
  class SqlCompiler(BaseCompiler):
50
+ """Deterministic IR β†’ Postgres SQL. No LLM."""
51
 
52
+ def __init__(self, catalog: Catalog, dialect: str = "postgres") -> None:
53
+ if dialect not in {"postgres", "supabase"}:
54
+ raise SqlCompilerError(
55
+ f"only 'postgres' / 'supabase' supported in v1, got {dialect!r}"
56
+ )
57
  self._catalog = catalog
58
+ self._dialect = dialect
59
 
60
  def compile(self, ir: QueryIR) -> CompiledSql:
61
+ _, table, cols_by_id = self._lookup(ir)
62
+ params: dict[str, Any] = {}
63
+ param_seq = [0]
64
+
65
+ select_clause, select_aliases = self._build_select(ir.select, table, cols_by_id)
66
+ from_clause = self._build_from(table)
67
+ where_clause = self._build_where(ir.filters, table, cols_by_id, params, param_seq)
68
+ groupby_clause = self._build_groupby(ir.group_by, table, cols_by_id)
69
+ orderby_clause = self._build_orderby(
70
+ ir.order_by, table, cols_by_id, select_aliases
71
+ )
72
+ limit_clause = self._build_limit(ir.limit)
73
+
74
+ parts: list[str] = [select_clause, from_clause]
75
+ for clause in (where_clause, groupby_clause, orderby_clause, limit_clause):
76
+ if clause:
77
+ parts.append(clause)
78
+
79
+ return CompiledSql(sql=" ".join(parts), params=params)
80
+
81
+ # ------------------------------------------------------------------
82
+ # Catalog lookup
83
+ # ------------------------------------------------------------------
84
+
85
+ def _lookup(self, ir: QueryIR) -> tuple[Source, Table, dict[str, Column]]:
86
+ source = next(
87
+ (s for s in self._catalog.sources if s.source_id == ir.source_id), None
88
+ )
89
+ if source is None:
90
+ raise SqlCompilerError(f"source_id {ir.source_id!r} not in catalog")
91
+ table = next(
92
+ (t for t in source.tables if t.table_id == ir.table_id), None
93
+ )
94
+ if table is None:
95
+ raise SqlCompilerError(
96
+ f"table_id {ir.table_id!r} not in source {ir.source_id!r}"
97
+ )
98
+ return source, table, {c.column_id: c for c in table.columns}
99
+
100
+ # ------------------------------------------------------------------
101
+ # Identifier quoting
102
+ # ------------------------------------------------------------------
103
+
104
+ @staticmethod
105
+ def _qident(name: str) -> str:
106
+ """Postgres-style double-quoted identifier with embedded-quote escape."""
107
+ return '"' + name.replace('"', '""') + '"'
108
+
109
+ def _qcol(self, table: Table, col: Column) -> str:
110
+ return f"{self._qident(table.name)}.{self._qident(col.name)}"
111
+
112
+ # ------------------------------------------------------------------
113
+ # Clauses
114
+ # ------------------------------------------------------------------
115
+
116
+ def _build_select(
117
+ self,
118
+ items: list[SelectItem],
119
+ table: Table,
120
+ cols_by_id: dict[str, Column],
121
+ ) -> tuple[str, set[str]]:
122
+ if not items:
123
+ raise SqlCompilerError("select clause cannot be empty")
124
+ parts: list[str] = []
125
+ aliases: set[str] = set()
126
+ for i, item in enumerate(items):
127
+ expr, alias = self._select_item(item, table, cols_by_id, i)
128
+ if alias:
129
+ parts.append(f"{expr} AS {self._qident(alias)}")
130
+ aliases.add(alias)
131
+ else:
132
+ parts.append(expr)
133
+ return "SELECT " + ", ".join(parts), aliases
134
+
135
+ def _select_item(
136
+ self,
137
+ item: SelectItem,
138
+ table: Table,
139
+ cols_by_id: dict[str, Column],
140
+ index: int,
141
+ ) -> tuple[str, str | None]:
142
+ if isinstance(item, ColumnSelect):
143
+ col = self._require_col(cols_by_id, item.column_id, f"select[{index}]")
144
+ return self._qcol(table, col), item.alias
145
+ if not isinstance(item, AggSelect):
146
+ raise SqlCompilerError(
147
+ f"select[{index}]: unknown SelectItem kind {type(item).__name__}"
148
+ )
149
+ return self._compile_agg(item, table, cols_by_id, index), item.alias
150
+
151
+ def _compile_agg(
152
+ self,
153
+ item: AggSelect,
154
+ table: Table,
155
+ cols_by_id: dict[str, Column],
156
+ index: int,
157
+ ) -> str:
158
+ if item.fn == "count_distinct":
159
+ if item.column_id is None:
160
+ raise SqlCompilerError(
161
+ f"select[{index}].fn=count_distinct requires column_id"
162
+ )
163
+ col = self._require_col(cols_by_id, item.column_id, f"select[{index}]")
164
+ return f"COUNT(DISTINCT {self._qcol(table, col)})"
165
+ if item.column_id is None:
166
+ if item.fn != "count":
167
+ raise SqlCompilerError(
168
+ f"select[{index}].fn={item.fn!r} requires column_id "
169
+ "(only 'count' may omit it for COUNT(*))"
170
+ )
171
+ return "COUNT(*)"
172
+ col = self._require_col(cols_by_id, item.column_id, f"select[{index}]")
173
+ return f"{item.fn.upper()}({self._qcol(table, col)})"
174
+
175
+ def _build_from(self, table: Table) -> str:
176
+ return f"FROM {self._qident(table.name)}"
177
+
178
+ def _build_where(
179
+ self,
180
+ filters: list[FilterClause],
181
+ table: Table,
182
+ cols_by_id: dict[str, Column],
183
+ params: dict[str, Any],
184
+ param_seq: list[int],
185
+ ) -> str:
186
+ if not filters:
187
+ return ""
188
+ parts = [
189
+ self._compile_filter(f, table, cols_by_id, params, param_seq, index=i)
190
+ for i, f in enumerate(filters)
191
+ ]
192
+ return "WHERE " + " AND ".join(parts)
193
+
194
+ def _compile_filter(
195
+ self,
196
+ f: FilterClause,
197
+ table: Table,
198
+ cols_by_id: dict[str, Column],
199
+ params: dict[str, Any],
200
+ param_seq: list[int],
201
+ index: int,
202
+ ) -> str:
203
+ col = self._require_col(cols_by_id, f.column_id, f"filters[{index}]")
204
+ col_ref = self._qcol(table, col)
205
+ op = f.op
206
+
207
+ if op == "is_null":
208
+ return f"{col_ref} IS NULL"
209
+ if op == "is_not_null":
210
+ return f"{col_ref} IS NOT NULL"
211
+
212
+ if op in _LIST_OPS:
213
+ if not isinstance(f.value, list) or not f.value:
214
+ raise SqlCompilerError(
215
+ f"filters[{index}]: op {op!r} requires a non-empty list value"
216
+ )
217
+ placeholders = [
218
+ ":" + self._next_param(params, param_seq, v) for v in f.value
219
+ ]
220
+ sql_op = "IN" if op == "in" else "NOT IN"
221
+ return f"{col_ref} {sql_op} ({', '.join(placeholders)})"
222
+
223
+ if op == "between":
224
+ if not isinstance(f.value, list) or len(f.value) != 2:
225
+ raise SqlCompilerError(
226
+ f"filters[{index}]: op 'between' requires a list of two values"
227
+ )
228
+ lo = self._next_param(params, param_seq, f.value[0])
229
+ hi = self._next_param(params, param_seq, f.value[1])
230
+ return f"{col_ref} BETWEEN :{lo} AND :{hi}"
231
+
232
+ if op == "like":
233
+ p = self._next_param(params, param_seq, f.value)
234
+ return f"{col_ref} LIKE :{p}"
235
+
236
+ if op in _COMPARISON_OPS:
237
+ p = self._next_param(params, param_seq, f.value)
238
+ return f"{col_ref} {op} :{p}"
239
+
240
+ # Should not reach here β€” IRValidator already filters disallowed ops
241
+ raise SqlCompilerError(f"filters[{index}]: unhandled op {op!r}")
242
+
243
+ def _build_groupby(
244
+ self,
245
+ group_by: list[str],
246
+ table: Table,
247
+ cols_by_id: dict[str, Column],
248
+ ) -> str:
249
+ if not group_by:
250
+ return ""
251
+ parts = [
252
+ self._qcol(table, self._require_col(cols_by_id, col_id, f"group_by[{i}]"))
253
+ for i, col_id in enumerate(group_by)
254
+ ]
255
+ return "GROUP BY " + ", ".join(parts)
256
+
257
+ def _build_orderby(
258
+ self,
259
+ order_by: list[OrderByClause],
260
+ table: Table,
261
+ cols_by_id: dict[str, Column],
262
+ select_aliases: set[str],
263
+ ) -> str:
264
+ if not order_by:
265
+ return ""
266
+ parts: list[str] = []
267
+ for i, ob in enumerate(order_by):
268
+ if ob.column_id in cols_by_id:
269
+ ref = self._qcol(table, cols_by_id[ob.column_id])
270
+ elif ob.column_id in select_aliases:
271
+ ref = self._qident(ob.column_id)
272
+ else:
273
+ raise SqlCompilerError(
274
+ f"order_by[{i}].column_id: {ob.column_id!r} not in table "
275
+ "columns or select aliases"
276
+ )
277
+ parts.append(f"{ref} {ob.dir.upper()}")
278
+ return "ORDER BY " + ", ".join(parts)
279
+
280
+ def _build_limit(self, limit: int | None) -> str:
281
+ if limit is None:
282
+ return ""
283
+ return f"LIMIT {int(limit)}"
284
+
285
+ # ------------------------------------------------------------------
286
+ # Helpers
287
+ # ------------------------------------------------------------------
288
+
289
+ @staticmethod
290
+ def _next_param(
291
+ params: dict[str, Any], param_seq: list[int], value: Any
292
+ ) -> str:
293
+ name = f"p_{param_seq[0]}"
294
+ param_seq[0] += 1
295
+ params[name] = value
296
+ return name
297
+
298
+ @staticmethod
299
+ def _require_col(
300
+ cols_by_id: dict[str, Column], col_id: str, where: str
301
+ ) -> Column:
302
+ col = cols_by_id.get(col_id)
303
+ if col is None:
304
+ raise SqlCompilerError(f"{where}.column_id: {col_id!r} not in table")
305
+ return col
src/query/executor/db.py CHANGED
@@ -1,25 +1,188 @@
1
- """DbExecutor β€” runs compiled SQL on a user's external DB.
2
 
3
  Pipeline:
4
- IR β†’ SqlCompiler β†’ SQL string + params
5
  ↓
6
- sqlglot validation (SELECT-only, whitelist tables/columns, LIMIT enforced)
7
  ↓
8
- asyncpg / pymysql in read-only transaction with timeout (30s)
 
9
  ↓
10
- QueryResult
 
 
 
 
 
 
11
  """
12
 
13
- from ...catalog.models import Catalog
14
- from ..compiler.sql import SqlCompiler
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
15
  from ..ir.models import QueryIR
16
  from .base import BaseExecutor, QueryResult
17
 
 
 
 
 
 
 
 
18
 
19
  class DbExecutor(BaseExecutor):
 
 
 
 
 
 
 
20
  def __init__(self, catalog: Catalog) -> None:
21
  self._catalog = catalog
22
  self._compiler = SqlCompiler(catalog)
23
 
24
  async def run(self, ir: QueryIR) -> QueryResult:
25
- raise NotImplementedError
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """DbExecutor β€” runs a compiled IR against a user's external SQL database.
2
 
3
  Pipeline:
4
+ IR β†’ SqlCompiler.compile() β†’ CompiledSql(sql, params)
5
  ↓
6
+ sqlglot guard (defense-in-depth: SELECT-only, no DML / DDL)
7
  ↓
8
+ resolve creds (catalog.location_ref β†’ dbclient://{client_id} β†’ DatabaseClient
9
+ row β†’ Fernet decrypt)
10
  ↓
11
+ asyncio.to_thread(_run_sync)
12
+ β”” db_pipeline_service.engine_scope(db_type, creds)
13
+ β”” session-level: default_transaction_read_only + statement_timeout=30s
14
+ (postgres / supabase only)
15
+ β”” engine.execute(text(sql), params)
16
+ ↓
17
+ QueryResult (always returned β€” errors populate `.error`, never raised)
18
  """
19
 
20
+ from __future__ import annotations
21
+
22
+ import asyncio
23
+ import time
24
+ from typing import Any
25
+
26
+ import sqlglot
27
+ import sqlglot.expressions as exp
28
+ from sqlalchemy import text
29
+
30
+ from ...catalog.models import Catalog, Source
31
+ from ...database_client.database_client_service import database_client_service
32
+ from ...db.postgres.connection import AsyncSessionLocal
33
+ from ...middlewares.logging import get_logger
34
+ from ...pipeline.db_pipeline import db_pipeline_service
35
+ from ...utils.db_credential_encryption import decrypt_credentials_dict
36
+ from ..compiler.sql import CompiledSql, SqlCompiler
37
  from ..ir.models import QueryIR
38
  from .base import BaseExecutor, QueryResult
39
 
40
+ logger = get_logger("db_executor")
41
+
42
+ _QUERY_TIMEOUT_SECONDS = 30
43
+ _ROW_HARD_CAP = 10_000 # belt-and-suspenders cap regardless of LIMIT
44
+ _DBCLIENT_PREFIX = "dbclient://"
45
+ _POSTGRES_LIKE = frozenset({"postgres", "supabase"})
46
+
47
 
48
  class DbExecutor(BaseExecutor):
49
+ """Executes compiled SQL on the user's registered DB.
50
+
51
+ Constructed once per query with the user's catalog. The catalog is the
52
+ source of truth for identifiers; the executor never touches the user's
53
+ DB metadata at execution time.
54
+ """
55
+
56
  def __init__(self, catalog: Catalog) -> None:
57
  self._catalog = catalog
58
  self._compiler = SqlCompiler(catalog)
59
 
60
  async def run(self, ir: QueryIR) -> QueryResult:
61
+ started = time.perf_counter()
62
+ try:
63
+ source = self._find_source(ir.source_id)
64
+ if source.source_type != "schema":
65
+ raise ValueError(
66
+ f"DbExecutor cannot run on source_type={source.source_type!r}; "
67
+ "expected 'schema'"
68
+ )
69
+
70
+ compiled = self._compiler.compile(ir)
71
+ self._sqlglot_guard(compiled.sql)
72
+
73
+ client_id = self._parse_client_id(source.location_ref)
74
+ client = await self._fetch_client(client_id)
75
+ if client.user_id != self._catalog.user_id:
76
+ raise PermissionError(
77
+ f"DatabaseClient {client_id!r} owner mismatch "
78
+ f"(client.user_id != catalog.user_id)"
79
+ )
80
+ creds = decrypt_credentials_dict(client.credentials)
81
+
82
+ rows = await asyncio.wait_for(
83
+ asyncio.to_thread(self._run_sync, client.db_type, creds, compiled),
84
+ timeout=_QUERY_TIMEOUT_SECONDS,
85
+ )
86
+
87
+ truncated = len(rows) > _ROW_HARD_CAP
88
+ capped = rows[:_ROW_HARD_CAP]
89
+ elapsed_ms = int((time.perf_counter() - started) * 1000)
90
+ logger.info(
91
+ "db query complete",
92
+ source_id=ir.source_id,
93
+ rows=len(capped),
94
+ truncated=truncated,
95
+ elapsed_ms=elapsed_ms,
96
+ )
97
+ return QueryResult(
98
+ source_id=ir.source_id,
99
+ backend="sql",
100
+ rows=capped,
101
+ row_count=len(capped),
102
+ truncated=truncated,
103
+ elapsed_ms=elapsed_ms,
104
+ )
105
+
106
+ except Exception as e:
107
+ elapsed_ms = int((time.perf_counter() - started) * 1000)
108
+ logger.error(
109
+ "db executor failed",
110
+ source_id=ir.source_id,
111
+ error=str(e),
112
+ elapsed_ms=elapsed_ms,
113
+ )
114
+ return QueryResult(
115
+ source_id=ir.source_id,
116
+ backend="sql",
117
+ elapsed_ms=elapsed_ms,
118
+ error=str(e),
119
+ )
120
+
121
+ # ------------------------------------------------------------------
122
+ # Helpers
123
+ # ------------------------------------------------------------------
124
+
125
+ def _find_source(self, source_id: str) -> Source:
126
+ for s in self._catalog.sources:
127
+ if s.source_id == source_id:
128
+ return s
129
+ raise ValueError(f"source_id {source_id!r} not in catalog")
130
+
131
+ @staticmethod
132
+ def _parse_client_id(location_ref: str) -> str:
133
+ if not location_ref.startswith(_DBCLIENT_PREFIX):
134
+ raise ValueError(
135
+ f"DbExecutor expects 'dbclient://...' location_ref, got {location_ref!r}"
136
+ )
137
+ client_id = location_ref[len(_DBCLIENT_PREFIX):]
138
+ if not client_id:
139
+ raise ValueError("location_ref is missing client_id after 'dbclient://'")
140
+ return client_id
141
+
142
+ @staticmethod
143
+ async def _fetch_client(client_id: str) -> Any:
144
+ async with AsyncSessionLocal() as session:
145
+ client = await database_client_service.get(session, client_id)
146
+ if client is None:
147
+ raise ValueError(f"DatabaseClient {client_id!r} not found")
148
+ if client.status != "active":
149
+ raise ValueError(
150
+ f"DatabaseClient {client_id!r} is not active "
151
+ f"(status={client.status!r})"
152
+ )
153
+ return client
154
+
155
+ @staticmethod
156
+ def _sqlglot_guard(sql: str) -> None:
157
+ """Defense-in-depth: ensure the compiled SQL is a SELECT statement.
158
+
159
+ The compiler is already deterministic and only constructs SELECTs from
160
+ validated IR, but this guard catches any future bug that could leak
161
+ DML/DDL through.
162
+ """
163
+ try:
164
+ parsed = sqlglot.parse_one(sql, read="postgres")
165
+ except sqlglot.errors.ParseError as e:
166
+ raise ValueError(f"compiled SQL failed to parse: {e}") from e
167
+ if not isinstance(parsed, exp.Select):
168
+ raise ValueError(
169
+ f"compiled SQL is not a SELECT (got {type(parsed).__name__})"
170
+ )
171
+ forbidden = (exp.Insert, exp.Update, exp.Delete, exp.Drop, exp.AlterTable)
172
+ for node in parsed.find_all(forbidden):
173
+ raise ValueError(
174
+ f"compiled SQL contains forbidden DML/DDL: {type(node).__name__}"
175
+ )
176
+
177
+ @staticmethod
178
+ def _run_sync(db_type: str, creds: dict, compiled: CompiledSql) -> list[dict]:
179
+ with db_pipeline_service.engine_scope(db_type, creds) as engine:
180
+ with engine.connect() as conn:
181
+ if db_type in _POSTGRES_LIKE:
182
+ # session-level read-only + per-statement timeout (ms)
183
+ conn.execute(text("SET default_transaction_read_only = on"))
184
+ conn.execute(
185
+ text(f"SET statement_timeout = {_QUERY_TIMEOUT_SECONDS * 1000}")
186
+ )
187
+ result = conn.execute(text(compiled.sql), compiled.params)
188
+ return [dict(row) for row in result.mappings()]