sofhiaazzhr commited on
Commit
c749544
·
1 Parent(s): 04e5c48

eat(query): PandasCompiler + TabularExecutor (PR3-TAB)

Browse files
PROGRESS.md CHANGED
@@ -2,8 +2,8 @@
2
 
3
  Persistent tracker mirroring the 42-item ownership table in `REPO_CONTEXT.md` "Team — division of work". Update as PRs land. Future Claude Code sessions read this to know what's already done.
4
 
5
- **Last updated**: 2026-05-07 (PR1-tabtabular introspector + on_tabular_uploaded trigger + 31 tests)
6
- **Current open PRs**: PR3-DB (DB owner — SqlCompiler + DbExecutor + golden IR→SQL tests) · PR1-tab (TAB owner — tabular introspector)
7
 
8
  ---
9
 
@@ -25,7 +25,7 @@ Persistent tracker mirroring the 42-item ownership table in `REPO_CONTEXT.md` "T
25
  | PR2a | `[x]` merged | DB | CatalogEnricher + StructuredPipeline + on_db_registered trigger + FK extension on Table |
26
  | PR2b | `[ ]` | B | IntentRouter + planner prompt (pair) + planner LLM service |
27
  | PR3-DB | `[~]` open | DB | SqlCompiler (Postgres) + DbExecutor (sqlglot guard, RO + statement_timeout, asyncio.to_thread) + 36 golden IR→SQL tests |
28
- | PR3-TAB | `[ ]` | TAB | Pandas compiler + tabular executor + golden IR→DataFrame tests |
29
  | PR4 | `[ ]` | B (pair) | ExecutorDispatcher + QueryService + chat stream endpoint integration |
30
  | PR5 | `[ ]` | B | Retry/self-correction loop on execution failure |
31
  | PR6 | `[ ]` | B | Eval harness (golden question→IR→result examples) |
@@ -45,7 +45,7 @@ Persistent tracker mirroring the 42-item ownership table in `REPO_CONTEXT.md` "T
45
  | 3 | IR operator whitelists (`query/ir/operators.py`) | `[x]` | PR1 filled `TYPE_COMPATIBILITY` matrix |
46
  | 4 | PII patterns / regex (`security/pii_patterns.py`) | `[x]` | Pre-existing |
47
  | — | `catalogs` Postgres jsonb table (`db/postgres/models.py`) | `[x]` | PR1 added `Catalog` SQLAlchemy class + `init_db.py` import |
48
- | — | `QueryResult` shape (`query/executor/base.py`) | `[x]` | Pre-existing scaffold; revisit in PR3 if `column_types` needed |
49
  | — | `Source.location_ref` URI scheme | `[x]` | PR1 documented in `catalog/models.py` docstring |
50
 
51
  ### Ingestion — introspection
@@ -101,8 +101,8 @@ Persistent tracker mirroring the 42-item ownership table in `REPO_CONTEXT.md` "T
101
 
102
  | # | Item | Status | Notes |
103
  |---|---|---|---|
104
- | 29 | Pandas compiler (`query/compiler/pandas.py`) | `[ ]` | PR3 — IRcallable on DataFrame |
105
- | 30 | Tabular executor (`query/executor/tabular.py`) | `[ ]` | PR3 — eager pandas first; pyarrow/polars later if file size demands |
106
  | 31 | Parquet upload/download wrapper | `[ ]` | Phase 1 has `knowledge/parquet_service.py` — reuse or move to `storage/` in cleanup |
107
 
108
  ### Agents + chat
@@ -126,7 +126,7 @@ Persistent tracker mirroring the 42-item ownership table in `REPO_CONTEXT.md` "T
126
  | # | Item | Owner | Status | Notes |
127
  |---|---|---|---|---|
128
  | 38 | DB compiler golden tests (`tests/query/compiler/test_sql.py`) | DB | `[x]` | PR3-DB — 36 tests across all whitelisted ops, identifier quoting, agg / count_distinct / count(*), order_by alias resolution, parameter sequencing, error paths. Pure-Python, no LLM, no DB. |
