Rifqi Hafizuddin
[KM-564] fix source, now shows name instead of id. added diff retrieval vs catalog
96598f8 | """TabularExecutor — runs compiled pandas/polars chain on a Parquet file. | |
| Picks engine by file size: | |
| ≤ 100 MB → eager pandas | |
| 100 MB-1 GB → pyarrow with predicate pushdown | |
| > 1 GB → polars lazy scan | |
| Initial scope ships eager pandas only; the others are added when a real | |
| file is too big. | |
| """ | |
| from __future__ import annotations | |
| import asyncio | |
| import io | |
| import time | |
| from collections.abc import Callable, Coroutine | |
| from typing import Any | |
| import pandas as pd | |
| from ...catalog.models import Catalog, Source, Table | |
| from ...storage.parquet import parquet_blob_name | |
| from ...middlewares.logging import get_logger | |
| from ..compiler.pandas import CompiledPandas, PandasCompiler | |
| from ..ir.models import QueryIR | |
| from .base import BaseExecutor, QueryResult | |
| logger = get_logger("tabular_executor") | |
| _AZ_BLOB_PREFIX = "az_blob://" | |
| _ROW_HARD_CAP = 10_000 | |
| class TabularExecutor(BaseExecutor): | |
| """Executes compiled pandas chain on a Parquet blob. | |
| `fetch_blob` is injectable for tests — defaults to AzureBlobStorage. | |
| """ | |
| def __init__( | |
| self, | |
| catalog: Catalog, | |
| fetch_blob: Callable[[str], Coroutine[Any, Any, bytes]] | None = None, | |
| ) -> None: | |
| self._catalog = catalog | |
| self._compiler = PandasCompiler(catalog) | |
| self._fetch_blob = fetch_blob or self._default_fetch_blob | |
| async def _default_fetch_blob(blob_name: str) -> bytes: | |
| from ...storage.az_blob.az_blob import blob_storage | |
| return await blob_storage.download_file(blob_name) | |
| async def run(self, ir: QueryIR) -> QueryResult: | |
| started = time.perf_counter() | |
| table_name = "" | |
| source_name = "" | |
| try: | |
| source, table = self._lookup(ir) | |
| table_name = table.name | |
| source_name = source.name | |
| if source.source_type != "tabular": | |
| raise ValueError( | |
| f"TabularExecutor cannot run on source_type={source.source_type!r}; " | |
| "expected 'tabular'" | |
| ) | |
| compiled = self._compiler.compile(ir) | |
| logger.info("pandas query", query=_render_query(ir, {c.column_id: c for c in table.columns})) | |
| blob_name = _resolve_blob_name(source, table) | |
| blob_bytes = await self._fetch_blob(blob_name) | |
| result_df = await asyncio.to_thread(_load_and_apply, blob_bytes, compiled) | |
| truncated = len(result_df) > _ROW_HARD_CAP | |
| capped = result_df.head(_ROW_HARD_CAP) | |
| columns = compiled.output_columns | |
| rows = capped.to_dict(orient="records") | |
| elapsed_ms = int((time.perf_counter() - started) * 1000) | |
| logger.info( | |
| "tabular query complete", | |
| source_id=ir.source_id, | |
| rows=len(rows), | |
| truncated=truncated, | |
| elapsed_ms=elapsed_ms, | |
| ) | |
| return QueryResult( | |
| source_id=ir.source_id, | |
| backend="tabular", | |
| columns=columns, | |
| rows=rows, | |
| row_count=len(rows), | |
| truncated=truncated, | |
| elapsed_ms=elapsed_ms, | |
| table_id=ir.table_id, | |
| table_name=table_name, | |
| source_name=source_name, | |
| ) | |
| except Exception as e: | |
| elapsed_ms = int((time.perf_counter() - started) * 1000) | |
| logger.error( | |
| "tabular executor failed", | |
| source_id=ir.source_id, | |
| error=str(e), | |
| elapsed_ms=elapsed_ms, | |
| ) | |
| return QueryResult( | |
| source_id=ir.source_id, | |
| backend="tabular", | |
| elapsed_ms=elapsed_ms, | |
| error=str(e), | |
| table_id=ir.table_id, | |
| table_name=table_name, | |
| source_name=source_name, | |
| ) | |
| # ------------------------------------------------------------------ | |
| # Helpers | |
| # ------------------------------------------------------------------ | |
| def _lookup(self, ir: QueryIR) -> tuple[Source, Table]: | |
| source = next( | |
| (s for s in self._catalog.sources if s.source_id == ir.source_id), None | |
| ) | |
| if source is None: | |
| raise ValueError(f"source_id {ir.source_id!r} not in catalog") | |
| table = next( | |
| (t for t in source.tables if t.table_id == ir.table_id), None | |
| ) | |
| if table is None: | |
| raise ValueError(f"table_id {ir.table_id!r} not in source {ir.source_id!r}") | |
| return source, table | |
| # --------------------------------------------------------------------------- | |
| # Module-level helpers (pure functions — easier to test in isolation) | |
| # --------------------------------------------------------------------------- | |
| def _resolve_blob_name(source: Source, table: Table) -> str: | |
| """Map source.location_ref + table → the Parquet blob name to download. | |
| Delegates to ``parquet_service.parquet_blob_name`` so the same naming | |
| convention (and ``_safe_sheet_name`` sanitization) is used on both the | |
| write side (ingestion) and the read side (query execution). | |
| CSV / Parquet → ``{user_id}/{document_id}.parquet`` | |
| XLSX → ``{user_id}/{document_id}__{safe_sheet}.parquet`` | |
| (writer always uploads with sheet suffix for XLSX, | |
| regardless of sheet count — see processing_service | |
| `_build_excel_documents`) | |
| XLSX is detected via ``Source.name`` (the original filename). This relies | |
| on the upload pipeline preserving the file extension, which it does today | |
| because `Document.filename` is set once at upload and never renamed. | |
| """ | |
| if not source.location_ref.startswith(_AZ_BLOB_PREFIX): | |
| raise ValueError( | |
| f"TabularExecutor expects 'az_blob://...' location_ref, " | |
| f"got {source.location_ref!r}" | |
| ) | |
| path = source.location_ref[len(_AZ_BLOB_PREFIX):] | |
| parts = path.split("/", 1) | |
| if len(parts) != 2 or not parts[0] or not parts[1]: | |
| raise ValueError(f"Malformed az_blob location_ref: {source.location_ref!r}") | |
| user_id, document_id = parts | |
| is_xlsx = source.name.lower().endswith(".xlsx") | |
| sheet_name = table.name if is_xlsx else None | |
| return parquet_blob_name(user_id, document_id, sheet_name) | |
| def _render_query(ir: QueryIR, cols_by_id: dict) -> str: | |
| from ..ir.models import AggSelect, ColumnSelect | |
| parts = ["df"] | |
| if ir.filters: | |
| conds = " & ".join( | |
| f'(df["{cols_by_id[f.column_id].name}"] {f.op} {f.value!r})' | |
| for f in ir.filters | |
| ) | |
| parts.append(f"[{conds}]") | |
| aggs = [s for s in ir.select if isinstance(s, AggSelect)] | |
| cols = [s for s in ir.select if isinstance(s, ColumnSelect)] | |
| if aggs: | |
| col_names = [cols_by_id[s.column_id].name for s in cols] | |
| if ir.group_by: | |
| group_names = [cols_by_id[g].name for g in ir.group_by] | |
| parts.append(f'.groupby({group_names})') | |
| for agg in aggs: | |
| col = f'["{cols_by_id[agg.column_id].name}"]' if agg.column_id else "" | |
| fn_map = {"count": "count()", "count_distinct": "nunique()", "sum": "sum()", "avg": "mean()", "min": "min()", "max": "max()"} | |
| parts.append(f'{col}.{fn_map.get(agg.fn, agg.fn + "()")}') | |
| elif cols: | |
| col_names = [cols_by_id[s.column_id].name for s in cols] | |
| parts.append(f'[{col_names}]') | |
| if ir.limit: | |
| parts.append(f'.head({ir.limit})') | |
| return "".join(parts) | |
| def _load_and_apply(blob_bytes: bytes, compiled: CompiledPandas) -> pd.DataFrame: | |
| """Load Parquet bytes into a DataFrame and apply the compiled op chain.""" | |
| df = pd.read_parquet(io.BytesIO(blob_bytes)) | |
| return compiled.apply(df) | |