sofhiaazzhr commited on
Commit
ca42520
·
1 Parent(s): faf8a4f

TabularIntrospector + on_tabular_uploaded trigger + 31 tests

Browse files
PROGRESS.md CHANGED
@@ -2,8 +2,8 @@
2
 
3
  Persistent tracker mirroring the 42-item ownership table in `REPO_CONTEXT.md` "Team — division of work". Update as PRs land. Future Claude Code sessions read this to know what's already done.
4
 
5
- **Last updated**: 2026-05-07 (PR3-DBSQL compiler + DB executor shipped)
6
- **Current open PR**: PR3-DB (DB owner — SqlCompiler + DbExecutor + golden IR→SQL tests)
7
 
8
  ---
9
 
@@ -21,7 +21,7 @@ Persistent tracker mirroring the 42-item ownership table in `REPO_CONTEXT.md` "T
21
  | PR | Status | Owner(s) | Scope |
22
  |---|---|---|---|
23
  | PR1 | `[x]` merged | DB | Contract locks + catalog plumbing + DB introspector + IR validator + tests |
24
- | PR1-tab | `[ ]` | TAB | Tabular introspector + golden IR examples for tabular |
25
  | PR2a | `[x]` merged | DB | CatalogEnricher + StructuredPipeline + on_db_registered trigger + FK extension on Table |
26
  | PR2b | `[ ]` | B | IntentRouter + planner prompt (pair) + planner LLM service |
27
  | PR3-DB | `[~]` open | DB | SqlCompiler (Postgres) + DbExecutor (sqlglot guard, RO + statement_timeout, asyncio.to_thread) + 36 golden IR→SQL tests |
@@ -53,7 +53,7 @@ Persistent tracker mirroring the 42-item ownership table in `REPO_CONTEXT.md` "T
53
  | # | Item | Owner | Status | Notes |
54
  |---|---|---|---|---|
55
  | 5 | DB introspector (`catalog/introspect/database.py`) | DB | `[x]` | PR1 — reuses Phase 1 `database_client_service`, `db_credential_encryption`, `db_pipeline_service.engine_scope`, `extractor.get_schema/profile_column/get_row_count`. PR2a wired FK extraction (was discarded before). |
56
- | 6 | Tabular introspector (`catalog/introspect/tabular.py`) | TAB | `[ ]` | XLSX sheet one Table; `Source.source_id = document_id` |
57
  | 7 | `BaseIntrospector` ABC (`catalog/introspect/base.py`) | B | `[x]` | Pre-existing; signature locked |
58
 
59
  ### Ingestion — shared catalog plumbing
@@ -71,7 +71,7 @@ Persistent tracker mirroring the 42-item ownership table in `REPO_CONTEXT.md` "T
71
  | # | Item | Owner | Status | Notes |
72
  |---|---|---|---|---|
73
  | 13 | Structured pipeline (`pipeline/structured_pipeline.py`) | B | `[x]` | PR2a (DB owner) — `introspect → enrich → merge with existing → validate → upsert`. Source-type-agnostic: caller supplies the introspector. `default_structured_pipeline()` factory wires production deps lazily so tests can inject mocks without `Settings()` construction. |
74
- | 14 | Triggers (`pipeline/triggers.py`) | B | `[~]` | PR2a — `on_db_registered` implemented (DB owner). `on_tabular_uploaded`, `on_document_uploaded`, `on_catalog_rebuild_requested` still stubs. |
75
  | 15 | Ingestion orchestrator (`pipeline/orchestrator.py`) | B | `[ ]` | Likely redundant — StructuredPipeline already takes the introspector at run() time. Revisit if a higher-level routing layer is needed. |
76
  | 16 | Document pipeline (`pipeline/document_pipeline.py`) | TAB | `[ ]` | Tabular-adjacent (file uploads). Phase 1 implementation exists at `pipeline/document_pipeline/document_pipeline.py` — reuse or rewrite TBD |
77
 
