sofhiaazzhr commited on
Commit
0ae8717
·
2 Parent(s): df220ea77ea82d

Merge commit 'refs/pr/1' of https://huggingface.co/spaces/DataEyond/Agentic-Service-Data-Eyond-Catalog into pr/1

Browse files
PROGRESS.md ADDED
@@ -0,0 +1,205 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # Progress — Phase 2 catalog-driven build
2
+
3
+ Persistent tracker mirroring the 42-item ownership table in `REPO_CONTEXT.md` "Team — division of work". Update as PRs land. Future Claude Code sessions read this to know what's already done.
4
+
5
+ **Last updated**: 2026-05-07 (PR2a — enricher + structured pipeline shipped)
6
+ **Current open PR**: PR2a (DB owner — CatalogEnricher + StructuredPipeline + on_db_registered trigger + FK extension)
7
+
8
+ ---
9
+
10
+ ## Legend
11
+
12
+ - `[x]` done and merged
13
+ - `[~]` in progress (open PR or active branch)
14
+ - `[ ]` not started
15
+ - **DB** / **TAB** / **B** — ownership (from REPO_CONTEXT.md)
16
+
17
+ ---
18
+
19
+ ## PR sequence
20
+
21
+ | PR | Status | Owner(s) | Scope |
22
+ |---|---|---|---|
23
+ | PR1 | `[x]` merged | DB | Contract locks + catalog plumbing + DB introspector + IR validator + tests |
24
+ | PR1-tab | `[ ]` | TAB | Tabular introspector + golden IR examples for tabular |
25
+ | PR2a | `[~]` open | DB | CatalogEnricher + StructuredPipeline + on_db_registered trigger + FK extension on Table |
26
+ | PR2b | `[ ]` | B | IntentRouter + planner prompt (pair) + planner LLM service |
27
+ | PR3 | `[ ]` | B (split) | SQL compiler + DB executor (DB), pandas compiler + tabular executor (TAB) |
28
+ | PR4 | `[ ]` | B (pair) | ExecutorDispatcher + QueryService + chat stream endpoint integration |
29
+ | PR5 | `[ ]` | B | Retry/self-correction loop on execution failure |
30
+ | PR6 | `[ ]` | B | Eval harness (golden question→IR→result examples) |
31
+ | PR7 | `[ ]` | B | Auto PII tagging review + ChatbotAgent rewrite + API rewiring |
32
+ | Cleanup | `[ ]` | B | Remove Phase 1 (rag/, query/executors/, database_client/, …) once Phase 2 has feature parity |
33
+
34
+ ---
35
+
36
+ ## All items
37
+
38
+ ### Contracts (B — shared)
39
+
40
+ | # | Item | Status | Notes |
41
+ |---|---|---|---|
42
+ | 1 | Catalog Pydantic models (`catalog/models.py`) | `[x]` | PR1 added `location_ref` URI-scheme docstring; PR2a added `ForeignKey` model + `Table.foreign_keys` field |
43
+ | 2 | IR Pydantic models (`query/ir/models.py`) | `[x]` | Pre-existing scaffold |
44
+ | 3 | IR operator whitelists (`query/ir/operators.py`) | `[x]` | PR1 filled `TYPE_COMPATIBILITY` matrix |
45
+ | 4 | PII patterns / regex (`security/pii_patterns.py`) | `[x]` | Pre-existing |
46
+ | — | `catalogs` Postgres jsonb table (`db/postgres/models.py`) | `[x]` | PR1 added `Catalog` SQLAlchemy class + `init_db.py` import |
47
+ | — | `QueryResult` shape (`query/executor/base.py`) | `[x]` | Pre-existing scaffold; revisit in PR3 if `column_types` needed |
48
+ | — | `Source.location_ref` URI scheme | `[x]` | PR1 documented in `catalog/models.py` docstring |
49
+
50
+ ### Ingestion — introspection
51
+
52
+ | # | Item | Owner | Status | Notes |
53
+ |---|---|---|---|---|
54
+ | 5 | DB introspector (`catalog/introspect/database.py`) | DB | `[x]` | PR1 — reuses Phase 1 `database_client_service`, `db_credential_encryption`, `db_pipeline_service.engine_scope`, `extractor.get_schema/profile_column/get_row_count`. PR2a wired FK extraction (was discarded before). |
55
+ | 6 | Tabular introspector (`catalog/introspect/tabular.py`) | TAB | `[ ]` | XLSX sheet → one Table; `Source.source_id = document_id` |
56
+ | 7 | `BaseIntrospector` ABC (`catalog/introspect/base.py`) | B | `[x]` | Pre-existing; signature locked |
57
+
58
+ ### Ingestion — shared catalog plumbing
59
+
60
+ | # | Item | Owner | Status | Notes |
61
+ |---|---|---|---|---|
62
+ | 8 | Catalog enricher + prompt (`catalog/enricher.py`, `config/prompts/catalog_enricher.md`) | B | `[x]` | PR2a (DB owner picked up) — Azure OpenAI GPT-4o, structured output (flat `EnrichmentResponse` keyed by stable IDs), source-type-agnostic prompt with PII suppression and FK rendering. LLM is constructor-injectable for tests. |
63
+ | 9 | Catalog validator (`catalog/validator.py`) | B | `[x]` | PR1 (DB owner picked up) — uniqueness invariants |
64
+ | 10 | Catalog store — Postgres jsonb (`catalog/store.py`) | B | `[x]` | PR1 (DB owner picked up) — `INSERT ... ON CONFLICT` |
65
+ | 11 | Catalog reader (`catalog/reader.py`) | B | `[x]` | PR1 (DB owner picked up) — filters by source_hint, empty on miss |
66
+ | 12 | PII detector (`catalog/pii_detector.py`) | B | `[x]` | PR1 (DB owner picked up) — name + value matching, bias toward over-flag |
67
+
68
+ ### Ingestion — pipelines
69
+
70
+ | # | Item | Owner | Status | Notes |
71
+ |---|---|---|---|---|
72
+ | 13 | Structured pipeline (`pipeline/structured_pipeline.py`) | B | `[x]` | PR2a (DB owner) — `introspect → enrich → merge with existing → validate → upsert`. Source-type-agnostic: caller supplies the introspector. `default_structured_pipeline()` factory wires production deps lazily so tests can inject mocks without `Settings()` construction. |
73
+ | 14 | Triggers (`pipeline/triggers.py`) | B | `[~]` | PR2a — `on_db_registered` implemented (DB owner). `on_tabular_uploaded`, `on_document_uploaded`, `on_catalog_rebuild_requested` still stubs. |
74
+ | 15 | Ingestion orchestrator (`pipeline/orchestrator.py`) | B | `[ ]` | Likely redundant — StructuredPipeline already takes the introspector at run() time. Revisit if a higher-level routing layer is needed. |
75
+ | 16 | Document pipeline (`pipeline/document_pipeline.py`) | TAB | `[ ]` | Tabular-adjacent (file uploads). Phase 1 implementation exists at `pipeline/document_pipeline/document_pipeline.py` — reuse or rewrite TBD |
76
+
77
+ ### Query — shared spine
78
+
79
+ | # | Item | Owner | Status | Notes |
80
+ |---|---|---|---|---|
81
+ | 17 | IR validator (`query/ir/validator.py`) | B | `[x]` | PR1 (DB owner) — full rule set; descriptive errors for planner retry |
82
+ | 18 | Planner LLM service (`query/planner/service.py`) | B | `[ ]` | PR2b |
83
+ | 19 | Planner prompt (`query/planner/prompt.py`, `config/prompts/query_planner.md`) | B | `[ ]` | PR2b — **pair-program**; must render DB and tabular sources uniformly. Can reuse `catalog.enricher.render_source` as a starting point. |
84
+ | 20 | Intent router (`agents/intent_router.py`, `config/prompts/intent_router.md`) | B | `[ ]` | PR2b |
85
+ | 21 | Executor base + `QueryResult` (`query/executor/base.py`) | B | `[x]` | Pre-existing scaffold |
86
+ | 22 | Executor dispatcher (`query/executor/dispatcher.py`) | B | `[ ]` | PR4 — `(Catalog, IR) → BaseExecutor` |
87
+ | 23 | Compiler base ABC (`query/compiler/base.py`) | B | `[x]` | Pre-existing scaffold |
88
+ | 24 | Top-level QueryService (`query/service.py`) | B | `[ ]` | PR4 — wires planner → validator → compiler → executor |
89
+
90
+ ### Query — DB path
91
+
92
+ | # | Item | Status | Notes |
93
+ |---|---|---|---|
94
+ | 25 | SQL compiler (`query/compiler/sql.py`) | `[ ]` | PR3 — IR → (sql, params); identifiers from catalog (quoted), values parameterized |
95
+ | 26 | DB executor (`query/executor/db.py`) | `[ ]` | PR3 — asyncpg/pymysql; sqlglot SELECT-only guard; RO txn; 30s timeout |
96
+ | 27 | Credential encryption (`security/credentials.py`) | `[ ]` | Stub exists; PR1 reused Phase 1 `utils/db_credential_encryption.py` instead. Move in cleanup PR |
97
+ | 28 | User-DB connection management | `[ ]` | Reused Phase 1 `db_pipeline_service.engine_scope` in PR1; new helper in PR3 if needed |
98
+
99
+ ### Query — Tabular path
100
+
101
+ | # | Item | Status | Notes |
102
+ |---|---|---|---|
103
+ | 29 | Pandas compiler (`query/compiler/pandas.py`) | `[ ]` | PR3 — IR → callable on DataFrame |
104
+ | 30 | Tabular executor (`query/executor/tabular.py`) | `[ ]` | PR3 — eager pandas first; pyarrow/polars later if file size demands |
105
+ | 31 | Parquet upload/download wrapper | `[ ]` | Phase 1 has `knowledge/parquet_service.py` — reuse or move to `storage/` in cleanup |
106
+
107
+ ### Agents + chat
108
+
109
+ | # | Item | Status | Notes |
110
+ |---|---|---|---|
111
+ | 32 | Chatbot agent + prompt (`agents/chatbot.py`, `config/prompts/chatbot_system.md`) | `[ ]` | PR7 — receives `QueryResult` or Cu chunks |
112
+ | 33 | Guardrails prompt (`config/prompts/guardrails.md`) | `[ ]` | PR7 |
113
+
114
+ ### API surface
115
+
116
+ | # | Item | Owner | Status | Notes |
117
+ |---|---|---|---|---|
118
+ | 34 | DB client endpoints (`api/v1/db_client.py`) | DB | `[ ]` | Phase 1 endpoint exists — rewire `/ingest` to call `pipeline.triggers.on_db_registered`. Trigger is ready as of PR2a; deferred to a later PR until both teammates ack. |
119
+ | 35 | Document/tabular upload endpoints (`api/v1/document.py`) | TAB | `[ ]` | Phase 1 endpoint exists — rewire after enricher |
120
+ | 36 | Chat stream endpoint (`api/v1/chat.py`) | B | `[ ]` | PR4 — pair on dispatch logic; SSE event sequence stays |
121
+ | 37 | Room / users endpoints (`api/v1/room.py`, `api/v1/users.py`) | B | `[ ]` | No catalog work; only touch if auth flow changes |
122
+
123
+ ### Tests + eval
124
+
125
+ | # | Item | Owner | Status | Notes |
126
+ |---|---|---|---|---|
127
+ | 38 | DB compiler golden tests (`tests/query/compiler/test_sql.py`) | DB | `[ ]` | PR3 — pure-Python, no LLM |
128
+ | 39 | Pandas compiler golden tests (`tests/query/compiler/test_pandas.py`) | TAB | `[ ]` | PR3 — pure-Python, no LLM |
129
+ | 40 | IR validator tests (`tests/query/ir/test_validator.py`) | B | `[x]` | PR1 — 19 tests, all rules covered |
130
+ | — | PII detector tests (`tests/catalog/test_pii_detector.py`) | B | `[x]` | PR1 — 26 tests (parametrized) |
131
+ | — | Catalog validator tests (`tests/catalog/test_validator.py`) | B | `[x]` | PR1 — 5 tests |
132
+ | — | Catalog store integration test (`tests/catalog/test_store.py`) | DB | `[x]` | PR1 — module-level skip without `RUN_INTEGRATION_TESTS=1` |
133
+ | — | DB introspector test | DB | `[ ]` | Deferred to PR2 — needs Postgres testcontainer or fixture infra |
134
+ | — | Tabular introspector test | TAB | `[ ]` | TAB to add when introspector lands |
135
+ | 41 | Planner eval (`tests/query/planner/`) | B | `[ ]` | PR6 — golden question → IR examples; each side contributes |
136
+ | 42 | E2E smoke tests (`tests/e2e/`) | B | `[ ]` | PR4 — pair |
137
+ | — | Golden IR fixtures (`tests/fixtures/golden_irs.json`) | B | `[~]` | PR1 seeded with 5 DB-targeting examples; TAB extends in PR1-tab |
138
+ | — | Shared `sample_catalog` fixture (`tests/conftest.py`) | B | `[x]` | PR1 — DB-shaped; TAB may add tabular sibling |
139
+
140
+ ---
141
+
142
+ ## What just shipped (PR2a — DB owner)
143
+
144
+ **Files implemented**:
145
+ - `src/catalog/enricher.py` — Azure OpenAI GPT-4o + structured output (`EnrichmentResponse`), `render_source` (reusable by planner prompt later), `apply_descriptions` merger, injectable `structured_chain` for tests
146
+ - `src/pipeline/structured_pipeline.py` — `StructuredPipeline` orchestrator + `default_structured_pipeline()` factory with lazy production-dep imports
147
+ - `src/pipeline/triggers.py` — `on_db_registered` wired; tabular/document/rebuild stubs preserved with implementation notes
148
+
149
+ **Files extended**:
150
+ - `src/catalog/models.py` — added `ForeignKey` model, `Table.foreign_keys: list[ForeignKey] = []`
151
+ - `src/catalog/introspect/database.py` — `_extract_foreign_keys` populates `Table.foreign_keys` from extractor data
152
+ - `src/config/prompts/catalog_enricher.md` — full system prompt with style rules and one few-shot example
153
+
154
+ **Tests added** (14 new, all passing — total now 64):
155
+ - `tests/catalog/test_enricher.py` — render / apply / end-to-end with fake chain (10 tests)
156
+ - `tests/pipeline/test_structured_pipeline.py` — orchestration with stub deps (4 tests)
157
+
158
+ **Lint**: `ruff check` clean on all Phase 2 paths. Phase 1 files (`pipeline/db_pipeline/`, `pipeline/document_pipeline/`) have pre-existing ruff issues — out of scope for this PR.
159
+
160
+ ---
161
+
162
+ ## What shipped previously (PR1 — DB owner's first chunk)
163
+
164
+ **Files implemented** (was `NotImplementedError`):
165
+ - `src/catalog/pii_detector.py`, `src/catalog/validator.py`, `src/catalog/store.py`, `src/catalog/reader.py`
166
+ - `src/catalog/introspect/database.py` (FK extraction added in PR2a)
167
+ - `src/query/ir/validator.py`
168
+
169
+ **Files extended**:
170
+ - `src/query/ir/operators.py` — `TYPE_COMPATIBILITY` matrix
171
+ - `src/catalog/models.py` — `location_ref` URI-scheme docstring
172
+ - `src/db/postgres/models.py` — `Catalog` SQLAlchemy table; `init_db.py` imports it
173
+
174
+ **Tests**: 50 unit tests + 1 integration (gated on `RUN_INTEGRATION_TESTS=1`).
175
+
176
+ **Reused Phase 1 utilities** (cleanup deferred):
177
+ - `src/database_client/database_client_service.py:get`
178
+ - `src/utils/db_credential_encryption.py:decrypt_credentials_dict`
179
+ - `src/pipeline/db_pipeline/db_pipeline_service.py:engine_scope`
180
+ - `src/pipeline/db_pipeline/extractor.py:get_schema/profile_column/get_row_count`
181
+
182
+ ---
183
+
184
+ ## Open contract items (not yet locked)
185
+
186
+ - **Joins in IR** — currently single-table only (ARCHITECTURE.md §7); DB owner accepted the constraint for v1, will revisit in PR3 if it's blocking real queries
187
+ - **`updated_at` on Source vs `generated_at` on Catalog** — Pydantic models have both; introspector sets per-Source; CatalogStore preserves both
188
+ - **Catalog refresh trigger** (open question §3) — default policy is rebuild-on-upload-or-connect; auto-refresh deferred
189
+ - **Unstructured catalog entries** (open question §2) — currently empty filter for `source_hint="unstructured"`; revisit when adding doc descriptions
190
+ - **PII handling for `sample_values`** (open question §5) — currently nulls them out (skip); mask/synthesize deferred
191
+ - **Dialect priority for SQL compiler** — PR3 will land Postgres first, MySQL second; BigQuery/Snowflake/SQL Server later
192
+
193
+ ---
194
+
195
+ ## How to update this file
196
+
197
+ When a PR lands:
198
+ 1. Flip status from `[ ]` or `[~]` to `[x]`
199
+ 2. Add a short note (file paths, scope cuts, surprises)
200
+ 3. Bump "Last updated" at the top
201
+ 4. If a new contract decision lands, move it from "Open contract items" to the relevant inline note
202
+
203
+ When opening a PR:
204
+ 1. Flip status to `[~]` and add yourself as the active owner in the PR row
205
+ 2. Don't promise items in the PR description that aren't in the table
REPO_CONTEXT.md CHANGED
@@ -227,16 +227,19 @@ Catalog
227
  └── sources[]