129
- | 39 | Pandas compiler golden tests (`tests/query/compiler/test_pandas.py`) | TAB | `[ ]` | PR3 — pure-Python, no LLM |
130
  | 40 | IR validator tests (`tests/query/ir/test_validator.py`) | B | `[x]` | PR1 — 19 tests, all rules covered |
131
  | — | PII detector tests (`tests/catalog/test_pii_detector.py`) | B | `[x]` | PR1 — 26 tests (parametrized) |
132
  | — | Catalog validator tests (`tests/catalog/test_validator.py`) | B | `[x]` | PR1 — 5 tests |
@@ -153,6 +153,20 @@ Persistent tracker mirroring the 42-item ownership table in `REPO_CONTEXT.md` "T
153
 
154
  ---
155
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
156
  ## What just shipped (PR3-DB — DB owner)
157
 
158
  **Files implemented**:
 
2
 
3
  Persistent tracker mirroring the 42-item ownership table in `REPO_CONTEXT.md` "Team — division of work". Update as PRs land. Future Claude Code sessions read this to know what's already done.
4
 
5
+ **Last updated**: 2026-05-07 (PR3-TABPandasCompiler + TabularExecutor + 43+12 tests)
6
+ **Current open PRs**: PR3-DB (DB owner — SqlCompiler + DbExecutor + golden IR→SQL tests) · PR1-tab (TAB owner — tabular introspector) · PR3-TAB (TAB owner — pandas compiler + tabular executor)
7
 
8
  ---
9
 
 
25
  | PR2a | `[x]` merged | DB | CatalogEnricher + StructuredPipeline + on_db_registered trigger + FK extension on Table |
26
  | PR2b | `[ ]` | B | IntentRouter + planner prompt (pair) + planner LLM service |
27
  | PR3-DB | `[~]` open | DB | SqlCompiler (Postgres) + DbExecutor (sqlglot guard, RO + statement_timeout, asyncio.to_thread) + 36 golden IR→SQL tests |
28
+ | PR3-TAB | `[~]` | TAB | Pandas compiler + tabular executor + golden IR→DataFrame tests |
29
  | PR4 | `[ ]` | B (pair) | ExecutorDispatcher + QueryService + chat stream endpoint integration |
30
  | PR5 | `[ ]` | B | Retry/self-correction loop on execution failure |
31
  | PR6 | `[ ]` | B | Eval harness (golden question→IR→result examples) |
 
45
  | 3 | IR operator whitelists (`query/ir/operators.py`) | `[x]` | PR1 filled `TYPE_COMPATIBILITY` matrix |
46
  | 4 | PII patterns / regex (`security/pii_patterns.py`) | `[x]` | Pre-existing |
47
  | — | `catalogs` Postgres jsonb table (`db/postgres/models.py`) | `[x]` | PR1 added `Catalog` SQLAlchemy class + `init_db.py` import |
48
+ | — | `QueryResult` shape (`query/executor/base.py`) | `[x]` | Pre-existing scaffold; `columns: list[str]` added (TAB owner, PR1-tab) — DbExecutor updated to populate it. |
49
  | — | `Source.location_ref` URI scheme | `[x]` | PR1 documented in `catalog/models.py` docstring |
50
 
51
  ### Ingestion — introspection
 
101
 
102
  | # | Item | Status | Notes |
103
  |---|---|---|---|
104
+ | 29 | Pandas compiler (`query/compiler/pandas.py`) | `[~]` | PR3-TAB`CompiledPandas` dataclass; all 12 filter ops; all 6 aggs; group_by via `pd.concat` of Series; alias-aware order_by; `_like_to_regex` (`%``.*`, `_`→`.`); pure module-level helpers |
105
+ | 30 | Tabular executor (`query/executor/tabular.py`) | `[~]` | PR3-TAB`fetch_blob` injectable for tests; blob path: single-table → `{uid}/{did}.parquet`, multi-table `{uid}/{did}__{table.name}.parquet`; `asyncio.to_thread`; 10k row hard cap; errors → `QueryResult.error` |
106
  | 31 | Parquet upload/download wrapper | `[ ]` | Phase 1 has `knowledge/parquet_service.py` — reuse or move to `storage/` in cleanup |