@@ -132,7 +132,7 @@ Persistent tracker mirroring the 42-item ownership table in `REPO_CONTEXT.md` "T
132
  | — | Catalog validator tests (`tests/catalog/test_validator.py`) | B | `[x]` | PR1 — 5 tests |
133
  | — | Catalog store integration test (`tests/catalog/test_store.py`) | DB | `[x]` | PR1 — module-level skip without `RUN_INTEGRATION_TESTS=1` |
134
  | — | DB introspector test | DB | `[ ]` | Deferred to PR2 — needs Postgres testcontainer or fixture infra |
135
- | — | Tabular introspector test | TAB | `[ ]` | TAB to add when introspector lands |
136
  | 41 | Planner eval (`tests/query/planner/`) | B | `[ ]` | PR6 — golden question → IR examples; each side contributes |
137
  | 42 | E2E smoke tests (`tests/e2e/`) | B | `[ ]` | PR4 — pair |
138
  | — | Golden IR fixtures (`tests/fixtures/golden_irs.json`) | B | `[~]` | PR1 seeded with 5 DB-targeting examples; TAB extends in PR1-tab |
@@ -140,6 +140,19 @@ Persistent tracker mirroring the 42-item ownership table in `REPO_CONTEXT.md` "T
140
 
141
  ---
142
 
 
 
 
 
 
 
 
 
 
 
 
 
 
143
  ## What just shipped (PR3-DB — DB owner)
144
 
145
  **Files implemented**:
 
2
 
3
  Persistent tracker mirroring the 42-item ownership table in `REPO_CONTEXT.md` "Team — division of work". Update as PRs land. Future Claude Code sessions read this to know what's already done.
4
 
5
+ **Last updated**: 2026-05-07 (PR1-tabtabular introspector + on_tabular_uploaded trigger + 31 tests)
6
+ **Current open PRs**: PR3-DB (DB owner — SqlCompiler + DbExecutor + golden IR→SQL tests) · PR1-tab (TAB owner — tabular introspector)
7
 
8
  ---
9
 
 
21
  | PR | Status | Owner(s) | Scope |
22
  |---|---|---|---|
23
  | PR1 | `[x]` merged | DB | Contract locks + catalog plumbing + DB introspector + IR validator + tests |
24
+ | PR1-tab | `[~]` open | TAB | Tabular introspector + on_tabular_uploaded trigger + 31 unit tests |
25
  | PR2a | `[x]` merged | DB | CatalogEnricher + StructuredPipeline + on_db_registered trigger + FK extension on Table |
26
  | PR2b | `[ ]` | B | IntentRouter + planner prompt (pair) + planner LLM service |
27
  | PR3-DB | `[~]` open | DB | SqlCompiler (Postgres) + DbExecutor (sqlglot guard, RO + statement_timeout, asyncio.to_thread) + 36 golden IR→SQL tests |
 
53
  | # | Item | Owner | Status | Notes |
54
  |---|---|---|---|---|
55
  | 5 | DB introspector (`catalog/introspect/database.py`) | DB | `[x]` | PR1 — reuses Phase 1 `database_client_service`, `db_credential_encryption`, `db_pipeline_service.engine_scope`, `extractor.get_schema/profile_column/get_row_count`. PR2a wired FK extraction (was discarded before). |
56
+ | 6 | Tabular introspector (`catalog/introspect/tabular.py`) | TAB | `[~]` | PR1-tab — downloads original blob (CSV/XLSX/Parquet), one Table per sheet (XLSX) or one Table (CSV/Parquet). `source_id = document_id`. `fetch_doc`/`fetch_blob` injectable for unit tests (no Settings). |
57
  | 7 | `BaseIntrospector` ABC (`catalog/introspect/base.py`) | B | `[x]` | Pre-existing; signature locked |
58
 
59
  ### Ingestion — shared catalog plumbing
 
71
  | # | Item | Owner | Status | Notes |
72
  |---|---|---|---|---|