228
  └── Source { source_id, source_type, name, description, location_ref, updated_at }
229
  └── tables[]
230
- └── Table { table_id, name, description, row_count }
231
- ── columns[]
232
- └── Column { column_id, name, data_type, description,
233
- nullable, pii_flag, sample_values[]|null, stats|null }
 
 
234
  ```
235
 
236
  `source_type ∈ {schema, tabular, unstructured}`.
237
  `data_type ∈ {int, decimal, string, datetime, date, bool, json}`.
 
238
 
239
- Deferred Column fields (add when justified): `description_human`, `synonyms[]`, `tags[]`, `primary_key`, `foreign_keys`, `unit`, `semantic_type`, `example_questions[]`, `schema_hash`, `enrichment_status`.
240
 
241
  ---
242
 
 
227
  └── sources[]
228
  └── Source { source_id, source_type, name, description, location_ref, updated_at }
229
  └── tables[]
230
+ └── Table { table_id, name, description, row_count, foreign_keys[] }
231
+ ── columns[]
232
+ └── Column { column_id, name, data_type, description,
233
+ nullable, pii_flag, sample_values[]|null, stats|null }
234
+ └── foreign_keys[]
235
+ └── ForeignKey { column_id, target_table_id, target_column_id }
236
  ```
237
 