107
 
108
  ### Agents + chat
 
126
  | # | Item | Owner | Status | Notes |
127
  |---|---|---|---|---|
128
  | 38 | DB compiler golden tests (`tests/query/compiler/test_sql.py`) | DB | `[x]` | PR3-DB — 36 tests across all whitelisted ops, identifier quoting, agg / count_distinct / count(*), order_by alias resolution, parameter sequencing, error paths. Pure-Python, no LLM, no DB. |
129
+ | 39 | Pandas compiler golden tests (`tests/unit/query/compiler/test_pandas_compiler.py`) | TAB | `[~]` | PR3-TAB43 tests: all 12 filter ops, all 6 aggs, group_by, order_by, limit, aliases, empty DataFrame, error paths. `test_tabular_executor.py` adds 12 more (blob name resolution + happy path + error paths). |
130
  | 40 | IR validator tests (`tests/query/ir/test_validator.py`) | B | `[x]` | PR1 — 19 tests, all rules covered |
131
  | — | PII detector tests (`tests/catalog/test_pii_detector.py`) | B | `[x]` | PR1 — 26 tests (parametrized) |
132
  | — | Catalog validator tests (`tests/catalog/test_validator.py`) | B | `[x]` | PR1 — 5 tests |
 
153
 
154
  ---
155
 
156
+ ## What just shipped (PR3-TAB — TAB owner)
157
+
158
+ **Files implemented**:
159
+ - `src/query/compiler/pandas.py` — `PandasCompiler` + `CompiledPandas(apply, output_columns)` dataclass. Pure helper functions (easier to test in isolation): `_apply_filters` (all 12 ops, `_like_to_regex` for LIKE), `_apply_select` (column pick + rename), `_apply_agg` (scalar + group_by via `pd.concat` of Series → `reset_index`), `_apply_orderby` (alias-aware via `_resolve_order_col`). Closure captures all IR fields explicitly so `apply(df)` is self-contained.
160
+ - `src/query/executor/tabular.py` — `TabularExecutor` with injectable `fetch_blob` (same testability pattern as `TabularIntrospector`). Resolves Parquet blob path from `az_blob://{uid}/{did}` + table: single-table → `{uid}/{did}.parquet`, multi-table → `{uid}/{did}__{table.name}.parquet`. Runs compile → download → `asyncio.to_thread(_load_and_apply)` → 10k hard cap. Never raises; errors populate `QueryResult.error`. Uses `compiled.output_columns` for column labels (safe on empty DataFrame).
161
+
162
+ **Tests added** (55 new — total suite now 86 all passing):
163
+ - `tests/unit/query/compiler/test_pandas_compiler.py` — 43 tests across all 12 filter ops (including `is_null`, `not_in`, `like`, `between`), all 6 agg fns, group_by, order_by asc/desc, limit-after-order, alias round-trip, empty DataFrame, error paths.
164
+ - `tests/unit/query/executor/test_tabular_executor.py` — 12 tests: `_resolve_blob_name` (single/multi-table, bad prefix), happy-path `QueryResult` shape (columns, rows, backend, truncated, source_id), wrong source_type → error, blob fetch failure → error, unknown source → error.
165
+
166
+ **Lint**: `ruff check` clean on both files.
167
+
168
+ ---
169
+
170
  ## What just shipped (PR3-DB — DB owner)
171
 
172
  **Files implemented**:
src/query/compiler/pandas.py CHANGED
@@ -1,22 +1,296 @@
1
  """PandasCompiler — IR → callable that runs against a DataFrame.
2
 
3
  For tabular sources. The callable encapsulates the chain of operations
4
- (filter → groupby → agg → sort → limit) so the executor can apply them
5
- to a DataFrame loaded eagerly or via predicate pushdown / polars lazy scan.
 
 
 
6
  """
7
 
 
 
 
8
  from collections.abc import Callable
 
 
 
 
9
 
10
- from ...catalog.models import Catalog
11
- from ..ir.models import QueryIR
12
  from .base import BaseCompiler
13
 
14
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
15
  class PandasCompiler(BaseCompiler):
16
- """Deterministic IR → pandas/polars op chain. No LLM."""
17
 
18
  def __init__(self, catalog: Catalog) -> None:
19
  self._catalog = catalog
20
 
21
- def compile(self, ir: QueryIR) -> Callable:
22
- raise NotImplementedError
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  """PandasCompiler — IR → callable that runs against a DataFrame.
2
 
3
  For tabular sources. The callable encapsulates the chain of operations
4
+ (filter → select/agg → sort → limit) so the executor can apply them
5
+ to a DataFrame loaded from a Parquet blob.
6
+
7
+ Returns a `CompiledPandas` dataclass (mirrors `CompiledSql`) whose `.apply`
8
+ is a pure function `(pd.DataFrame) -> pd.DataFrame`. No LLM, no I/O.
9
  """
10
 
11
+ from __future__ import annotations
12
+
13
+ import re
14
  from collections.abc import Callable
15
+ from dataclasses import dataclass
16
+ from typing import Any
17
+
18
+ import pandas as pd
19
 
20
+ from ...catalog.models import Catalog, Column, Source, Table
21
+ from ..ir.models import AggSelect, ColumnSelect, FilterClause, OrderByClause, QueryIR, SelectItem
22
  from .base import BaseCompiler
23
 
24
 
25
+ @dataclass
26
+ class CompiledPandas:
27
+ """Compiled IR as a pandas operation chain.
28
+
29
+ `apply(df)` executes the full filter → select/agg → sort → limit
30
+ pipeline and returns the result as a new DataFrame.
31
+
32
+ `output_columns` lists the expected column names so callers can label
33
+ an empty result without inspecting rows.
34
+ """
35
+
36
+ apply: Callable[[pd.DataFrame], pd.DataFrame]
37
+ output_columns: list[str]
38
+
39
+
40
+ class PandasCompilerError(Exception):
41
+ pass
42
+
43
+
44
  class PandasCompiler(BaseCompiler):
45
+ """Deterministic IR → pandas op chain. No LLM."""
46
 
47
  def __init__(self, catalog: Catalog) -> None:
48
  self._catalog = catalog
49
 