73
  | 13 | Structured pipeline (`pipeline/structured_pipeline.py`) | B | `[x]` | PR2a (DB owner) — `introspect → enrich → merge with existing → validate → upsert`. Source-type-agnostic: caller supplies the introspector. `default_structured_pipeline()` factory wires production deps lazily so tests can inject mocks without `Settings()` construction. |
74
+ | 14 | Triggers (`pipeline/triggers.py`) | B | `[~]` | PR2a — `on_db_registered` implemented (DB owner). PR1-tab — `on_tabular_uploaded` implemented (TAB owner). `on_document_uploaded`, `on_catalog_rebuild_requested` still stubs. |
75
  | 15 | Ingestion orchestrator (`pipeline/orchestrator.py`) | B | `[ ]` | Likely redundant — StructuredPipeline already takes the introspector at run() time. Revisit if a higher-level routing layer is needed. |
76
  | 16 | Document pipeline (`pipeline/document_pipeline.py`) | TAB | `[ ]` | Tabular-adjacent (file uploads). Phase 1 implementation exists at `pipeline/document_pipeline/document_pipeline.py` — reuse or rewrite TBD |
77
 
 
132
  | — | Catalog validator tests (`tests/catalog/test_validator.py`) | B | `[x]` | PR1 — 5 tests |
133
  | — | Catalog store integration test (`tests/catalog/test_store.py`) | DB | `[x]` | PR1 — module-level skip without `RUN_INTEGRATION_TESTS=1` |
134
  | — | DB introspector test | DB | `[ ]` | Deferred to PR2 — needs Postgres testcontainer or fixture infra |
135
+ | — | Tabular introspector test | TAB | `[~]` | PR1-tab 31 unit tests (CSV/XLSX/Parquet, stats, PII, error paths). No DB/blob I/O — mocks injected via constructor. |
136
  | 41 | Planner eval (`tests/query/planner/`) | B | `[ ]` | PR6 — golden question → IR examples; each side contributes |
137
  | 42 | E2E smoke tests (`tests/e2e/`) | B | `[ ]` | PR4 — pair |
138
  | — | Golden IR fixtures (`tests/fixtures/golden_irs.json`) | B | `[~]` | PR1 seeded with 5 DB-targeting examples; TAB extends in PR1-tab |
 
140
 
141
  ---
142
 
143
+ ## What just shipped (PR1-tab — TAB owner)
144
+
145
+ **Files implemented**:
146
+ - `src/catalog/introspect/tabular.py` — `TabularIntrospector` reads original blob (CSV/XLSX/Parquet), profiles each column (dtype, stats, sample values), runs PIIDetector. For XLSX: one `Table` per sheet (`Table.name = sheet_name`); for CSV/Parquet: one `Table` (`Table.name = filename stem`). `fetch_doc`/`fetch_blob` are constructor-injectable for unit tests — no `Settings` or DB required at import time.
147
+ - `src/pipeline/triggers.py` — `on_tabular_uploaded` wired (mirrors `on_db_registered` pattern).
148
+
149
+ **Tests added** (31 new):
150
+ - `tests/unit/catalog/test_introspect_tabular.py` — CSV / XLSX / Parquet shapes, per-column stats, nullable detection, PII name + value matching, sample capping, all error paths. Pure Python, no network I/O.
151
+
152
+ **Executor contract note**: introspector downloads the *original* blob for schema reading. The tabular executor (PR3-TAB) downloads *Parquet* blobs for query execution. For CSV/Parquet sources (single table), the executor must call `parquet_blob_name(uid, did, sheet_name=None)`; for XLSX (multi-table), `parquet_blob_name(uid, did, table.name)`.
153
+
154
+ ---
155
+
156
  ## What just shipped (PR3-DB — DB owner)
157
 
158
  **Files implemented**:
src/catalog/introspect/tabular.py CHANGED
@@ -1,15 +1,231 @@
1
  """Tabular file schema introspection (Parquet / CSV / XLSX).
2
 
3
  Reads file headers + samples ~100 rows. For XLSX, each sheet becomes a Table.
4
- Files are expected to live in Azure Blob (location_ref like az_blob://...).
 
 
 
 
 
 
 
 
5
  """