238
  `source_type ∈ {schema, tabular, unstructured}`.
239
  `data_type ∈ {int, decimal, string, datetime, date, bool, json}`.
240
+ `ForeignKey` references are within the SAME `Source` only; cross-source FKs are not modeled.
241
 
242
+ Deferred Column fields (add when justified): `description_human`, `synonyms[]`, `tags[]`, `primary_key`, `unit`, `semantic_type`, `example_questions[]`, `schema_hash`, `enrichment_status`.
243
 
244
  ---
245
 
src/catalog/enricher.py CHANGED
@@ -1,16 +1,196 @@
1
  """CatalogEnricher — runs 1 LLM call per source to generate AI descriptions.
2
 
3
  Input: a draft Source produced by an introspector (raw schema, no descriptions).
4
- Output: the same Source enriched with description fields at source/table/column level.
 
 
 
5
 
6
- Prompt: src/config/prompts/catalog_enricher.md
 
 
7
  """
8
 
 
 
 
 
 
 
 
 
 
 
 
9
  from .models import Source
10
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
11
 
12
  class CatalogEnricher:
13
- """Adds AI-generated descriptions to a freshly introspected source."""
 
 
 
 
 
 
 
 
 
 
 
 
14
 
15
  async def enrich(self, source: Source) -> Source:
16
- raise NotImplementedError
 
 
 
 
 
 
 
 
 
 
1
  """CatalogEnricher — runs 1 LLM call per source to generate AI descriptions.
2
 
3
  Input: a draft Source produced by an introspector (raw schema, no descriptions).
4
+ Output: the same Source enriched with description fields at source / table /
5
+ column level. Other fields (sample_values, stats, foreign_keys, ids, etc.) are
6
+ preserved verbatim — the LLM only emits descriptions keyed by stable IDs and
7
+ they are merged back in.
8
 
9
+ Prompt: `src/config/prompts/catalog_enricher.md`.
10
+ LLM: Azure OpenAI GPT-4o by default. The structured-output runnable is
11
+ injectable via the constructor for testability.
12
  """
13
 
14
+ from __future__ import annotations
15
+
16
+ from pathlib import Path
17
+
18
+ from langchain_core.prompts import ChatPromptTemplate
19
+ from langchain_core.runnables import Runnable
20
+ from langchain_openai import AzureChatOpenAI
21
+ from pydantic import BaseModel, Field
22
+
23
+ from src.middlewares.logging import get_logger
24
+
25
  from .models import Source
26
 
27
+ logger = get_logger("catalog_enricher")
28
+
29
+ _PROMPT_PATH = (
30
+ Path(__file__).resolve().parent.parent / "config" / "prompts" / "catalog_enricher.md"
31
+ )
32
+
33
+
34
+ class _IdDescription(BaseModel):
35
+ """One description keyed by a stable identifier from the input Source.
36
+
37
+ `target_id` MUST match a `source_id`, `table_id`, or `column_id` exactly
38
+ as it appeared in the rendered input — do not regenerate or transform.
39
+ """
40
+
41
+ target_id: str = Field(
42
+ ...,
43
+ description="source_id, table_id, or column_id — copied verbatim from the input.",
44
+ )
45
+ description: str = Field(
46
+ ...,
47
+ description="One-line factual description grounded in the input.",
48
+ )
49
+
50
+
51
+ class EnrichmentResponse(BaseModel):
52
+ """Structured output: a flat list of (id, description) pairs."""
53
+
54
+ descriptions: list[_IdDescription] = Field(default_factory=list)
55
+
56
+
57
+ def _load_prompt_text() -> str:
58
+ return _PROMPT_PATH.read_text(encoding="utf-8")
59
+
60
+
61
+ def _build_default_chain() -> Runnable:
62
+ """Construct the production LangChain runnable.
63
+
64
+ `settings` is imported lazily so importing this module is side-effect-free
65
+ for tests that inject a mock `structured_chain` (otherwise
66
+ `Settings()` would fail without a populated `.env`).
67
+ """
68
+ from src.config.settings import settings
69
+
70
+ llm = AzureChatOpenAI(
71
+ azure_deployment=settings.azureai_deployment_name_4o,
72
+ openai_api_version=settings.azureai_api_version_4o,
73
+ azure_endpoint=settings.azureai_endpoint_url_4o,
74
+ api_key=settings.azureai_api_key_4o,
75
+ temperature=0.2,
76
+ )
77
+ prompt = ChatPromptTemplate.from_messages(
78
+ [
79
+ ("system", _load_prompt_text()),
80
+ ("human", "{source_text}"),
81
+ ]
82
+ )
83
+ return prompt | llm.with_structured_output(EnrichmentResponse)
84
+
85
+
86
+ def render_source(source: Source) -> str:
87
+ """Render a Source as the text that the enricher prompt expects.
88
+
89
+ Public so tests and the planner-prompt builder can reuse the same format.
90
+ """
91
+ lines: list[str] = [
92
+ f"Source: {source.name} ({source.source_type})",
93
+ f"Source ID: {source.source_id}",
94
+ "",
95
+ "Tables:",
96
+ ]
97
+
98
+ tables_by_id = {t.table_id: t for t in source.tables}
99
+ col_names_by_id = {
100
+ t.table_id: {c.column_id: c.name for c in t.columns} for t in source.tables
101
+ }
102
+
103
+ for table in source.tables:
104
+ rc = table.row_count
105
+ rc_str = f"({rc:,} rows) " if rc is not None else ""
106
+ lines.append("")
107
+ lines.append(f" Table: {table.name} {rc_str}— id={table.table_id}")
108
+ lines.append(" Columns:")
109
+ for col in table.columns:
110
+ samples = "PII (suppressed)" if col.pii_flag else (col.sample_values or [])
111
+ stats_parts: list[str] = []
112
+ if col.stats:
113
+ if col.stats.min is not None:
114
+ stats_parts.append(f"min={col.stats.min}")
115
+ if col.stats.max is not None:
116
+ stats_parts.append(f"max={col.stats.max}")
117
+ if col.stats.distinct_count is not None:
118
+ stats_parts.append(f"distinct={col.stats.distinct_count}")
119
+ stats_str = (", " + ", ".join(stats_parts)) if stats_parts else ""
120
+ lines.append(
121
+ f" - {col.name} [{col.data_type}]: samples={samples}{stats_str} "
122
+ f"— id={col.column_id}"
123
+ )
124
+ if table.foreign_keys:
125
+ lines.append(" Foreign keys:")
126
+ cols_in_this_table = {c.column_id: c.name for c in table.columns}
127
+ for fk in table.foreign_keys:
128
+ src_col_name = cols_in_this_table.get(fk.column_id, fk.column_id)
129
+ tgt_table = tables_by_id.get(fk.target_table_id)
130
+ tgt_table_name = tgt_table.name if tgt_table else fk.target_table_id
131
+ tgt_col_name = col_names_by_id.get(fk.target_table_id, {}).get(
132
+ fk.target_column_id, fk.target_column_id
133
+ )
134
+ lines.append(f" - {src_col_name} -> {tgt_table_name}.{tgt_col_name}")
135
+ return "\n".join(lines)
136
+
137
+
138
+ def apply_descriptions(source: Source, response: EnrichmentResponse) -> Source:
139
+ """Merge LLM-emitted descriptions back into the Source by stable ID.
140
+
141
+ Items the LLM omitted retain their existing description (usually ""
142
+ from the introspector). All other fields are preserved verbatim.
143
+ """
144
+ by_id = {d.target_id: d.description for d in response.descriptions}
145
+
146
+ new_tables = []
147
+ for table in source.tables:
148
+ new_columns = [
149
+ col.model_copy(
150
+ update={"description": by_id.get(col.column_id, col.description)}
151
+ )
152
+ for col in table.columns
153
+ ]
154
+ new_tables.append(
155
+ table.model_copy(
156
+ update={
157
+ "description": by_id.get(table.table_id, table.description),
158
+ "columns": new_columns,
159
+ }
160
+ )
161
+ )
162
+
163
+ return source.model_copy(
164
+ update={
165
+ "description": by_id.get(source.source_id, source.description),
166
+ "tables": new_tables,
167
+ }
168
+ )
169
+
170
 