50
+ def compile(self, ir: QueryIR) -> CompiledPandas:
51
+ _, table, cols_by_id = self._lookup(ir)
52
+ output_columns = _output_column_names(ir.select, cols_by_id)
53
+
54
+ # Capture IR fields explicitly so the closure is self-contained
55
+ _filters = ir.filters
56
+ _select = ir.select
57
+ _group_by = ir.group_by
58
+ _order_by = ir.order_by
59
+ _limit = ir.limit
60
+ _cols = cols_by_id
61
+
62
+ def apply(df: pd.DataFrame) -> pd.DataFrame:
63
+ df = _apply_filters(df, _filters, _cols)
64
+
65
+ has_agg = any(isinstance(s, AggSelect) for s in _select)
66
+ if has_agg:
67
+ df = _apply_agg(df, _select, _group_by, _cols)
68
+ else:
69
+ df = _apply_select(df, _select, _cols)
70
+
71
+ if _order_by:
72
+ df = _apply_orderby(df, _order_by, _select, _cols)
73
+
74
+ if _limit is not None:
75
+ df = df.head(_limit)
76
+
77
+ return df.reset_index(drop=True)
78
+
79
+ return CompiledPandas(apply=apply, output_columns=output_columns)
80
+
81
+ # ------------------------------------------------------------------
82
+ # Catalog lookup (mirrors SqlCompiler._lookup)
83
+ # ------------------------------------------------------------------
84
+
85
+ def _lookup(self, ir: QueryIR) -> tuple[Source, Table, dict[str, Column]]:
86
+ source = next((s for s in self._catalog.sources if s.source_id == ir.source_id), None)
87
+ if source is None:
88
+ raise PandasCompilerError(f"source_id {ir.source_id!r} not in catalog")
89
+ table = next((t for t in source.tables if t.table_id == ir.table_id), None)
90
+ if table is None:
91
+ raise PandasCompilerError(
92
+ f"table_id {ir.table_id!r} not in source {ir.source_id!r}"
93
+ )
94
+ return source, table, {c.column_id: c for c in table.columns}
95
+
96
+
97
+ # ---------------------------------------------------------------------------
98
+ # Module-level helpers (pure functions — easier to test in isolation)
99
+ # ---------------------------------------------------------------------------
100
+
101
+ def _output_column_names(select: list[SelectItem], cols_by_id: dict[str, Column]) -> list[str]:
102
+ names = []
103
+ for s in select:
104
+ if isinstance(s, ColumnSelect):
105
+ names.append(s.alias or cols_by_id[s.column_id].name)
106
+ else:
107
+ names.append(_agg_output_name(s, cols_by_id))
108
+ return names
109
+
110
+
111
+ def _agg_output_name(s: AggSelect, cols_by_id: dict[str, Column]) -> str:
112
+ if s.alias:
113
+ return s.alias
114
+ if s.fn == "count" and s.column_id is None:
115
+ return "count"
116
+ return f"{s.fn}_{cols_by_id[s.column_id].name}"
117
+
118
+
119
+ def _like_to_regex(pattern: str) -> str:
120
+ """Convert SQL LIKE pattern to Python regex string (no anchors — use fullmatch)."""
121
+ parts: list[str] = []
122
+ for ch in pattern:
123
+ if ch == "%":
124
+ parts.append(".*")
125
+ elif ch == "_":
126
+ parts.append(".")
127
+ else:
128
+ parts.append(re.escape(ch))
129
+ return "".join(parts)
130
+
131
+
132
+ def _apply_filters(
133
+ df: pd.DataFrame,
134
+ filters: list[FilterClause],
135
+ cols_by_id: dict[str, Column],
136
+ ) -> pd.DataFrame:
137
+ if not filters:
138
+ return df
139
+ mask = pd.Series(True, index=df.index)
140
+ for f in filters:
141
+ col_name = cols_by_id[f.column_id].name
142
+ series = df[col_name]
143
+ op, val = f.op, f.value
144
+ if op == "=":
145
+ mask &= series == val
146
+ elif op == "!=":
147
+ mask &= series != val
148
+ elif op == "<":
149
+ mask &= series < val
150
+ elif op == "<=":
151
+ mask &= series <= val
152
+ elif op == ">":
153
+ mask &= series > val
154
+ elif op == ">=":
155
+ mask &= series >= val
156
+ elif op == "in":
157
+ mask &= series.isin(val)
158
+ elif op == "not_in":
159
+ mask &= ~series.isin(val)
160
+ elif op == "is_null":
161
+ mask &= series.isna()
162
+ elif op == "is_not_null":
163
+ mask &= series.notna()
164
+ elif op == "like":
165
+ mask &= series.astype(str).str.fullmatch(_like_to_regex(val), case=True, na=False)
166
+ elif op == "between":
167
+ mask &= (series >= val[0]) & (series <= val[1])
168
+ return df[mask].copy()
169
+
170
+
171
+ def _apply_select(
172
+ df: pd.DataFrame,
173
+ select: list[SelectItem],
174
+ cols_by_id: dict[str, Column],
175
+ ) -> pd.DataFrame:
176
+ col_names = [cols_by_id[s.column_id].name for s in select if isinstance(s, ColumnSelect)]
177
+ result = df[col_names].copy()
178
+ rename_map = {
179
+ cols_by_id[s.column_id].name: s.alias
180
+ for s in select
181
+ if isinstance(s, ColumnSelect) and s.alias
182
+ }
183
+ if rename_map:
184
+ result = result.rename(columns=rename_map)
185
+ return result
186
+
187
+
188
+ def _scalar_agg(df: pd.DataFrame, s: AggSelect, cols_by_id: dict[str, Column]) -> Any:
189
+ if s.fn == "count" and s.column_id is None:
190
+ return int(len(df))
191
+ col_name = cols_by_id[s.column_id].name
192
+ series = df[col_name]
193
+ match s.fn:
194
+ case "count":
195
+ return int(series.count())
196
+ case "count_distinct":
197
+ return int(series.nunique())
198
+ case "sum":
199
+ return series.sum()
200
+ case "avg":
201
+ return series.mean()
202
+ case "min":
203
+ return series.min()
204
+ case "max":
205
+ return series.max()
206
+ raise PandasCompilerError(f"unhandled agg fn {s.fn!r}")
207
+
208
+
209
+ def _group_agg_series(
210
+ grouped: Any,
211
+ s: AggSelect,
212
+ cols_by_id: dict[str, Column],
213
+ ) -> pd.Series:
214
+ if s.fn == "count" and s.column_id is None:
215
+ return grouped.size()
216
+ col_name = cols_by_id[s.column_id].name
217
+ match s.fn:
218
+ case "count":
219
+ return grouped[col_name].count()
220
+ case "count_distinct":
221
+ return grouped[col_name].nunique()
222
+ case "sum":
223
+ return grouped[col_name].sum()
224
+ case "avg":
225
+ return grouped[col_name].mean()
226
+ case "min":
227
+ return grouped[col_name].min()
228
+ case "max":
229
+ return grouped[col_name].max()
230
+ raise PandasCompilerError(f"unhandled agg fn {s.fn!r}")
231
+
232
+
233
+ def _apply_agg(
234
+ df: pd.DataFrame,
235
+ select: list[SelectItem],
236
+ group_by: list[str],
237
+ cols_by_id: dict[str, Column],
238
+ ) -> pd.DataFrame:
239
+ agg_items = [s for s in select if isinstance(s, AggSelect)]
240
+ col_items = [s for s in select if isinstance(s, ColumnSelect)]
241
+ group_col_names = [cols_by_id[col_id].name for col_id in group_by]
242
+
243
+ if group_col_names:
244
+ grouped = df.groupby(group_col_names, sort=False)
245
+ series_list = [
246
+ _group_agg_series(grouped, s, cols_by_id).rename(_agg_output_name(s, cols_by_id))
247
+ for s in agg_items
248
+ ]
249
+ result = pd.concat(series_list, axis=1).reset_index()
250
+ rename_map = {
251
+ cols_by_id[s.column_id].name: s.alias
252
+ for s in col_items
253
+ if s.alias
254
+ }
255
+ if rename_map:
256
+ result = result.rename(columns=rename_map)
257
+ else:
258
+ row = {
259
+ _agg_output_name(s, cols_by_id): _scalar_agg(df, s, cols_by_id)
260
+ for s in agg_items
261
+ }
262
+ result = pd.DataFrame([row])
263
+
264
+ return result
265
+
266
+
267
+ def _resolve_order_col(
268
+ col_id_or_alias: str,
269
+ select: list[SelectItem],
270
+ cols_by_id: dict[str, Column],
271
+ ) -> str:
272
+ """Map an order_by column_id (or alias) to the actual output column name."""
273
+ for s in select:
274
+ if isinstance(s, ColumnSelect) and s.column_id == col_id_or_alias:
275
+ return s.alias or cols_by_id[s.column_id].name
276
+ if isinstance(s, AggSelect) and s.column_id == col_id_or_alias:
277
+ return _agg_output_name(s, cols_by_id)
278
+ return col_id_or_alias # treat as alias / output name directly
279
+
280
+
281
+ def _apply_orderby(
282
+ df: pd.DataFrame,
283
+ order_by: list[OrderByClause],
284
+ select: list[SelectItem],
285
+ cols_by_id: dict[str, Column],
286
+ ) -> pd.DataFrame:
287
+ sort_cols: list[str] = []
288
+ ascending: list[bool] = []
289
+ for ob in order_by:
290
+ out_name = _resolve_order_col(ob.column_id, select, cols_by_id)
291
+ if out_name in df.columns:
292
+ sort_cols.append(out_name)
293
+ ascending.append(ob.dir == "asc")
294
+ if sort_cols:
295
+ return df.sort_values(by=sort_cols, ascending=ascending)
296
+ return df
src/query/executor/tabular.py CHANGED
@@ -9,16 +9,148 @@ Initial scope ships eager pandas only; the others are added when a real
9
  file is too big.
10
  """
11
 
12
- from ...catalog.models import Catalog
13
- from ..compiler.pandas import PandasCompiler
 
 
 
 
 
 
 
 
 
 
 
14
  from ..ir.models import QueryIR
15
  from .base import BaseExecutor, QueryResult
16
 
 
 
 
 
 
17
 
18
  class TabularExecutor(BaseExecutor):
19
- def __init__(self, catalog: Catalog) -> None:
 
 
 
 
 
 
 
 
 
20
  self._catalog = catalog
21
  self._compiler = PandasCompiler(catalog)
 
 
 
 
 
 
 
22
 
23
  async def run(self, ir: QueryIR) -> QueryResult:
24
- raise NotImplementedError
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9
  file is too big.
10
  """
11
 
12
+ from __future__ import annotations
13
+
14
+ import asyncio
15
+ import io
16
+ import time
17
+ from collections.abc import Callable, Coroutine
18
+ from typing import Any
19
+
20
+ import pandas as pd
21
+
22
+ from ...catalog.models import Catalog, Source, Table
23
+ from ...middlewares.logging import get_logger
24
+ from ..compiler.pandas import CompiledPandas, PandasCompiler
25
  from ..ir.models import QueryIR
26
  from .base import BaseExecutor, QueryResult
27
 
28
+ logger = get_logger("tabular_executor")
29
+
30
+ _AZ_BLOB_PREFIX = "az_blob://"
31
+ _ROW_HARD_CAP = 10_000
32
+
33
 
34
  class TabularExecutor(BaseExecutor):
35
+ """Executes compiled pandas chain on a Parquet blob.
36
+
37
+ `fetch_blob` is injectable for tests — defaults to AzureBlobStorage.
38
+ """
39
+
40
+ def __init__(
41
+ self,
42
+ catalog: Catalog,
43
+ fetch_blob: Callable[[str], Coroutine[Any, Any, bytes]] | None = None,
44
+ ) -> None:
45
  self._catalog = catalog
46
  self._compiler = PandasCompiler(catalog)
47
+ self._fetch_blob = fetch_blob or self._default_fetch_blob
48
+
49
+ @staticmethod
50
+ async def _default_fetch_blob(blob_name: str) -> bytes:
51
+ from ...storage.az_blob.az_blob import blob_storage
52
+
53
+ return await blob_storage.download_file(blob_name)
54
 
55
  async def run(self, ir: QueryIR) -> QueryResult:
56
+ started = time.perf_counter()
57
+ try:
58
+ source, table = self._lookup(ir)
59
+ if source.source_type != "tabular":
60
+ raise ValueError(
61
+ f"TabularExecutor cannot run on source_type={source.source_type!r}; "
62
+ "expected 'tabular'"
63
+ )
64
+
65
+ compiled = self._compiler.compile(ir)
66
+ blob_name = _resolve_blob_name(source, table)
67
+ blob_bytes = await self._fetch_blob(blob_name)
68
+
69
+ result_df = await asyncio.to_thread(_load_and_apply, blob_bytes, compiled)
70
+
71
+ truncated = len(result_df) > _ROW_HARD_CAP
72
+ capped = result_df.head(_ROW_HARD_CAP)
73
+
74
+ columns = compiled.output_columns
75
+ rows = capped.to_dict(orient="records")
76
+ elapsed_ms = int((time.perf_counter() - started) * 1000)
77
+ logger.info(
78
+ "tabular query complete",
79
+ source_id=ir.source_id,
80
+ rows=len(rows),
81
+ truncated=truncated,
82
+ elapsed_ms=elapsed_ms,
83
+ )
84
+ return QueryResult(
85
+ source_id=ir.source_id,
86
+ backend="tabular",
87
+ columns=columns,
88
+ rows=rows,
89
+ row_count=len(rows),
90
+ truncated=truncated,
91
+ elapsed_ms=elapsed_ms,
92
+ )
93
+
94
+ except Exception as e:
95
+ elapsed_ms = int((time.perf_counter() - started) * 1000)
96
+ logger.error(
97
+ "tabular executor failed",
98
+ source_id=ir.source_id,
99
+ error=str(e),
100
+ elapsed_ms=elapsed_ms,
101
+ )
102
+ return QueryResult(
103
+ source_id=ir.source_id,
104
+ backend="tabular",
105
+ elapsed_ms=elapsed_ms,
106
+ error=str(e),
107
+ )
108
+
109
+ # ------------------------------------------------------------------
110
+ # Helpers
111
+ # ------------------------------------------------------------------
112
+
113
+ def _lookup(self, ir: QueryIR) -> tuple[Source, Table]:
114
+ source = next(
115
+ (s for s in self._catalog.sources if s.source_id == ir.source_id), None
116
+ )
117
+ if source is None:
118
+ raise ValueError(f"source_id {ir.source_id!r} not in catalog")
119
+ table = next(
120
+ (t for t in source.tables if t.table_id == ir.table_id), None
121
+ )
122
+ if table is None:
123
+ raise ValueError(f"table_id {ir.table_id!r} not in source {ir.source_id!r}")
124
+ return source, table
125
+
126
+
127
+ # ---------------------------------------------------------------------------
128
+ # Module-level helpers (pure functions — easier to test in isolation)
129
+ # ---------------------------------------------------------------------------
130
+
131
+ def _resolve_blob_name(source: Source, table: Table) -> str:
132
+ """Map source.location_ref + table → the Parquet blob name to download.
133
+
134
+ Convention:
135
+ CSV / single-sheet Parquet → ``{user_id}/{document_id}.parquet``
136
+ XLSX (multi-sheet) → ``{user_id}/{document_id}__{table.name}.parquet``
137
+ """
138
+ if not source.location_ref.startswith(_AZ_BLOB_PREFIX):
139
+ raise ValueError(
140
+ f"TabularExecutor expects 'az_blob://...' location_ref, "
141
+ f"got {source.location_ref!r}"
142
+ )
143
+ path = source.location_ref[len(_AZ_BLOB_PREFIX):]
144
+ parts = path.split("/", 1)
145
+ if len(parts) != 2 or not parts[0] or not parts[1]:
146
+ raise ValueError(f"Malformed az_blob location_ref: {source.location_ref!r}")
147
+ user_id, document_id = parts
148
+ if len(source.tables) == 1:
149
+ return f"{user_id}/{document_id}.parquet"
150
+ return f"{user_id}/{document_id}__{table.name}.parquet"
151
+
152
+
153
+ def _load_and_apply(blob_bytes: bytes, compiled: CompiledPandas) -> pd.DataFrame:
154
+ """Load Parquet bytes into a DataFrame and apply the compiled op chain."""
155
+ df = pd.read_parquet(io.BytesIO(blob_bytes))
156
+ return compiled.apply(df)