6
 
7
- from ..models import Source
 
 
 
 
 
 
 
 
 
 
 
 
 
8
  from .base import BaseIntrospector
9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
10
 
11
  class TabularIntrospector(BaseIntrospector):
12
- """Read column names, dtypes, and sample values from Parquet/CSV/XLSX."""
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
13
 
14
  async def introspect(self, location_ref: str) -> Source:
15
- raise NotImplementedError
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  """Tabular file schema introspection (Parquet / CSV / XLSX).
2
 
3
  Reads file headers + samples ~100 rows. For XLSX, each sheet becomes a Table.
4
+ Files are expected to live in Azure Blob (location_ref like az_blob://{user_id}/{document_id}).
5
+
6
+ Table.name convention (executor contract)
7
+ -----------------------------------------
8
+ CSV / Parquet → Table.name = filename stem (e.g. "sales_data").
9
+ Parquet blob was uploaded without a sheet suffix, so the
10
+ executor must call parquet_blob_name(uid, did, sheet_name=None).
11
+ XLSX → Table.name = sheet_name (e.g. "Sheet1").
12
+ Executor calls parquet_blob_name(uid, did, table.name).
13
  """
14
 
15
+ import asyncio
16
+ import hashlib
17
+ from collections.abc import Callable, Coroutine
18
+ from datetime import UTC, datetime
19
+ from io import BytesIO
20
+ from pathlib import Path
21
+ from typing import Any
22
+
23
+ import pandas as pd
24
+
25
+ from src.middlewares.logging import get_logger
26
+
27
+ from ..models import Column, ColumnStats, DataType, Source, Table
28
+ from ..pii_detector import PIIDetector
29
  from .base import BaseIntrospector
30
 
31
+ logger = get_logger("tabular_introspector")
32
+
33
+ _AZ_BLOB_PREFIX = "az_blob://"
34
+
35
+
36
+ def _stable_id(prefix: str, *parts: str) -> str:
37
+ h = hashlib.sha1(
38
+ "/".join(parts).encode("utf-8"), usedforsecurity=False
39
+ ).hexdigest()[:12]
40
+ return f"{prefix}{h}"
41
+
42
+
43
+ def _map_pandas_type(dtype: Any) -> DataType:
44
+ s = str(dtype).lower()
45
+ if "int" in s:
46
+ return "int"
47
+ if "float" in s or "decimal" in s:
48
+ return "decimal"
49
+ if "bool" in s:
50
+ return "bool"
51
+ if "datetime" in s:
52
+ return "datetime"
53
+ if "date" in s:
54
+ return "date"
55
+ return "string"
56
+
57
+
58
+ def _normalize(v: Any) -> Any:
59
+ """Coerce non-JSON-native scalars to types that survive the jsonb round-trip."""
60
+ if v is None:
61
+ return None
62
+ try:
63
+ import numpy as np
64
+
65
+ if isinstance(v, np.generic):
66
+ return v.item()
67
+ except ImportError:
68
+ pass
69
+ if isinstance(v, datetime):
70
+ return v.isoformat()
71
+ return v
72
+
73
 
74
  class TabularIntrospector(BaseIntrospector):