171
  class CatalogEnricher:
172
+ """Adds AI-generated descriptions to a freshly introspected source.
173
+
174
+ Inject `structured_chain` for tests; default builds an Azure OpenAI
175
+ GPT-4o chain wired to `with_structured_output(EnrichmentResponse)`.
176
+ """
177
+
178
+ def __init__(self, structured_chain: Runnable | None = None) -> None:
179
+ self._chain = structured_chain
180
+
181
+ def _ensure_chain(self) -> Runnable:
182
+ if self._chain is None:
183
+ self._chain = _build_default_chain()
184
+ return self._chain
185
 
186
  async def enrich(self, source: Source) -> Source:
187
+ rendered = render_source(source)
188
+ chain = self._ensure_chain()
189
+ response: EnrichmentResponse = await chain.ainvoke({"source_text": rendered})
190
+ logger.info(
191
+ "catalog source enriched",
192
+ source_id=source.source_id,
193
+ descriptions=len(response.descriptions),
194
+ tables=len(source.tables),
195
+ )
196
+ return apply_descriptions(source, response)
src/catalog/introspect/database.py CHANGED
@@ -27,7 +27,7 @@ from src.pipeline.db_pipeline.extractor import (
27
  )
28
  from src.utils.db_credential_encryption import decrypt_credentials_dict
29
 
30
- from ..models import Column, ColumnStats, DataType, Source, Table
31
  from ..pii_detector import PIIDetector
32
  from .base import BaseIntrospector
33
 
@@ -171,6 +171,8 @@ class DatabaseIntrospector(BaseIntrospector):
171
  continue
172
  columns.append(self._to_column(table_name, col, profile))
173
 
 
 
174
  tables.append(
175
  Table(
176
  table_id=_stable_id("t_", table_name),
@@ -178,10 +180,36 @@ class DatabaseIntrospector(BaseIntrospector):
178
  description="",
179
  row_count=row_count,
180
  columns=columns,
 
181
  )
182
  )
183
  return tables
184
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
185
  def _to_column(
186
  self, table_name: str, col: dict[str, Any], profile: dict[str, Any]
187
  ) -> Column:
 
27
  )
28
  from src.utils.db_credential_encryption import decrypt_credentials_dict
29
 
30
+ from ..models import Column, ColumnStats, DataType, ForeignKey, Source, Table
31
  from ..pii_detector import PIIDetector
32
  from .base import BaseIntrospector
33
 
 
171
  continue
172
  columns.append(self._to_column(table_name, col, profile))
173
 
174
+ foreign_keys = self._extract_foreign_keys(table_name, cols)
175
+
176
  tables.append(
177
  Table(
178
  table_id=_stable_id("t_", table_name),
 
180
  description="",
181
  row_count=row_count,
182
  columns=columns,
183
+ foreign_keys=foreign_keys,
184
  )
185
  )
186
  return tables
187
 
188
+ @staticmethod
189
+ def _extract_foreign_keys(
190
+ table_name: str, cols: list[dict[str, Any]]
191
+ ) -> list[ForeignKey]:
192
+ """Convert extractor's `foreign_key: 'target_table.target_col'` strings
193
+ into ForeignKey objects with stable IDs (derived deterministically from
194
+ names — same scheme used to generate table_id / column_id elsewhere).
195
+ """
196
+ fks: list[ForeignKey] = []
197
+ for col in cols:
198
+ fk_str = col.get("foreign_key")
199
+ if not fk_str:
200
+ continue
201
+ target_table, _, target_col = fk_str.partition(".")
202
+ if not target_table or not target_col:
203
+ continue
204
+ fks.append(
205
+ ForeignKey(
206
+ column_id=_stable_id("c_", table_name, col["name"]),
207
+ target_table_id=_stable_id("t_", target_table),
208
+ target_column_id=_stable_id("c_", target_table, target_col),
209
+ )
210
+ )
211
+ return fks
212
+
213
  def _to_column(
214
  self, table_name: str, col: dict[str, Any], profile: dict[str, Any]
215
  ) -> Column:
src/catalog/models.py CHANGED
@@ -48,12 +48,25 @@ class Column(BaseModel):
48
  stats: ColumnStats | None = None
49
 
50
 
 
 
 
 
 
 
 
 
 
 
 
 
51
  class Table(BaseModel):
52
  table_id: str
53
  name: str
54
  description: str
55
  row_count: int | None = None
56
  columns: list[Column]
 
57
 
58
 
59
  class Source(BaseModel):
 
48
  stats: ColumnStats | None = None
49
 
50
 
51
+ class ForeignKey(BaseModel):
52
+ """A FK edge from one column in this table to a column in another table.
53
+
54
+ All references use stable IDs derived from source/table/column names so
55
+ edges survive renames at the `name` level. The target table must belong
56
+ to the SAME `Source` — cross-source FKs are not modeled in v1.
57
+ """
58
+ column_id: str # the column in this table that holds the FK
59
+ target_table_id: str # referenced table_id, within the same Source
60
+ target_column_id: str # referenced column_id
61
+
62
+
63
  class Table(BaseModel):
64
  table_id: str
65
  name: str
66
  description: str
67
  row_count: int | None = None
68
  columns: list[Column]
69
+ foreign_keys: list[ForeignKey] = Field(default_factory=list)
70
 
71
 
72
  class Source(BaseModel):
src/config/prompts/catalog_enricher.md CHANGED
@@ -1,21 +1,62 @@
1
- # Catalog Enricher Prompt
2
 
3
- Takes raw introspected schema (table names, column names, types, sample values)
4
- and produces AI-generated descriptions for the source, each table, and each column.
5
 
6
- One LLM call per source at ingestion time. Used by `src/catalog/enricher.py`.
7
 
8
- ## System prompt
9
 
10
- (to be written)
 
 
 
 
 
 
 
11
 
12
- ## Output schema
13
 
14
- Strict JSON matching the catalog Pydantic models in `src/catalog/models.py`.
15
- Validated by `CatalogValidator` before being persisted.
16
 
17
- ## Notes
18
 
19
- - Descriptions should be one-line and factual.
20
- - Do NOT invent column meanings beyond what the sample values support.
21
- - For `pii_flag` columns, do NOT include sample values in the description.
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ You are a senior data analyst writing concise, factual descriptions for a user's data catalog. The catalog is consumed by an AI agent that helps the same user query their data, so descriptions must accurately convey what each source / table / column **is**, not what an analyst might guess it could be.
2
 
3
+ ## Your task
 
4
 
5
+ Given a single data source rendered below, produce a description for the source itself, each table inside it, and each column inside each table. Return your output as structured JSON matching the requested schema; identifiers (`source_id`, `table_id`, `column_id`) must be copied **verbatim** from the input.
6
 
7
+ ## Style rules
8
 
9
+ - **One factual sentence per item.** Two only if a second is genuinely necessary (e.g., to call out a unit or important caveat).
10
+ - **Ground every claim in the evidence shown** — column names, sample values, stats, foreign keys, table names. Do not invent semantics that the inputs do not support.
11
+ - **Do not restate the data type.** ("`amount` (decimal)" is redundant; say what `amount` represents.)
12
+ - **Mention obvious units / scales** when sample values make them clear (e.g., "in cents", "in milliseconds", "0–100 score").
13
+ - **PII columns have `samples=PII (suppressed)`.** Describe their role from the column name only — do not speculate about content. Example: "Customer's email address." (correct), not "Email address, e.g. alice@example.com" (wrong, invented).
14
+ - **Foreign keys**: when present, mention the relationship at the table level (e.g., "Each row links to a customer via `customer_id`"). Don't repeat the FK in the column description unless it's the primary point.
15
+ - **Do not include sample values verbatim in any description.** Use them as evidence to infer meaning, not as content to quote.
16
+ - **No markdown formatting in descriptions** (no bold, lists, code fences). Plain text only.
17
 
18
+ ## Source-level description
19
 
20
+ One or two sentences describing what kind of data this source holds and what the user might use it for. If the source has only one table, the source and table descriptions can overlap but should not be identical word-for-word.
 
21
 
22
+ ## Table-level description
23
 
24
+ What real-world entity or event each row represents. Mention the grain (one row per …) if non-obvious.
25
+
26
+ ## Column-level description
27
+
28
+ What the column **means**, not what it stores. Read sample values, ranges, and the column name together to triangulate. If genuinely ambiguous, write a description that captures the ambiguity rather than picking one interpretation arbitrarily.
29
+
30
+ ## Few-shot example
31
+
32
+ Input (rendered source):
33
+
34
+ ```
35
+ Source: prod_db (schema)
36
+
37
+ Tables:
38
+
39
+ Table: orders (12,453 rows)
40
+ Columns:
41
+ - id [int]: samples=[1, 2, 3], min=1, max=12453, distinct=12453
42
+ - customer_id [int]: samples=[42, 17, 99], min=1, max=8200, distinct=8200
43
+ - total_cents [int]: samples=[2499, 4999, 1999], min=99, max=999900, distinct=4321
44
+ - status [string]: samples=[completed, pending, refunded], distinct=4
45
+ - created_at [datetime]: samples=[2026-04-01T08:12:00Z, 2026-04-01T08:14:00Z], distinct=12440
46
+ Foreign keys:
47
+ - customer_id -> customers.id
48
+ ```
49
+
50
+ Expected descriptions:
51
+
52
+ - Source: "Production order ledger for the storefront — one row per checkout, used for reporting on revenue, fulfillment, and customer purchase history."
53
+ - Table `orders`: "One row per completed or attempted checkout; links each order to the customer who placed it."
54
+ - Column `id`: "Unique order identifier."
55
+ - Column `customer_id`: "Identifier of the customer who placed the order; references customers.id."
56
+ - Column `total_cents`: "Order total in cents (USD), inclusive of taxes and discounts."
57
+ - Column `status`: "Lifecycle state of the order — values include completed, pending, and refunded."
58
+ - Column `created_at`: "Timestamp when the order was placed (UTC)."
59
+
60
+ ## Output
61
+
62
+ Return strict JSON matching the schema requested by the structured-output binding. The shape mirrors the input source: a flat map of identifiers to descriptions is acceptable but the full nested form is preferred. Identifiers must match the input exactly — do not regenerate them.
src/pipeline/structured_pipeline.py CHANGED
@@ -1,13 +1,98 @@
1
  """StructuredPipeline — runs catalog enrichment for DB / tabular sources.
2
 
3
- Steps:
4
- 1. introspect (catalog/introspect/database.py or catalog/introspect/tabular.py)
5
- 2. enrich (catalog/enricher.py — 1 LLM call per source)
6
- 3. validate (catalog/validator.py)
7
- 4. write (catalog/store.py)
 
 
 
 
 
 
 
 
8
  """
9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
10
 
11
  class StructuredPipeline:
12
- async def run(self, source_ref: str, source_type: str, user_id: str) -> None:
13
- raise NotImplementedError
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  """StructuredPipeline — runs catalog enrichment for DB / tabular sources.
2
 
3
+ Steps (per source, end-to-end):
4
+ 1. introspect (caller-supplied — DatabaseIntrospector or TabularIntrospector)
5
+ 2. enrich (catalog/enricher.py — 1 LLM call per source)
6
+ 3. merge (replace any existing source with the same source_id)
7
+ 4. validate (catalog/validator.py)
8
+ 5. upsert (catalog/store.py)
9
+
10
+ Source-type-agnostic: the caller picks the introspector. Triggers in
11
+ `pipeline/triggers.py` know which one to use based on the upload event.
12
+
13
+ Heavy production deps (`CatalogEnricher`, `CatalogStore`) are imported lazily
14
+ inside `default_structured_pipeline()` so unit tests that inject mocks never
15
+ trigger `Settings()` construction at module import.
16
  """
17
 
18
+ from __future__ import annotations
19
+
20
+ from datetime import UTC, datetime
21
+ from typing import TYPE_CHECKING
22
+
23
+ from src.catalog.introspect.base import BaseIntrospector
24
+ from src.catalog.models import Catalog, Source
25
+ from src.middlewares.logging import get_logger
26
+
27
+ if TYPE_CHECKING:
28
+ from src.catalog.enricher import CatalogEnricher
29
+ from src.catalog.store import CatalogStore
30
+ from src.catalog.validator import CatalogValidator
31
+
32
+ logger = get_logger("structured_pipeline")
33
+
34
 
35
  class StructuredPipeline:
36
+ """Orchestrates introspect enrich merge validate store.
37
+
38
+ Dependencies are injected (no concrete imports at class-definition time)
39
+ so tests can pass mocks without constructing Settings or opening DB
40
+ connections.
41
+ """
42
+
43
+ def __init__(
44
+ self,
45
+ enricher: CatalogEnricher,
46
+ validator: CatalogValidator,
47
+ store: CatalogStore,
48
+ ) -> None:
49
+ self._enricher = enricher
50
+ self._validator = validator
51
+ self._store = store
52
+
53
+ async def run(
54
+ self,
55
+ introspector: BaseIntrospector,
56
+ location_ref: str,
57
+ user_id: str,
58
+ ) -> Source:
59
+ source = await introspector.introspect(location_ref)
60
+ enriched = await self._enricher.enrich(source)
61
+ merged = await self._merge_with_existing(user_id, enriched)
62
+ self._validator.validate(merged)
63
+ await self._store.upsert(merged)
64
+ logger.info(
65
+ "structured pipeline complete",
66
+ user_id=user_id,
67
+ source_id=enriched.source_id,
68
+ source_type=enriched.source_type,
69
+ tables=len(enriched.tables),
70
+ )
71
+ return enriched
72
+
73
+ async def _merge_with_existing(self, user_id: str, new_source: Source) -> Catalog:
74
+ existing = await self._store.get(user_id)
75
+ now = datetime.now(UTC)
76
+ if existing is None:
77
+ return Catalog(user_id=user_id, generated_at=now, sources=[new_source])
78
+ kept = [s for s in existing.sources if s.source_id != new_source.source_id]
79
+ return existing.model_copy(
80
+ update={"sources": [*kept, new_source], "generated_at": now}
81
+ )
82
+
83
+
84
+ def default_structured_pipeline() -> StructuredPipeline:
85
+ """Build the production pipeline with default deps.
86
+
87
+ Lazy imports keep `from src.pipeline.structured_pipeline import …` cheap
88
+ and side-effect-free for tests.
89
+ """
90
+ from src.catalog.enricher import CatalogEnricher
91
+ from src.catalog.store import CatalogStore
92
+ from src.catalog.validator import CatalogValidator
93
+
94
+ return StructuredPipeline(
95
+ enricher=CatalogEnricher(),
96
+ validator=CatalogValidator(),
97
+ store=CatalogStore(),
98
+ )
src/pipeline/triggers.py CHANGED
@@ -1,21 +1,63 @@
1
  """Pipeline trigger entry points called from API routes / event handlers.
2
 
3
- These thin functions are what the FastAPI routes invoke; they delegate
4
- to IngestionOrchestrator.
 
 
 
 
5
  """
6
 
 
7
 
8
- async def on_document_uploaded(document_id: str, user_id: str) -> None:
9
- raise NotImplementedError
10
 
11
 
12
  async def on_db_registered(database_client_id: str, user_id: str) -> None:
13
- raise NotImplementedError
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
14
 
15
 
16
  async def on_tabular_uploaded(document_id: str, user_id: str) -> None:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
17
  raise NotImplementedError
18
 
19
 
20
  async def on_catalog_rebuild_requested(user_id: str) -> None:
 
 
21
  raise NotImplementedError
 
1
  """Pipeline trigger entry points called from API routes / event handlers.
2
 
3
+ These thin functions are what the FastAPI routes invoke; they delegate to the
4
+ appropriate pipeline (StructuredPipeline for DB/tabular, DocumentPipeline for
5
+ unstructured).
6
+
7
+ Errors propagate from the pipelines — the caller decides whether to surface
8
+ them as HTTP 4xx/5xx or quietly fail. The trigger itself does not catch.
9
  """
10
 
11
+ from src.middlewares.logging import get_logger
12
 
13
+ logger = get_logger("pipeline_triggers")
 
14
 
15
 
16
  async def on_db_registered(database_client_id: str, user_id: str) -> None:
17
+ """Build a dbclient:// location_ref and run the structured pipeline.
18
+
19
+ Called by `/api/v1/database-clients/{id}/ingest` (after rewiring in a
20
+ later PR). The DatabaseIntrospector resolves the client_id to a
21
+ DatabaseClient row, decrypts credentials, connects, and produces a Source.
22
+ The CatalogEnricher then fills in descriptions, the catalog is validated
23
+ and upserted.
24
+ """
25
+ from src.catalog.introspect.database import database_introspector
26
+ from src.pipeline.structured_pipeline import default_structured_pipeline
27
+
28
+ location_ref = f"dbclient://{database_client_id}"
29
+ logger.info(
30
+ "on_db_registered triggered",
31
+ user_id=user_id,
32
+ database_client_id=database_client_id,
33
+ )
34
+ pipeline = default_structured_pipeline()
35
+ await pipeline.run(database_introspector, location_ref, user_id)
36
 
37
 
38
  async def on_tabular_uploaded(document_id: str, user_id: str) -> None:
39
+ """Stub — implemented by TAB owner once `catalog/introspect/tabular.py` lands.
40
+
41
+ Expected pattern (mirrors `on_db_registered`):
42
+
43
+ from src.catalog.introspect.tabular import tabular_introspector
44
+ from src.pipeline.structured_pipeline import default_structured_pipeline
45
+ location_ref = f"az_blob://{user_id}/{document_id}"
46
+ pipeline = default_structured_pipeline()
47
+ await pipeline.run(tabular_introspector, location_ref, user_id)
48
+ """
49
+ raise NotImplementedError
50
+
51
+
52
+ async def on_document_uploaded(document_id: str, user_id: str) -> None:
53
+ """Stub — for unstructured (PDF/DOCX/TXT). Owned by TAB / document pipeline.
54
+
55
+ Expected to call DocumentPipeline (extract → chunk → embed → PGVector).
56
+ """
57
  raise NotImplementedError
58
 
59
 
60
  async def on_catalog_rebuild_requested(user_id: str) -> None:
61
+ """Stub — re-runs every source for a user, useful after enricher prompt
62
+ changes. Implemented when the bulk re-enrichment script lands."""
63
  raise NotImplementedError