75
+ """Read column names, dtypes, and sample values from Parquet/CSV/XLSX.
76
+
77
+ Heavy I/O dependencies (`fetch_doc`, `fetch_blob`) are injectable so unit
78
+ tests can pass mocks without triggering Settings or DB construction — same
79
+ pattern as CatalogEnricher's `structured_chain` parameter.
80
+ """
81
+
82
+ def __init__(
83
+ self,
84
+ fetch_doc: Callable[[str], Coroutine[Any, Any, Any]] | None = None,
85
+ fetch_blob: Callable[[str], Coroutine[Any, Any, bytes]] | None = None,
86
+ ) -> None:
87
+ self._pii = PIIDetector()
88
+ self._fetch_doc = fetch_doc or self._default_fetch_doc
89
+ self._fetch_blob = fetch_blob or self._default_fetch_blob
90
+
91
+ @staticmethod
92
+ async def _default_fetch_doc(document_id: str) -> Any:
93
+ from sqlalchemy import select
94
+
95
+ from src.db.postgres.connection import AsyncSessionLocal
96
+ from src.db.postgres.models import Document as DBDocument
97
+
98
+ async with AsyncSessionLocal() as session:
99
+ result = await session.execute(
100
+ select(DBDocument).where(DBDocument.id == document_id)
101
+ )
102
+ return result.scalar_one_or_none()
103
+
104
+ @staticmethod
105
+ async def _default_fetch_blob(blob_name: str) -> bytes:
106
+ from src.storage.az_blob.az_blob import blob_storage
107
+
108
+ return await blob_storage.download_file(blob_name)
109
 
110
  async def introspect(self, location_ref: str) -> Source:
111
+ if not location_ref.startswith(_AZ_BLOB_PREFIX):
112
+ raise ValueError(
113
+ f"TabularIntrospector expects 'az_blob://...' location_ref, "
114
+ f"got {location_ref!r}"
115
+ )
116
+ rest = location_ref[len(_AZ_BLOB_PREFIX):]
117
+ user_id, _, document_id = rest.partition("/")
118
+ if not user_id or not document_id:
119
+ raise ValueError(
120
+ f"location_ref must be 'az_blob://{{user_id}}/{{document_id}}', "
121
+ f"got {location_ref!r}"
122
+ )
123
+
124
+ doc = await self._fetch_doc(document_id)
125
+ if doc is None:
126
+ raise ValueError(f"Document {document_id!r} not found")
127
+
128
+ logger.info(
129
+ "introspecting tabular source",
130
+ document_id=document_id,
131
+ file_type=doc.file_type,
132
+ filename=doc.filename,
133
+ )
134
+
135
+ content = await self._fetch_blob(doc.blob_name)
136
+
137
+ tables: list[Table] = await asyncio.to_thread(
138
+ self._introspect_sync, content, doc.file_type, doc.filename, document_id
139
+ )
140
+
141
+ return Source(
142
+ source_id=document_id,
143
+ source_type="tabular",
144
+ name=doc.filename,
145
+ description="",
146
+ location_ref=location_ref,
147
+ updated_at=datetime.now(UTC),
148
+ tables=tables,
149
+ )
150
+
151
+ def _introspect_sync(
152
+ self,
153
+ content: bytes,
154
+ file_type: str,
155
+ filename: str,
156
+ document_id: str,
157
+ ) -> list[Table]:
158
+ if file_type == "csv":
159
+ df = pd.read_csv(BytesIO(content))
160
+ return [self._build_table(df, document_id, Path(filename).stem, sheet_name=None)]
161
+ if file_type == "xlsx":
162
+ sheets: dict[str, pd.DataFrame] = pd.read_excel(BytesIO(content), sheet_name=None)
163
+ return [
164
+ self._build_table(df, document_id, sheet_name, sheet_name=sheet_name)
165
+ for sheet_name, df in sheets.items()
166
+ ]
167
+ if file_type == "parquet":
168
+ df = pd.read_parquet(BytesIO(content))
169
+ return [self._build_table(df, document_id, Path(filename).stem, sheet_name=None)]
170
+ raise ValueError(f"Unsupported file_type {file_type!r} for tabular introspection")
171
+
172
+ def _build_table(
173
+ self,
174
+ df: pd.DataFrame,
175
+ document_id: str,
176
+ table_name: str,
177
+ sheet_name: str | None,
178
+ ) -> Table:
179
+ id_parts = (document_id, sheet_name) if sheet_name else (document_id,)
180
+ columns = [
181
+ self._to_column(df[col], document_id, sheet_name, col)
182
+ for col in df.columns
183
+ ]
184
+ return Table(
185
+ table_id=_stable_id("t_", *id_parts),
186
+ name=table_name,
187
+ description="",
188
+ row_count=len(df),
189
+ columns=columns,
190
+ foreign_keys=[],
191
+ )
192
+
193
+ def _to_column(
194
+ self,
195
+ series: pd.Series,
196
+ document_id: str,
197
+ sheet_name: str | None,
198
+ col_name: str,
199
+ ) -> Column:
200
+ id_parts = (
201
+ (document_id, sheet_name, col_name) if sheet_name else (document_id, col_name)
202
+ )
203
+
204
+ sample_raw = series.dropna().head(5).tolist()
205
+ sample_values: list[Any] | None = [_normalize(v) for v in sample_raw] or None
206
+
207
+ is_numeric = pd.api.types.is_numeric_dtype(series)
208
+ is_dt = pd.api.types.is_datetime64_any_dtype(series)
209
+ non_null = series.dropna()
210
+ stats = ColumnStats(
211
+ min=_normalize(non_null.min()) if (is_numeric or is_dt) and len(non_null) > 0 else None,
212
+ max=_normalize(non_null.max()) if (is_numeric or is_dt) and len(non_null) > 0 else None,
213
+ distinct_count=int(series.nunique()),
214
+ )
215
+
216
+ column = Column(
217
+ column_id=_stable_id("c_", *id_parts),
218
+ name=col_name,
219
+ data_type=_map_pandas_type(series.dtype),
220
+ description="",
221
+ nullable=bool(series.isnull().any()),
222
+ pii_flag=False,
223
+ sample_values=sample_values,
224
+ stats=stats,
225
+ )
226
+ if self._pii.detect(column):
227
+ return column.model_copy(update={"pii_flag": True, "sample_values": None})
228
+ return column
229
+
230
+
231
+ tabular_introspector = TabularIntrospector()
src/pipeline/triggers.py CHANGED
@@ -36,17 +36,24 @@ async def on_db_registered(database_client_id: str, user_id: str) -> None:
36
 
37
 
38
  async def on_tabular_uploaded(document_id: str, user_id: str) -> None:
39
- """Stub implemented by TAB owner once `catalog/introspect/tabular.py` lands.
40
 
41
- Expected pattern (mirrors `on_db_registered`):
42
-
43
- from src.catalog.introspect.tabular import tabular_introspector
44
- from src.pipeline.structured_pipeline import default_structured_pipeline
45
- location_ref = f"az_blob://{user_id}/{document_id}"
46
- pipeline = default_structured_pipeline()
47
- await pipeline.run(tabular_introspector, location_ref, user_id)
48
  """
49
- raise NotImplementedError
 
 
 
 
 
 
 
 
 
 
50
 
51
 
52
  async def on_document_uploaded(document_id: str, user_id: str) -> None:
 
36
 
37
 
38
  async def on_tabular_uploaded(document_id: str, user_id: str) -> None:
39
+ """Build an az_blob:// location_ref and run the structured pipeline.
40
 
41
+ Called after a CSV/XLSX/Parquet file has been processed and its Parquet
42
+ blob(s) uploaded. The TabularIntrospector downloads the original blob,
43
+ profiles each column, and produces a Source. The CatalogEnricher then fills
44
+ in descriptions, the catalog is validated and upserted.
 
 
 
45
  """
46
+ from src.catalog.introspect.tabular import tabular_introspector
47
+ from src.pipeline.structured_pipeline import default_structured_pipeline
48
+
49
+ location_ref = f"az_blob://{user_id}/{document_id}"
50
+ logger.info(
51
+ "on_tabular_uploaded triggered",
52
+ user_id=user_id,
53
+ document_id=document_id,
54
+ )
55
+ pipeline = default_structured_pipeline()
56
+ await pipeline.run(tabular_introspector, location_ref, user_id)
57
 
58
 
59
  async def on_document_uploaded(document_id: str, user_id: str) -> None: