feat/Catalog Retrieval System

#1
by rhbt6767 - opened
This view is limited to 50 files because it contains too many changes. See the raw diff here.
Files changed (50) hide show
  1. ARCHITECTURE.md +340 -0
  2. PHASE1_TO_PHASE2_REPORT.md +260 -0
  3. PROGRESS.md +381 -0
  4. REPO_CONTEXT.md +474 -0
  5. main.py +15 -11
  6. pyproject.toml +2 -1
  7. scripts/build_initial_catalogs.py +73 -0
  8. scripts/enrich_all_sources.py +16 -0
  9. src/agents/chat_handler.py +274 -0
  10. src/agents/chatbot.py +153 -69
  11. src/agents/orchestration.py +100 -70
  12. src/api/v1/chat.py +50 -167
  13. src/api/v1/data_catalog.py +100 -0
  14. src/api/v1/db_client.py +15 -25
  15. src/api/v1/document.py +12 -2
  16. src/api/v1/knowledge.py +0 -25
  17. src/catalog/README.md +6 -0
  18. src/catalog/__init__.py +1 -0
  19. src/catalog/introspect/__init__.py +1 -0
  20. src/catalog/introspect/base.py +18 -0
  21. src/catalog/introspect/database.py +246 -0
  22. src/catalog/introspect/tabular.py +239 -0
  23. src/catalog/models.py +86 -0
  24. src/catalog/pii_detector.py +39 -0
  25. src/catalog/reader.py +40 -0
  26. src/catalog/render.py +69 -0
  27. src/catalog/store.py +82 -0
  28. src/catalog/validator.py +49 -0
  29. src/config/agents/guardrails_prompt.md +0 -7
  30. src/config/agents/system_prompt.md +0 -26
  31. src/{pipeline/document_pipeline → config/prompts}/__init__.py +0 -0
  32. src/config/prompts/chatbot_system.md +31 -0
  33. src/config/prompts/guardrails.md +11 -0
  34. src/config/prompts/intent_router.md +66 -0
  35. src/config/prompts/query_planner.md +168 -0
  36. src/db/postgres/init_db.py +2 -1
  37. src/db/postgres/models.py +19 -0
  38. src/knowledge/processing_service.py +0 -106
  39. src/middlewares/logging.py +3 -0
  40. src/models/api/__init__.py +1 -0
  41. src/models/api/catalog.py +27 -0
  42. src/models/api/chat.py +17 -0
  43. src/models/api/document.py +9 -0
  44. src/models/user_info.py +0 -15
  45. src/pipeline/db_pipeline/extractor.py +76 -35
  46. src/pipeline/{document_pipeline/document_pipeline.py → document_pipeline.py} +32 -3
  47. src/pipeline/structured_pipeline.py +91 -0
  48. src/pipeline/triggers.py +115 -0
  49. src/query/README.md +11 -0
  50. src/query/base.py +0 -32
ARCHITECTURE.md ADDED
@@ -0,0 +1,340 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # Architecture — Data Eyond Agentic Service
2
+
3
+ **Last updated**: 2026-05-07
4
+ **Status**: Design phase — folder skeleton in place, implementation in progress
5
+
6
+ ---
7
+
8
+ ## TL;DR
9
+
10
+ A catalog-driven AI service for data analysis. Users upload documents and register databases or tabular files; they ask natural-language questions and get answers grounded in their data.
11
+
12
+ The architecture has two paths:
13
+
14
+ - **Unstructured** (PDF, DOCX, TXT) — dense similarity over prose chunks (the right primitive for free-form text).
15
+ - **Structured** (databases, XLSX, CSV, Parquet) — a per-user **data catalog** describes what tables/columns exist; an LLM produces a structured **JSON intermediate representation (IR)** of the user's intent; a deterministic compiler turns the IR into SQL or pandas operations.
16
+
17
+ The LLM produces *intent*, not query syntax. Deterministic code does the rest.
18
+
19
+ ---
20
+
21
+ ## 1. Why catalog-driven design
22
+
23
+ For a database or spreadsheet, a user's question maps to *known tables and columns* — not to *similar text fragments*. Treating structured data with the same retrieval primitive as prose (chunk + embed + rank top-K) makes the right column survive a probabilistic ranking lottery. Catalog-based **lookup** is the right primitive instead.
24
+
25
+ A central per-user catalog also means:
26
+
27
+ - One place to keep table/column descriptions (AI-generated, refreshed when the source changes).
28
+ - The query planner sees the user's full data landscape in a single prompt.
29
+ - Schema stays stable across user sessions without hitting the source DB on every query.
30
+ - New sources auto-update the catalog without re-embedding chunks.
31
+
32
+ ---
33
+
34
+ ## 2. Source taxonomy
35
+
36
+ ```
37
+ Sources
38
+ ├── Unstructured (pdf, docx, txt) → Cu (prose chunks via DocumentRetriever)
39
+ └── Structured
40
+ ├── Schema (DB) → Cs (DB tables + columns)
41
+ └── Tabular (xlsx, csv, parquet) → Ct (sheets + columns)
42
+ Cs ∪ Ct = Data Catalog Context
43
+ ```
44
+
45
+ - **Cu** = unstructured prose context. Retrieval primitive: dense similarity over chunks.
46
+ - **Cs** = DB schema context (tables, columns, descriptions, sample values).
47
+ - **Ct** = tabular file context (sheets, columns, descriptions, sample values).
48
+ - **Data Catalog Context** = `Cs ∪ Ct`. Passed to the query planner as a single unified view.
49
+
50
+ DB vs tabular is **not** a routing concern — it's a per-source attribute (`source_type`) on each catalog entry. The split only matters at execution time (SQL vs pandas).
51
+
52
+ ---
53
+
54
+ ## 3. Routing model
55
+
56
+ ```
57
+ source_hint ∈ { chat, unstructured, structured }
58
+ ```
59
+
60
+ - `chat` — no search, conversational reply only
61
+ - `unstructured` — DocumentRetriever path (Cu)
62
+ - `structured` — catalog-driven path (Cs ∪ Ct → planner → compiler → executor)
63
+
64
+ The router commits to one path. Cross-source questions ("compare DB sales vs uploaded customer file") are handled inside the structured path because the planner sees both Cs and Ct in one prompt.
65
+
66
+ ---
67
+
68
+ ## 4. Core architectural decisions
69
+
70
+ ### 4.1 Catalog as primary context, not retrieval
71
+
72
+ For most users (≤50 tables), the entire catalog fits in ~3-5k tokens and is passed verbatim to the planner. No vector search, no BM25, no chunk retrieval. The LLM reads the whole catalog and picks the right table.
73
+
74
+ When a user has hundreds of tables, **catalog-level retrieval** (BM25 + table-level vectors with RRF) can be added as a slicer between `CatalogReader` and `Planner`. Deferred until measurably needed.
75
+
76
+ ### 4.2 JSON IR over raw SQL
77
+
78
+ The planner LLM emits a structured JSON IR describing query intent — not a SQL string. A deterministic compiler turns the IR into SQL (per dialect) or pandas/polars operations.
79
+
80
+ Benefits:
81
+
82
+ - Validatable with Pydantic before execution
83
+ - Compiler whitelists allowed operations (no DROP, DELETE, etc.)
84
+ - Portable: same IR → SQL (any dialect) / pandas / polars
85
+ - Cheaper tokens, easier to debug, trivially testable without an LLM
86
+ - LLM cannot emit valid-but-wrong SQL syntax
87
+
88
+ ### 4.3 Deterministic compiler, not LLM SQL writer
89
+
90
+ The LLM produces *intent* (the IR). All actual query construction is deterministic Python. Compiler bugs are reproducible and fixable. Same IR always produces the same query.
91
+
92
+ ### 4.4 Pipeline stage isolation
93
+
94
+ Each stage is its own module with typed input and typed output. No god classes. Stages: `IntentRouter`, `CatalogReader`, `QueryPlanner`, `IRValidator`, `QueryCompiler`, `QueryExecutor`, `ChatbotAgent`. Each is testable in isolation.
95
+
96
+ ### 4.5 Minimal LLM surface
97
+
98
+ LLM calls happen in exactly three places (KM-557 removed `CatalogEnricher`; ingestion is now LLM-free — the planner reads column names, stats, and sample rows directly):
99
+
100
+ 1. **`IntentRouter`** — once per user message
101
+ 2. **`QueryPlanner`** — once per structured query (produces the IR)
102
+ 3. **`ChatbotAgent`** — once per answer (formats the response)
103
+
104
+ Compiler and executors are pure code. No LLM in the hot path of query construction.
105
+
106
+ ---
107
+
108
+ ## 5. End-to-end flow
109
+
110
+ ### Ingestion (when user uploads a file or connects a DB)
111
+
112
+ ```
113
+ source upload / DB connect
114
+
115
+ introspect schema (DB: information_schema; tabular: file headers + sample rows)
116
+
117
+ validate (Pydantic)
118
+
119
+ write to catalog store (Postgres jsonb in `data_catalog`, keyed by user_id)
120
+ ```
121
+
122
+ For unstructured files: chunk + embed → PGVector.
123
+
124
+ ### Query (per user message)
125
+
126
+ ```
127
+ User message
128
+
129
+ Chat cache check (Redis, 24h TTL)
130
+ ↓ miss
131
+ Load chat history
132
+
133
+ IntentRouter LLM → needs_search? source_hint?
134
+
135
+ ├── chat → ChatbotAgent → SSE stream
136
+ ├── unstructured → DocumentRetriever → answerer
137
+ └── structured →
138
+ CatalogReader (load full Cs ∪ Ct for user)
139
+
140
+ QueryPlanner LLM → JSON IR
141
+
142
+ IRValidator (Pydantic + columns-exist + ops whitelist)
143
+
144
+ QueryCompiler → SQL (schema source) or pandas (tabular source)
145
+
146
+ QueryExecutor (DbExecutor or TabularExecutor)
147
+
148
+ QueryResult
149
+
150
+ ChatbotAgent → SSE stream
151
+ ```
152
+
153
+ ---
154
+
155
+ ## 6. Data catalog
156
+
157
+ ### Storage
158
+
159
+ Per-user JSON document, stored as a `jsonb` row in Postgres keyed by `user_id`.
160
+
161
+ ### Schema (initial scope)
162
+
163
+ ```
164
+ Catalog
165
+ ├── user_id, schema_version, generated_at
166
+ └── sources[]
167
+ └── Source
168
+ ├── source_id, source_type, name, description, location_ref, updated_at
169
+ └── tables[]
170
+ └── Table
171
+ ├── table_id, name, description, row_count
172
+ └── columns[]
173
+ └── Column
174
+ ├── column_id, name, data_type, description
175
+ ├── nullable
176
+ ├── pii_flag
177
+ ├── sample_values[]
178
+ └── stats: { min, max, distinct_count } | null
179
+ ```
180
+
181
+ ### Best-practice fields deferred
182
+
183
+ `description_human`, `synonyms[]`, `tags[]`, `primary_key`, `foreign_keys`, `unit`, `semantic_type`, `example_questions[]`, `schema_hash`, `enrichment_status`. Add when justified by user need.
184
+
185
+ ### Stable IDs
186
+
187
+ `source_id`, `table_id`, `column_id` are stable internal references. `name` fields can change (e.g. column rename in source DB) without invalidating cached IRs.
188
+
189
+ ### PII handling
190
+
191
+ Columns with `pii_flag: true` have `sample_values: null` — real values never enter LLM prompts. Auto-detected at ingestion via name patterns + value regex.
192
+
193
+ ---
194
+
195
+ ## 7. JSON IR
196
+
197
+ ### Schema (initial scope)
198
+
199
+ ```
200
+ QueryIR
201
+ ├── ir_version : "1.0"
202
+ ├── source_id : str (references catalog)
203
+ ├── table_id : str (references catalog)
204
+ ├── select[] : SelectItem
205
+ │ ├── { kind: "column", column_id, alias? }
206
+ │ └── { kind: "agg", fn, column_id?, alias? }
207
+ ├── filters[] : { column_id, op, value, value_type }
208
+ ├── group_by[] : column_id
209
+ ├── order_by[] : { column_id | alias, dir }
210
+ └── limit : int | null
211
+ ```
212
+
213
+ ### Whitelisted operators
214
+
215
+ ```
216
+ Filter ops: = != < <= > >= in not_in is_null is_not_null like between
217
+ Agg fns: count count_distinct sum avg min max
218
+ ```
219
+
220
+ ### Validation rules (enforced before execution)
221
+
222
+ - `source_id` exists in catalog for this user
223
+ - `table_id` belongs to that source
224
+ - Every `column_id` exists in that table
225
+ - Every `agg.fn` and `filter.op` is whitelisted
226
+ - `value_type` consistent with column's `data_type`
227
+ - `limit` positive int, ≤ hard cap (e.g. 10000)
228
+
229
+ If any rule fails → reject IR → re-prompt planner with error context (max 3 retries).
230
+
231
+ ### Deferred features
232
+
233
+ `having`, `offset`, boolean tree filters (OR/NOT), `distinct`, joins, window functions. Add as user demand proves the limitation.
234
+
235
+ ---
236
+
237
+ ## 8. Executors
238
+
239
+ Same input (validated IR), same output (`QueryResult`), different backends.
240
+
241
+ ### DbExecutor (schema sources)
242
+
243
+ ```
244
+ IR → SqlCompiler → SQL string + params
245
+
246
+ sqlglot validation (SELECT-only, whitelist tables/columns, LIMIT enforced)
247
+
248
+ asyncpg / pymysql in read-only transaction with timeout (30s)
249
+
250
+ QueryResult
251
+ ```
252
+
253
+ Identifiers come from catalog (verified at validation time, safe to inline as quoted identifiers). Values are always parameterized — never inlined as strings.
254
+
255
+ ### TabularExecutor (tabular sources)
256
+
257
+ ```
258
+ IR → PandasCompiler → operation chain
259
+
260
+ choose strategy by file size:
261
+ ≤ 100 MB → eager pandas
262
+ 100 MB-1 GB → pyarrow with predicate pushdown
263
+ > 1 GB → polars lazy scan
264
+
265
+ execute in asyncio.to_thread (CPU work off the event loop)
266
+
267
+ QueryResult
268
+ ```
269
+
270
+ Initially eager pandas is sufficient. Add the others when a real file is too big.
271
+
272
+ ### Shared safety guarantees
273
+
274
+ 1. IR validated before reaching compiler
275
+ 2. Compiler is deterministic (no LLM)
276
+ 3. Identifiers from catalog (trusted)
277
+ 4. Values parameterized
278
+ 5. sqlglot second-line defence for SQL
279
+ 6. Read-only at every layer
280
+ 7. Timeouts and row caps
281
+
282
+ ---
283
+
284
+ ## 9. Implementation scope
285
+
286
+ ### Initial PR — what ships first
287
+
288
+ | Item | Folder |
289
+ |---|---|
290
+ | Data catalog Pydantic models | `src/catalog/models.py` |
291
+ | Catalog ingestion (introspect → enrich → validate → store) | `src/catalog/`, `src/pipeline/` |
292
+ | `IntentRouter` with 3-way source_hint | `src/agents/` |
293
+ | `CatalogReader` (loads full catalog) | `src/catalog/reader.py` |
294
+ | `QueryPlanner` LLM call | `src/query/planner/` |
295
+ | JSON IR Pydantic models | `src/query/ir/models.py` |
296
+ | IR validator | `src/query/ir/validator.py` |
297
+
298
+ **Output**: a validated JSON IR object. Execution lands in a follow-up PR.
299
+
300
+ ### Follow-up PRs
301
+
302
+ | PR | Scope |
303
+ |---|---|
304
+ | 2 | `QueryCompiler` (IR → SQL / pandas) |
305
+ | 3 | `QueryExecutor` split: `DbExecutor` + `TabularExecutor` |
306
+ | 4 | Retry / self-correction loop on execution failure |
307
+ | 5 | Eval harness (golden question→IR→result examples) |
308
+ | 6 | Auto PII tagging in catalog |
309
+ | Later | Joins in IR, schema drift detection, hybrid catalog search |
310
+
311
+ ---
312
+
313
+ ## 10. Open questions
314
+
315
+ | # | Question | Why it matters |
316
+ |---|---|---|
317
+ | 1 | Catalog storage: JSON file per user vs Postgres `jsonb` row? | Affects ingestion + read performance |
318
+ | 2 | Should the catalog also list unstructured files (with descriptions only)? | Gives router unified view of all user sources |
319
+ | 3 | Catalog refresh trigger: explicit "rebuild" button, on every upload, or background TTL? | Staleness vs latency tradeoff |
320
+ | 4 | Confirm joins are out of initial IR scope? | Limits what user questions can be answered |
321
+ | 5 | PII handling for sample_values: mask, synthesize, or skip? | Affects what gets sent to LLM prompts |
322
+
323
+ ---
324
+
325
+ ## 11. References
326
+
327
+ - `docs/flowchart.html` — interactive end-to-end diagram (open in browser)
328
+ - `docs/flowchart.mmd` — mermaid source for the diagram
329
+
330
+ ---
331
+
332
+ ## Glossary
333
+
334
+ - **Cu** — unstructured context (prose chunks)
335
+ - **Cs** — schema context (DB tables/columns from catalog)
336
+ - **Ct** — tabular context (file sheets/columns from catalog)
337
+ - **IR** — intermediate representation (the JSON query shape)
338
+ - **PR** — pull request (a unit of code change)
339
+ - **PII** — personally identifiable information (names, emails, etc.)
340
+ - **ABC** — abstract base class (Python contract for subclasses)
PHASE1_TO_PHASE2_REPORT.md ADDED
@@ -0,0 +1,260 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # Phase 1 → Phase 2 Migration Report
2
+
3
+ A walkthrough of what changed between the original retrieval-style backend (Phase 1) and the current catalog-driven backend (Phase 2). Intended as a hand-off for the lead.
4
+
5
+ ---
6
+
7
+ ## 1. The conceptual change
8
+
9
+ **Phase 1** was a single retrieval-style RAG pipeline. Every question — whether it pointed at a database, a spreadsheet, or a PDF — went through the same primitive: **chunk + embed + top-K** over PGVector. Schema and tabular columns were embedded as chunks and ranked alongside prose. When the question needed SQL, the LLM **wrote the SQL string directly** (via `query_executor`).
10
+
11
+ **Phase 2** splits the system into two paths governed by an LLM router:
12
+
13
+ | Path | Primitive | Why |
14
+ |---|---|---|
15
+ | Unstructured (PDF / DOCX / TXT) | Dense similarity over prose chunks (PGVector) | Right primitive for free text |
16
+ | Structured (DB / CSV / XLSX / Parquet) | **Per-user data catalog** → LLM emits a **JSON IR** of intent → deterministic **compiler** → **executor** (SQL or pandas) | A column lookup shouldn't go through a similarity ranking lottery; the LLM emits intent, never SQL syntax |
17
+
18
+ Three explicit LLM call sites only:
19
+
20
+ 1. **Intent router** (classifies the user message into `chat` / `unstructured` / `structured`)
21
+ 2. **Query planner** (turns the question + catalog into a Pydantic-validated `QueryIR`)
22
+ 3. **Chatbot agent** (formats the final answer, streamed over SSE)
23
+
24
+ Everything else — IR validation, SQL/pandas compilation, execution — is deterministic Python.
25
+
26
+ ---
27
+
28
+ ## 2. File-by-file changes
29
+
30
+ ### 2.1 Deleted (Phase 1 only)
31
+
32
+ | Phase 1 path | Reason it was removed |
33
+ |---|---|
34
+ | `src/rag/base.py`, `src/rag/retriever.py`, `src/rag/router.py` | Replaced by `src/retrieval/` |
35
+ | `src/rag/retrievers/baseline.py`, `schema.py`, `document.py` | Schema retrieval gone (catalog replaces it); document retriever rewritten in `src/retrieval/document.py` |
36
+ | `src/tools/search.py` (whole `tools/` folder) | Only consumer was `rag/router.py` |
37
+ | `src/query/base.py` | Duplicate of `query/executor/base.py` |
38
+ | `src/query/query_executor.py` | Replaced by `src/query/service.py` |
39
+ | `src/query/executors/db_executor.py` | Replaced by `src/query/executor/db.py` |
40
+ | `src/query/executors/tabular.py` | Replaced by `src/query/executor/tabular.py` |
41
+ | `src/agents/chatbot.py` (Phase 1 LangChain chatbot) | Phase 2 `ChatbotAgent` lives at the same path now — see §2.2 |
42
+ | `src/api/v1/knowledge.py` | Fake `/knowledge/rebuild` endpoint, never wired |
43
+ | `src/config/agents/system_prompt.md`, `guardrails_prompt.md` | Replaced by `src/config/prompts/{chatbot_system,guardrails}.md` |
44
+ | `src/models/structured_output.py` (`IntentClassification`) | Replaced by `IntentRouterDecision` Pydantic model inside `agents/orchestration.py` |
45
+ | `src/models/sql_query.py` | LLM no longer emits SQL; IR replaces it |
46
+ | `src/pipeline/orchestrator.py` (empty stub) | Redundant — `StructuredPipeline` takes the introspector at `run()` time |
47
+
48
+ ### 2.2 Renamed / moved (same role, new home)
49
+
50
+ | Phase 1 location | Phase 2 location | Notes |
51
+ |---|---|---|
52
+ | `src/agents/chatbot.py` (Phase 1) → deleted, then `src/agents/answer_agent.py` (`AnswerAgent`) → renamed | `src/agents/chatbot.py::ChatbotAgent` | Final answer formation; streams via `astream` |
53
+ | `src/knowledge/parquet_service.py` | `src/storage/parquet.py` | Parquet upload/download helper |
54
+ | `src/pipeline/document_pipeline/document_pipeline.py` (folder) | `src/pipeline/document_pipeline.py` (flat) | Single module |
55
+ | `src/rag/retrievers/document.py` | `src/retrieval/document.py` | `DocumentRetriever` migrated; tabular file types filtered out of results |
56
+ | `src/rag/router.py` | `src/retrieval/router.py` | `RetrievalRouter`, Redis-cached, unstructured-only; dead `db: AsyncSession` + `source_hint` params removed |
57
+ | `src/rag/base.py` (`RetrievalResult`, `BaseRetriever`) | `src/retrieval/base.py` | Same dataclass + ABC |
58
+
59
+ > **Heads-up on the intent router**: the Phase 1 file `src/agents/orchestration.py` and its class `OrchestratorAgent` were **kept in place** for Phase 2 — but the body was fully rewritten. The class now emits `IntentRouterDecision(needs_search, source_hint ∈ {chat, unstructured, structured}, rewritten_query)`. The prompt file and test file use the `intent_router` name (`config/prompts/intent_router.md`, `tests/agents/test_intent_router.py`), but **the source module is still `orchestration.py` and the class is still `OrchestratorAgent`**. Existing imports continue to work; only the behavior changed.
60
+
61
+ ### 2.3 Added (Phase 2 new)
62
+
63
+ **Catalog subsystem (whole new concept)**
64
+
65
+ | Path | Role |
66
+ |---|---|
67
+ | `src/catalog/models.py` | Pydantic: `Catalog → Source[] → Table[] → Column[]`, `ForeignKey`, `ColumnStats.top_values` |
68
+ | `src/catalog/introspect/base.py` | `BaseIntrospector` ABC |
69
+ | `src/catalog/introspect/database.py` | DB introspector — wraps Phase 1 `db_pipeline/extractor.py` (`get_schema`, `profile_column`, `get_row_count`) |
70
+ | `src/catalog/introspect/tabular.py` | CSV / XLSX / Parquet introspector — one `Table` per XLSX sheet |
71
+ | `src/catalog/render.py` | Renders a `Source` for the planner prompt |
72
+ | `src/catalog/validator.py` | Unique-ID + foreign-key-ref invariants |
73
+ | `src/catalog/store.py` | Postgres `jsonb` upsert keyed by `user_id` (table `data_catalog`) |
74
+ | `src/catalog/reader.py` | Loads + filters catalog by `source_hint` |
75
+ | `src/catalog/pii_detector.py` | Flags PII columns at ingestion → suppresses `sample_values` |
76
+ | `src/security/pii_patterns.py` | Name patterns + value regex used by the detector |
77
+
78
+ **JSON IR + query subsystem**
79
+
80
+ | Path | Role |
81
+ |---|---|
82
+ | `src/query/ir/models.py` | `QueryIR` Pydantic schema |
83
+ | `src/query/ir/operators.py` | `ALLOWED_FILTER_OPS`, `ALLOWED_AGG_FNS`, `LIMIT_HARD_CAP`, `TYPE_COMPATIBILITY` |
84
+ | `src/query/ir/validator.py` | Catalog-aware IR validation (rejects unknown column ids, bad ops, type mismatches, oversize limits) |
85
+ | `src/query/planner/service.py` | `QueryPlannerService.plan(question, catalog, previous_error)` — Azure OpenAI structured output → `QueryIR` |
86
+ | `src/query/planner/prompt.py` | Builds the planner prompt from catalog text |
87
+ | `src/query/compiler/base.py` | Compiler ABC |
88
+ | `src/query/compiler/sql.py` | `SqlCompiler` (Postgres) — all 12 filter ops, params as a dict |
89
+ | `src/query/compiler/pandas.py` | `PandasCompiler` — returns `CompiledPandas(apply, output_columns)` |
90
+ | `src/query/executor/base.py` | `BaseExecutor` + `QueryResult` |
91
+ | `src/query/executor/db.py` | `DbExecutor` — sqlglot SELECT-only guard, RO txn, 30 s `statement_timeout`, 10 k row cap |
92
+ | `src/query/executor/tabular.py` | `TabularExecutor` — Parquet via blob, `asyncio.to_thread`, 10 k cap |
93
+ | `src/query/executor/dispatcher.py` | `ExecutorDispatcher.pick(ir)` — picks by `source.source_type` |
94
+ | `src/query/service.py` | `QueryService.run(user_id, question, catalog)` — plan → validate → retry (max 3) → dispatch → execute |
95
+
96
+ **Agents**
97
+
98
+ | Path | Role |
99
+ |---|---|
100
+ | `src/agents/orchestration.py` | `OrchestratorAgent` — Phase 1 file/class name preserved; Phase 2 body. Emits `IntentRouterDecision` |
101
+ | `src/agents/chatbot.py` | `ChatbotAgent` — formerly `AnswerAgent` in `agents/answer_agent.py`; renamed in Cleanup PR |
102
+ | `src/agents/chat_handler.py` | `ChatHandler.handle(...)` — top-level orchestrator; yields `intent` / `chunk` / `done` / `error` SSE events |
103
+
104
+ **Pipelines & API**
105
+
106
+ | Path | Role |
107
+ |---|---|
108
+ | `src/pipeline/structured_pipeline.py` | DB / tabular ingestion: introspect → merge → validate → upsert |
109
+ | `src/pipeline/triggers.py` | `on_db_registered`, `on_tabular_uploaded`, `on_document_uploaded`, `on_catalog_rebuild_requested` |
110
+ | `src/api/v1/data_catalog.py` | `GET /api/v1/data-catalog/{user_id}` + `POST /api/v1/data-catalog/rebuild` |
111
+ | `src/models/api/catalog.py` | Catalog request/response models |
112
+ | `src/config/prompts/intent_router.md`, `query_planner.md`, `chatbot_system.md`, `guardrails.md` | New prompts. `guardrails.md` is appended to `chatbot_system.md` at load time |
113
+ | `src/db/postgres/models.py` (added `Catalog` SQLAlchemy class) | Stores the per-user jsonb document in `data_catalog` |
114
+
115
+ ### 2.4 Rewired API endpoints
116
+
117
+ | Endpoint | Phase 1 wiring | Phase 2 wiring |
118
+ |---|---|---|
119
+ | `POST /api/v1/chat/stream` | Inline in `chat.py`: `OrchestratorAgent` → `retriever` → `query_executor` → `chatbot` | Delegates to `ChatHandler.handle()`. Redis cache, fast intent, history load, and message persistence stay in the endpoint |
120
+ | `POST /api/v1/database-clients/{id}/ingest` | Called `db_pipeline_service.run()` and dual-wrote vectors | Calls **only** `on_db_registered` (catalog build). Failure → HTTP 500 |
121
+ | `POST /api/v1/document/process` | Always pushed to vector store | PDF/DOCX/TXT → `knowledge_processor` (vectors); CSV/XLSX → `on_tabular_uploaded` (catalog only, **no vector embedding**) |
122
+ | `POST /api/v1/document/upload` | Storage + DB row | Same, plus `on_document_uploaded` trigger |
123
+ | `POST /api/v1/data-catalog/rebuild` | — | New: iterates all sources, re-runs per-source trigger |
124
+ | `GET /api/v1/data-catalog/{user_id}` | — | New: returns `list[CatalogIndexEntry]` |
125
+
126
+ ### 2.5 Phase 1 files still in production use
127
+
128
+ These were **not rewritten** — Phase 2 imports them directly:
129
+
130
+ - `src/database_client/database_client_service.py`
131
+ - `src/utils/db_credential_encryption.py` (`decrypt_credentials_dict`) — `src/security/credentials.py` is still a stub
132
+ - `src/pipeline/db_pipeline/db_pipeline_service.py` (`engine_scope` context manager — used by both the introspector and `DbExecutor`)
133
+ - `src/pipeline/db_pipeline/extractor.py` (`get_schema`, `profile_column`, `get_row_count`)
134
+ - `src/knowledge/processing_service.py` (PDF / DOCX / TXT extraction + embedding)
135
+ - `src/db/postgres/{connection,init_db,vector_store}.py`, `src/storage/az_blob/`, `src/middlewares/`, `src/security/auth.py`
136
+
137
+ ---
138
+
139
+ ## 3. End-to-end flow (current state)
140
+
141
+ ### 3.1 Ingestion
142
+
143
+ ```
144
+ User action Pipeline Storage
145
+ ──────��─────── ──────────────────────────── ─────────────────
146
+ upload PDF/DOCX/TXT → DocumentPipeline → Azure Blob + PGVector
147
+ (extract → chunk → embed) (table: langchain_pg_embedding)
148
+ + on_document_uploaded + retrieval cache invalidate
149
+
150
+ upload CSV/XLSX → TabularIntrospector → Azure Blob (Parquet)
151
+ (sheets / columns + sample + stats) + data_catalog jsonb row
152
+ → CatalogValidator → CatalogStore (NO vector store — catalog only)
153
+ via on_tabular_uploaded
154
+
155
+ register DB → DatabaseIntrospector → data_catalog jsonb row
156
+ (information_schema + sample + FKs)
157
+ → validate → store
158
+ via on_db_registered
159
+ ```
160
+
161
+ ### 3.2 Query (per user message → SSE stream)
162
+
163
+ ```
164
+ POST /api/v1/chat/stream
165
+
166
+ ├── Redis cache check (24h TTL) — hit returns cached stream
167
+ ├── _fast_intent (greetings / goodbyes) — bypass LLM
168
+ ├── load history from chat_messages
169
+
170
+ └── ChatHandler.handle(message, user_id, history) [src/agents/chat_handler.py]
171
+
172
+ ├─ OrchestratorAgent.classify() [agents/orchestration.py]
173
+ │ → needs_search, source_hint, rewritten_query
174
+
175
+ ├── source_hint == "chat"
176
+ │ → ChatbotAgent.astream() → yield chunk events
177
+
178
+ ├── source_hint == "unstructured"
179
+ │ → RetrievalRouter.retrieve() [retrieval/router.py, Redis-cached]
180
+ │ → DocumentRetriever (PGVector MMR/cosine/etc.)
181
+ │ → ChatbotAgent.astream(chunks=...)
182
+
183
+ └── source_hint == "structured"
184
+ → CatalogReader.read(user_id, "structured") [catalog/reader.py]
185
+ → QueryService.run(user_id, question, catalog) [query/service.py]
186
+
187
+ ├─ QueryPlannerService.plan(...) [query/planner/service.py]
188
+ │ LLM(catalog, question, prev_error?) → QueryIR
189
+
190
+ ├─ IRValidator.validate(ir, catalog) [query/ir/validator.py]
191
+ │ fail → loop back to planner with error context (max 3)
192
+
193
+ ├─ ExecutorDispatcher.pick(ir) [query/executor/dispatcher.py]
194
+ │ schema source → DbExecutor
195
+ │ tabular source → TabularExecutor
196
+
197
+ ├─ DbExecutor.run(ir): [query/executor/db.py]
198
+ │ SqlCompiler → (sql, params)
199
+ │ → sqlglot SELECT-only guard
200
+ │ → engine_scope (Phase 1 utility) in asyncio.to_thread
201
+ │ → RO txn + statement_timeout=30s + 10k cap
202
+
203
+ ├─ TabularExecutor.run(ir): [query/executor/tabular.py]
204
+ │ resolve Parquet blob path
205
+ │ → download → PandasCompiler.apply(df)
206
+ │ → asyncio.to_thread → 10k cap
207
+
208
+ └─ QueryResult { rows, columns, row_count,
209
+ truncated, source_id, error?, elapsed_ms }
210
+
211
+ ChatbotAgent.astream(query_result=...)
212
+ → yield chunk events
213
+
214
+ └── final events: done / error
215
+
216
+ └── persist user + assistant messages to chat_messages
217
+ └── populate Redis cache
218
+ ```
219
+
220
+ **Safety invariants for the structured path** (read-only at every layer):
221
+
222
+ 1. IR validated against the catalog before reaching the compiler
223
+ 2. Identifiers come from the catalog (trusted; inlined as quoted identifiers)
224
+ 3. Values from `IR.filters` are always parameterized
225
+ 4. Compiler is deterministic — no LLM in the hot path
226
+ 5. sqlglot rejects anything that isn't a pure SELECT
227
+ 6. DB connection is read-only with a 30 s `statement_timeout`
228
+ 7. Hard 10 000 row cap on both executors; neither raises — errors go in `QueryResult.error`
229
+
230
+ ---
231
+
232
+ ## 4. Summary table for review
233
+
234
+ | Concern | Phase 1 — where it lived | Phase 2 — where it lives | Change type |
235
+ |---|---|---|---|
236
+ | Intent classification | `agents/orchestration.py::OrchestratorAgent` (free-text intent) | **Same path + same class name** — body rewritten to emit `IntentRouterDecision` | Body rewrite only |
237
+ | Top-level chat orchestration | Inline in `api/v1/chat.py` | `agents/chat_handler.py::ChatHandler` | Extracted to a reusable module |
238
+ | Final answer formation | `agents/chatbot.py` (Phase 1 LangChain) | `agents/chatbot.py::ChatbotAgent` (was `AnswerAgent` in `answer_agent.py` mid-cycle) | Rewritten + renamed |
239
+ | Schema retrieval (DB / tabular) | `rag/retrievers/schema.py` + PGVector chunks | **Removed**. Replaced by catalog (`catalog/store.py` jsonb) loaded verbatim into planner prompt | Whole concept replaced |
240
+ | Doc retrieval (PDF / DOCX / TXT) | `rag/retrievers/document.py`, `rag/router.py` | `retrieval/document.py`, `retrieval/router.py` | Moved; Redis cache restored; tabular files filtered |
241
+ | Query writing | `query/query_executor.py` + `models/sql_query.py` (LLM writes SQL) | `query/planner/service.py` (LLM writes IR) + `query/compiler/sql.py` (deterministic) | LLM emits intent, not SQL |
242
+ | DB execution | `query/executors/db_executor.py` | `query/executor/db.py::DbExecutor` | Folder renamed (`executors` → `executor`); sqlglot guard + RO txn + 30 s timeout kept |
243
+ | Tabular execution | `query/executors/tabular.py` | `query/executor/tabular.py::TabularExecutor` | Parquet-only; pandas compiler split out |
244
+ | Executor selection | Hard-coded in `query_executor.py` | `query/executor/dispatcher.py::ExecutorDispatcher` | New; routes by `source.source_type` |
245
+ | Catalog (NEW) | — | `catalog/` (models, introspect/, validator, store, reader, pii_detector, render) | New subsystem |
246
+ | Catalog persistence | (data was embedded in PGVector) | Postgres jsonb table `data_catalog`, keyed by `user_id` | New table |
247
+ | Ingestion triggers | Inline in API endpoints | `pipeline/triggers.py` (`on_db_registered`, `on_tabular_uploaded`, `on_document_uploaded`, `on_catalog_rebuild_requested`) | Centralized event entry points |
248
+ | Structured pipeline | `pipeline/db_pipeline/db_pipeline_service.py` (still present for `engine_scope` + extractor reuse) | `pipeline/structured_pipeline.py` (orchestrator) — reuses Phase 1 extractor | New orchestrator wraps Phase 1 introspection helpers |
249
+ | Document pipeline | `pipeline/document_pipeline/document_pipeline.py` (folder) | `pipeline/document_pipeline.py` (file) | Flattened; CSV / XLSX now skip the vector store |
250
+ | Parquet helper | `knowledge/parquet_service.py` | `storage/parquet.py` | Moved into `storage/` |
251
+ | Prompts | `config/agents/system_prompt.md`, `guardrails_prompt.md` | `config/prompts/{intent_router,query_planner,chatbot_system,guardrails}.md` | Folder renamed; split into four files; guardrails appended to `chatbot_system` at load |
252
+ | PII detection | — | `catalog/pii_detector.py` + `security/pii_patterns.py` | New. Columns flagged `pii_flag=true` get `sample_values: null` so PII never enters prompts |
253
+ | Chat endpoint | `api/v1/chat.py` (does everything inline) | `api/v1/chat.py` (cache + history + persistence) → delegates to `ChatHandler` | Slimmed; SSE event shape is `intent` / `chunk` / `done` / `error` |
254
+ | DB ingest endpoint | `api/v1/db_client.py::ingest` (Phase 1 `db_pipeline_service.run()`) | `api/v1/db_client.py::ingest` (calls `on_db_registered` only) | Phase 1 dual-write removed |
255
+ | Document process endpoint | `api/v1/document.py::process` (always vectorize) | `api/v1/document.py::process` (PDF/DOCX/TXT → vectors; CSV/XLSX → catalog via `on_tabular_uploaded`) | Routing by file type |
256
+ | Catalog management API | — | `api/v1/data_catalog.py` (GET index + POST rebuild) | New |
257
+
258
+ **Bottom line.** Every Phase 1 file under `src/rag/`, `src/tools/`, `src/query/executors/`, `src/query/query_executor.py`, `src/query/base.py`, `src/api/v1/knowledge.py`, and `src/config/agents/` is gone. Phase 1 introspection helpers under `src/pipeline/db_pipeline/` and `src/database_client/` are still imported by Phase 2 — they were not rewritten, just wrapped. The three LLM call sites are now explicit and the SQL-writing one no longer exists; the planner emits a Pydantic-validated `QueryIR` instead.
259
+
260
+ The one filename gotcha to remember: the **intent router** still lives at `src/agents/orchestration.py` as class `OrchestratorAgent` (Phase 1 name kept for import-site compatibility, Phase 2 body). The matching prompt and tests use the `intent_router` name, but the source module does not.
PROGRESS.md ADDED
@@ -0,0 +1,381 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # Progress — Phase 2 catalog-driven build
2
+
3
+ Persistent tracker mirroring the 42-item ownership table in `REPO_CONTEXT.md` "Team — division of work". Update as PRs land. Future Claude Code sessions read this to know what's already done.
4
+
5
+ **Last updated**: 2026-05-12 ([NOTICKET] Cleanup PR landed: ChatHandler wired to chat.py, Phase 1 dual-write dropped from /ingest, on_catalog_rebuild_requested implemented, dead modules deleted, answer_agent→chatbot renamed, retrieval cache restored via RetrievalRouter, top_values added to ColumnStats, lifespan migration, knowledge_router removed)
6
+ **Current open PR**: `pr/1` — active. Cleanup PR committed and pushed.
7
+
8
+ ---
9
+
10
+ ## Legend
11
+
12
+ - `[x]` done and merged
13
+ - `[~]` in progress (open PR or active branch)
14
+ - `[ ]` not started
15
+ - **DB** / **TAB** / **B** — ownership (from REPO_CONTEXT.md)
16
+
17
+ ---
18
+
19
+ ## PR sequence
20
+
21
+ | PR | Status | Owner(s) | Scope |
22
+ |---|---|---|---|
23
+ | PR1 | `[x]` merged | DB | Contract locks + catalog plumbing + DB introspector + IR validator + tests |
24
+ | PR1-tab | `[x]` shipped | TAB | Tabular introspector + on_tabular_uploaded trigger + 31 unit tests |
25
+ | PR2a | `[x]` merged | DB | CatalogEnricher + StructuredPipeline + on_db_registered trigger + FK extension on Table (enricher later removed in KM-557) |
26
+ | KM-557 | `[x]` shipped | DB | Drop CatalogEnricher entirely (cost cut — planner uses stats + sample rows directly); rename jsonb table `catalogs` → `data_catalog`; add `GET /api/v1/data-catalog/{user_id}` index endpoint for catalog refresher |
27
+ | PR2b | `[x]` shipped | DB-solo (B-review) | IntentRouter + planner prompt + planner LLM service |
28
+ | PR3-DB | `[x]` shipped | DB | SqlCompiler (Postgres) + DbExecutor (sqlglot guard, RO + statement_timeout, asyncio.to_thread) + 36 golden IR→SQL tests |
29
+ | PR3-TAB | `[x]` shipped | TAB | PandasCompiler + TabularExecutor + 43+12 golden IR→DataFrame tests |
30
+ | PR4 | `[x]` | DB-solo (B-review) | ExecutorDispatcher + QueryService + ChatHandler module. **API rewired in Cleanup PR.** |
31
+ | PR5 | `[x]` shipped | DB-solo (B-review) | Retry/self-correction loop on validation failure (lives in QueryService, max 3 attempts, planner re-prompted with prior error) |
32
+ | PR6 | `[~]` scaffold | DB-solo (B-review) | Eval harness scaffold + 3 DB-targeting golden cases. Skipped without `RUN_PLANNER_EVAL=1` env. TAB extends with tabular cases. |
33
+ | PR7 | `[x]` | DB-solo (B-review) | `ChatbotAgent` (renamed from `AnswerAgent`) + chatbot_system + guardrails prompts. `answer_agent.py` → `chatbot.py`, `AnswerAgent` → `ChatbotAgent`. API rewired in Cleanup PR. |
34
+ | Cleanup | `[x]` | B | ChatHandler wired to chat.py; Phase 1 dual-write dropped from /ingest; on_catalog_rebuild_requested + POST /data-catalog/rebuild; dead modules deleted (chatbot Phase 1, orchestrator, query/base, knowledge.py, config/agents/); retrieval cache restored via RetrievalRouter; top_values added to ColumnStats; lifespan migration; knowledge_router removed. |
35
+
36
+ ---
37
+
38
+ ## All items
39
+
40
+ ### Contracts (B — shared)
41
+
42
+ | # | Item | Status | Notes |
43
+ |---|---|---|---|
44
+ | 1 | Catalog Pydantic models (`catalog/models.py`) | `[x]` | PR1 added `location_ref` URI-scheme docstring; PR2a added `ForeignKey` model + `Table.foreign_keys` field |
45
+ | 2 | IR Pydantic models (`query/ir/models.py`) | `[x]` | Pre-existing scaffold |
46
+ | 3 | IR operator whitelists (`query/ir/operators.py`) | `[x]` | PR1 filled `TYPE_COMPATIBILITY` matrix |
47
+ | 4 | PII patterns / regex (`security/pii_patterns.py`) | `[x]` | Pre-existing |
48
+ | — | `data_catalog` Postgres jsonb table (`db/postgres/models.py`) | `[x]` | PR1 added `Catalog` SQLAlchemy class + `init_db.py` import. KM-557 renamed `__tablename__` from `catalogs` → `data_catalog`; created fresh (no migration) |
49
+ | — | `QueryResult` shape (`query/executor/base.py`) | `[x]` | Pre-existing scaffold; `columns: list[str]` added (TAB owner, PR1-tab) — DbExecutor updated to populate it. |
50
+ | — | `Source.location_ref` URI scheme | `[x]` | PR1 documented in `catalog/models.py` docstring |
51
+
52
+ ### Ingestion — introspection
53
+
54
+ | # | Item | Owner | Status | Notes |
55
+ |---|---|---|---|---|
56
+ | 5 | DB introspector (`catalog/introspect/database.py`) | DB | `[x]` | PR1 — reuses Phase 1 `database_client_service`, `db_credential_encryption`, `db_pipeline_service.engine_scope`, `extractor.get_schema/profile_column/get_row_count`. PR2a wired FK extraction (was discarded before). |
57
+ | 6 | Tabular introspector (`catalog/introspect/tabular.py`) | TAB | `[~]` | PR1-tab — downloads original blob (CSV/XLSX/Parquet), one Table per sheet (XLSX) or one Table (CSV/Parquet). `source_id = document_id`. `fetch_doc`/`fetch_blob` injectable for unit tests (no Settings). |
58
+ | 7 | `BaseIntrospector` ABC (`catalog/introspect/base.py`) | B | `[x]` | Pre-existing; signature locked |
59
+
60
+ ### Ingestion — shared catalog plumbing
61
+
62
+ | # | Item | Owner | Status | Notes |
63
+ |---|---|---|---|---|
64
+ | 8 | ~~Catalog enricher + prompt~~ | B | **REMOVED in KM-557** | Cost optimization — planner reads stats + sample rows + column names directly. `catalog/enricher.py` + `config/prompts/catalog_enricher.md` deleted. `render_source` (the only piece still needed) moved to `src/catalog/render.py`. Tests moved to `tests/catalog/test_render.py`. |
65
+ | 9 | Catalog validator (`catalog/validator.py`) | B | `[x]` | PR1 (DB owner picked up) — uniqueness invariants |
66
+ | 10 | Catalog store — Postgres jsonb (`catalog/store.py`) | B | `[x]` | PR1 (DB owner picked up) — `INSERT ... ON CONFLICT` |
67
+ | 11 | Catalog reader (`catalog/reader.py`) | B | `[x]` | PR1 (DB owner picked up) — filters by source_hint, empty on miss |
68
+ | 12 | PII detector (`catalog/pii_detector.py`) | B | `[x]` | PR1 (DB owner picked up) — name + value matching, bias toward over-flag |
69
+
70
+ ### Ingestion — pipelines
71
+
72
+ | # | Item | Owner | Status | Notes |
73
+ |---|---|---|---|---|
74
+ | 13 | Structured pipeline (`pipeline/structured_pipeline.py`) | B | `[x]` | PR2a (DB owner) — Source-type-agnostic: caller supplies the introspector. `default_structured_pipeline()` factory wires production deps lazily so tests can inject mocks without `Settings()` construction. **KM-557**: enrich step removed; pipeline is now `introspect → merge with existing → validate → upsert`. Constructor no longer takes `enricher`. |
75
+ | 14 | Triggers (`pipeline/triggers.py`) | B | `[x]` | PR2a — `on_db_registered` implemented (DB owner). PR1-tab — `on_tabular_uploaded` implemented (TAB owner). **2026-05-11** — `on_document_uploaded` implemented. **2026-05-12** — `on_catalog_rebuild_requested` implemented: iterates all Sources in current catalog, re-runs `on_db_registered` (schema) or `on_tabular_uploaded` (tabular) per source; per-source errors logged but don't abort. |
76
+ | 15 | Ingestion orchestrator (`pipeline/orchestrator.py`) | B | **DELETED** | Redundant stub — `StructuredPipeline` already takes introspector at run() time. Deleted in Cleanup PR. |
77
+ | 16 | Document pipeline (`pipeline/document_pipeline.py`) | TAB | `[x]` | Flattened `pipeline/document_pipeline/document_pipeline.py` (folder) → `pipeline/document_pipeline.py` (file). Updated import in `api/v1/document.py`. |
78
+
79
+ ### Query — shared spine
80
+
81
+ | # | Item | Owner | Status | Notes |
82
+ |---|---|---|---|---|
83
+ | 17 | IR validator (`query/ir/validator.py`) | B | `[x]` | PR1 (DB owner) — full rule set; descriptive errors for planner retry |
84
+ | 18 | Planner LLM service (`query/planner/service.py`) | B | `[x]` | PR2b — Azure OpenAI structured output → `QueryIR`. Injectable chain. Supports retry via `previous_error` argument. |
85
+ | 19 | Planner prompt (`query/planner/prompt.py`, `config/prompts/query_planner.md`) | B | `[x]` | PR2b — system prompt with hard constraints + few-shot for DB and tabular sources. `build_planner_prompt(question, catalog, previous_error)` calls `catalog.render.render_source` (renamed from `catalog.enricher.render_source` in KM-557). |
86
+ | 20 | Intent router (`agents/orchestration.py` — class `OrchestratorAgent`; `config/prompts/intent_router.md`) | B | `[x]` | PR2b — single LLM call → `IntentRouterDecision(needs_search, source_hint, rewritten_query)`. Supports conversation history. **NOTE**: source filename + class name were kept from Phase 1 for import-site compatibility; only the body is Phase 2. Prompt file and test file use the `intent_router` name. |
87
+ | 21 | Executor base + `QueryResult` (`query/executor/base.py`) | B | `[x]` | Pre-existing scaffold |
88
+ | 22 | Executor dispatcher (`query/executor/dispatcher.py`) | B | `[x]` | PR4 — picks DbExecutor / TabularExecutor by `source.source_type`. Lazy imports of production executors keep import side-effect-free for tests. Caches per source_type. |
89
+ | 23 | Compiler base ABC (`query/compiler/base.py`) | B | `[x]` | Pre-existing scaffold |
90
+ | 24 | Top-level QueryService (`query/service.py`) | B | `[x]` | PR4+5 — `plan → validate → dispatch → execute → QueryResult`. Retry loop on validation failure (max 3, planner re-prompted with prior error). Catches NotImplementedError from TabularExecutor placeholder gracefully. Never raises. |
91
+
92
+ ### Query — DB path
93
+
94
+ | # | Item | Status | Notes |
95
+ |---|---|---|---|
96
+ | 25 | SQL compiler (`query/compiler/sql.py`) | `[x]` | PR3-DB — Postgres dialect (Supabase reuses); deterministic IR → (sql, named-params dict); double-quoted identifiers from catalog; all whitelisted ops (=, !=, <, <=, >, >=, in, not_in, is_null, is_not_null, like, between); alias-aware order_by; `CompiledSql.params: dict[str, Any]` (changed from `list`). MySQL/BigQuery/Snowflake compilers later. |
97
+ | 26 | DB executor (`query/executor/db.py`) | `[x]` | PR3-DB — sync engine via `db_pipeline_service.engine_scope` inside `asyncio.to_thread`. sqlglot SELECT-only / no-DML guard. Postgres-only session settings: `default_transaction_read_only=on` + `statement_timeout=30000`. asyncio.wait_for backstop. Never raises — populates `QueryResult.error`. 10k row hard cap. |
98
+ | 27 | Credential encryption (`security/credentials.py`) | `[ ]` | Stub exists; PR1 reused Phase 1 `utils/db_credential_encryption.py` instead. Move in cleanup PR |
99
+ | 28 | User-DB connection management | `[x]` | PR3-DB reused Phase 1 `db_pipeline_service.engine_scope` (same as PR1 introspector); no new helper needed |
100
+
101
+ ### Query — Tabular path
102
+
103
+ | # | Item | Status | Notes |
104
+ |---|---|---|---|
105
+ | 29 | Pandas compiler (`query/compiler/pandas.py`) | `[~]` | PR3-TAB — `CompiledPandas` dataclass; all 12 filter ops; all 6 aggs; group_by via `pd.concat` of Series; alias-aware order_by; `_like_to_regex` (`%`→`.*`, `_`→`.`); pure module-level helpers |
106
+ | 30 | Tabular executor (`query/executor/tabular.py`) | `[~]` | PR3-TAB — `fetch_blob` injectable for tests; blob path: single-table → `{uid}/{did}.parquet`, multi-table → `{uid}/{did}__{table.name}.parquet`; `asyncio.to_thread`; 10k row hard cap; errors → `QueryResult.error` |
107
+ | 31 | Parquet upload/download wrapper | `[x]` | Moved `knowledge/parquet_service.py` → `storage/parquet.py`. Updated 4 import sites: `pipeline/document_pipeline.py`, `knowledge/processing_service.py`, `query/executor/tabular.py`, `query/executors/tabular.py`. |
108
+
109
+ ### Agents + chat
110
+
111
+ | # | Item | Status | Notes |
112
+ |---|---|---|---|
113
+ | 32 | Chatbot agent + prompt (`agents/chatbot.py`, `config/prompts/chatbot_system.md`) | `[x]` | PR7-bundle — `ChatbotAgent` (was `AnswerAgent`) streams tokens, accepts `QueryResult` or list[`DocumentChunk`] or neither. **Cleanup PR**: renamed `answer_agent.py` → `chatbot.py`, `AnswerAgent` → `ChatbotAgent`; Phase 1 `agents/chatbot.py` deleted. |
114
+ | 33 | Guardrails prompt (`config/prompts/guardrails.md`) | `[x]` | PR7-bundle — appended to `chatbot_system.md` so guardrails take precedence in conflict. |
115
+ | — | Chat handler / orchestrator (`agents/chat_handler.py`) | `[x]` | PR4-bundle — top-level Phase 2 orchestrator. Routes by `source_hint`: chat → AnswerAgent direct; structured → CatalogReader + QueryService; unstructured → DocumentRetriever placeholder + AnswerAgent. Yields `intent` / `chunk` / `done` / `error` SSE-style events. Phase 1 chat.py NOT touched — cleanup PR rewires the API to call this. |
116
+
117
+ ### API surface
118
+
119
+ | # | Item | Owner | Status | Notes |
120
+ |---|---|---|---|---|
121
+ | 34 | DB client endpoints (`api/v1/db_client.py`) | DB | `[x]` | **Cleanup PR** — `/ingest` now calls only `on_db_registered`. Phase 1 `db_pipeline_service.run()` + `decrypt_credentials_dict` removed. Error from catalog build now raises HTTP 500 (was silent log). Response simplified to `{"status": "success", "client_id": ...}`. |
122
+ | 35 | Document/tabular upload endpoints (`api/v1/document.py`) | TAB | `[x]` | Rewired `/document/process` — after processing CSV/XLSX, calls `on_tabular_uploaded(document_id, user_id)`. Catalog ingestion failure is logged but does not fail the request. **2026-05-11** — CSV/XLSX no longer ingested to vector store (`knowledge_processor` skipped for tabular types in `document_pipeline.py`); they go to catalog only. |
123
+ | 36 | Chat stream endpoint (`api/v1/chat.py`) | B | `[x]` | Rewired `/chat/stream` — replaced `query_executor.execute()` (Phase 1) with `CatalogReader + QueryService` (Phase 2). **Cleanup PR**: fully rewired to `ChatHandler.handle()`. Inline intent routing, retrieval, and answer generation removed. Redis cache, fast intent, history loading, and message persistence remain in chat.py. Sources event emits `[]` (retrieval not yet exposed by ChatHandler). |
124
+ | 37 | Room / users endpoints (`api/v1/room.py`, `api/v1/users.py`) | B | `[ ]` | No catalog work; only touch if auth flow changes |
125
+ | — | Data catalog index endpoint (`api/v1/data_catalog.py`) | DB | `[x]` | **KM-557** — `GET /api/v1/data-catalog/{user_id}` → `list[CatalogIndexEntry]`. **Cleanup PR** — added `POST /api/v1/data-catalog/rebuild?user_id=` → calls `on_catalog_rebuild_requested`; per-source errors logged but don't fail the request. |
126
+
127
+ ### Tests + eval
128
+
129
+ | # | Item | Owner | Status | Notes |
130
+ |---|---|---|---|---|
131
+ | 38 | DB compiler golden tests (`tests/query/compiler/test_sql.py`) | DB | `[x]` | PR3-DB — 36 tests across all whitelisted ops, identifier quoting, agg / count_distinct / count(*), order_by alias resolution, parameter sequencing, error paths. Pure-Python, no LLM, no DB. |
132
+ | 39 | Pandas compiler golden tests (`tests/unit/query/compiler/test_pandas_compiler.py`) | TAB | `[~]` | PR3-TAB — 43 tests: all 12 filter ops, all 6 aggs, group_by, order_by, limit, aliases, empty DataFrame, error paths. `test_tabular_executor.py` adds 12 more (blob name resolution + happy path + error paths). |
133
+ | 40 | IR validator tests (`tests/query/ir/test_validator.py`) | B | `[x]` | PR1 — 19 tests, all rules covered |
134
+ | — | PII detector tests (`tests/catalog/test_pii_detector.py`) | B | `[x]` | PR1 — 26 tests (parametrized) |
135
+ | — | Catalog validator tests (`tests/catalog/test_validator.py`) | B | `[x]` | PR1 — 5 tests |
136
+ | — | Catalog render tests (`tests/catalog/test_render.py`) | B | `[x]` | **KM-557** — 5 tests (renamed from `test_enricher.py`; LLM enrichment tests dropped, render-only tests kept). |
137
+ | — | Catalog store integration test (`tests/catalog/test_store.py`) | DB | `[x]` | PR1 — module-level skip without `RUN_INTEGRATION_TESTS=1` |
138
+ | — | DB introspector test | DB | `[ ]` | Deferred to PR2 — needs Postgres testcontainer or fixture infra |
139
+ | — | Tabular introspector test | TAB | `[x]` | PR1-tab — 31 unit tests (CSV/XLSX/Parquet, stats, PII, error paths). No DB/blob I/O — mocks injected via constructor. |
140
+ | 41 | Planner eval (`tests/query/planner/`) | B | `[x]` | PR6-scaffold — `test_golden_questions.py` with 3 DB-targeting cases. TAB added `test_golden_tabular.py` with 4 tabular cases (group_by+sum, top-N+limit, date range filter, XLSX sheet selection). All 4 passed against real Azure OpenAI. Fix shipped alongside: `query/planner/service.py` replaced `("system", text)` tuple with `SystemMessage` — without this, `{...}` in `query_planner.md` was parsed as f-string variables and crashed on every real invocation. |
141
+ | 42 | E2E smoke tests (`tests/e2e/`) | B | `[ ]` | Defer until Phase 2 endpoints are wired (cleanup PR). Component-level orchestration is already covered by `test_chat_handler.py` + `test_service.py`. |
142
+ | — | Golden IR fixtures (`tests/fixtures/golden_irs.json`) | B | `[~]` | PR1 seeded with 5 DB-targeting examples; TAB extends in PR1-tab |
143
+ | — | Shared `sample_catalog` fixture (`tests/conftest.py`) | B | `[x]` | PR1 — DB-shaped; TAB may add tabular sibling |
144
+
145
+ ---
146
+
147
+ ## What just shipped (2026-05-12 — Cleanup PR)
148
+
149
+ **Phase 1 removal + Phase 2 API rewiring:**
150
+ - `src/api/v1/chat.py` — fully rewired to `ChatHandler.handle()`. Removed inline IntentRouter, retrieval, and ChatbotAgent calls. Redis cache, fast intent, load_history, save_messages stay in chat.py.
151
+ - `src/api/v1/db_client.py` — `/ingest` now calls only `on_db_registered`. Phase 1 `db_pipeline_service.run()` block removed. Catalog build failure now raises HTTP 500.
152
+ - `src/api/v1/data_catalog.py` — added `POST /api/v1/data-catalog/rebuild` endpoint.
153
+ - `src/pipeline/triggers.py` — `on_catalog_rebuild_requested` implemented: iterates catalog sources, re-runs the appropriate trigger per source type, per-source errors logged.
154
+
155
+ **Dead modules deleted:**
156
+ - `src/agents/chatbot.py` (Phase 1 LangChain chatbot)
157
+ - `src/pipeline/orchestrator.py` (empty stub)
158
+ - `src/query/base.py` (old duplicate of `executor/base.py`)
159
+ - `src/api/v1/knowledge.py` (fake `/knowledge/rebuild` endpoint)
160
+ - `src/config/agents/` (folder — prompts only used by deleted Phase 1 chatbot)
161
+
162
+ **Renames:**
163
+ - `src/agents/answer_agent.py` → `src/agents/chatbot.py`; `AnswerAgent` → `ChatbotAgent`; updated all import sites (`chat_handler.py`, `chat.py`)
164
+
165
+ **Fixes + improvements:**
166
+ - `src/agents/chat_handler.py` — `_get_document_retriever()` now returns `RetrievalRouter` (Redis-cached) instead of `DocumentRetriever` directly; retrieval-level cache restored.
167
+ - `src/retrieval/router.py` — removed dead `db: AsyncSession` and `source_hint` parameters + `_UNSTRUCTURED_HINTS` constant from `retrieve()`. Cache key simplified.
168
+ - `src/knowledge/processing_service.py` — removed dead `_build_csv_documents`, `_build_excel_documents`, `_profile_dataframe`, `_to_sheet_document` methods + `pandas` and `upload_parquet` imports.
169
+ - `src/catalog/models.py` — added `top_values: list[Any] | None` to `ColumnStats`.
170
+ - `src/catalog/introspect/tabular.py` — `_to_column` now populates `top_values` for columns with ≤10 distinct values; useful for query planner WHERE clause generation.
171
+ - `main.py` — replaced deprecated `@app.on_event("startup")` with `lifespan` context manager; removed `knowledge_router`.
172
+
173
+ ---
174
+
175
+ ## What just shipped (KM-557 — DB owner)
176
+
177
+ After lead review of the catalog ingestion cost: dropped LLM enrichment,
178
+ renamed the storage table, and exposed a lightweight index endpoint for
179
+ the upcoming catalog refresher.
180
+
181
+ **Files deleted**:
182
+ - `src/catalog/enricher.py` — entire CatalogEnricher + EnrichmentResponse + apply_descriptions removed
183
+ - `src/config/prompts/catalog_enricher.md` — dead prompt
184
+ - `tests/catalog/test_enricher.py` — replaced by `test_render.py`
185
+
186
+ **Files added**:
187
+ - `src/catalog/render.py` — new home for `render_source` (the only piece of the old enricher still needed; consumed by `query/planner/prompt.py`)
188
+ - `src/api/v1/data_catalog.py` — `GET /api/v1/data-catalog/{user_id}` returns `list[CatalogIndexEntry]`
189
+ - `tests/catalog/test_render.py` — 5 tests (same coverage as the old render block)
190
+
191
+ **Files modified**:
192
+ - `src/db/postgres/models.py` — `__tablename__ = "data_catalog"` (was `"catalogs"`). Class name unchanged
193
+ - `src/pipeline/structured_pipeline.py` — `StructuredPipeline(validator, store)` (was `(enricher, validator, store)`); pipeline is now `introspect → merge → validate → upsert`; `default_structured_pipeline()` no longer constructs an enricher
194
+ - `src/pipeline/triggers.py` — docstrings updated; `on_catalog_rebuild_requested` docstring rewritten for the refresher use case
195
+ - `src/query/planner/prompt.py` — import now `from ...catalog.render import render_source`
196
+ - `src/catalog/introspect/{base,database,tabular}.py` — docstring scrubs (no behavior changes)
197
+ - `src/models/api/catalog.py` — added `CatalogIndexEntry`; simplified `CatalogRebuildResponse` to `sources_rebuilt`
198
+ - `main.py` — registered `data_catalog_router`
199
+ - `src/security/README.md` — one stale wording fix
200
+
201
+ **No migration**: the `data_catalog` table is created from scratch on first `init_db()`. The old `catalogs` table was never deployed against production data, so no rename SQL is needed.
202
+
203
+ **Tests**: all 4 `test_structured_pipeline.py` tests reworked to construct `StructuredPipeline(validator=, store=)` without `enricher`. 5 `test_render.py` tests cover render_source standalone.
204
+
205
+ **Lint**: `ruff check` clean on modified Phase 2 paths.
206
+
207
+ **Open follow-ups left for the lead**:
208
+ - `on_catalog_rebuild_requested` body — the refresher will iterate the index endpoint and call this trigger per source
209
+ - `api/v1/db_client.py` `/ingest` still doesn't call `on_db_registered` — same blocker as before, untouched by KM-557
210
+
211
+ ---
212
+
213
+ ## What just shipped (2026-05-11 — retrieval migration + bug fixes)
214
+
215
+ **Files implemented / migrated**:
216
+ - `src/retrieval/base.py` — `RetrievalResult` dataclass + `BaseRetriever` ABC (was in `src/rag/base.py`)
217
+ - `src/retrieval/document.py` — full `DocumentRetriever` migrated from `src/rag/retrievers/document.py`; all retrieval methods (MMR/cosine/euclidean/inner_product/manhattan). Tabular file types filtered out from results.
218
+ - `src/retrieval/router.py` — `RetrievalRouter` (Redis-cached, unstructured-only). `invalidate_cache(user_id)` clears all `retrieval:{user_id}:*` keys.
219
+
220
+ **Deleted** (no longer used):
221
+ - `src/rag/` — entire folder (base.py, retriever.py, router.py, retrievers/)
222
+ - `src/tools/` — entire folder (search.py was the only real file; only called by deleted rag/ router)
223
+
224
+ **Bug fixes**:
225
+ - `src/pipeline/document_pipeline.py` — `retrieval_router.invalidate_cache(user_id)` called after `process()` and `delete()`. Redis failure is caught and logged (does not fail the document op).
226
+ - `src/pipeline/document_pipeline.py` — CSV/XLSX now skips `knowledge_processor` (vector store). Tabular files go to catalog only; no duplicate embeddings.
227
+ - `src/pipeline/triggers.py` — `on_document_uploaded` implemented (was `raise NotImplementedError`).
228
+ - `src/agents/chat_handler.py` — `_normalize_chunks` now handles `RetrievalResult` objects. Previously they were silently dropped, causing empty context for unstructured queries through ChatHandler.
229
+
230
+ **Import updates** (all changed from `src.rag.*` → `src.retrieval.*`):
231
+ - `src/api/v1/chat.py`, `src/query/base.py`, `src/query/query_executor.py`, `src/query/executors/db_executor.py`, `src/query/executors/tabular.py`
232
+
233
+ ---
234
+
235
+ ## What shipped previously (PR2b/4/5/6/7-bundle — DB owner solo, teammate reviews)
236
+
237
+ **Files implemented**:
238
+ - `src/agents/orchestration.py` — `OrchestratorAgent.classify(message, history) → IntentRouterDecision`. Pydantic model for structured output. History-aware query rewriting. Phase 1 filename + class name preserved; body fully rewritten for Phase 2.
239
+ - `src/agents/answer_agent.py` — `AnswerAgent.astream(...)` streams answer tokens; accepts `QueryResult` and/or `list[DocumentChunk]`. Renames to `chatbot.py` in cleanup PR.
240
+ - `src/agents/chat_handler.py` — `ChatHandler.handle(message, user_id, history)` returns `AsyncIterator[dict]` of `intent` / `chunk` / `done` / `error` SSE events. All deps injectable; lazy default builders.
241
+ - `src/query/planner/prompt.py` — `render_catalog(catalog)` + `build_planner_prompt(question, catalog, previous_error)`. Reuses `catalog.enricher.render_source` for consistency across LLM call sites.
242
+ - `src/query/planner/service.py` — `QueryPlannerService.plan(question, catalog, previous_error)` Azure OpenAI structured output → `QueryIR`.
243
+ - `src/query/executor/dispatcher.py` — `ExecutorDispatcher.pick(ir) → BaseExecutor` by `source.source_type`. Lazy executor imports + per-source-type cache.
244
+ - `src/query/service.py` — `QueryService.run(user_id, question, catalog) → QueryResult`. Plan→validate→retry-on-failure (max 3)→dispatch→execute. Catches NotImplementedError from TabularExecutor placeholder gracefully.
245
+
246
+ **Prompts written** (filled in placeholders):
247
+ - `src/config/prompts/intent_router.md`
248
+ - `src/config/prompts/query_planner.md`
249
+ - `src/config/prompts/chatbot_system.md`
250
+ - `src/config/prompts/guardrails.md`
251
+
252
+ **Tests added** (46 new — total now 146 + 2 skipped):
253
+ - `tests/agents/test_intent_router.py` (4)
254
+ - `tests/agents/test_answer_agent.py` (12)
255
+ - `tests/agents/test_chat_handler.py` (6)
256
+ - `tests/query/planner/test_prompt.py` (7)
257
+ - `tests/query/planner/test_service.py` (3)
258
+ - `tests/query/executor/test_dispatcher.py` (5)
259
+ - `tests/query/test_service.py` (8)
260
+ - `tests/query/planner/test_golden_questions.py` (3 — skipped by default; eval harness scaffold)
261
+
262
+ **Lint**: `ruff check` clean on all Phase 2 paths. Phase 1 files have pre-existing E501/S608 issues — out of scope for this PR.
263
+
264
+ **Placeholders / blockers for teammate** (status as of DB owner's commit, before merge):
265
+ - `src/query/executor/tabular.py` (TAB) — DB owner's note: "still raises NotImplementedError". **Post-merge**: TAB shipped this in PR3-TAB; dispatcher now routes to the real `TabularExecutor`. The `NotImplementedError` catch in `QueryService` stays as a safety net.
266
+ - `src/retrieval/document.py` — **implemented** (2026-05-11). Full `DocumentRetriever` migrated from `src/rag/retrievers/document.py`; supports MMR/cosine/euclidean/manhattan/inner_product. `_normalize_chunks` in `chat_handler.py` now handles `RetrievalResult` → `DocumentChunk` conversion correctly.
267
+ - `src/api/v1/chat.py` (Phase 1) — NOT touched. Cleanup PR rewires the SSE endpoint to call `ChatHandler.handle(...)`.
268
+ - `src/api/v1/db_client.py` (Phase 1) — NOT touched. Cleanup PR rewires `/database-clients/{id}/ingest` to call `pipeline.triggers.on_db_registered`.
269
+
270
+ ---
271
+
272
+ ## What shipped previously (PR3-TAB — TAB owner)
273
+
274
+ **Files implemented**:
275
+ - `src/query/compiler/pandas.py` — `PandasCompiler` + `CompiledPandas(apply, output_columns)` dataclass. Pure helper functions (easier to test in isolation): `_apply_filters` (all 12 ops, `_like_to_regex` for LIKE), `_apply_select` (column pick + rename), `_apply_agg` (scalar + group_by via `pd.concat` of Series → `reset_index`), `_apply_orderby` (alias-aware via `_resolve_order_col`). Closure captures all IR fields explicitly so `apply(df)` is self-contained.
276
+ - `src/query/executor/tabular.py` — `TabularExecutor` with injectable `fetch_blob` (same testability pattern as `TabularIntrospector`). Resolves Parquet blob path from `az_blob://{uid}/{did}` + table: single-table → `{uid}/{did}.parquet`, multi-table → `{uid}/{did}__{table.name}.parquet`. Runs compile → download → `asyncio.to_thread(_load_and_apply)` → 10k hard cap. Never raises; errors populate `QueryResult.error`. Uses `compiled.output_columns` for column labels (safe on empty DataFrame).
277
+
278
+ **Tests added** (55 new — total suite was 86 all passing at PR3-TAB time):
279
+ - `tests/unit/query/compiler/test_pandas_compiler.py` — 43 tests across all 12 filter ops (including `is_null`, `not_in`, `like`, `between`), all 6 agg fns, group_by, order_by asc/desc, limit-after-order, alias round-trip, empty DataFrame, error paths.
280
+ - `tests/unit/query/executor/test_tabular_executor.py` — 12 tests: `_resolve_blob_name` (single/multi-table, bad prefix), happy-path `QueryResult` shape (columns, rows, backend, truncated, source_id), wrong source_type → error, blob fetch failure → error, unknown source → error.
281
+
282
+ **Lint**: `ruff check` clean on both files.
283
+
284
+ ---
285
+
286
+ ## What shipped previously (PR1-tab — TAB owner)
287
+
288
+ **Files implemented**:
289
+ - `src/catalog/introspect/tabular.py` — `TabularIntrospector` reads original blob (CSV/XLSX/Parquet), profiles each column (dtype, stats, sample values), runs PIIDetector. For XLSX: one `Table` per sheet (`Table.name = sheet_name`); for CSV/Parquet: one `Table` (`Table.name = filename stem`). `fetch_doc`/`fetch_blob` are constructor-injectable for unit tests — no `Settings` or DB required at import time.
290
+ - `src/pipeline/triggers.py` — `on_tabular_uploaded` wired (mirrors `on_db_registered` pattern).
291
+
292
+ **Tests added** (31 new):
293
+ - `tests/unit/catalog/test_introspect_tabular.py` — CSV / XLSX / Parquet shapes, per-column stats, nullable detection, PII name + value matching, sample capping, all error paths. Pure Python, no network I/O.
294
+
295
+ **Executor contract note**: introspector downloads the *original* blob for schema reading. The tabular executor (PR3-TAB) downloads *Parquet* blobs for query execution. For CSV/Parquet sources (single table), the executor must call `parquet_blob_name(uid, did, sheet_name=None)`; for XLSX (multi-table), `parquet_blob_name(uid, did, table.name)`.
296
+
297
+ ---
298
+
299
+ ## What shipped previously (PR3-DB — DB owner)
300
+
301
+ **Files implemented**:
302
+ - `src/query/compiler/sql.py` — `SqlCompiler` for Postgres dialect; `CompiledSql(sql, params)` dataclass with `params: dict[str, Any]` (changed from `list`); supports all 12 whitelisted filter ops, all 6 aggs, alias-aware order_by; `_qident` escapes embedded double-quotes
303
+ - `src/query/executor/db.py` — `DbExecutor` with sqlglot SELECT-only guard, Postgres session-level read-only + 30s `statement_timeout`, `asyncio.wait_for` backstop, 10k row hard cap; rejects non-`schema` source_type and `dbclient://` URI mismatch; never raises (populates `QueryResult.error`)
304
+
305
+ **Files extended**:
306
+ - `src/query/compiler/pandas.py` — fixed pre-existing UP035 (Callable import)
307
+ - `pyproject.toml` — added `S608` to `tests/**` ruff ignore (false positive: tests assert literal SQL strings)
308
+
309
+ **Tests added** (36 new, all passing — total now 100):
310
+ - `tests/query/compiler/test_sql.py` — every filter op, every agg, count(*), count_distinct, order_by alias vs column, multi-filter AND, identifier quoting escape, error paths
311
+
312
+ **Lint**: `ruff check` clean on Phase 2 paths.
313
+
314
+ **Hand-off note for teammate**: `CompiledSql.params` is now `dict[str, Any]` not `list`. The pandas compiler will follow the same convention (or document its own) — coordinate when PR3-TAB lands.
315
+
316
+ ---
317
+
318
+ ## What shipped previously (PR2a — DB owner)
319
+
320
+ **Files implemented**:
321
+ - `src/catalog/enricher.py` — Azure OpenAI GPT-4o + structured output (`EnrichmentResponse`), `render_source` (reusable by planner prompt later), `apply_descriptions` merger, injectable `structured_chain` for tests
322
+ - `src/pipeline/structured_pipeline.py` — `StructuredPipeline` orchestrator + `default_structured_pipeline()` factory with lazy production-dep imports
323
+ - `src/pipeline/triggers.py` — `on_db_registered` wired; tabular/document/rebuild stubs preserved with implementation notes
324
+
325
+ **Files extended**:
326
+ - `src/catalog/models.py` — added `ForeignKey` model, `Table.foreign_keys: list[ForeignKey] = []`
327
+ - `src/catalog/introspect/database.py` — `_extract_foreign_keys` populates `Table.foreign_keys` from extractor data
328
+ - `src/config/prompts/catalog_enricher.md` — full system prompt with style rules and one few-shot example
329
+
330
+ **Tests added** (14 new, all passing — total now 64):
331
+ - `tests/catalog/test_enricher.py` — render / apply / end-to-end with fake chain (10 tests)
332
+ - `tests/pipeline/test_structured_pipeline.py` — orchestration with stub deps (4 tests)
333
+
334
+ **Lint**: `ruff check` clean on all Phase 2 paths. Phase 1 files (`pipeline/db_pipeline/`, `pipeline/document_pipeline/`) have pre-existing ruff issues — out of scope for this PR.
335
+
336
+ ---
337
+
338
+ ## What shipped previously (PR1 — DB owner's first chunk)
339
+
340
+ **Files implemented** (was `NotImplementedError`):
341
+ - `src/catalog/pii_detector.py`, `src/catalog/validator.py`, `src/catalog/store.py`, `src/catalog/reader.py`
342
+ - `src/catalog/introspect/database.py` (FK extraction added in PR2a)
343
+ - `src/query/ir/validator.py`
344
+
345
+ **Files extended**:
346
+ - `src/query/ir/operators.py` — `TYPE_COMPATIBILITY` matrix
347
+ - `src/catalog/models.py` — `location_ref` URI-scheme docstring
348
+ - `src/db/postgres/models.py` — `Catalog` SQLAlchemy table; `init_db.py` imports it
349
+
350
+ **Tests**: 50 unit tests + 1 integration (gated on `RUN_INTEGRATION_TESTS=1`).
351
+
352
+ **Reused Phase 1 utilities** (cleanup deferred):
353
+ - `src/database_client/database_client_service.py:get`
354
+ - `src/utils/db_credential_encryption.py:decrypt_credentials_dict`
355
+ - `src/pipeline/db_pipeline/db_pipeline_service.py:engine_scope`
356
+ - `src/pipeline/db_pipeline/extractor.py:get_schema/profile_column/get_row_count`
357
+
358
+ ---
359
+
360
+ ## Open contract items (not yet locked)
361
+
362
+ - **Joins in IR** — currently single-table only (ARCHITECTURE.md §7); DB owner accepted the constraint for v1, will revisit in PR3 if it's blocking real queries
363
+ - **`updated_at` on Source vs `generated_at` on Catalog** — Pydantic models have both; introspector sets per-Source; CatalogStore preserves both
364
+ - **Catalog refresh trigger** (open question §3) — default policy is rebuild-on-upload-or-connect; auto-refresh deferred
365
+ - **Unstructured catalog entries** (open question §2) — currently empty filter for `source_hint="unstructured"`; revisit when adding doc descriptions
366
+ - **PII handling for `sample_values`** (open question §5) — currently nulls them out (skip); mask/synthesize deferred
367
+ - **Dialect priority for SQL compiler** — PR3 will land Postgres first, MySQL second; BigQuery/Snowflake/SQL Server later
368
+
369
+ ---
370
+
371
+ ## How to update this file
372
+
373
+ When a PR lands:
374
+ 1. Flip status from `[ ]` or `[~]` to `[x]`
375
+ 2. Add a short note (file paths, scope cuts, surprises)
376
+ 3. Bump "Last updated" at the top
377
+ 4. If a new contract decision lands, move it from "Open contract items" to the relevant inline note
378
+
379
+ When opening a PR:
380
+ 1. Flip status to `[~]` and add yourself as the active owner in the PR row
381
+ 2. Don't promise items in the PR description that aren't in the table
REPO_CONTEXT.md ADDED
@@ -0,0 +1,474 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # Repo Context — Agentic Service Data Eyond Catalog
2
+
3
+ Orientation file for future Claude Code sessions. Cross-reference `ARCHITECTURE.md` for the full design rationale and decision log.
4
+
5
+ ---
6
+
7
+ ## TL;DR
8
+
9
+ FastAPI multi-agent backend for data analysis. Users upload documents and register databases / tabular files; they ask natural-language questions and get answers grounded in their data, streamed via SSE.
10
+
11
+ The architecture has two paths:
12
+
13
+ - **Unstructured** (PDF, DOCX, TXT) — dense similarity over prose chunks (PGVector).
14
+ - **Structured** (databases, XLSX, CSV, Parquet) — a per-user **data catalog** describes what tables/columns exist; an LLM produces a **JSON IR** of intent; a deterministic Python compiler turns the IR into SQL or pandas; the executor runs it.
15
+
16
+ The LLM produces *intent*, not query syntax. Deterministic code does the rest.
17
+
18
+ The Phase 2 end-to-end flow is **wired and runnable** as of 2026-05-12. See *Implementation status* below for the per-file matrix. `PROGRESS.md` is the authoritative line-by-line tracker; this file is the orientation.
19
+
20
+ ---
21
+
22
+ ## Stack
23
+
24
+ - Python 3.12, FastAPI 0.115, uvicorn, sse-starlette
25
+ - Async SQLAlchemy 2.0 + asyncpg (Postgres), psycopg3 (PGVector multi-statement workaround)
26
+ - LangChain 0.3 + langchain-postgres (PGVector) + langchain-openai (Azure OpenAI GPT-4o + embeddings)
27
+ - LangGraph 0.2 + langgraph-checkpoint-postgres
28
+ - Redis 5 (response + retrieval cache)
29
+ - Azure Blob Storage (uploads + Parquet)
30
+ - pandas, pyarrow, polars-ready (deferred), sqlglot, pydantic v2, structlog, slowapi, langfuse
31
+ - presidio-analyzer + spaCy `en_core_web_lg` (PII), pytesseract + pdf2image (PDF OCR)
32
+ - DB connectors: psycopg2, pymysql, pymssql, sqlalchemy-bigquery, snowflake-sqlalchemy
33
+
34
+ Run: `uv run --no-sync uvicorn main:app --host 0.0.0.0 --port 7860`. On Windows use `uv run --no-sync python run.py` (sets `WindowsSelectorEventLoopPolicy` for psycopg3 async).
35
+
36
+ ---
37
+
38
+ ## Top-level layout
39
+
40
+ ```
41
+ main.py — FastAPI app + middleware + router wiring + init_db() on startup
42
+ run.py — Windows-safe local entry point
43
+ ARCHITECTURE.md — design intent (source of truth for shape + invariants)
44
+ README.md
45
+ Dockerfile — python:3.12-slim, installs spaCy en_core_web_lg, tesseract, poppler
46
+ pyproject.toml / uv.lock
47
+ scripts/ — backfill scripts (build_initial_catalogs, enrich_all_sources)
48
+ src/ — all application code
49
+ ```
50
+
51
+ ---
52
+
53
+ ## src/ map
54
+
55
+ ### Core data shapes (only files with real content)
56
+
57
+ | Path | Role |
58
+ |---|---|
59
+ | `catalog/models.py` | Pydantic: `Catalog → Source[] → Table[] → Column[]` |
60
+ | `query/ir/models.py` | `QueryIR` (select / filters / group_by / order_by / limit) |
61
+ | `query/ir/operators.py` | `ALLOWED_FILTER_OPS`, `ALLOWED_AGG_FNS`, `LIMIT_HARD_CAP=10000` |
62
+ | `security/pii_patterns.py` | name patterns + email/phone regex for PII detection |
63
+
64
+ ### Catalog — identity layer for structured sources (Cs ∪ Ct)
65
+
66
+ | Path | Role |
67
+ |---|---|
68
+ | `catalog/introspect/base.py` | `BaseIntrospector.introspect(location_ref) -> Source` |
69
+ | `catalog/introspect/database.py` | `information_schema` + ~100 row sample → draft Source |
70
+ | `catalog/introspect/tabular.py` | Parquet/CSV/XLSX header reader + sample (one Table per sheet for XLSX) |
71
+ | `catalog/render.py` | renders a `Source` as the canonical text block consumed by the planner (KM-557; LLM enrichment removed — planner reads stats + samples directly) |
72
+ | `catalog/validator.py` | invariants beyond Pydantic shape (unique IDs, FK refs) |
73
+ | `catalog/store.py` | persist as Postgres `jsonb` row keyed by user_id (`get/upsert/delete`) — table `data_catalog` |
74
+ | `catalog/reader.py` | load + filter catalog by source_hint (returns full catalog for ≤50 tables) |
75
+ | `catalog/pii_detector.py` | flag PII columns at ingestion → suppresses `sample_values` |
76
+
77
+ ### Query — catalog-driven structured path
78
+
79
+ | Path | Role |
80
+ |---|---|
81
+ | `query/service.py` | `QueryService.run(user_id, question, catalog) -> QueryResult` (top-level) |
82
+ | `query/planner/service.py` | LLM call: question + catalog → QueryIR (structured output) |
83
+ | `query/planner/prompt.py` | renders catalog into the planner prompt |
84
+ | `query/ir/validator.py` | catalog-aware IR validation: column_ids exist, ops whitelisted, value_type matches data_type, limit ≤ cap |
85
+ | `query/compiler/base.py` | `BaseCompiler.compile(ir) -> object` |
86
+ | `query/compiler/sql.py` | IR → `(sql, params)`; identifiers from catalog, values parameterized |
87
+ | `query/compiler/pandas.py` | IR → callable that runs against a DataFrame |
88
+ | `query/executor/base.py` | `BaseExecutor.run(ir) -> QueryResult` (uniform across backends) |
89
+ | `query/executor/db.py` | runs compiled SQL via asyncpg/pymysql in read-only txn (sqlglot second-line defence) |
90
+ | `query/executor/tabular.py` | runs pandas/polars chain on a Parquet file (eager pandas → pyarrow pushdown → polars lazy by file size) |
91
+ | `query/executor/dispatcher.py` | picks DB vs Tabular executor based on `source.source_type` of the IR's source |
92
+
93
+ ### Retrieval — unstructured path (Cu)
94
+
95
+ | Path | Role |
96
+ |---|---|
97
+ | `retrieval/document.py` | `DocumentRetriever` over PGVector chunks |
98
+ | `retrieval/router.py` | dispatches the `unstructured` route (the `chat` and `structured` routes do not pass through here) |
99
+
100
+ ### Agents — the three LLM call sites
101
+
102
+ | Path | Role |
103
+ |---|---|
104
+ | `agents/orchestration.py` | `OrchestratorAgent` — classifies message → `needs_search`, `source_hint ∈ {chat, unstructured, structured}`, `rewritten_query`. Filename + class name kept from Phase 1; body replaced with Phase 2 logic. Output model is `IntentRouterDecision` |
105
+ | `agents/chatbot.py` | `ChatbotAgent` — final answer formation (receives Cu chunks or QueryResult); SSE-streamed via `astream` |
106
+ | `agents/chat_handler.py` | `ChatHandler` — top-level orchestrator; routes to chat / unstructured / structured and yields SSE-style `intent`/`chunk`/`done`/`error` events |
107
+
108
+ (`QueryPlanner` is the third LLM call site, under `query/planner/`. The
109
+ fourth — `CatalogEnricher` — was removed in KM-557; ingestion no longer
110
+ makes any LLM calls.)
111
+
112
+ ### Pipelines — ingestion coordinators
113
+
114
+ | Path | Role |
115
+ |---|---|
116
+ | `pipeline/structured_pipeline.py` | DB / tabular: introspect → merge → validate → store (no enrich step since KM-557) |
117
+ | `pipeline/document_pipeline.py` | unstructured: extract → chunk → embed → PGVector. CSV/XLSX skip vector store (catalog only). Invalidates retrieval cache on process/delete. |
118
+ | `pipeline/triggers.py` | event entry points called by API routes: `on_db_registered`, `on_tabular_uploaded`, `on_document_uploaded`, `on_catalog_rebuild_requested` |
119
+
120
+ (`pipeline/orchestrator.py` was deleted in the Cleanup PR — it was a redundant stub; `StructuredPipeline` already takes the introspector at `run()` time.)
121
+
122
+ ### Security — cross-cutting
123
+
124
+ | Path | Role |
125
+ |---|---|
126
+ | `security/auth.py` | bcrypt password hash/verify, JWT encode/decode, get_user |
127
+ | `security/credentials.py` | Fernet encrypt/decrypt for stored DB credentials |
128
+ | `security/pii_patterns.py` | (already listed) |
129
+
130
+ ### API + infra + config
131
+
132
+ | Path | Role |
133
+ |---|---|
134
+ | `api/v1/*.py` | FastAPI routers — thin endpoints delegating to `pipeline/triggers` and `query/service` |
135
+ | `models/api/{catalog,chat,document}.py` | request/response Pydantic models |
136
+ | `db/postgres/connection.py` | two async engines: `engine` (app) and `_pgvector_engine` (PGVector) |
137
+ | `db/postgres/init_db.py` | startup: creates `vector` extension, all tables, HNSW + GIN indexes |
138
+ | `db/postgres/models.py` | SQLAlchemy app tables (users, rooms, chat messages, …) |
139
+ | `db/postgres/vector_store.py` | shared PGVector instance (collection `document_embeddings`) |
140
+ | `db/redis/connection.py` | async Redis client |
141
+ | `storage/az_blob/az_blob.py` | Azure Blob async wrapper (uploads + Parquet) |
142
+ | `middlewares/{cors,logging,rate_limit}.py` | CORS allow-all (POC), structlog JSON, slowapi |
143
+ | `observability/langfuse/langfuse.py` | trace helper |
144
+ | `config/settings.py` | pydantic-settings; `.env` uses double-underscore aliases |
145
+ | `config/env_constant.py` | env file path constant |
146
+ | `config/prompts/*.md` | prompt templates: `intent_router`, `query_planner`, `chatbot_system`, `guardrails` (KM-557 removed `catalog_enricher`) |
147
+
148
+ ---
149
+
150
+ ## Core architectural decisions
151
+
152
+ 1. **Catalog as primary context, not retrieval.** For ≤50 tables (typical), the entire catalog is rendered into the planner prompt verbatim (~3–5k tokens). No vector search, no BM25, no top-k for structured data. Catalog-level retrieval (BM25 + table-level vectors with RRF) is the *deferred* upgrade for users with hundreds of tables.
153
+
154
+ 2. **JSON IR over raw SQL.** The planner LLM emits a Pydantic-validated intent, never a SQL string. The compiler is deterministic Python. Benefits: validatable before execution, dialect-portable (one IR → SQL of any dialect / pandas / polars), cheaper tokens, trivially testable without an LLM, and the LLM literally cannot emit invalid SQL syntax.
155
+
156
+ 3. **Deterministic compiler, not LLM SQL writer.** All actual query construction happens in pure code. Compiler bugs are reproducible and fixable. Same IR → same query.
157
+
158
+ 4. **Pipeline stage isolation.** Each stage (`IntentRouter`, `CatalogReader`, `QueryPlanner`, `IRValidator`, `QueryCompiler`, `QueryExecutor`, `ChatbotAgent`) is its own module with typed input and typed output. No god classes.
159
+
160
+ 5. **Minimal LLM surface.** Only three LLM call sites in the system (KM-557 dropped `CatalogEnricher` — ingestion is now LLM-free; the planner reads stats + sample rows + column names directly):
161
+ - `IntentRouter` — once per user message
162
+ - `QueryPlanner` — once per structured query
163
+ - `ChatbotAgent` — once per answer (formatting)
164
+
165
+ 6. **Three-way routing**: `chat` / `unstructured` / `structured`. The router commits to one path. Cross-source questions ("compare DB sales vs uploaded customer file") are handled inside the structured path because the planner sees Cs ∪ Ct in one prompt. **DB vs tabular is not a routing concern** — it's a per-source attribute (`source_type`) that only matters at execution time.
166
+
167
+ 7. **Stable IDs.** `source_id`, `table_id`, `column_id` are stable internal references. Renaming a column in the source DB does not invalidate cached IRs.
168
+
169
+ 8. **PII suppression at the boundary.** Columns flagged with `pii_flag=true` have `sample_values: null` — real PII never enters LLM prompts. Auto-detected at ingestion via name patterns + value regex (`security/pii_patterns.py`). When in doubt, flag — false positives cost nothing; false negatives leak data.
170
+
171
+ ---
172
+
173
+ ## End-to-end flows
174
+
175
+ ### Ingestion (when user uploads a file or connects a DB)
176
+
177
+ ```
178
+ source upload / DB connect
179
+
180
+ ├── unstructured (pdf/docx/txt)
181
+ │ → DocumentPipeline: extract → chunk → embed → PGVector
182
+
183
+ └── structured (DB schema or tabular file)
184
+ → introspect (information_schema or file headers + sample rows)
185
+ → CatalogValidator (Pydantic + unique-IDs + FK refs)
186
+ → CatalogStore.upsert(user_id jsonb row in `data_catalog`)
187
+ ```
188
+
189
+ ### Query (per user message)
190
+
191
+ ```
192
+ user message
193
+
194
+ → Redis cache check (24h TTL) ── miss ─→ continue
195
+
196
+ → IntentRouter LLM → needs_search? source_hint?
197
+
198
+ ├── chat → ChatbotAgent → SSE stream
199
+ ├── unstructured → DocumentRetriever (Cu) → ChatbotAgent → SSE stream
200
+ └── structured →
201
+ CatalogReader.read(user_id, "structured") # full Cs ∪ Ct
202
+
203
+ QueryPlanner LLM(question, catalog) → QueryIR
204
+
205
+ IRValidator.validate(ir, catalog)
206
+ (source_id ∈ catalog, table_id ∈ source, column_ids ∈ table,
207
+ ops/aggs whitelisted, value_type matches data_type, limit ≤ 10000)
208
+ fail → re-prompt planner with error context (max 3 retries)
209
+
210
+ ExecutorDispatcher.pick(ir) # by source.source_type
211
+ ├─ DbExecutor → SqlCompiler → sqlglot guard → asyncpg/pymysql
212
+ │ (read-only txn, 30s timeout)
213
+ └─ TabularExecutor → PandasCompiler → eager pandas (≤100 MB)
214
+ or pyarrow pushdown (100 MB–1 GB)
215
+ or polars lazy scan (>1 GB)
216
+
217
+ QueryResult
218
+
219
+ ChatbotAgent → SSE stream
220
+ ```
221
+
222
+ ---
223
+
224
+ ## Catalog schema (per-user `jsonb` row)
225
+
226
+ ```
227
+ Catalog
228
+ ├── user_id, schema_version, generated_at
229
+ └── sources[]
230
+ └── Source { source_id, source_type, name, description, location_ref, updated_at }
231
+ └── tables[]
232
+ └── Table { table_id, name, description, row_count, foreign_keys[] }
233
+ ├── columns[]
234
+ │ └── Column { column_id, name, data_type, description,
235
+ │ nullable, pii_flag, sample_values[]|null, stats|null }
236
+ └── foreign_keys[]
237
+ └── ForeignKey { column_id, target_table_id, target_column_id }
238
+ ```
239
+
240
+ `source_type ∈ {schema, tabular, unstructured}`.
241
+ `data_type ∈ {int, decimal, string, datetime, date, bool, json}`.
242
+ `ForeignKey` references are within the SAME `Source` only; cross-source FKs are not modeled.
243
+
244
+ Deferred Column fields (add when justified): `description_human`, `synonyms[]`, `tags[]`, `primary_key`, `unit`, `semantic_type`, `example_questions[]`, `schema_hash`, `enrichment_status`.
245
+
246
+ ---
247
+
248
+ ## JSON IR schema
249
+
250
+ ```jsonc
251
+ {
252
+ "ir_version": "1.0",
253
+ "source_id": "...",
254
+ "table_id": "...",
255
+ "select": [
256
+ {"kind": "column", "column_id": "...", "alias": "..."},
257
+ {"kind": "agg", "fn": "count|count_distinct|sum|avg|min|max",
258
+ "column_id": "...?", "alias": "..."}
259
+ ],
260
+ "filters": [
261
+ {"column_id": "...",
262
+ "op": "= | != | < | <= | > | >= | in | not_in | is_null | is_not_null | like | between",
263
+ "value": ...,
264
+ "value_type": "int|decimal|string|datetime|date|bool"}
265
+ ],
266
+ "group_by": ["column_id", ...],
267
+ "order_by": [{"column_id": "...", "dir": "asc|desc"}],
268
+ "limit": 100
269
+ }
270
+ ```
271
+
272
+ Single-table only in v1. `having`, `offset`, boolean filter trees, `distinct`, joins, window functions are deferred until user demand proves the limitation.
273
+
274
+ ---
275
+
276
+ ## Implementation status
277
+
278
+ **As of 2026-05-12 — Phase 2 end-to-end flow is wired.** `PROGRESS.md` has the per-PR line-item table; this section is the high-level snapshot. Stub files (`raise NotImplementedError`) are now the exception, not the rule.
279
+
280
+ | Area | Status | Notes |
281
+ |---|---|---|
282
+ | Catalog Pydantic models | ✅ | `catalog/models.py` — incl. `ForeignKey`, `ColumnStats.top_values` |
283
+ | JSON IR Pydantic models | ✅ | `query/ir/models.py` + `operators.py` (TYPE_COMPATIBILITY filled) |
284
+ | Catalog ingestion — DB | ✅ | introspect → validate → upsert. `on_db_registered` wired; `/api/v1/db-clients/{id}/ingest` calls it |
285
+ | Catalog ingestion — tabular | ✅ | CSV/XLSX/Parquet; `on_tabular_uploaded` wired into `/api/v1/document/process`. XLSX → one Table per sheet. CSV/XLSX skip vector store |
286
+ | Catalog ingestion — unstructured | ✅ | `on_document_uploaded` implemented; full DocumentPipeline (extract → chunk → embed → PGVector) |
287
+ | Catalog store / reader / validator / PII detector | ✅ | `data_catalog` jsonb table (renamed from `catalogs` in KM-557) |
288
+ | LLM enrichment | ❌ removed (KM-557) | Cost cut — planner reads `column.stats` + `sample_values` + `top_values` + `column.name` directly. `catalog/render.py` keeps the source-rendering helper |
289
+ | `IntentRouter` (lives as `OrchestratorAgent` in `agents/orchestration.py`) | ✅ | 3-way `source_hint`, history-aware query rewriting. Filename + class name kept from Phase 1; Phase 2 body |
290
+ | `CatalogReader` | ✅ | Loads full catalog; filters by `source_hint` |
291
+ | `QueryPlanner` LLM call | ✅ | Azure OpenAI structured output → `QueryIR`; supports retry with `previous_error` |
292
+ | IR validator | ✅ | Catalog-aware; full rule set; descriptive errors |
293
+ | SQL compiler (Postgres) | ✅ | All 12 filter ops, all 6 aggs, alias-aware order_by, parameterized values, quoted identifiers |
294
+ | DbExecutor | ✅ | sqlglot SELECT-only guard, RO txn, `statement_timeout=30000`, 10k row cap, never raises |
295
+ | Pandas compiler | ✅ | Same op coverage as SQL; pure module-level helpers |
296
+ | TabularExecutor | ✅ | Parquet blob path resolution, `asyncio.to_thread`, 10k cap, never raises |
297
+ | ExecutorDispatcher | ✅ | Routes by `source.source_type`; lazy imports + cache |
298
+ | QueryService | ✅ | plan → validate → retry-on-fail (max 3) → dispatch → execute → `QueryResult` |
299
+ | `ChatbotAgent` + prompt + guardrails | ✅ | Renamed from `AnswerAgent` in Cleanup PR. Guardrails appended to `chatbot_system.md` |
300
+ | `ChatHandler` (top-level chat orchestrator) | ✅ | SSE events: `intent` / `chunk` / `done` / `error` |
301
+ | `DocumentRetriever` + `RetrievalRouter` (Redis-cached) | ✅ | Migrated from `src/rag/` (now deleted); MMR/cosine/euclidean/manhattan/inner_product |
302
+ | `/api/v1/chat/stream` | ✅ | Rewired to `ChatHandler`; Redis cache + fast intent + history + message persistence remain in chat.py |
303
+ | `/api/v1/db-clients/{id}/ingest` | ✅ | Calls only `on_db_registered`; Phase 1 dual-write removed |
304
+ | `/api/v1/document/{upload,process,delete}` | ✅ | `/process` triggers `on_tabular_uploaded` for CSV/XLSX |
305
+ | `GET /api/v1/data-catalog/{user_id}` | ✅ | Index endpoint (KM-557) |
306
+ | `POST /api/v1/data-catalog/rebuild` | ✅ | Iterates sources, re-runs per-source trigger |
307
+ | Credential encryption | ⚠️ stub | `security/credentials.py` not migrated; runtime reuses Phase 1 `utils/db_credential_encryption.py` |
308
+ | Tests | ✅ 146+ unit | Compilers (DB 36, Pandas 43), validators, introspectors, agents, chat handler, dispatcher, planner |
309
+ | Planner eval harness | 🟡 scaffold | 3 DB + 4 tabular golden cases. Gated on `RUN_PLANNER_EVAL=1`. Real Azure OpenAI passing |
310
+ | E2E smoke tests | ❌ not started | Component-level orchestration is covered |
311
+ | DB introspector unit test | ❌ deferred | Needs Postgres testcontainer |
312
+ | Sources event in `/chat/stream` | ⚠️ emits `[]` | `ChatHandler` doesn't surface retrieval sources yet; same gap reflected in `save_messages` |
313
+
314
+ **Deferred to later phases**: joins in IR, schema drift detection, hybrid catalog search (BM25 + RRF for 100+ table users), polars lazy scan for >1GB tabular files, MySQL/BigQuery/Snowflake SQL dialects, mask/synthesize PII strategies.
315
+
316
+ ---
317
+
318
+ ## Team — division of work
319
+
320
+ The service is built by two engineers; many modules are source-type-agnostic and shared.
321
+
322
+ - **DB** owns SQL paths: introspection, SQL compiler, DB executor, credential storage.
323
+ - **TAB** owns tabular paths: CSV/XLSX/Parquet introspection, pandas compiler, tabular executor, blob/Parquet plumbing.
324
+ - **B** = both — shared contracts and source-type-agnostic plumbing. Pair-program or split with explicit hand-off.
325
+
326
+ ### Step-by-step ownership
327
+
328
+ | # | Step | File / area | Owner | Notes |
329
+ |---|---|---|---|---|
330
+ | 0 | **Lock contracts before coding** | — | B | See "Decisions to lock" below; block until aligned |
331
+ | 1 | Catalog Pydantic models | `catalog/models.py` | B | Already done; only touch if both agree |
332
+ | 2 | IR Pydantic models | `query/ir/models.py` | B | Already done; joins/window fns require joint sign-off |
333
+ | 3 | IR operator whitelists | `query/ir/operators.py` | B | Already done; both compilers rely on these |
334
+ | 4 | PII patterns / regex | `security/pii_patterns.py` | B | Already done; extend together as gaps appear |
335
+ | **Ingestion — introspection** | | | | |
336
+ | 5 | DB introspector (information_schema, sample, FKs) | `catalog/introspect/database.py` | DB | Use SQLAlchemy `inspect()`; dialect-aware quoting |
337
+ | 6 | Tabular introspector (CSV/XLSX/Parquet headers + sample) | `catalog/introspect/tabular.py` | TAB | Each XLSX sheet → one Table |
338
+ | 7 | `BaseIntrospector` ABC | `catalog/introspect/base.py` | B | Confirm signature returns the same `Source` shape |
339
+ | **Ingestion — shared catalog plumbing** | | | | |
340
+ | 8 | ~~Catalog enricher + prompt~~ | — | **REMOVED in KM-557.** Cost optimization — planner reads stats + sample rows directly. `catalog/render.py` keeps the source-rendering helper. |
341
+ | 9 | Catalog validator | `catalog/validator.py` | B | Type-agnostic |
342
+ | 10 | Catalog store (Postgres jsonb) | `catalog/store.py` | B | Recommend DB (Postgres expertise) |
343
+ | 11 | Catalog reader | `catalog/reader.py` | B | Type-agnostic |
344
+ | 12 | PII detector | `catalog/pii_detector.py` | B | Either; uses `pii_patterns.py` |
345
+ | **Ingestion — pipelines** | | | | |
346
+ | 13 | Structured pipeline (introspect → enrich → validate → store) | `pipeline/structured_pipeline.py` | B | Pair on this — calls both introspectors via dispatcher |
347
+ | 14 | Triggers (`on_db_registered`, `on_tabular_uploaded`) | `pipeline/triggers.py` | B | Each owns their trigger function |
348
+ | 15 | Ingestion orchestrator | `pipeline/orchestrator.py` | B | Routes by source_type; pair |
349
+ | 16 | Document pipeline (PDF/DOCX/TXT) | `pipeline/document_pipeline.py` | TAB | Tabular-adjacent (file uploads) |
350
+ | **Query — shared spine** | | | | |
351
+ | 17 | IR validator (catalog-aware) | `query/ir/validator.py` | B | Recommend DB; both must agree on exact error messages so retry-prompt is consistent |
352
+ | 18 | Planner LLM service | `query/planner/service.py` | B | Type-agnostic |
353
+ | 19 | Planner prompt (catalog → text) | `query/planner/prompt.py`, `config/prompts/query_planner.md` | B | **Pair-program**. Must describe DB tables and tabular files in one consistent format |
354
+ | 20 | Intent router (chat/unstructured/structured) | `agents/orchestration.py` (class `OrchestratorAgent` — Phase 1 filename + class name preserved; Phase 2 body), `config/prompts/intent_router.md` | B | Type-agnostic. The prompt file uses `intent_router.md`, but the source module is still `orchestration.py` |
355
+ | 21 | Executor base + `QueryResult` | `query/executor/base.py` | B | Lock the shape before either implements an executor |
356
+ | 22 | Executor dispatcher | `query/executor/dispatcher.py` | B | Reads `source.source_type` from catalog; pair |
357
+ | 23 | Compiler base ABC | `query/compiler/base.py` | B | Already done |
358
+ | 24 | Top-level QueryService | `query/service.py` | B | Wires planner → validator → compiler → executor; pair |
359
+ | **Query — DB path** | | | | |
360
+ | 25 | SQL compiler (IR → SQL + params, per dialect) | `query/compiler/sql.py` | DB | Identifiers from catalog (quoted), values parameterized |
361
+ | 26 | DB executor (asyncpg/pymysql, sqlglot guard, RO txn, 30s timeout) | `query/executor/db.py` | DB | |
362
+ | 27 | Credential encryption (Fernet) | `security/credentials.py` | DB | Needed for stored user DB creds |
363
+ | 28 | User-DB connection management | helper in pipelines | DB | engine_scope context manager pattern |
364
+ | **Query — Tabular path** | | | | |
365
+ | 29 | Pandas compiler (IR → callable on DataFrame) | `query/compiler/pandas.py` | TAB | Same IR, different backend |
366
+ | 30 | Tabular executor (eager pandas first; pyarrow / polars later) | `query/executor/tabular.py` | TAB | Initial scope: eager pandas only |
367
+ | 31 | Parquet upload/download + Azure Blob wrapper | `storage/az_blob/az_blob.py` (+ helper) | TAB | XLSX sheet → one Parquet per sheet (deterministic blob name) |
368
+ | **Agents + chat** | | | | |
369
+ | 32 | Chatbot agent + prompt | `agents/chatbot.py`, `config/prompts/chatbot_system.md` | B | Receives QueryResult or Cu chunks |
370
+ | 33 | Guardrails prompt | `config/prompts/guardrails.md` | B | |
371
+ | **API surface** | | | | |
372
+ | 34 | DB client endpoints (register/ingest/list/delete) | `api/v1/db_client.py` | DB | |
373
+ | 35 | Document/tabular upload endpoints | `api/v1/document.py` | TAB | |
374
+ | 36 | Chat stream endpoint (SSE) | `api/v1/chat.py` | B | Dispatches both paths; pair |
375
+ | 37 | Room / users endpoints | `api/v1/room.py`, `api/v1/users.py` | B | Whoever has bandwidth |
376
+ | **Tests + eval** | | | | |
377
+ | 38 | DB compiler golden tests (IR → SQL fixtures) | `tests/query/compiler/test_sql.py` | DB | Pure-Python, no LLM |
378
+ | 39 | Pandas compiler golden tests (IR → expected DataFrame) | `tests/query/compiler/test_pandas.py` | TAB | Pure-Python, no LLM |
379
+ | 40 | IR validator tests (catalog × IR error matrix) | `tests/query/ir/test_validator.py` | B | Each contributes test cases for their source type |
380
+ | 41 | Planner eval (golden question → IR examples) | `tests/query/planner/` | B | Each contributes ~10 question→IR examples |
381
+ | 42 | E2E smoke tests | `tests/e2e/` | B | Pair |
382
+
383
+ ### Decisions to lock before coding
384
+
385
+ If made unilaterally these create silent contract drift. Lock them in a 30-min sync first.
386
+
387
+ | Decision | Why it matters | Recommended call |
388
+ |---|---|---|
389
+ | `QueryResult` shape (current scaffold: `source_id, backend, rows, row_count, truncated, elapsed_ms, error`) | Both executors return this; chatbot consumes it | Lock as-is unless either side needs more (e.g. `column_types` for formatting) |
390
+ | `Source.location_ref` format (`az_blob://...` vs `dbclient://{id}` etc.) | Dispatcher and executors both parse this | Pick a convention now; document in `catalog/models.py` docstring |
391
+ | Where do user DB credentials live? | DB executor needs creds to run queries; Source has `location_ref` but creds are encrypted separately | Recommend: `location_ref="dbclient://{client_id}"`; executor looks up creds by ID |
392
+ | How does dispatcher pick the executor? | Routes by `source.source_type` — but where does dispatcher get it (catalog reload, or IR carries it)? | Recommend: dispatcher takes `(Catalog, IR)`, looks up source by `IR.source_id` |
393
+ | Joins in v1 IR? | Excluded per ARCHITECTURE.md §7. DB path is most affected — real DB use often needs joins. | Recommend: ship single-table; revisit in PR 2. **DB owner must accept the constraint or push back early** |
394
+ | Planner prompt — render tabular vs DB sources uniformly | If described differently, planner gets confused | Pair-program. Render both as `Table: name (n rows) — Columns: ...` regardless of source_type |
395
+ | Error contract — raise or return `QueryResult.error`? | Both executors must behave the same so chatbot branches consistently | Recommend: never raise from `executor.run()`; populate `QueryResult.error` |
396
+ | PII handling for tabular `sample_values` | DB samples come from `information_schema`; tabular from file reads. Same `pii_flag` rule must apply both sides | Confirm tabular introspector calls `pii_detector` |
397
+ | Catalog refresh trigger (open question §3) | Affects both pipelines symmetrically | Default: rebuild on every upload/connect; defer auto-refresh |
398
+ | `updated_at` semantics — per-Source vs per-Catalog | Affects how each pipeline writes | Recommend: per-Source `updated_at` + Catalog-level `generated_at` |
399
+ | Dialect support scope for v1 | DB compiler must implement at least one dialect well | Recommend: Postgres first (matches app DB); MySQL second |
400
+ | Test-fixture format for golden IRs | Both compilers test against golden IR → expected output | Recommend: shared `tests/fixtures/golden_irs.json`; each side adds expected SQL or DataFrame |
401
+ | Logging conventions | structlog is already in place; both should log the same fields | Quick agreement: log `source_id`, `table_id`, `ir_version`, `elapsed_ms` |
402
+
403
+ ### Working rhythm (suggested)
404
+
405
+ 1. **Day 1** — 30-min sync to lock the decisions table. PR any contract/docstring changes that fall out.
406
+ 2. **Week 1** — both build introspectors + agree on the planner prompt format. PR in parallel; review each other's.
407
+ 3. **Week 2** — DB builds SQL compiler + DB executor; TAB builds pandas compiler + tabular executor. Both write golden tests against shared IR fixtures.
408
+ 4. **Week 3** — pair on dispatcher, QueryService, and chat endpoint integration. End-to-end smoke test.
409
+ 5. **Ongoing** — short daily standup, mostly to flag IR-shape questions and catalog-field additions *before* either side implements against an unconfirmed contract.
410
+
411
+ Biggest risk: **silent contract drift** — one side adds a `QueryResult` field or assumes a new IR op exists, the other ships without it, and integration breaks at the dispatcher. The §0 lock + shared golden-IR fixtures are what prevent that.
412
+
413
+ ### Onboarding to Claude Code
414
+
415
+ If you're new to Claude Code, before you start:
416
+
417
+ 1. Read `ARCHITECTURE.md` end-to-end (~10 min) — this is the source of truth.
418
+ 2. Skim this file (`REPO_CONTEXT.md`) — find your section in the ownership table.
419
+ 3. Read your owned files' docstrings — every stub explains its contract.
420
+ 4. Open Claude Code in this repo. When you ask Claude to implement a stub:
421
+ - Reference the file path + the contract it should follow
422
+ - Point it at `ARCHITECTURE.md` section if relevant (e.g. §7 for IR validation)
423
+ - Ask it to write the test first (golden IR fixtures), then the implementation
424
+ - Always review the diff — don't auto-accept
425
+
426
+ Useful slash commands while working: `/review` (PR review), `/security-review` (audit pending changes).
427
+
428
+ ---
429
+
430
+ ## Conventions & gotchas
431
+
432
+ - **Async event loop on Windows**: `run.py` sets `WindowsSelectorEventLoopPolicy` because psycopg3 async needs it. Don't call `uvicorn` directly on Windows.
433
+ - **Two Postgres engines**: `engine` (app tables) and `_pgvector_engine` (asyncpg with `prepared_statement_cache_size=0`) — the latter is required because PGVector emits `advisory_lock + CREATE EXTENSION` as a multi-statement string and asyncpg rejects multi-statement prepared queries. `init_db.py` creates the extension explicitly so `PGVector(create_extension=False)` skips that path.
434
+ - **Read-only at every layer for user DBs**: IR validation + compiler whitelists + sqlglot SELECT-only check + read-only DB credentials + LIMIT enforcement + 30s timeout. Five layers; no single point of failure.
435
+ - **Identifiers vs values**: identifiers (table/column names) come from the catalog and are inlined as quoted identifiers — they were verified at validation time so this is safe. Values from `IR.filters` are *always* parameterized, never inlined as strings.
436
+ - **Credential encryption**: Fernet via `dataeyond__db__credential__key` env var; lives in `security/credentials.py`. Sensitive fields = `{"password", "service_account_json"}`.
437
+ - **Settings env-var aliases**: `.env` uses double-underscore names (`azureai__api_key__4o`); `Settings` exposes them as `azureai_api_key_4o` via `Field(alias=...)`. Mind both forms when adding settings.
438
+ - **Prompts**: `src/config/prompts/*.md` — `intent_router`, `query_planner`, `chatbot_system`, `guardrails` are all written. `chatbot_system` has `guardrails` appended so guardrails take precedence in conflict. `catalog_enricher.md` was deleted in KM-557. `config/agents/` folder deleted in Cleanup PR.
439
+ - **Planner prompt parsing gotcha**: `query/planner/service.py` uses `SystemMessage(content=...)` not `("system", text)`. The tuple form causes LangChain to interpret `{...}` in `query_planner.md` as f-string variables and crash on every real invocation. Don't refactor back to tuples.
440
+ - **Tests**: 146+ unit tests in place. Run with `uv run pytest`. Planner eval gated on `RUN_PLANNER_EVAL=1`; catalog store integration test gated on `RUN_INTEGRATION_TESTS=1`.
441
+
442
+ ---
443
+
444
+ ## Recommended reading order
445
+
446
+ 1. `ARCHITECTURE.md` — design intent (the source of truth)
447
+ 2. `src/catalog/models.py` + `src/query/ir/models.py` — the two data shapes everything else moves between
448
+ 3. `src/query/ir/operators.py` + `src/security/pii_patterns.py` — the explicit whitelists / patterns
449
+ 4. Skim every `__init__.py`-level docstring under `src/catalog/`, `src/query/`, `src/agents/`, `src/pipeline/` — each describes the contract its module enforces
450
+ 5. `main.py` + `src/db/postgres/{connection,init_db}.py` — runtime bootstrap
451
+ 6. `ARCHITECTURE.md §10` — five open questions that haven't been decided yet
452
+
453
+ ---
454
+
455
+ ## Open questions
456
+
457
+ Resolved as Phase 2 landed:
458
+
459
+ 1. ✅ Catalog storage shape — Postgres `jsonb` row in `data_catalog` table, keyed by `user_id`.
460
+ 2. ❌ Unstructured files in catalog — still not modeled; router uses `source_hint` from the LLM instead.
461
+ 3. 🟡 Catalog refresh trigger — rebuild-on-upload-or-connect is the default. Explicit endpoint `POST /api/v1/data-catalog/rebuild` exists. Background TTL deferred.
462
+ 4. ✅ Joins out of v1 IR — confirmed; single-table only. Revisit when real queries need it.
463
+ 5. 🟡 PII `sample_values` — currently nulled out (skip). Mask/synthesize deferred.
464
+
465
+ ---
466
+
467
+ ## Glossary
468
+
469
+ - **Cu** — unstructured context (prose chunks)
470
+ - **Cs** — schema context (DB tables/columns from catalog)
471
+ - **Ct** — tabular context (file sheets/columns from catalog)
472
+ - **IR** — intermediate representation (the JSON query shape)
473
+ - **PII** — personally identifiable information
474
+ - **ABC** — abstract base class
main.py CHANGED
@@ -1,5 +1,7 @@
1
  """Main application entry point."""
2
 
 
 
3
  from fastapi import FastAPI
4
  from src.middlewares.logging import configure_logging, get_logger
5
  from src.middlewares.cors import add_cors_middleware
@@ -9,8 +11,8 @@ from src.api.v1.document import router as document_router
9
  from src.api.v1.chat import router as chat_router
10
  from src.api.v1.room import router as room_router
11
  from src.api.v1.users import router as users_router
12
- from src.api.v1.knowledge import router as knowledge_router
13
  from src.api.v1.db_client import router as db_client_router
 
14
  from src.db.postgres.init_db import init_db
15
  import uvicorn
16
 
@@ -18,11 +20,21 @@ import uvicorn
18
  configure_logging()
19
  logger = get_logger("main")
20
 
 
 
 
 
 
 
 
 
 
21
  # Create FastAPI app
22
  app = FastAPI(
23
  title="DataEyond Agentic Service",
24
  description="Multi-agent AI backend with RAG capabilities",
25
- version="0.1.0"
 
26
  )
27
 
28
  # Add middleware
@@ -33,18 +45,10 @@ app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
33
  # Include routers
34
  app.include_router(users_router)
35
  app.include_router(document_router)
36
- app.include_router(knowledge_router)
37
  app.include_router(room_router)
38
  app.include_router(chat_router)
39
  app.include_router(db_client_router)
40
-
41
-
42
- @app.on_event("startup")
43
- async def startup_event():
44
- """Initialize database on startup."""
45
- logger.info("Starting application...")
46
- await init_db()
47
- logger.info("Database initialized")
48
 
49
 
50
  @app.get("/")
 
1
  """Main application entry point."""
2
 
3
+ from contextlib import asynccontextmanager
4
+
5
  from fastapi import FastAPI
6
  from src.middlewares.logging import configure_logging, get_logger
7
  from src.middlewares.cors import add_cors_middleware
 
11
  from src.api.v1.chat import router as chat_router
12
  from src.api.v1.room import router as room_router
13
  from src.api.v1.users import router as users_router
 
14
  from src.api.v1.db_client import router as db_client_router
15
+ from src.api.v1.data_catalog import router as data_catalog_router
16
  from src.db.postgres.init_db import init_db
17
  import uvicorn
18
 
 
20
  configure_logging()
21
  logger = get_logger("main")
22
 
23
+
24
+ @asynccontextmanager
25
+ async def lifespan(app: FastAPI):
26
+ logger.info("Starting application...")
27
+ await init_db()
28
+ logger.info("Database initialized")
29
+ yield
30
+
31
+
32
  # Create FastAPI app
33
  app = FastAPI(
34
  title="DataEyond Agentic Service",
35
  description="Multi-agent AI backend with RAG capabilities",
36
+ version="0.1.0",
37
+ lifespan=lifespan,
38
  )
39
 
40
  # Add middleware
 
45
  # Include routers
46
  app.include_router(users_router)
47
  app.include_router(document_router)
 
48
  app.include_router(room_router)
49
  app.include_router(chat_router)
50
  app.include_router(db_client_router)
51
+ app.include_router(data_catalog_router)
 
 
 
 
 
 
 
52
 
53
 
54
  @app.get("/")
pyproject.toml CHANGED
@@ -121,7 +121,8 @@ ignore = [
121
  ]
122
 
123
  [tool.ruff.lint.per-file-ignores]
124
- "tests/**" = ["S101", "S105", "S106"]
 
125
 
126
  [tool.mypy]
127
  python_version = "3.12"
 
121
  ]
122
 
123
  [tool.ruff.lint.per-file-ignores]
124
+ # S608 in tests is a false positive — tests assert literal SQL strings as fixtures.
125
+ "tests/**" = ["S101", "S105", "S106", "S608"]
126
 
127
  [tool.mypy]
128
  python_version = "3.12"
scripts/build_initial_catalogs.py ADDED
@@ -0,0 +1,73 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Backfill catalogs for existing users.
2
+
3
+ One-off script. For each user that already has registered DB connections or
4
+ uploaded tabular files, run the structured pipeline to build their catalog.
5
+
6
+ Run once against the live DB after deploying this branch to populate catalog
7
+ rows for data registered before the catalog pipeline landed.
8
+
9
+ Note: enrich_all_sources.py is not needed — LLM enrichment was removed in
10
+ KM-557. The pipeline is now introspect → merge → validate → upsert.
11
+
12
+ Usage:
13
+ uv run python scripts/build_initial_catalogs.py [--user-id USER_ID]
14
+ """
15
+
16
+ import asyncio
17
+ import sys
18
+ import os
19
+
20
+ sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
21
+
22
+ from sqlalchemy import select
23
+ from src.db.postgres.connection import AsyncSessionLocal
24
+ from src.db.postgres.models import DatabaseClient, Document
25
+ from src.pipeline.triggers import on_db_registered, on_tabular_uploaded
26
+
27
+
28
+ async def main() -> None:
29
+ user_id_filter = None
30
+ if "--user-id" in sys.argv:
31
+ idx = sys.argv.index("--user-id")
32
+ user_id_filter = sys.argv[idx + 1]
33
+ print(f"Filtering to user_id: {user_id_filter}")
34
+
35
+ async with AsyncSessionLocal() as db:
36
+ # ── 1. DB clients ──────────────────────────────────────────────
37
+ query = select(DatabaseClient).where(DatabaseClient.status == "active")
38
+ if user_id_filter:
39
+ query = query.where(DatabaseClient.user_id == user_id_filter)
40
+ result = await db.execute(query)
41
+ db_clients = result.scalars().all()
42
+ print(f"\nFound {len(db_clients)} active DB client(s)")
43
+
44
+ for client in db_clients:
45
+ try:
46
+ await on_db_registered(client.id, client.user_id)
47
+ print(f" ✓ db_client {client.id} ({client.name})")
48
+ except Exception as e:
49
+ print(f" ✗ db_client {client.id} ({client.name}): {e}")
50
+
51
+ # ── 2. Tabular files ───────────────────────────────────────────
52
+ query = select(Document).where(
53
+ Document.file_type.in_(["csv", "xlsx"]),
54
+ Document.status == "completed",
55
+ )
56
+ if user_id_filter:
57
+ query = query.where(Document.user_id == user_id_filter)
58
+ result = await db.execute(query)
59
+ docs = result.scalars().all()
60
+ print(f"\nFound {len(docs)} completed tabular file(s)")
61
+
62
+ for doc in docs:
63
+ try:
64
+ await on_tabular_uploaded(doc.id, doc.user_id)
65
+ print(f" ✓ {doc.file_type} {doc.id} ({doc.filename})")
66
+ except Exception as e:
67
+ print(f" ✗ {doc.file_type} {doc.id} ({doc.filename}): {e}")
68
+
69
+ print("\nDone.")
70
+
71
+
72
+ if __name__ == "__main__":
73
+ asyncio.run(main())
scripts/enrich_all_sources.py ADDED
@@ -0,0 +1,16 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Bulk re-run CatalogEnricher with the current prompt.
2
+
3
+ For when src/config/prompts/catalog_enricher.md changes and existing
4
+ catalog descriptions need to be regenerated across all users.
5
+
6
+ Usage:
7
+ uv run python scripts/enrich_all_sources.py [--user-id USER_ID]
8
+ """
9
+
10
+
11
+ def main() -> None:
12
+ raise NotImplementedError
13
+
14
+
15
+ if __name__ == "__main__":
16
+ main()
src/agents/chat_handler.py ADDED
@@ -0,0 +1,274 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """ChatHandler — top-level Phase 2 chat orchestrator.
2
+
3
+ End-to-end flow per user message:
4
+
5
+ 1. `IntentRouter.classify` → `chat` / `unstructured` / `structured`.
6
+ 2. Route:
7
+ - `chat` → no context. Pass straight to ChatbotAgent.
8
+ - `structured` → CatalogReader → QueryService → QueryResult.
9
+ - `unstructured` → DocumentRetriever (placeholder, raises until TAB
10
+ ships) → list[DocumentChunk].
11
+ 3. `ChatbotAgent.astream` → yield text tokens.
12
+ 4. Wrap each step into an SSE-style event dict so the API endpoint can
13
+ stream them as Server-Sent Events.
14
+
15
+ Phase 1's chat endpoint (`src/api/v1/chat.py`) is intentionally NOT touched
16
+ in this PR. PR7 cleanup will rewire it to call `ChatHandler.handle(...)`.
17
+
18
+ All dependencies are injectable for tests. Default constructors lazy-build
19
+ production deps (no `Settings()` triggered at import time as long as you
20
+ inject mocks).
21
+ """
22
+
23
+ from __future__ import annotations
24
+
25
+ import json
26
+ from collections.abc import AsyncIterator
27
+ from typing import TYPE_CHECKING, Any
28
+
29
+ from langchain_core.messages import BaseMessage
30
+
31
+ from src.middlewares.logging import get_logger
32
+ from src.retrieval.base import RetrievalResult
33
+
34
+ from .chatbot import ChatbotAgent, DocumentChunk
35
+ from .orchestration import OrchestratorAgent
36
+
37
+ if TYPE_CHECKING:
38
+ from ..catalog.reader import CatalogReader
39
+ from ..query.service import QueryService
40
+ from ..retrieval.router import RetrievalRouter
41
+
42
+ logger = get_logger("chat_handler")
43
+
44
+
45
+ class ChatHandler:
46
+ """Top-level chat orchestrator.
47
+
48
+ Returns an `AsyncIterator[dict]` of SSE-style events with shape
49
+ `{"event": <name>, "data": <str>}`. Event types:
50
+ - `intent` — emitted once after classification (JSON-encoded decision)
51
+ - `sources` — JSON array of source refs (one per structured table, or
52
+ per (document_id, page_label) for unstructured)
53
+ - `chunk` — text fragment of the streaming answer (one per token)
54
+ - `done` — end of stream (data is empty string)
55
+ - `error` — failure; data is a user-facing message
56
+ """
57
+
58
+ def __init__(
59
+ self,
60
+ intent_router: OrchestratorAgent | None = None,
61
+ answer_agent: ChatbotAgent | None = None,
62
+ catalog_reader: CatalogReader | None = None,
63
+ query_service: QueryService | None = None,
64
+ document_retriever: RetrievalRouter | None = None,
65
+ ) -> None:
66
+ self._intent_router = intent_router
67
+ self._answer_agent = answer_agent
68
+ self._catalog_reader = catalog_reader
69
+ self._query_service = query_service
70
+ self._document_retriever = document_retriever
71
+
72
+ # ------------------------------------------------------------------
73
+ # Lazy default-dep builders
74
+ # ------------------------------------------------------------------
75
+
76
+ def _get_intent_router(self) -> OrchestratorAgent:
77
+ if self._intent_router is None:
78
+ self._intent_router = OrchestratorAgent()
79
+ return self._intent_router
80
+
81
+ def _get_answer_agent(self) -> ChatbotAgent:
82
+ if self._answer_agent is None:
83
+ self._answer_agent = ChatbotAgent()
84
+ return self._answer_agent
85
+
86
+ def _get_catalog_reader(self) -> CatalogReader:
87
+ if self._catalog_reader is None:
88
+ from ..catalog.reader import CatalogReader
89
+ from ..catalog.store import CatalogStore
90
+
91
+ self._catalog_reader = CatalogReader(CatalogStore())
92
+ return self._catalog_reader
93
+
94
+ def _get_query_service(self) -> QueryService:
95
+ if self._query_service is None:
96
+ from ..query.service import QueryService
97
+
98
+ self._query_service = QueryService()
99
+ return self._query_service
100
+
101
+ def _get_document_retriever(self) -> RetrievalRouter:
102
+ if self._document_retriever is None:
103
+ from ..retrieval.router import RetrievalRouter
104
+
105
+ self._document_retriever = RetrievalRouter()
106
+ return self._document_retriever
107
+
108
+ # ------------------------------------------------------------------
109
+ # Public entry
110
+ # ------------------------------------------------------------------
111
+
112
+ async def handle(
113
+ self,
114
+ message: str,
115
+ user_id: str,
116
+ history: list[BaseMessage] | None = None,
117
+ ) -> AsyncIterator[dict[str, Any]]:
118
+ # ---- 1. Classify intent --------------------------------------
119
+ try:
120
+ decision = await self._get_intent_router().classify(message, history)
121
+ except Exception as e:
122
+ logger.error("intent classification failed", error=str(e))
123
+ yield {"event": "error", "data": f"Could not classify message: {e}"}
124
+ return
125
+
126
+ yield {"event": "intent", "data": decision.model_dump_json()}
127
+
128
+ rewritten = decision.rewritten_query or message
129
+ query_result = None
130
+ chunks: list[DocumentChunk] | None = None
131
+ raw_chunks: Any = None
132
+
133
+ # ---- 2. Route ------------------------------------------------
134
+ if decision.source_hint == "structured":
135
+ try:
136
+ catalog = await self._get_catalog_reader().read(user_id, "structured")
137
+ query_result = await self._get_query_service().run(
138
+ user_id, rewritten, catalog
139
+ )
140
+ except Exception as e:
141
+ logger.error(
142
+ "structured route failed",
143
+ user_id=user_id,
144
+ error=str(e),
145
+ )
146
+ yield {"event": "error", "data": f"Structured query failed: {e}"}
147
+ return
148
+ elif decision.source_hint == "unstructured":
149
+ try:
150
+ raw_chunks = await self._get_document_retriever().retrieve(
151
+ rewritten, user_id
152
+ )
153
+ chunks = _normalize_chunks(raw_chunks)
154
+ except NotImplementedError:
155
+ logger.warning("DocumentRetriever placeholder hit", user_id=user_id)
156
+ yield {
157
+ "event": "error",
158
+ "data": "Document retrieval is not yet available — pending implementation.",
159
+ }
160
+ return
161
+ except Exception as e:
162
+ logger.error(
163
+ "unstructured route failed", user_id=user_id, error=str(e)
164
+ )
165
+ yield {"event": "error", "data": f"Document retrieval failed: {e}"}
166
+ return
167
+ # else: chat path — no context
168
+
169
+ # ---- 2b. Emit sources ---------------------------------------
170
+ sources = _build_sources(
171
+ decision.source_hint, user_id, query_result, raw_chunks
172
+ )
173
+ yield {"event": "sources", "data": json.dumps(sources)}
174
+
175
+ # ---- 3. Stream answer ----------------------------------------
176
+ try:
177
+ async for token in self._get_answer_agent().astream(
178
+ message,
179
+ history=history,
180
+ query_result=query_result,
181
+ chunks=chunks,
182
+ ):
183
+ yield {"event": "chunk", "data": token}
184
+ except Exception as e:
185
+ logger.error("answer streaming failed", user_id=user_id, error=str(e))
186
+ yield {"event": "error", "data": f"Answer generation failed: {e}"}
187
+ return
188
+
189
+ yield {"event": "done", "data": ""}
190
+
191
+
192
+ def _build_sources(
193
+ source_hint: str,
194
+ user_id: str,
195
+ query_result: Any,
196
+ raw_chunks: Any,
197
+ ) -> list[dict[str, Any]]:
198
+ """Build the sources payload for the SSE `sources` event.
199
+
200
+ - structured: one entry per executed table (table_name only).
201
+ - unstructured: deduped by (document_id, page_label), Phase 1 shape.
202
+ - chat or error: empty list.
203
+ """
204
+ if source_hint == "structured":
205
+ if query_result is None or getattr(query_result, "error", None):
206
+ return []
207
+ table_name = getattr(query_result, "table_name", "") or ""
208
+ if not table_name:
209
+ return []
210
+ return [{
211
+ "document_id": f"{user_id}_{table_name}",
212
+ "filename": table_name,
213
+ "page_label": None,
214
+ }]
215
+
216
+ if source_hint == "unstructured" and raw_chunks:
217
+ seen: set[tuple[Any, Any]] = set()
218
+ sources: list[dict[str, Any]] = []
219
+ for item in raw_chunks:
220
+ if isinstance(item, RetrievalResult):
221
+ data = item.metadata.get("data", {})
222
+ elif isinstance(item, dict):
223
+ data = item
224
+ else:
225
+ continue
226
+ key = (data.get("document_id"), data.get("page_label"))
227
+ if key in seen or key == (None, None):
228
+ continue
229
+ seen.add(key)
230
+ sources.append({
231
+ "document_id": data.get("document_id"),
232
+ "filename": data.get("filename", "Unknown"),
233
+ "page_label": data.get("page_label", "Unknown"),
234
+ })
235
+ return sources
236
+
237
+ return []
238
+
239
+
240
+ def _normalize_chunks(raw: Any) -> list[DocumentChunk]:
241
+ """Convert whatever the retriever returns into list[DocumentChunk].
242
+
243
+ The Phase 2 `DocumentRetriever.retrieve` interface is a stub today;
244
+ when TAB owner ships it, it should return `list[DocumentChunk]`
245
+ directly so this normalizer becomes a no-op. Until then we coerce
246
+ common shapes (dict-with-content, plain string) defensively.
247
+ """
248
+ if not raw:
249
+ return []
250
+ if isinstance(raw, list) and all(isinstance(c, DocumentChunk) for c in raw):
251
+ return raw
252
+ chunks: list[DocumentChunk] = []
253
+ for item in raw:
254
+ if isinstance(item, DocumentChunk):
255
+ chunks.append(item)
256
+ elif isinstance(item, dict):
257
+ chunks.append(
258
+ DocumentChunk(
259
+ content=str(item.get("content", "")),
260
+ filename=item.get("filename"),
261
+ page_label=item.get("page_label"),
262
+ )
263
+ )
264
+ elif isinstance(item, RetrievalResult):
265
+ data = item.metadata.get("data", {})
266
+ page = data.get("page_label")
267
+ chunks.append(DocumentChunk(
268
+ content=item.content,
269
+ filename=data.get("filename"),
270
+ page_label=str(page) if page is not None else None,
271
+ ))
272
+ elif isinstance(item, str):
273
+ chunks.append(DocumentChunk(content=item))
274
+ return chunks
src/agents/chatbot.py CHANGED
@@ -1,85 +1,169 @@
1
- """Chatbot agent with RAG capabilities."""
2
 
3
- import tiktoken
4
- from langchain_openai import AzureChatOpenAI
5
- from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6
  from langchain_core.output_parsers import StrOutputParser
7
- from src.config.settings import settings
 
 
 
8
  from src.middlewares.logging import get_logger
9
- from langchain_core.messages import HumanMessage, AIMessage
 
10
 
11
  logger = get_logger("chatbot")
12
 
13
- _enc = tiktoken.get_encoding("cl100k_base")
14
 
 
 
 
15
 
16
- def _count_tokens(messages: list, context: str) -> dict:
17
- msg_tokens = sum(len(_enc.encode(m.content)) for m in messages)
18
- ctx_tokens = len(_enc.encode(context))
19
- return {"messages_tokens": msg_tokens, "context_tokens": ctx_tokens, "total": msg_tokens + ctx_tokens}
20
 
 
 
 
21
 
22
- class ChatbotAgent:
23
- """Chatbot agent with RAG capabilities."""
24
-
25
- def __init__(self):
26
- self.llm = AzureChatOpenAI(
27
- azure_deployment=settings.azureai_deployment_name_4o,
28
- openai_api_version=settings.azureai_api_version_4o,
29
- azure_endpoint=settings.azureai_endpoint_url_4o,
30
- api_key=settings.azureai_api_key_4o,
31
- temperature=0.7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
32
  )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
33
 
34
- # Read system prompt
35
- try:
36
- with open("src/config/agents/system_prompt.md", "r") as f:
37
- system_prompt = f.read()
38
- except FileNotFoundError:
39
- system_prompt = "You are a helpful AI assistant with access to user's uploaded documents."
40
 
41
- # Create prompt template
42
- self.prompt = ChatPromptTemplate.from_messages([
43
- ("system", system_prompt),
44
- MessagesPlaceholder(variable_name="messages"),
45
- ("system", "Relevant documents:\n{context}")
46
- ])
47
 
48
- # Create chain
49
- self.chain = self.prompt | self.llm | StrOutputParser()
 
 
50
 
51
- async def generate_response(
52
  self,
53
- messages: list,
54
- context: str = ""
55
- ) -> str:
56
- """Generate response with optional RAG context."""
57
- try:
58
- logger.info("Generating chatbot response")
59
-
60
- # Generate response
61
- response = await self.chain.ainvoke({
62
- "messages": messages,
63
- "context": context
64
- })
65
-
66
- logger.info(f"Generated response: {response[:100]}...")
67
- return response
68
-
69
- except Exception as e:
70
- logger.error("Response generation failed", error=str(e))
71
- raise
72
-
73
- async def astream_response(self, messages: list, context: str = ""):
74
- """Stream response tokens as they are generated."""
75
- try:
76
- token_counts = _count_tokens(messages, context)
77
- logger.info("LLM input tokens", **token_counts)
78
- async for token in self.chain.astream({"messages": messages, "context": context}):
79
- yield token
80
- except Exception as e:
81
- logger.error("Response streaming failed", error=str(e))
82
- raise
83
-
84
-
85
- chatbot = ChatbotAgent()
 
1
+ """ChatbotAgent final answer formation. Phase 2 chatbot.
2
 
3
+ Receives one of:
4
+ - a `QueryResult` (structured query path),
5
+ - a list of document chunks (unstructured path), or
6
+ - nothing (chat-only path: greeting, farewell, meta question).
7
+
8
+ Streams the answer token-by-token so the chat handler can wrap each token
9
+ into an SSE event. Conversation history is supported.
10
+ """
11
+
12
+ from __future__ import annotations
13
+
14
+ from collections.abc import AsyncIterator
15
+ from dataclasses import dataclass
16
+ from pathlib import Path
17
+ from typing import Any
18
+
19
+ from langchain_core.messages import BaseMessage
20
  from langchain_core.output_parsers import StrOutputParser
21
+ from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
22
+ from langchain_core.runnables import Runnable
23
+ from langchain_openai import AzureChatOpenAI
24
+
25
  from src.middlewares.logging import get_logger
26
+
27
+ from ..query.executor.base import QueryResult
28
 
29
  logger = get_logger("chatbot")
30
 
 
31
 
32
+ _PROMPT_DIR = Path(__file__).resolve().parent.parent / "config" / "prompts"
33
+ _SYSTEM_PROMPT_PATH = _PROMPT_DIR / "chatbot_system.md"
34
+ _GUARDRAILS_PATH = _PROMPT_DIR / "guardrails.md"
35
 
 
 
 
 
36
 
37
+ @dataclass
38
+ class DocumentChunk:
39
+ """One retrieved document chunk for the unstructured path."""
40
 
41
+ content: str
42
+ filename: str | None = None
43
+ page_label: str | None = None
44
+
45
+
46
+ def _load_system_prompt() -> str:
47
+ """Compose system prompt = chatbot_system.md + guardrails.md.
48
+
49
+ Guardrails appended last so they take precedence in conflict (matches
50
+ the docstring at the top of guardrails.md).
51
+ """
52
+ chatbot = _SYSTEM_PROMPT_PATH.read_text(encoding="utf-8")
53
+ guardrails = _GUARDRAILS_PATH.read_text(encoding="utf-8")
54
+ return f"{chatbot}\n\n{guardrails}"
55
+
56
+
57
+ def _format_query_result(qr: QueryResult) -> str:
58
+ """Render a QueryResult as a compact context block for the LLM."""
59
+ source_label = qr.source_name or "(unknown source)"
60
+ table_label = qr.table_name or "(unknown table)"
61
+ if qr.error:
62
+ return (
63
+ f"[Query result — FAILED]\n"
64
+ f"source: {source_label}\n"
65
+ f"table: {table_label}\n"
66
+ f"error: {qr.error}"
67
  )
68
+ lines: list[str] = [
69
+ "[Query result]",
70
+ f"source: {source_label}",
71
+ f"table: {table_label}",
72
+ f"backend: {qr.backend}",
73
+ f"row_count: {qr.row_count}"
74
+ + (" (truncated)" if qr.truncated else ""),
75
+ f"elapsed_ms: {qr.elapsed_ms}",
76
+ ]
77
+ if qr.rows:
78
+ # Cap rendering at 25 rows; the LLM doesn't need the full set
79
+ cap = min(len(qr.rows), 25)
80
+ columns = list(qr.rows[0].keys())
81
+ lines.append("columns: " + ", ".join(columns))
82
+ lines.append("rows:")
83
+ for row in qr.rows[:cap]:
84
+ lines.append(" " + ", ".join(f"{k}={row[k]!r}" for k in columns))
85
+ if cap < len(qr.rows):
86
+ lines.append(f" ... (+{len(qr.rows) - cap} more rows omitted from prompt)")
87
+ return "\n".join(lines)
88
+
89
+
90
+ def _format_document_chunks(chunks: list[DocumentChunk]) -> str:
91
+ if not chunks:
92
+ return ""
93
+ blocks: list[str] = []
94
+ for c in chunks:
95
+ label_parts = [p for p in (c.filename, c.page_label) if p]
96
+ label = ", ".join(label_parts) if label_parts else "Unknown source"
97
+ blocks.append(f"[Source: {label}]\n{c.content}")
98
+ return "\n\n".join(blocks)
99
+
100
+
101
+ def _build_context_block(
102
+ query_result: QueryResult | None,
103
+ chunks: list[DocumentChunk] | None,
104
+ ) -> str:
105
+ parts: list[str] = []
106
+ if query_result is not None:
107
+ parts.append(_format_query_result(query_result))
108
+ if chunks:
109
+ parts.append(_format_document_chunks(chunks))
110
+ return "\n\n".join(parts) if parts else "(no data context — answer conversationally)"
111
+
112
+
113
+ def _build_default_chain() -> Runnable:
114
+ from src.config.settings import settings
115
+
116
+ llm = AzureChatOpenAI(
117
+ azure_deployment=settings.azureai_deployment_name_4o,
118
+ openai_api_version=settings.azureai_api_version_4o,
119
+ azure_endpoint=settings.azureai_endpoint_url_4o,
120
+ api_key=settings.azureai_api_key_4o,
121
+ temperature=0.3,
122
+ )
123
+ prompt = ChatPromptTemplate.from_messages(
124
+ [
125
+ ("system", _load_system_prompt()),
126
+ MessagesPlaceholder(variable_name="history", optional=True),
127
+ ("human", "{message}"),
128
+ ("system", "Data context for this turn:\n\n{context}"),
129
+ ]
130
+ )
131
+ return prompt | llm | StrOutputParser()
132
+
133
+
134
+ class ChatbotAgent:
135
+ """Formats and streams the final user-facing answer.
136
 
137
+ `chain` is injectable: tests pass a fake that yields canned tokens.
138
+ Default constructs the production Azure OpenAI streaming chain on
139
+ first use.
140
+ """
 
 
141
 
142
+ def __init__(self, chain: Runnable | None = None) -> None:
143
+ self._chain = chain
 
 
 
 
144
 
145
+ def _ensure_chain(self) -> Runnable:
146
+ if self._chain is None:
147
+ self._chain = _build_default_chain()
148
+ return self._chain
149
 
150
+ async def astream(
151
  self,
152
+ message: str,
153
+ history: list[BaseMessage] | None = None,
154
+ query_result: QueryResult | None = None,
155
+ chunks: list[DocumentChunk] | None = None,
156
+ ) -> AsyncIterator[str]:
157
+ """Stream tokens of the final answer.
158
+
159
+ Caller wraps each token into the SSE format. Empty `history` and
160
+ no context = pure chat reply.
161
+ """
162
+ chain = self._ensure_chain()
163
+ payload: dict[str, Any] = {
164
+ "message": message,
165
+ "history": history or [],
166
+ "context": _build_context_block(query_result, chunks),
167
+ }
168
+ async for token in chain.astream(payload):
169
+ yield token
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
src/agents/orchestration.py CHANGED
@@ -1,79 +1,109 @@
1
- """Orchestrator agent for intent recognition and planning."""
2
 
3
- from langchain_openai import AzureChatOpenAI
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4
  from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
5
- from src.config.settings import settings
 
 
 
6
  from src.middlewares.logging import get_logger
7
- from src.models.structured_output import IntentClassification
8
 
9
  logger = get_logger("orchestrator")
10
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
11
 
12
  class OrchestratorAgent:
13
- """Orchestrator agent for intent recognition and planning."""
14
-
15
- def __init__(self):
16
- self.llm = AzureChatOpenAI(
17
- azure_deployment=settings.azureai_deployment_name_4o,
18
- openai_api_version=settings.azureai_api_version_4o,
19
- azure_endpoint=settings.azureai_endpoint_url_4o,
20
- api_key=settings.azureai_api_key_4o,
21
- temperature=0
 
 
 
 
 
 
 
 
 
 
 
 
 
22
  )
23
-
24
- self.prompt = ChatPromptTemplate.from_messages([
25
- ("system", """You are an orchestrator agent. You receive recent conversation history and the user's latest message.
26
-
27
- Your task:
28
- 1. Determine intent: question, greeting, goodbye, or other
29
- 2. Decide whether to search the user's documents (needs_search)
30
- 3. If search is needed, rewrite the user's message into a STANDALONE search query that incorporates necessary context from conversation history. If the user says "tell me more" or "how many papers?", the search_query must spell out the full topic explicitly from history.
31
- 4. If no search needed, provide a short direct_response (plain text only, no markdown formatting).
32
-
33
- Intent Routing:
34
- - question -> needs_search=True, search_query=<standalone rewritten query>
35
- - greeting -> needs_search=False, direct_response="Hello! How can I assist you today?"
36
- - goodbye -> needs_search=False, direct_response="Goodbye! Have a great day!"
37
- - other -> needs_search=True, search_query=<standalone rewritten query>
38
-
39
- Source Routing (set source_hint):
40
- - Columns, tables, sheets, data types, schema, row counts, statistics -> source_hint=schema
41
- - Document content, paragraphs, reports, articles, text -> source_hint=document
42
- - Unclear or spans both -> source_hint=both
43
- """),
44
- MessagesPlaceholder(variable_name="history"),
45
- ("user", "{message}")
46
- ])
47
-
48
- # with_structured_output uses function calling — guarantees valid schema regardless of LLM response style
49
- self.chain = self.prompt | self.llm.with_structured_output(IntentClassification)
50
-
51
- async def analyze_message(self, message: str, history: list = None) -> dict:
52
- """Analyze user message and determine next actions.
53
-
54
- Args:
55
- message: The current user message.
56
- history: Recent conversation as LangChain BaseMessage objects (oldest-first).
57
- Used to rewrite ambiguous follow-ups into standalone search queries.
58
- """
59
- try:
60
- logger.info(f"Analyzing message: {message[:50]}...")
61
-
62
- history_messages = history or []
63
- result: IntentClassification = await self.chain.ainvoke({"message": message, "history": history_messages})
64
-
65
- logger.info(f"Intent: {result.intent}, Needs search: {result.needs_search}, Search query: {result.search_query[:50] if result.search_query else ''}")
66
- return result.model_dump()
67
-
68
- except Exception as e:
69
- logger.error("Message analysis failed", error=str(e))
70
- # Fallback to treating everything as a question
71
- return {
72
- "intent": "question",
73
- "needs_search": True,
74
- "search_query": message,
75
- "direct_response": None
76
- }
77
-
78
-
79
- orchestrator = OrchestratorAgent()
 
1
+ """OrchestratorAgent classifies a user message and emits source_hint.
2
 
3
+ Output: needs_search (bool) + source_hint ∈ { chat, unstructured, structured }
4
+ + rewritten_query (standalone form of the user's question, history-resolved).
5
+
6
+ Phase 2 replaces the previous intent-classification body. The class name
7
+ is preserved so existing import sites (`from src.agents.orchestration
8
+ import OrchestratorAgent`) keep working. The default LLM chain is
9
+ constructed lazily so the module is import-safe even without `.env`
10
+ populated.
11
+ """
12
+
13
+ from __future__ import annotations
14
+
15
+ from pathlib import Path
16
+ from typing import Literal
17
+
18
+ from langchain_core.messages import BaseMessage
19
  from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
20
+ from langchain_core.runnables import Runnable
21
+ from langchain_openai import AzureChatOpenAI
22
+ from pydantic import BaseModel, Field
23
+
24
  from src.middlewares.logging import get_logger
 
25
 
26
  logger = get_logger("orchestrator")
27
 
28
+ SourceHint = Literal["chat", "unstructured", "structured"]
29
+
30
+ _PROMPT_PATH = (
31
+ Path(__file__).resolve().parent.parent
32
+ / "config"
33
+ / "prompts"
34
+ / "intent_router.md"
35
+ )
36
+
37
+
38
+ class IntentRouterDecision(BaseModel):
39
+ """LLM output. Pydantic so it can be used with `with_structured_output`."""
40
+
41
+ needs_search: bool = Field(
42
+ ..., description="True if we must look at the user's data to answer."
43
+ )
44
+ source_hint: SourceHint = Field(
45
+ ...,
46
+ description="Which downstream path: 'chat' (no lookup), "
47
+ "'unstructured' (PDF/DOCX/TXT prose), 'structured' (DB / tabular file).",
48
+ )
49
+ rewritten_query: str | None = Field(
50
+ None,
51
+ description="Standalone version of the question, history-resolved. "
52
+ "Null when needs_search=false.",
53
+ )
54
+
55
+
56
+ def _load_prompt_text() -> str:
57
+ return _PROMPT_PATH.read_text(encoding="utf-8")
58
+
59
+
60
+ def _build_default_chain() -> Runnable:
61
+ from src.config.settings import settings
62
+
63
+ llm = AzureChatOpenAI(
64
+ azure_deployment=settings.azureai_deployment_name_4o,
65
+ openai_api_version=settings.azureai_api_version_4o,
66
+ azure_endpoint=settings.azureai_endpoint_url_4o,
67
+ api_key=settings.azureai_api_key_4o,
68
+ temperature=0,
69
+ )
70
+ prompt = ChatPromptTemplate.from_messages(
71
+ [
72
+ ("system", _load_prompt_text()),
73
+ MessagesPlaceholder(variable_name="history", optional=True),
74
+ ("human", "{message}"),
75
+ ]
76
+ )
77
+ return prompt | llm.with_structured_output(IntentRouterDecision)
78
+
79
 
80
  class OrchestratorAgent:
81
+ """Classifies a user message into chat / unstructured / structured.
82
+
83
+ Inject `structured_chain` for tests; default builds the production
84
+ Azure OpenAI chain on first use.
85
+ """
86
+
87
+ def __init__(self, structured_chain: Runnable | None = None) -> None:
88
+ self._chain = structured_chain
89
+
90
+ def _ensure_chain(self) -> Runnable:
91
+ if self._chain is None:
92
+ self._chain = _build_default_chain()
93
+ return self._chain
94
+
95
+ async def classify(
96
+ self,
97
+ message: str,
98
+ history: list[BaseMessage] | None = None,
99
+ ) -> IntentRouterDecision:
100
+ chain = self._ensure_chain()
101
+ decision: IntentRouterDecision = await chain.ainvoke(
102
+ {"message": message, "history": history or []}
103
  )
104
+ logger.info(
105
+ "intent classified",
106
+ source_hint=decision.source_hint,
107
+ needs_search=decision.needs_search,
108
+ )
109
+ return decision
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
src/api/v1/chat.py CHANGED
@@ -1,46 +1,40 @@
1
  """Chat endpoint with streaming support."""
2
 
3
- import asyncio
4
  import uuid
 
 
 
5
  from fastapi import APIRouter, Depends, HTTPException
 
 
 
6
  from sqlalchemy.ext.asyncio import AsyncSession
 
 
 
 
7
  from src.db.postgres.connection import get_db
8
  from src.db.postgres.models import ChatMessage, MessageSource
9
- from src.agents.orchestration import orchestrator
10
- from src.agents.chatbot import chatbot
11
- from src.rag.retriever import retriever
12
- from src.rag.base import RetrievalResult
13
- from src.query.query_executor import query_executor
14
- from src.query.base import QueryResult
15
  from src.db.redis.connection import get_redis
16
- from src.config.settings import settings
17
  from src.middlewares.logging import get_logger, log_execution
18
- from sse_starlette.sse import EventSourceResponse
19
- from langchain_core.messages import HumanMessage, AIMessage
20
- from sqlalchemy import select
21
- from pydantic import BaseModel
22
- from typing import List, Dict, Any, Optional
23
- import json
24
 
25
  _GREETINGS = frozenset(["hi", "hello", "hey", "halo", "hai", "hei"])
26
  _GOODBYES = frozenset(["bye", "goodbye", "thanks", "thank you", "terima kasih", "sampai jumpa"])
27
 
28
 
29
- def _fast_intent(message: str) -> Optional[dict]:
30
- """Bypass LLM orchestrator for obvious greetings and farewells."""
31
  lower = message.lower().strip().rstrip("!.,?")
32
  if lower in _GREETINGS:
33
- return {"intent": "greeting", "needs_search": False,
34
- "direct_response": "Hello! How can I assist you today?", "search_query": ""}
35
  if lower in _GOODBYES:
36
- return {"intent": "goodbye", "needs_search": False,
37
- "direct_response": "Goodbye! Have a great day!", "search_query": ""}
38
  return None
39
 
40
- logger = get_logger("chat_api")
41
-
42
- router = APIRouter(prefix="/api/v1", tags=["Chat"])
43
-
44
 
45
  class ChatRequest(BaseModel):
46
  user_id: str
@@ -48,66 +42,6 @@ class ChatRequest(BaseModel):
48
  message: str
49
 
50
 
51
- def _format_context(results: List[RetrievalResult]) -> str:
52
- """Format retrieval results as context string for the LLM."""
53
- lines = []
54
- for result in results:
55
- data = result.metadata.get("data", {})
56
- filename = data.get("filename", "Unknown")
57
- page = data.get("page_label")
58
- source_label = f"{filename}, p.{page}" if page else filename
59
- lines.append(f"[Source: {source_label}]\n{result.content}\n")
60
- return "\n".join(lines)
61
-
62
-
63
- def _extract_sources(results: List[RetrievalResult]) -> List[Dict[str, Any]]:
64
- """Extract deduplicated source references from retrieval results."""
65
- seen = set()
66
- sources = []
67
- for result in results:
68
- meta = result.metadata
69
- data = meta.get("data", {})
70
- if "document_id" in data:
71
- key = (data.get("document_id"), data.get("page_label"))
72
- if key not in seen:
73
- seen.add(key)
74
- sources.append({
75
- "document_id": data.get("document_id"),
76
- "filename": data.get("filename", "Unknown"),
77
- "page_label": data.get("page_label", "Unknown"),
78
- })
79
- else:
80
- key = (data.get("table_name"), data.get("column_name"))
81
- if key not in seen:
82
- seen.add(key)
83
- table_name = data.get("table_name")
84
- user_id = meta.get("user_id")
85
- sources.append({
86
- "document_id": f"{user_id}_{table_name}",
87
- "filename": data.get("table_name", "Unknown"),
88
- "page_label": data.get("column_name", "Unknown"),
89
- })
90
-
91
- logger.debug(f"Extracted sources: {sources}")
92
- return sources
93
-
94
-
95
- def _format_query_results(results: list[QueryResult]) -> str:
96
- if not results:
97
- return ""
98
- lines = []
99
- for r in results:
100
- name = r.metadata.get("client_name", r.source_id)
101
- lines.append(f"[Query result — {name}, tables: {r.table_or_file}]")
102
- lines.append(f"SQL: {r.metadata.get('sql', '')}")
103
- if r.columns and r.rows:
104
- lines.append(" | ".join(r.columns))
105
- for row in r.rows[:20]:
106
- lines.append(" | ".join(str(row.get(c, "")) for c in r.columns))
107
- lines.append(f"({r.row_count} rows total)\n")
108
- return "\n".join(lines)
109
-
110
-
111
  async def get_cached_response(redis, cache_key: str) -> Optional[str]:
112
  cached = await redis.get(cache_key)
113
  if cached:
@@ -163,13 +97,15 @@ async def chat_stream(request: ChatRequest, db: AsyncSession = Depends(get_db)):
163
  """Chat endpoint with streaming response.
164
 
165
  SSE event sequence:
166
- 1. sources — JSON array of {document_id, filename, page_label}
 
167
  2. chunk — text fragments of the answer
168
  3. done — signals end of stream
169
  """
170
  redis = await get_redis()
171
-
172
  cache_key = f"{settings.redis_prefix}chat:{request.room_id}:{request.message}"
 
 
173
  cached = await get_cached_response(redis, cache_key)
174
  if cached:
175
  logger.info("Returning cached response")
@@ -183,96 +119,43 @@ async def chat_stream(request: ChatRequest, db: AsyncSession = Depends(get_db)):
183
  return EventSourceResponse(stream_cached())
184
 
185
  try:
186
- # Step 1: Fast local intent check (skips LLM for greetings/farewells)
187
- intent_result = _fast_intent(request.message)
188
-
189
- context = ""
190
- sources: List[Dict[str, Any]] = []
191
-
192
- if intent_result is None:
193
- # Step 2: Launch retrieval and history loading in parallel, then run orchestrator.
194
- # k=5
195
- # tables — db_executor's FK expansion is one-hop and cannot bridge
196
- # 2-hop gaps (e.g. customers -> order_items -> products) on its own.
197
- retrieval_task = asyncio.create_task(
198
- retriever.retrieve(request.message, request.user_id, db, k=5)
199
- )
200
- history_task = asyncio.create_task(
201
- load_history(db, request.room_id, limit=6) # 6 msgs (3 pairs) for orchestrator
202
- )
203
- history = await history_task # fast DB query (<100ms), done before orchestrator finishes
204
- intent_result = await orchestrator.analyze_message(request.message, history)
205
-
206
- search_query = intent_result.get("search_query", request.message) or request.message
207
- if not intent_result.get("needs_search"):
208
- retrieval_task.cancel()
209
- try:
210
- await retrieval_task
211
- except asyncio.CancelledError:
212
- pass
213
- raw_results = []
214
- else:
215
- logger.info(f"Searching for: {search_query}")
216
- if search_query != request.message:
217
- retrieval_task.cancel()
218
- try:
219
- await retrieval_task
220
- except asyncio.CancelledError:
221
- pass
222
- raw_results = await retriever.retrieve(
223
- query=search_query,
224
- user_id=request.user_id,
225
- db=db,
226
- k=5,
227
- source_hint=intent_result.get("source_hint", "both"),
228
- )
229
- else:
230
- raw_results = await retrieval_task
231
-
232
- context = _format_context(raw_results)
233
- sources = _extract_sources(raw_results)
234
-
235
- source_hint = intent_result.get("source_hint", "both")
236
- if source_hint in ("schema", "both"):
237
- # Use search_query (orchestrator's standalone rewrite) so follow-up
238
- # messages like "dive deeper" or "show me last year" resolve correctly.
239
- # For first-turn questions search_query == request.message, so no change.
240
- query_results = await query_executor.execute(
241
- results=raw_results,
242
- user_id=request.user_id,
243
- db=db,
244
- question=search_query,
245
- )
246
- query_context = _format_query_results(query_results)
247
- if query_context:
248
- context = query_context + "\n\n" + context
249
-
250
- # Step 3: Direct response for greetings / non-document intents
251
- if intent_result.get("direct_response"):
252
- response = intent_result["direct_response"]
253
- await cache_response(redis, cache_key, response)
254
- await save_messages(db, request.room_id, request.message, response, sources=[])
255
 
256
  async def stream_direct():
257
  yield {"event": "sources", "data": json.dumps([])}
258
- yield {"event": "message", "data": response}
 
259
 
260
  return EventSourceResponse(stream_direct())
261
 
262
- # Step 4: Stream answer token-by-token as LLM generates it
263
- # Load full history (10 msgs) for chatbot — richer context than the 6 used by orchestrator
264
- full_history = await load_history(db, request.room_id, limit=10)
265
- messages = full_history + [HumanMessage(content=request.message)]
266
 
267
  async def stream_response():
268
  full_response = ""
269
- yield {"event": "sources", "data": json.dumps(sources)}
270
- async for token in chatbot.astream_response(messages, context):
271
- full_response += token
272
- yield {"event": "chunk", "data": token}
273
- yield {"event": "done", "data": ""}
274
- await cache_response(redis, cache_key, full_response)
275
- await save_messages(db, request.room_id, request.message, full_response, sources=sources)
 
 
 
 
 
 
 
 
 
 
 
 
276
 
277
  return EventSourceResponse(stream_response())
278
 
 
1
  """Chat endpoint with streaming support."""
2
 
 
3
  import uuid
4
+ import json
5
+ from typing import List, Dict, Any, Optional
6
+
7
  from fastapi import APIRouter, Depends, HTTPException
8
+ from langchain_core.messages import HumanMessage, AIMessage
9
+ from pydantic import BaseModel
10
+ from sqlalchemy import select
11
  from sqlalchemy.ext.asyncio import AsyncSession
12
+ from sse_starlette.sse import EventSourceResponse
13
+
14
+ from src.agents.chat_handler import ChatHandler
15
+ from src.config.settings import settings
16
  from src.db.postgres.connection import get_db
17
  from src.db.postgres.models import ChatMessage, MessageSource
 
 
 
 
 
 
18
  from src.db.redis.connection import get_redis
 
19
  from src.middlewares.logging import get_logger, log_execution
20
+
21
+ logger = get_logger("chat_api")
22
+
23
+ router = APIRouter(prefix="/api/v1", tags=["Chat"])
 
 
24
 
25
  _GREETINGS = frozenset(["hi", "hello", "hey", "halo", "hai", "hei"])
26
  _GOODBYES = frozenset(["bye", "goodbye", "thanks", "thank you", "terima kasih", "sampai jumpa"])
27
 
28
 
29
+ def _fast_intent(message: str) -> Optional[str]:
30
+ """Return a direct response for obvious greetings/farewells, else None."""
31
  lower = message.lower().strip().rstrip("!.,?")
32
  if lower in _GREETINGS:
33
+ return "Hello! How can I assist you today?"
 
34
  if lower in _GOODBYES:
35
+ return "Goodbye! Have a great day!"
 
36
  return None
37
 
 
 
 
 
38
 
39
  class ChatRequest(BaseModel):
40
  user_id: str
 
42
  message: str
43
 
44
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
45
  async def get_cached_response(redis, cache_key: str) -> Optional[str]:
46
  cached = await redis.get(cache_key)
47
  if cached:
 
97
  """Chat endpoint with streaming response.
98
 
99
  SSE event sequence:
100
+ 1. sources — JSON array of source refs from ChatHandler (table for
101
+ structured; deduped document_id/page_label for unstructured)
102
  2. chunk — text fragments of the answer
103
  3. done — signals end of stream
104
  """
105
  redis = await get_redis()
 
106
  cache_key = f"{settings.redis_prefix}chat:{request.room_id}:{request.message}"
107
+
108
+ # Redis cache hit
109
  cached = await get_cached_response(redis, cache_key)
110
  if cached:
111
  logger.info("Returning cached response")
 
119
  return EventSourceResponse(stream_cached())
120
 
121
  try:
122
+ # Fast intent: greetings/farewells bypass LLM entirely
123
+ direct = _fast_intent(request.message)
124
+ if direct:
125
+ await cache_response(redis, cache_key, direct)
126
+ await save_messages(db, request.room_id, request.message, direct, sources=[])
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
127
 
128
  async def stream_direct():
129
  yield {"event": "sources", "data": json.dumps([])}
130
+ yield {"event": "chunk", "data": direct}
131
+ yield {"event": "done", "data": ""}
132
 
133
  return EventSourceResponse(stream_direct())
134
 
135
+ history = await load_history(db, request.room_id, limit=10)
136
+ handler = ChatHandler()
 
 
137
 
138
  async def stream_response():
139
  full_response = ""
140
+ sources: List[Dict[str, Any]] = []
141
+ async for event in handler.handle(request.message, request.user_id, history):
142
+ if event["event"] == "sources":
143
+ try:
144
+ sources = json.loads(event["data"]) or []
145
+ except (TypeError, ValueError):
146
+ sources = []
147
+ yield event
148
+ elif event["event"] == "chunk":
149
+ full_response += event["data"]
150
+ yield event
151
+ elif event["event"] == "done":
152
+ await cache_response(redis, cache_key, full_response)
153
+ await save_messages(db, request.room_id, request.message, full_response, sources=sources)
154
+ yield event
155
+ elif event["event"] == "error":
156
+ yield event
157
+ return
158
+ # "intent" event: consumed internally, not forwarded to frontend
159
 
160
  return EventSourceResponse(stream_response())
161
 
src/api/v1/data_catalog.py ADDED
@@ -0,0 +1,100 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """API endpoints for the per-user data catalog index.
2
+
3
+ The index is a lightweight summary of every structured source registered
4
+ by a user (DB connections and tabular files). It is intended to be
5
+ consumed by the catalog refresher and by frontend listings — full
6
+ catalog payloads (tables + columns + samples + stats) are not exposed
7
+ here on purpose.
8
+ """
9
+
10
+ from typing import List
11
+
12
+ from fastapi import APIRouter, HTTPException, Query, status
13
+
14
+ from src.catalog.store import CatalogStore
15
+ from src.middlewares.logging import get_logger, log_execution
16
+ from src.models.api.catalog import CatalogIndexEntry
17
+ from src.pipeline.triggers import on_catalog_rebuild_requested
18
+
19
+ logger = get_logger("data_catalog_api")
20
+
21
+ router = APIRouter(prefix="/api/v1", tags=["Data Catalog"])
22
+
23
+
24
+ @router.get(
25
+ "/data-catalog/{user_id}",
26
+ response_model=List[CatalogIndexEntry],
27
+ summary="List the user's data catalog index",
28
+ response_description="One entry per registered structured source.",
29
+ responses={
30
+ 200: {"description": "Returns an empty list if the user has no registered sources."},
31
+ 500: {"description": "Internal server error while reading the catalog."},
32
+ },
33
+ )
34
+ @log_execution(logger)
35
+ async def list_data_catalog_index(user_id: str):
36
+ """
37
+ Return a lightweight index of every structured source registered by the user.
38
+
39
+ One entry per source (DB connection or tabular file), including the
40
+ `source_id`, `source_type`, display `name`, `location_ref`, current
41
+ `table_count`, and `updated_at` timestamp.
42
+
43
+ Used by the catalog refresher to decide which sources need to be
44
+ rebuilt. Returns an empty list if the user has no catalog yet.
45
+ """
46
+ try:
47
+ catalog = await CatalogStore().get(user_id)
48
+ except Exception as e:
49
+ logger.error("Failed to read catalog index", user_id=user_id, error=str(e))
50
+ raise HTTPException(
51
+ status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
52
+ detail=f"Failed to read catalog index: {e}",
53
+ )
54
+
55
+ if catalog is None:
56
+ return []
57
+
58
+ return [
59
+ CatalogIndexEntry(
60
+ source_id=s.source_id,
61
+ source_type=s.source_type,
62
+ name=s.name,
63
+ location_ref=s.location_ref,
64
+ table_count=len(s.tables),
65
+ updated_at=s.updated_at,
66
+ )
67
+ for s in catalog.sources
68
+ ]
69
+
70
+
71
+ @router.post(
72
+ "/data-catalog/rebuild",
73
+ status_code=status.HTTP_200_OK,
74
+ summary="Rebuild the catalog for a user",
75
+ response_description="Confirmation that the rebuild was triggered.",
76
+ responses={
77
+ 200: {"description": "Rebuild completed. Per-source errors are logged but do not fail this request."},
78
+ 500: {"description": "Unexpected error before the rebuild loop started."},
79
+ },
80
+ )
81
+ @log_execution(logger)
82
+ async def rebuild_data_catalog(
83
+ user_id: str = Query(..., description="ID of the user whose catalog should be rebuilt."),
84
+ ):
85
+ """
86
+ Re-introspect every source in the user's catalog and upsert the results.
87
+
88
+ Each source (DB connection or tabular file) is processed independently.
89
+ A failure on one source is logged but does not abort the remaining sources.
90
+ If the user has no catalog yet, returns success with no-op.
91
+ """
92
+ try:
93
+ await on_catalog_rebuild_requested(user_id)
94
+ except Exception as e:
95
+ logger.error("catalog rebuild failed", user_id=user_id, error=str(e))
96
+ raise HTTPException(
97
+ status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
98
+ detail=f"Catalog rebuild failed: {e}",
99
+ )
100
+ return {"status": "success", "user_id": user_id}
src/api/v1/db_client.py CHANGED
@@ -27,8 +27,7 @@ from src.models.credentials import ( # noqa: F401 — re-exported for Swagger s
27
  SqlServerCredentials,
28
  SupabaseCredentials,
29
  )
30
- from src.pipeline.db_pipeline import db_pipeline_service
31
- from src.utils.db_credential_encryption import decrypt_credentials_dict
32
 
33
  logger = get_logger("database_client_api")
34
 
@@ -407,20 +406,22 @@ async def delete_database_client(
407
  raise HTTPException(status_code=403, detail="Access denied")
408
 
409
  await database_client_service.delete(db, client_id)
 
 
410
  return {"status": "success", "message": "Database client deleted successfully"}
411
 
412
 
413
  @router.post(
414
  "/database-clients/{client_id}/ingest",
415
  status_code=status.HTTP_200_OK,
416
- summary="Ingest schema from a registered database into the vector store",
417
- response_description="Count of chunks ingested.",
418
  responses={
419
- 200: {"description": "Ingestion completed successfully."},
420
  403: {"description": "Access denied — user_id does not own this connection."},
421
  404: {"description": "Connection not found."},
422
- 501: {"description": "The connection's db_type is not yet supported by the pipeline."},
423
- 500: {"description": "Ingestion failed (connection error, profiling error, etc.)."},
424
  },
425
  )
426
  @limiter.limit("5/minute")
@@ -432,11 +433,9 @@ async def ingest_database_client(
432
  db: AsyncSession = Depends(get_db),
433
  ):
434
  """
435
- Decrypt the stored credentials, connect to the user's database, introspect
436
- its schema, profile each column, embed the descriptions, and store them in
437
- the shared PGVector collection tagged with `source_type="database"`.
438
-
439
- Chunks become retrievable via the same retriever used for document chunks.
440
  """
441
  client = await database_client_service.get(db, client_id)
442
 
@@ -453,21 +452,12 @@ async def ingest_database_client(
453
  )
454
 
455
  try:
456
- creds = decrypt_credentials_dict(client.credentials)
457
- with db_pipeline_service.engine_scope(
458
- db_type=client.db_type,
459
- credentials=creds,
460
- ) as engine:
461
- total = await db_pipeline_service.run(user_id=user_id, client_id=client_id, engine=engine)
462
- except NotImplementedError as e:
463
- raise HTTPException(status_code=status.HTTP_501_NOT_IMPLEMENTED, detail=str(e))
464
  except Exception as e:
465
- logger.error(
466
- f"Ingestion failed for client {client_id}", user_id=user_id, error=str(e)
467
- )
468
  raise HTTPException(
469
  status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
470
- detail=f"Ingestion failed: {e}",
471
  )
472
 
473
- return {"status": "success", "client_id": client_id, "chunks_ingested": total}
 
27
  SqlServerCredentials,
28
  SupabaseCredentials,
29
  )
30
+ from src.pipeline.triggers import on_db_registered
 
31
 
32
  logger = get_logger("database_client_api")
33
 
 
406
  raise HTTPException(status_code=403, detail="Access denied")
407
 
408
  await database_client_service.delete(db, client_id)
409
+ from src.pipeline.triggers import on_db_deleted
410
+ await on_db_deleted(client_id, user_id)
411
  return {"status": "success", "message": "Database client deleted successfully"}
412
 
413
 
414
  @router.post(
415
  "/database-clients/{client_id}/ingest",
416
  status_code=status.HTTP_200_OK,
417
+ summary="Build the catalog for a registered database connection",
418
+ response_description="Confirmation that the catalog was built.",
419
  responses={
420
+ 200: {"description": "Catalog built successfully."},
421
  403: {"description": "Access denied — user_id does not own this connection."},
422
  404: {"description": "Connection not found."},
423
+ 409: {"description": "Connection is inactive."},
424
+ 500: {"description": "Catalog build failed."},
425
  },
426
  )
427
  @limiter.limit("5/minute")
 
433
  db: AsyncSession = Depends(get_db),
434
  ):
435
  """
436
+ Introspect the registered database and build (or rebuild) the catalog entry
437
+ for this connection. The catalog is stored in `data_catalog` and used by
438
+ the query pipeline to plan structured queries.
 
 
439
  """
440
  client = await database_client_service.get(db, client_id)
441
 
 
452
  )
453
 
454
  try:
455
+ await on_db_registered(client_id, user_id)
 
 
 
 
 
 
 
456
  except Exception as e:
457
+ logger.error("catalog build failed", client_id=client_id, error=str(e))
 
 
458
  raise HTTPException(
459
  status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
460
+ detail=f"Catalog build failed: {e}",
461
  )
462
 
463
+ return {"status": "success", "client_id": client_id}
src/api/v1/document.py CHANGED
@@ -6,7 +6,7 @@ from src.db.postgres.connection import get_db
6
  from src.document.document_service import document_service
7
  from src.middlewares.logging import get_logger, log_execution
8
  from src.middlewares.rate_limit import limiter
9
- from src.pipeline.document_pipeline.document_pipeline import document_pipeline
10
  from pydantic import BaseModel
11
  from typing import List
12
 
@@ -24,7 +24,7 @@ class DocumentResponse(BaseModel):
24
  created_at: str
25
 
26
 
27
- # NOTE: Keep in sync with SUPPORTED_FILE_TYPES in src/pipeline/document_pipeline/document_pipeline.py
28
  _DOC_TYPES = [
29
  {"doc_type": "pdf", "max_size": 10, "status": "active", "message": None},
30
  {"doc_type": "docx", "max_size": 10, "status": "active", "message": None},
@@ -92,6 +92,8 @@ async def delete_document(
92
  ):
93
  """Delete a document."""
94
  await document_pipeline.delete(document_id, user_id, db)
 
 
95
  return {"status": "success", "message": "Document deleted successfully"}
96
 
97
 
@@ -104,5 +106,13 @@ async def process_document(
104
  ):
105
  """Process document and ingest to vector index."""
106
  data = await document_pipeline.process(document_id, user_id, db)
 
 
 
 
 
 
 
 
107
  return {"status": "success", "message": "Document processed successfully", "data": data}
108
 
 
6
  from src.document.document_service import document_service
7
  from src.middlewares.logging import get_logger, log_execution
8
  from src.middlewares.rate_limit import limiter
9
+ from src.pipeline.document_pipeline import document_pipeline
10
  from pydantic import BaseModel
11
  from typing import List
12
 
 
24
  created_at: str
25
 
26
 
27
+ # NOTE: Keep in sync with SUPPORTED_FILE_TYPES in src/pipeline/document_pipeline.py
28
  _DOC_TYPES = [
29
  {"doc_type": "pdf", "max_size": 10, "status": "active", "message": None},
30
  {"doc_type": "docx", "max_size": 10, "status": "active", "message": None},
 
92
  ):
93
  """Delete a document."""
94
  await document_pipeline.delete(document_id, user_id, db)
95
+ from src.pipeline.triggers import on_tabular_deleted
96
+ await on_tabular_deleted(document_id, user_id)
97
  return {"status": "success", "message": "Document deleted successfully"}
98
 
99
 
 
106
  ):
107
  """Process document and ingest to vector index."""
108
  data = await document_pipeline.process(document_id, user_id, db)
109
+
110
+ if data["file_type"] in ("csv", "xlsx"):
111
+ from src.pipeline.triggers import on_tabular_uploaded
112
+ try:
113
+ await on_tabular_uploaded(document_id, user_id)
114
+ except Exception as e:
115
+ logger.error("catalog ingestion failed after process", document_id=document_id, error=str(e))
116
+
117
  return {"status": "success", "message": "Document processed successfully", "data": data}
118
 
src/api/v1/knowledge.py DELETED
@@ -1,25 +0,0 @@
1
- """Knowledge base management API endpoints."""
2
-
3
- from fastapi import APIRouter, Depends
4
- from sqlalchemy.ext.asyncio import AsyncSession
5
- from src.db.postgres.connection import get_db
6
- from src.middlewares.logging import get_logger, log_execution
7
-
8
- logger = get_logger("knowledge_api")
9
-
10
- router = APIRouter(prefix="/api/v1", tags=["Knowledge"])
11
-
12
-
13
- @router.post("/knowledge/rebuild")
14
- @log_execution(logger)
15
- async def rebuild_vector_index(
16
- user_id: str,
17
- db: AsyncSession = Depends(get_db)
18
- ):
19
- """Rebuild vector index for a user (admin endpoint)."""
20
- # This would re-process all documents
21
- # For POC, we'll skip this complexity
22
- return {
23
- "status": "success",
24
- "message": "Vector index rebuild initiated"
25
- }
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
src/catalog/README.md ADDED
@@ -0,0 +1,6 @@
 
 
 
 
 
 
 
1
+ # catalog
2
+
3
+ Per-user data catalog: identity layer for structured sources (DB schemas + tabular files).
4
+ Holds AI-enriched table/column descriptions, consumed by `query/planner` to generate JSON IR.
5
+
6
+ See `ARCHITECTURE.md` (root) for the full design.
src/catalog/__init__.py ADDED
@@ -0,0 +1 @@
 
 
1
+ """Catalog domain — per-user data catalog (Cs + Ct)."""
src/catalog/introspect/__init__.py ADDED
@@ -0,0 +1 @@
 
 
1
+ """Source-specific schema introspection (databases, tabular files)."""
src/catalog/introspect/base.py ADDED
@@ -0,0 +1,18 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """BaseIntrospector — contract for source-specific schema readers.
2
+
3
+ Subclasses produce a Source object with raw schema (names, types, sample
4
+ values, stats). The planner consumes this directly — descriptions are not
5
+ LLM-generated.
6
+ """
7
+
8
+ from abc import ABC, abstractmethod
9
+
10
+ from ..models import Source
11
+
12
+
13
+ class BaseIntrospector(ABC):
14
+ """Abstract base. Subclasses: DatabaseIntrospector, TabularIntrospector."""
15
+
16
+ @abstractmethod
17
+ async def introspect(self, location_ref: str) -> Source:
18
+ ...
src/catalog/introspect/database.py ADDED
@@ -0,0 +1,246 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Database schema introspection (Postgres / MySQL / Supabase).
2
+
3
+ Reads information_schema for tables/columns/types, samples ~100 rows per table
4
+ for `sample_values` and basic stats. Description fields are left empty —
5
+ the planner relies on names + samples + stats directly.
6
+
7
+ Reuses Phase 1 utilities (`database_client_service`, `db_credential_encryption`,
8
+ `db_pipeline_service.engine_scope`, `extractor.get_schema/profile_column/get_row_count`)
9
+ to avoid reimplementation. The cleanup PR will move those into `security/` and
10
+ `pipeline/db_pipeline/` respectively.
11
+ """
12
+
13
+ import asyncio
14
+ import hashlib
15
+ from datetime import UTC, datetime
16
+ from decimal import Decimal
17
+ from typing import Any
18
+
19
+ from src.database_client.database_client_service import database_client_service
20
+ from src.db.postgres.connection import AsyncSessionLocal
21
+ from src.middlewares.logging import get_logger
22
+ from src.pipeline.db_pipeline import db_pipeline_service
23
+ from src.pipeline.db_pipeline.extractor import (
24
+ get_row_count,
25
+ get_schema,
26
+ profile_column,
27
+ )
28
+ from src.utils.db_credential_encryption import decrypt_credentials_dict
29
+
30
+ from ..models import Column, ColumnStats, DataType, ForeignKey, Source, Table
31
+ from ..pii_detector import PIIDetector
32
+ from .base import BaseIntrospector
33
+
34
+ logger = get_logger("db_introspector")
35
+
36
+ _DBCLIENT_PREFIX = "dbclient://"
37
+
38
+
39
+ def _stable_id(prefix: str, *parts: str) -> str:
40
+ """Deterministic short ID from joined parts. Survives renames at the
41
+ `name` field while preserving identity for cached IRs.
42
+
43
+ Hash is non-cryptographic (identifier only).
44
+ """
45
+ h = hashlib.sha1(
46
+ "/".join(parts).encode("utf-8"), usedforsecurity=False
47
+ ).hexdigest()[:12]
48
+ return f"{prefix}{h}"
49
+
50
+
51
+ def _map_sql_type(sql_type: str) -> DataType:
52
+ """Map a stringified SQLAlchemy type to a Catalog DataType.
53
+
54
+ Matches on substring of the SQLAlchemy type repr (e.g. 'INTEGER',
55
+ 'TIMESTAMP', 'BOOLEAN'). Conservative — unknowns fall back to "string"
56
+ so the column is at least addressable.
57
+ """
58
+ s = sql_type.upper()
59
+ if "INT" in s:
60
+ return "int"
61
+ if "FLOAT" in s or "NUMERIC" in s or "DECIMAL" in s or "REAL" in s or "DOUBLE" in s:
62
+ return "decimal"
63
+ if "BOOL" in s:
64
+ return "bool"
65
+ if "TIMESTAMP" in s or "DATETIME" in s:
66
+ return "datetime"
67
+ if "DATE" in s:
68
+ return "date"
69
+ if "JSON" in s:
70
+ return "json"
71
+ return "string"
72
+
73
+
74
+ def _normalize(v: Any) -> Any:
75
+ """Coerce non-JSON-native scalars (Decimal, numpy, datetime) to types
76
+ that survive the jsonb round-trip when the catalog is persisted.
77
+ """
78
+ if v is None:
79
+ return None
80
+ if isinstance(v, Decimal):
81
+ return float(v)
82
+ try:
83
+ import numpy as np
84
+
85
+ if isinstance(v, np.generic):
86
+ return v.item()
87
+ except ImportError:
88
+ pass
89
+ if isinstance(v, datetime):
90
+ return v.isoformat()
91
+ return v
92
+
93
+
94
+ class DatabaseIntrospector(BaseIntrospector):
95
+ """Connect to user DB → read information_schema → sample 100 rows/table."""
96
+
97
+ def __init__(self) -> None:
98
+ self._pii = PIIDetector()
99
+
100
+ async def introspect(self, location_ref: str) -> Source:
101
+ if not location_ref.startswith(_DBCLIENT_PREFIX):
102
+ raise ValueError(
103
+ f"DatabaseIntrospector expects 'dbclient://...' location_ref, "
104
+ f"got {location_ref!r}"
105
+ )
106
+ client_id = location_ref[len(_DBCLIENT_PREFIX):]
107
+ if not client_id:
108
+ raise ValueError("location_ref is missing client_id after 'dbclient://'")
109
+
110
+ async with AsyncSessionLocal() as session:
111
+ client = await database_client_service.get(session, client_id)
112
+ if client is None:
113
+ raise ValueError(f"DatabaseClient {client_id!r} not found")
114
+
115
+ creds = decrypt_credentials_dict(client.credentials)
116
+ logger.info(
117
+ "introspecting db source",
118
+ client_id=client_id,
119
+ db_type=client.db_type,
120
+ name=client.name,
121
+ )
122
+
123
+ # SQLAlchemy inspect() + pandas read_sql are synchronous — run in a
124
+ # threadpool so the event loop stays free.
125
+ tables: list[Table] = await asyncio.to_thread(
126
+ self._introspect_sync, client.db_type, creds
127
+ )
128
+
129
+ return Source(
130
+ source_id=client_id,
131
+ source_type="schema",
132
+ name=client.name,
133
+ location_ref=location_ref,
134
+ updated_at=datetime.now(UTC),
135
+ tables=tables,
136
+ )
137
+
138
+ def _introspect_sync(self, db_type: str, creds: dict) -> list[Table]:
139
+ with db_pipeline_service.engine_scope(db_type, creds) as engine:
140
+ schema = get_schema(engine)
141
+ tables: list[Table] = []
142
+ for table_name, cols in schema.items():
143
+ try:
144
+ row_count = get_row_count(engine, table_name)
145
+ except Exception as e:
146
+ logger.error(
147
+ "row_count failed; skipping table",
148
+ table=table_name,
149
+ error=str(e),
150
+ )
151
+ continue
152
+
153
+ columns: list[Column] = []
154
+ for col in cols:
155
+ try:
156
+ profile = profile_column(
157
+ engine,
158
+ table_name,
159
+ col["name"],
160
+ col.get("is_numeric", False),
161
+ row_count,
162
+ is_temporal=col.get("is_temporal", False),
163
+ )
164
+ except Exception as e:
165
+ logger.error(
166
+ "profile_column failed; skipping column",
167
+ table=table_name,
168
+ column=col["name"],
169
+ error=str(e),
170
+ )
171
+ continue
172
+ columns.append(self._to_column(table_name, col, profile))
173
+
174
+ foreign_keys = self._extract_foreign_keys(table_name, cols)
175
+
176
+ tables.append(
177
+ Table(
178
+ table_id=_stable_id("t_", table_name),
179
+ name=table_name,
180
+ row_count=row_count,
181
+ columns=columns,
182
+ foreign_keys=foreign_keys,
183
+ )
184
+ )
185
+ return tables
186
+
187
+ @staticmethod
188
+ def _extract_foreign_keys(
189
+ table_name: str, cols: list[dict[str, Any]]
190
+ ) -> list[ForeignKey]:
191
+ """Convert extractor's `foreign_key: 'target_table.target_col'` strings
192
+ into ForeignKey objects with stable IDs (derived deterministically from
193
+ names — same scheme used to generate table_id / column_id elsewhere).
194
+ """
195
+ fks: list[ForeignKey] = []
196
+ for col in cols:
197
+ fk_str = col.get("foreign_key")
198
+ if not fk_str:
199
+ continue
200
+ target_table, _, target_col = fk_str.partition(".")
201
+ if not target_table or not target_col:
202
+ continue
203
+ fks.append(
204
+ ForeignKey(
205
+ column_id=_stable_id("c_", table_name, col["name"]),
206
+ target_table_id=_stable_id("t_", target_table),
207
+ target_column_id=_stable_id("c_", target_table, target_col),
208
+ )
209
+ )
210
+ return fks
211
+
212
+ def _to_column(
213
+ self, table_name: str, col: dict[str, Any], profile: dict[str, Any]
214
+ ) -> Column:
215
+ name = col["name"]
216
+ sample_values: list[Any] | None = [
217
+ _normalize(v) for v in (profile.get("sample_values") or [])
218
+ ] or None
219
+
220
+ top_raw = profile.get("top_values") or []
221
+ top_values: list[Any] | None = [
222
+ _normalize(v) for v, _cnt in top_raw
223
+ ] or None
224
+
225
+ column = Column(
226
+ column_id=_stable_id("c_", table_name, name),
227
+ name=name,
228
+ data_type=_map_sql_type(str(col["type"])),
229
+ nullable=True, # nullable not surfaced by extractor; default permissive
230
+ pii_flag=False,
231
+ sample_values=sample_values,
232
+ stats=ColumnStats(
233
+ min=_normalize(profile.get("min")),
234
+ max=_normalize(profile.get("max")),
235
+ mean=_normalize(profile.get("mean")),
236
+ median=_normalize(profile.get("median")),
237
+ distinct_count=profile.get("distinct_count"),
238
+ top_values=top_values,
239
+ ),
240
+ )
241
+ if self._pii.detect(column):
242
+ return column.model_copy(update={"pii_flag": True, "sample_values": None})
243
+ return column
244
+
245
+
246
+ database_introspector = DatabaseIntrospector()
src/catalog/introspect/tabular.py ADDED
@@ -0,0 +1,239 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Tabular file schema introspection (Parquet / CSV / XLSX).
2
+
3
+ Reads file headers + samples ~100 rows. For XLSX, each sheet becomes a Table.
4
+ Files are expected to live in Azure Blob (location_ref like az_blob://{user_id}/{document_id}).
5
+
6
+ Table.name convention (executor contract)
7
+ -----------------------------------------
8
+ CSV / Parquet → Table.name = filename stem (e.g. "sales_data").
9
+ Parquet blob was uploaded without a sheet suffix, so the
10
+ executor must call parquet_blob_name(uid, did, sheet_name=None).
11
+ XLSX → Table.name = sheet_name (e.g. "Sheet1").
12
+ Executor calls parquet_blob_name(uid, did, table.name).
13
+ """
14
+
15
+ import asyncio
16
+ import hashlib
17
+ from collections.abc import Callable, Coroutine
18
+ from datetime import UTC, datetime
19
+ from io import BytesIO
20
+ from pathlib import Path
21
+ from typing import Any
22
+
23
+ import pandas as pd
24
+
25
+ from src.middlewares.logging import get_logger
26
+
27
+ from ..models import Column, ColumnStats, DataType, Source, Table
28
+ from ..pii_detector import PIIDetector
29
+ from .base import BaseIntrospector
30
+
31
+ logger = get_logger("tabular_introspector")
32
+
33
+ _AZ_BLOB_PREFIX = "az_blob://"
34
+
35
+
36
+ def _stable_id(prefix: str, *parts: str) -> str:
37
+ h = hashlib.sha1(
38
+ "/".join(parts).encode("utf-8"), usedforsecurity=False
39
+ ).hexdigest()[:12]
40
+ return f"{prefix}{h}"
41
+
42
+
43
+ def _map_pandas_type(dtype: Any) -> DataType:
44
+ s = str(dtype).lower()
45
+ if "int" in s:
46
+ return "int"
47
+ if "float" in s or "decimal" in s:
48
+ return "decimal"
49
+ if "bool" in s:
50
+ return "bool"
51
+ if "datetime" in s:
52
+ return "datetime"
53
+ if "date" in s:
54
+ return "date"
55
+ return "string"
56
+
57
+
58
+ def _normalize(v: Any) -> Any:
59
+ """Coerce non-JSON-native scalars to types that survive the jsonb round-trip."""
60
+ if v is None:
61
+ return None
62
+ try:
63
+ import numpy as np
64
+
65
+ if isinstance(v, np.generic):
66
+ return v.item()
67
+ except ImportError:
68
+ pass
69
+ if isinstance(v, datetime):
70
+ return v.isoformat()
71
+ return v
72
+
73
+
74
+ class TabularIntrospector(BaseIntrospector):
75
+ """Read column names, dtypes, and sample values from Parquet/CSV/XLSX.
76
+
77
+ Heavy I/O dependencies (`fetch_doc`, `fetch_blob`) are injectable so unit
78
+ tests can pass mocks without triggering Settings or DB construction.
79
+ """
80
+
81
+ def __init__(
82
+ self,
83
+ fetch_doc: Callable[[str], Coroutine[Any, Any, Any]] | None = None,
84
+ fetch_blob: Callable[[str], Coroutine[Any, Any, bytes]] | None = None,
85
+ ) -> None:
86
+ self._pii = PIIDetector()
87
+ self._fetch_doc = fetch_doc or self._default_fetch_doc
88
+ self._fetch_blob = fetch_blob or self._default_fetch_blob
89
+
90
+ @staticmethod
91
+ async def _default_fetch_doc(document_id: str) -> Any:
92
+ from sqlalchemy import select
93
+
94
+ from src.db.postgres.connection import AsyncSessionLocal
95
+ from src.db.postgres.models import Document as DBDocument
96
+
97
+ async with AsyncSessionLocal() as session:
98
+ result = await session.execute(
99
+ select(DBDocument).where(DBDocument.id == document_id)
100
+ )
101
+ return result.scalar_one_or_none()
102
+
103
+ @staticmethod
104
+ async def _default_fetch_blob(blob_name: str) -> bytes:
105
+ from src.storage.az_blob.az_blob import blob_storage
106
+
107
+ return await blob_storage.download_file(blob_name)
108
+
109
+ async def introspect(self, location_ref: str) -> Source:
110
+ if not location_ref.startswith(_AZ_BLOB_PREFIX):
111
+ raise ValueError(
112
+ f"TabularIntrospector expects 'az_blob://...' location_ref, "
113
+ f"got {location_ref!r}"
114
+ )
115
+ rest = location_ref[len(_AZ_BLOB_PREFIX):]
116
+ user_id, _, document_id = rest.partition("/")
117
+ if not user_id or not document_id:
118
+ raise ValueError(
119
+ f"location_ref must be 'az_blob://{{user_id}}/{{document_id}}', "
120
+ f"got {location_ref!r}"
121
+ )
122
+
123
+ doc = await self._fetch_doc(document_id)
124
+ if doc is None:
125
+ raise ValueError(f"Document {document_id!r} not found")
126
+
127
+ logger.info(
128
+ "introspecting tabular source",
129
+ document_id=document_id,
130
+ file_type=doc.file_type,
131
+ filename=doc.filename,
132
+ )
133
+
134
+ content = await self._fetch_blob(doc.blob_name)
135
+
136
+ tables: list[Table] = await asyncio.to_thread(
137
+ self._introspect_sync, content, doc.file_type, doc.filename, document_id
138
+ )
139
+
140
+ return Source(
141
+ source_id=document_id,
142
+ source_type="tabular",
143
+ name=doc.filename,
144
+ location_ref=location_ref,
145
+ updated_at=datetime.now(UTC),
146
+ tables=tables,
147
+ )
148
+
149
+ def _introspect_sync(
150
+ self,
151
+ content: bytes,
152
+ file_type: str,
153
+ filename: str,
154
+ document_id: str,
155
+ ) -> list[Table]:
156
+ if file_type == "csv":
157
+ df = pd.read_csv(BytesIO(content))
158
+ return [self._build_table(df, document_id, Path(filename).stem, sheet_name=None)]
159
+ if file_type == "xlsx":
160
+ sheets: dict[str, pd.DataFrame] = pd.read_excel(BytesIO(content), sheet_name=None)
161
+ return [
162
+ self._build_table(df, document_id, sheet_name, sheet_name=sheet_name)
163
+ for sheet_name, df in sheets.items()
164
+ ]
165
+ if file_type == "parquet":
166
+ df = pd.read_parquet(BytesIO(content))
167
+ return [self._build_table(df, document_id, Path(filename).stem, sheet_name=None)]
168
+ raise ValueError(f"Unsupported file_type {file_type!r} for tabular introspection")
169
+
170
+ def _build_table(
171
+ self,
172
+ df: pd.DataFrame,
173
+ document_id: str,
174
+ table_name: str,
175
+ sheet_name: str | None,
176
+ ) -> Table:
177
+ id_parts = (document_id, sheet_name) if sheet_name else (document_id,)
178
+ columns = [
179
+ self._to_column(df[col], document_id, sheet_name, col)
180
+ for col in df.columns
181
+ ]
182
+ return Table(
183
+ table_id=_stable_id("t_", *id_parts),
184
+ name=table_name,
185
+ row_count=len(df),
186
+ columns=columns,
187
+ foreign_keys=[],
188
+ )
189
+
190
+ def _to_column(
191
+ self,
192
+ series: pd.Series,
193
+ document_id: str,
194
+ sheet_name: str | None,
195
+ col_name: str,
196
+ ) -> Column:
197
+ id_parts = (
198
+ (document_id, sheet_name, col_name) if sheet_name else (document_id, col_name)
199
+ )
200
+
201
+ sample_raw = series.dropna().head(3).tolist()
202
+ sample_values: list[Any] | None = [_normalize(v) for v in sample_raw] or None
203
+
204
+ is_numeric = pd.api.types.is_numeric_dtype(series)
205
+ is_dt = pd.api.types.is_datetime64_any_dtype(series)
206
+ non_null = series.dropna()
207
+ distinct_count = int(series.nunique())
208
+ top_values = (
209
+ [_normalize(v) for v in non_null.unique().tolist()]
210
+ if distinct_count <= 10
211
+ else None
212
+ )
213
+ has_values = len(non_null) > 0
214
+ wants_range = (is_numeric or is_dt) and has_values
215
+ wants_mean = is_numeric and has_values
216
+ stats = ColumnStats(
217
+ min=_normalize(non_null.min()) if wants_range else None,
218
+ max=_normalize(non_null.max()) if wants_range else None,
219
+ mean=float(non_null.mean()) if wants_mean else None,
220
+ median=float(non_null.median()) if wants_mean else None,
221
+ distinct_count=distinct_count,
222
+ top_values=top_values,
223
+ )
224
+
225
+ column = Column(
226
+ column_id=_stable_id("c_", *id_parts),
227
+ name=col_name,
228
+ data_type=_map_pandas_type(series.dtype),
229
+ nullable=bool(series.isnull().any()),
230
+ pii_flag=False,
231
+ sample_values=sample_values,
232
+ stats=stats,
233
+ )
234
+ if self._pii.detect(column):
235
+ return column.model_copy(update={"pii_flag": True, "sample_values": None})
236
+ return column
237
+
238
+
239
+ tabular_introspector = TabularIntrospector()
src/catalog/models.py ADDED
@@ -0,0 +1,86 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Pydantic models for the per-user data catalog (Cs + Ct).
2
+
3
+ See ARCHITECTURE.md §6 for the full schema definition.
4
+
5
+ Source.location_ref URI scheme
6
+ ------------------------------
7
+ A `Source` is uniquely addressable by `location_ref`; introspectors and
8
+ executors parse it to find the underlying data:
9
+
10
+ schema sources → "dbclient://{database_client_id}"
11
+ Resolves via `database_client_service.get(...)` which
12
+ returns a `DatabaseClient` row whose Fernet-encrypted
13
+ credentials are decrypted at runtime.
14
+
15
+ tabular sources → "az_blob://{user_id}/{document_id}"
16
+ The Source aggregates one or more sheets as Tables;
17
+ each per-sheet Parquet blob is named via
18
+ `parquet_service.parquet_blob_name(user_id, document_id, sheet_name)`,
19
+ so executors derive the per-Table blob path from
20
+ `Source.location_ref` plus `Table.name`.
21
+
22
+ unstructured → reserved (deferred — see ARCHITECTURE.md §10 q2).
23
+ """
24
+
25
+ from datetime import datetime
26
+ from typing import Any, Literal
27
+
28
+ from pydantic import BaseModel, Field
29
+
30
+ SourceType = Literal["schema", "tabular", "unstructured"]
31
+ DataType = Literal["int", "decimal", "string", "datetime", "date", "bool", "json"]
32
+
33
+
34
+ class ColumnStats(BaseModel):
35
+ min: Any | None = None
36
+ max: Any | None = None
37
+ mean: float | None = None
38
+ median: float | None = None
39
+ distinct_count: int | None = None
40
+ top_values: list[Any] | None = None
41
+
42
+
43
+ class Column(BaseModel):
44
+ column_id: str
45
+ name: str
46
+ data_type: DataType
47
+ nullable: bool
48
+ pii_flag: bool = False
49
+ sample_values: list[Any] | None = None
50
+ stats: ColumnStats | None = None
51
+
52
+
53
+ class ForeignKey(BaseModel):
54
+ """A FK edge from one column in this table to a column in another table.
55
+
56
+ All references use stable IDs derived from source/table/column names so
57
+ edges survive renames at the `name` level. The target table must belong
58
+ to the SAME `Source` — cross-source FKs are not modeled in v1.
59
+ """
60
+ column_id: str # the column in this table that holds the FK
61
+ target_table_id: str # referenced table_id, within the same Source
62
+ target_column_id: str # referenced column_id
63
+
64
+
65
+ class Table(BaseModel):
66
+ table_id: str
67
+ name: str
68
+ row_count: int | None = None
69
+ columns: list[Column]
70
+ foreign_keys: list[ForeignKey] = Field(default_factory=list)
71
+
72
+
73
+ class Source(BaseModel):
74
+ source_id: str
75
+ source_type: SourceType
76
+ name: str
77
+ location_ref: str
78
+ updated_at: datetime
79
+ tables: list[Table] = Field(default_factory=list)
80
+
81
+
82
+ class Catalog(BaseModel):
83
+ user_id: str
84
+ schema_version: str = "1.0"
85
+ generated_at: datetime
86
+ sources: list[Source] = Field(default_factory=list)
src/catalog/pii_detector.py ADDED
@@ -0,0 +1,39 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """PII auto-detection for catalog columns.
2
+
3
+ When pii_flag is set True, sample_values is forced to None so real PII
4
+ never enters LLM prompts. Patterns live in src/security/pii_patterns.py.
5
+ """
6
+
7
+ from src.security.pii_patterns import EMAIL_REGEX, PHONE_REGEX, PII_NAME_PATTERNS
8
+
9
+ from .models import Column
10
+
11
+
12
+ class PIIDetector:
13
+ """Marks columns as pii_flag=True when name or sampled values look sensitive.
14
+
15
+ Bias is intentional: false positives hide harmless sample values,
16
+ false negatives leak data. When unsure, flag.
17
+ """
18
+
19
+ def detect(self, column: Column) -> bool:
20
+ if self._name_matches(column.name):
21
+ return True
22
+ if column.sample_values and self._values_match(column.sample_values):
23
+ return True
24
+ return False
25
+
26
+ @staticmethod
27
+ def _name_matches(name: str) -> bool:
28
+ lowered = name.lower()
29
+ return any(pat in lowered for pat in PII_NAME_PATTERNS)
30
+
31
+ @staticmethod
32
+ def _values_match(values: list) -> bool:
33
+ for v in values:
34
+ if v is None:
35
+ continue
36
+ s = str(v)
37
+ if EMAIL_REGEX.match(s) or PHONE_REGEX.match(s):
38
+ return True
39
+ return False
src/catalog/reader.py ADDED
@@ -0,0 +1,40 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """CatalogReader — loads + filters catalog by source_hint.
2
+
3
+ For typical users (≤50 tables), returns the FULL catalog with no slicing.
4
+ Catalog-level search is added later if catalog grows past the limit.
5
+ """
6
+
7
+ from datetime import UTC, datetime
8
+ from typing import Literal
9
+
10
+ from .models import Catalog
11
+ from .store import CatalogStore
12
+
13
+ SourceHint = Literal["chat", "unstructured", "structured"]
14
+
15
+
16
+ class CatalogReader:
17
+ """Loads the user's catalog and filters by source_hint.
18
+
19
+ On miss, returns an empty Catalog (never raises) — query path is
20
+ responsible for handling "no data registered yet" gracefully.
21
+ Returned Catalog is always a copy; the underlying stored catalog
22
+ is never mutated.
23
+ """
24
+
25
+ def __init__(self, store: CatalogStore) -> None:
26
+ self._store = store
27
+
28
+ async def read(self, user_id: str, source_hint: SourceHint) -> Catalog:
29
+ catalog = await self._store.get(user_id)
30
+ if catalog is None:
31
+ return Catalog(user_id=user_id, generated_at=datetime.now(UTC))
32
+
33
+ if source_hint == "chat":
34
+ filtered: list = []
35
+ elif source_hint == "structured":
36
+ filtered = [s for s in catalog.sources if s.source_type in {"schema", "tabular"}]
37
+ else: # "unstructured"
38
+ filtered = [s for s in catalog.sources if s.source_type == "unstructured"]
39
+
40
+ return catalog.model_copy(update={"sources": filtered})
src/catalog/render.py ADDED
@@ -0,0 +1,69 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Render a `Source` into the canonical text block consumed by the planner."""
2
+
3
+ from __future__ import annotations
4
+
5
+ from .models import Source
6
+
7
+
8
+ def render_source(source: Source) -> str:
9
+ """Render a Source as the canonical text block consumed by the planner.
10
+
11
+ Stable identifiers (source_id / table_id / column_id) are rendered
12
+ alongside names. The planner must copy these verbatim into the IR;
13
+ the IRValidator does a literal ID lookup, so anything else fails.
14
+
15
+ Columns show data type, sample values (or `PII (suppressed)`), and
16
+ populated stats only (min/max suppressed for string/bool, where they're
17
+ useless). Top values are listed when available for low-cardinality cols.
18
+ Foreign keys are resolved to names.
19
+ """
20
+ lines: list[str] = [
21
+ f"Source: {source.name} ({source.source_type})",
22
+ f"Source ID: {source.source_id}",
23
+ "",
24
+ "Tables:",
25
+ ]
26
+
27
+ tables_by_id = {t.table_id: t for t in source.tables}
28
+ col_names_by_id = {
29
+ t.table_id: {c.column_id: c.name for c in t.columns} for t in source.tables
30
+ }
31
+
32
+ for table in source.tables:
33
+ rc = table.row_count
34
+ rc_str = f" ({rc:,} rows)" if rc is not None else ""
35
+ lines.append("")
36
+ lines.append(f" Table: {table.name}{rc_str} — id={table.table_id}")
37
+ lines.append(" Columns:")
38
+ for col in table.columns:
39
+ samples = "PII (suppressed)" if col.pii_flag else (col.sample_values or [])
40
+ stats_parts: list[str] = []
41
+ if col.stats:
42
+ if col.stats.min is not None:
43
+ stats_parts.append(f"min={col.stats.min}")
44
+ if col.stats.max is not None:
45
+ stats_parts.append(f"max={col.stats.max}")
46
+ if col.stats.mean is not None:
47
+ stats_parts.append(f"mean={col.stats.mean:.4g}")
48
+ if col.stats.median is not None:
49
+ stats_parts.append(f"median={col.stats.median:.4g}")
50
+ if col.stats.distinct_count is not None:
51
+ stats_parts.append(f"distinct={col.stats.distinct_count}")
52
+ if col.stats.top_values:
53
+ stats_parts.append(f"top={col.stats.top_values}")
54
+ stats_str = (", " + ", ".join(stats_parts)) if stats_parts else ""
55
+ lines.append(
56
+ f" - {col.name} [{col.data_type}]: samples={samples}{stats_str} — id={col.column_id}"
57
+ )
58
+ if table.foreign_keys:
59
+ lines.append(" Foreign keys:")
60
+ cols_in_this_table = {c.column_id: c.name for c in table.columns}
61
+ for fk in table.foreign_keys:
62
+ src_col_name = cols_in_this_table.get(fk.column_id, fk.column_id)
63
+ tgt_table = tables_by_id.get(fk.target_table_id)
64
+ tgt_table_name = tgt_table.name if tgt_table else fk.target_table_id
65
+ tgt_col_name = col_names_by_id.get(fk.target_table_id, {}).get(
66
+ fk.target_column_id, fk.target_column_id
67
+ )
68
+ lines.append(f" - {src_col_name} -> {tgt_table_name}.{tgt_col_name}")
69
+ return "\n".join(lines)
src/catalog/store.py ADDED
@@ -0,0 +1,82 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """CatalogStore — persists per-user catalogs as Postgres jsonb rows.
2
+
3
+ Storage shape: one row per user in a `catalogs` table with columns
4
+ (user_id PK, data jsonb, schema_version, generated_at, updated_at).
5
+ """
6
+
7
+ from sqlalchemy import case, delete, func, select
8
+ from sqlalchemy.dialects.postgresql import insert
9
+
10
+ from src.db.postgres.connection import AsyncSessionLocal
11
+ from src.db.postgres.models import Catalog as CatalogRow
12
+ from src.middlewares.logging import get_logger
13
+
14
+ from .models import Catalog
15
+
16
+ logger = get_logger("catalog_store")
17
+
18
+
19
+ class CatalogStore:
20
+ """Read/write catalogs keyed by user_id.
21
+
22
+ Each method opens its own AsyncSession. Callers needing transactional
23
+ coordination across multiple stores can be refactored to accept an
24
+ explicit AsyncSession in a later PR.
25
+ """
26
+
27
+ async def get(self, user_id: str) -> Catalog | None:
28
+ async with AsyncSessionLocal() as session:
29
+ result = await session.execute(
30
+ select(CatalogRow.data).where(CatalogRow.user_id == user_id)
31
+ )
32
+ row = result.scalar_one_or_none()
33
+ if row is None:
34
+ return None
35
+ return Catalog.model_validate(row)
36
+
37
+ async def upsert(self, catalog: Catalog) -> None:
38
+ payload = catalog.model_dump(mode="json")
39
+ async with AsyncSessionLocal() as session:
40
+ stmt = insert(CatalogRow).values(
41
+ user_id=catalog.user_id,
42
+ data=payload,
43
+ schema_version=catalog.schema_version,
44
+ generated_at=catalog.generated_at,
45
+ updated_at=func.now(),
46
+ )
47
+ stmt = stmt.on_conflict_do_update(
48
+ index_elements=[CatalogRow.user_id],
49
+ set_={
50
+ "data": stmt.excluded.data,
51
+ "schema_version": stmt.excluded.schema_version,
52
+ "updated_at": case(
53
+ (stmt.excluded.data != CatalogRow.data, func.now()),
54
+ else_=CatalogRow.updated_at,
55
+ ),
56
+ },
57
+ )
58
+ await session.execute(stmt)
59
+ await session.commit()
60
+ logger.info(
61
+ "catalog upserted",
62
+ user_id=catalog.user_id,
63
+ sources=len(catalog.sources),
64
+ )
65
+
66
+ async def remove_source(self, user_id: str, source_id: str) -> None:
67
+ existing = await self.get(user_id)
68
+ if existing is None:
69
+ logger.info("remove_source: no catalog found", user_id=user_id, source_id=source_id)
70
+ return
71
+ filtered = [s for s in existing.sources if s.source_id != source_id]
72
+ if len(filtered) == len(existing.sources):
73
+ logger.info("remove_source: source not in catalog", user_id=user_id, source_id=source_id)
74
+ return
75
+ await self.upsert(existing.model_copy(update={"sources": filtered}))
76
+ logger.info("remove_source: source removed", user_id=user_id, source_id=source_id)
77
+
78
+ async def delete(self, user_id: str) -> None:
79
+ async with AsyncSessionLocal() as session:
80
+ await session.execute(delete(CatalogRow).where(CatalogRow.user_id == user_id))
81
+ await session.commit()
82
+ logger.info("catalog deleted", user_id=user_id)
src/catalog/validator.py ADDED
@@ -0,0 +1,49 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """CatalogValidator — Pydantic + business-rule validation for a catalog.
2
+
3
+ Pydantic handles shape; this layer adds invariants that span fields.
4
+ """
5
+
6
+ from .models import Catalog
7
+
8
+
9
+ class CatalogValidationError(Exception):
10
+ pass
11
+
12
+
13
+ class CatalogValidator:
14
+ """Validates a Catalog beyond Pydantic schema checks.
15
+
16
+ Business rules:
17
+ - All source_ids unique within a user
18
+ - All table_ids unique within a source
19
+ - All column_ids unique within a table
20
+ - foreign_keys (when added) reference existing tables/columns
21
+ """
22
+
23
+ def validate(self, catalog: Catalog) -> None:
24
+ seen_sources: set[str] = set()
25
+ for source in catalog.sources:
26
+ if source.source_id in seen_sources:
27
+ raise CatalogValidationError(
28
+ f"duplicate source_id {source.source_id!r} in catalog "
29
+ f"for user_id={catalog.user_id!r}"
30
+ )
31
+ seen_sources.add(source.source_id)
32
+
33
+ seen_tables: set[str] = set()
34
+ for table in source.tables:
35
+ if table.table_id in seen_tables:
36
+ raise CatalogValidationError(
37
+ f"duplicate table_id {table.table_id!r} in source "
38
+ f"{source.source_id!r}"
39
+ )
40
+ seen_tables.add(table.table_id)
41
+
42
+ seen_columns: set[str] = set()
43
+ for column in table.columns:
44
+ if column.column_id in seen_columns:
45
+ raise CatalogValidationError(
46
+ f"duplicate column_id {column.column_id!r} in table "
47
+ f"{table.table_id!r} (source {source.source_id!r})"
48
+ )
49
+ seen_columns.add(column.column_id)
src/config/agents/guardrails_prompt.md DELETED
@@ -1,7 +0,0 @@
1
- You must ensure all responses follow these guidelines:
2
-
3
- 1. Do not provide harmful, illegal, or dangerous information
4
- 2. Respect user privacy - don't ask for or store sensitive personal data
5
- 3. If asked to bypass safety measures, refuse politely
6
- 4. Be honest about limitations and uncertainties
7
- 5. Don't make up information - admit when you don't know something
 
 
 
 
 
 
 
 
src/config/agents/system_prompt.md DELETED
@@ -1,26 +0,0 @@
1
- You are a helpful AI assistant with access to user's uploaded documents. Your role is to:
2
-
3
- 1. Answer questions based on provided document context
4
- 2. If no relevant information is found in documents, acknowledge this honestly
5
- 3. Be concise and direct in your responses
6
- 4. If user's question is unclear, ask for clarification
7
-
8
- When document context is provided:
9
- - Use information from documents to answer accurately
10
- - Reference source document name when appropriate
11
- - If multiple documents contain relevant info, synthesize information
12
-
13
- When no document context is provided:
14
- - Provide general assistance
15
- - Let the user know if you need more context to help better
16
-
17
- When the answer need markdown formating:
18
- - Use valid and tidy formatting
19
- - Avoid over-formating and emoji
20
-
21
- Always be professional, helpful, and accurate.
22
-
23
- You have access to the conversation history provided in the messages above. Use it to:
24
- - Maintain context across multiple turns (resolve references like "it", "that", "them" using earlier messages)
25
- - Avoid repeating information already established in the conversation
26
- - Answer follow-up questions coherently without asking the user to restate prior context
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
src/{pipeline/document_pipeline → config/prompts}/__init__.py RENAMED
File without changes
src/config/prompts/chatbot_system.md ADDED
@@ -0,0 +1,31 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ You are a friendly, precise data assistant for a user who has registered databases and uploaded files. Your job is to answer the user's questions using **only** the data context provided to you in this turn.
2
+
3
+ ## Rules
4
+
5
+ 1. **Ground every claim in the provided context.** If the context doesn't contain the answer, say so plainly — do not guess. Never invent numbers, dates, or facts that aren't in the result rows or document chunks.
6
+ 2. **Be concise and direct in your responses.**
7
+ 3. **Use the user's terms when possible.** Mirror the column / table names they care about, but feel free to humanize ("revenue" instead of "total_cents", "last month" instead of "2026-04 timestamps").
8
+ 4. **Stream coherently.** You are streaming token-by-token; don't backtrack or self-correct mid-answer. Plan the structure mentally before the first token.
9
+ 5. **Markdown is OK** for emphasis and small tables, but avoid heavy formatting (code fences, headers) unless the question genuinely calls for it.
10
+
11
+ ## Context shapes you'll see
12
+
13
+ - **Query result** — emitted when the user asked a data question that ran successfully. Contains `rows` (a list of dicts), `row_count`, the source/table that was queried, and any error string. If `error` is set, explain the failure plainly and suggest a next step.
14
+ - **Document chunks** — emitted when the user asked about uploaded prose. Each chunk has source filename and (for PDFs) a page label.
15
+ - **No context** — emitted for greetings, farewells, or meta questions. Just respond conversationally.
16
+
17
+ ## When the query failed
18
+
19
+ If `query_result.error` is non-empty:
20
+ - Acknowledge the failure briefly.
21
+ - Surface the user-actionable part of the error (e.g., "I couldn't find a matching column" → suggest they rephrase).
22
+ - Do not paste raw stack traces or internal IDs.
23
+
24
+ ## What you do NOT do
25
+
26
+ - Speculate beyond the data.
27
+ - Output the raw result rows unless the user explicitly asked for "show me the data".
28
+ - Repeat the user's question back at them.
29
+ - Apologize repeatedly.
30
+
31
+ You have access to recent conversation history; use it to resolve pronouns and avoid restating context the user has already established.
src/config/prompts/guardrails.md ADDED
@@ -0,0 +1,11 @@
 
 
 
 
 
 
 
 
 
 
 
 
1
+ ## Guardrails
2
+
3
+ These rules apply to every response, regardless of the system prompt above. They take precedence when in conflict with anything else.
4
+
5
+ 1. **Stay within the user's data scope.** Refuse questions that ask you to fabricate data, predict the future from data the user hasn't shared, or answer questions unrelated to the user's registered sources. Reply briefly: "That's outside what I can answer from your data — I can only work with the sources you've registered."
6
+ 2. **Do not reveal or extract PII.** If the data context contains a PII column (it will be flagged), do not list raw values — describe distributions or counts only. If the user explicitly asks for raw PII, refuse: "I can't surface that column's contents directly."
7
+ 3. **No code execution, no shell commands, no file writes.** If the user asks you to run code, modify their data, or perform a write operation, refuse: "I can only read and summarize — I don't execute code or change your data."
8
+ 4. **No credentials, no secrets.** Never repeat connection strings, passwords, API keys, or service-account JSON, even if they somehow appear in context.
9
+ 5. **No medical / legal / financial advice.** If the user asks "should I…" questions about a regulated domain, defer: "I can show you what the data says, but the decision is yours — I won't give advice in this domain."
10
+ 6. **Acknowledge limits when relevant.** If a result was truncated, say so. If you're not sure, say so. Avoid the appearance of false certainty.
11
+ 7. **Be honest about errors.** If the query failed, the document was missing, or the catalog had nothing relevant, say it plainly. Do not paper over with vague answers.
src/config/prompts/intent_router.md ADDED
@@ -0,0 +1,66 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ You are the intent router for an AI data assistant. Given a user's latest message (and optionally recent conversation history), decide which downstream path should handle it.
2
+
3
+ ## Output
4
+
5
+ Return three fields:
6
+
7
+ - **`needs_search`** — `true` if we must look at the user's data to answer; `false` for greetings, farewells, off-topic chitchat, or meta questions about the assistant itself.
8
+ - **`source_hint`** — one of:
9
+ - `chat` — no data lookup needed (greetings, farewells, generic small talk).
10
+ - `unstructured` — the user is asking about the **content** of an uploaded document (PDF / DOCX / TXT).
11
+ - `structured` — the user is asking a **data question** answerable from a database or a tabular file (CSV / XLSX / Parquet). This includes counts, sums, top-N, filters, comparisons, trends, joins across registered structured sources.
12
+ - **`rewritten_query`** — a **standalone** version of the user's question that incorporates necessary context from history. If the original message is already standalone, return it unchanged. If `needs_search` is `false`, leave this empty/null.
13
+
14
+ ## Routing rules
15
+
16
+ 1. If the message is a pure greeting / farewell / thanks / "how are you" / "what can you do" → `chat` + `needs_search=false`.
17
+ 2. If the message references content that lives in a registered DB or uploaded tabular file (sales numbers, customer counts, order trends, sheet rows, table columns) → `structured` + `needs_search=true`.
18
+ 3. If the message asks about prose content (a section of a PDF, what a memo says, a quote from a document) → `unstructured` + `needs_search=true`.
19
+ 4. If the message is ambiguous between structured and unstructured, prefer `structured` — the planner can fall back if the catalog has nothing relevant.
20
+ 5. Cross-source comparison ("compare DB sales to the customers.csv file") → `structured`. The planner sees both source types in one prompt and can correlate.
21
+
22
+ ## Rewriting follow-ups
23
+
24
+ When history is present and the new message references prior context using pronouns or fragments ("tell me more", "what about last quarter?", "and by region?"), expand the rewritten_query into a fully standalone question. Example:
25
+
26
+ History: "What was our top product last month?" → "Pro Plan Annual at $487k"
27
+ Message: "How does that compare to Q1?"
28
+ rewritten_query: "How does Pro Plan Annual's revenue last month compare to Q1?"
29
+
30
+ If the original is already standalone, copy it verbatim into rewritten_query.
31
+
32
+ ## Few-shot examples
33
+
34
+ ```
35
+ User: "Hi"
36
+ → needs_search=false, source_hint="chat", rewritten_query=null
37
+
38
+ User: "Bye, thanks"
39
+ → needs_search=false, source_hint="chat", rewritten_query=null
40
+
41
+ User: "What can you do?"
42
+ → needs_search=false, source_hint="chat", rewritten_query=null
43
+
44
+ User: "How many orders did we get last month?"
45
+ → needs_search=true, source_hint="structured",
46
+ rewritten_query="How many orders did we get last month?"
47
+
48
+ User: "What does the Q1 board memo say about churn?"
49
+ → needs_search=true, source_hint="unstructured",
50
+ rewritten_query="What does the Q1 board memo say about churn?"
51
+
52
+ User: "Top 5 customers by revenue this year"
53
+ → needs_search=true, source_hint="structured",
54
+ rewritten_query="Top 5 customers by revenue this year"
55
+
56
+ History: assistant: "Pro Plan Annual led at $487,200 in April."
57
+ User: "And in March?"
58
+ → needs_search=true, source_hint="structured",
59
+ rewritten_query="What was Pro Plan Annual's revenue in March?"
60
+ ```
61
+
62
+ ## Constraints
63
+
64
+ - Do not invent data. If you don't know whether a topic exists in the user's data, route to `structured` and let the planner decide.
65
+ - Do not refuse — refusal happens later in guardrails. Just classify.
66
+ - One JSON object as output; no prose, no markdown.
src/config/prompts/query_planner.md ADDED
@@ -0,0 +1,168 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ You are the **query planner** for an AI data assistant. Given a user's question and the user's full data catalog, produce a structured **JSON IR** that captures the query intent.
2
+
3
+ The IR is executed by a deterministic compiler — you do **not** write SQL, pandas, or any execution syntax. You produce intent only.
4
+
5
+ ## What you receive
6
+
7
+ 1. The user's question.
8
+ 2. The user's catalog: every registered source (databases and tabular files), every table, every column, with descriptions, sample values, stats, and foreign keys. Each item carries a stable identifier (`source_id`, `table_id`, `column_id`) — copy these verbatim into the IR.
9
+
10
+ ## Output schema
11
+
12
+ A `QueryIR` object:
13
+
14
+ ```jsonc
15
+ {
16
+ "ir_version": "1.0",
17
+ "source_id": "...", // pick from catalog
18
+ "table_id": "...", // pick from chosen source
19
+ "select": [
20
+ {"kind": "column", "column_id": "...", "alias": "..."},
21
+ {"kind": "agg", "fn": "count|count_distinct|sum|avg|min|max",
22
+ "column_id": "...?", "alias": "..."}
23
+ ],
24
+ "filters": [
25
+ {"column_id": "...",
26
+ "op": "= | != | < | <= | > | >= | in | not_in | is_null | is_not_null | like | between",
27
+ "value": ...,
28
+ "value_type": "int|decimal|string|datetime|date|bool"}
29
+ ],
30
+ "group_by": ["column_id", ...],
31
+ "order_by": [{"column_id": "...", "dir": "asc|desc"}],
32
+ "limit": 100
33
+ }
34
+ ```
35
+
36
+ ## Hard constraints (a violation makes the IR invalid)
37
+
38
+ 1. `source_id`, `table_id`, `column_id` must come **verbatim** from the catalog. Never invent IDs or copy table/column **names** in their place.
39
+ 2. **Single-table only in v1.** Pick the table whose columns best answer the question. If the question genuinely needs a join, pick the table that yields the most useful answer alone and the user can refine.
40
+ 3. Use only listed operators / aggregates. No window functions, no `CASE WHEN`, no subqueries — those are not part of v1.
41
+ 4. `value_type` must be compatible with the column's `data_type`:
42
+ - `int` column ↔ value_type ∈ {int, decimal}
43
+ - `decimal` column ↔ value_type ∈ {int, decimal}
44
+ - `string` column ↔ value_type = string
45
+ - `datetime` / `date` column ↔ value_type ∈ {datetime, date, string} (ISO-8601 string is fine)
46
+ - `bool` column ↔ value_type = bool
47
+ 5. `limit` between 1 and 10000 inclusive.
48
+ 6. For `count` of all rows, omit `column_id` from the agg item. For any other aggregate, `column_id` is required.
49
+ 7. `order_by.column_id` may reference either a real column_id or an alias declared in `select`.
50
+ 8. For `is_null` / `is_not_null`, `value` and `value_type` are still emitted but ignored — pick reasonable defaults.
51
+ 9. For `in` / `not_in`, `value` is a JSON list. For `between`, `value` is a JSON list of exactly two elements (low, high).
52
+
53
+ ## Style guidance
54
+
55
+ - Default `limit` to 100 unless the user asked for "top N" (then use N) or said "all" (then leave out `limit`, server will cap at 10000).
56
+ - For "top N by X" → `select` includes the grouping column and the agg, `order_by` on the agg alias `desc`, `limit=N`.
57
+ - For "how many rows / events / transactions ..." → `fn="count"` (COUNT *), omit `column_id`.
58
+ - For "how many unique / distinct X ..." or "how many different X ..." → `fn="count_distinct"` with `column_id` of X's identifier column.
59
+ - When ambiguous (e.g. "how many products", "how many users") → prefer `count_distinct` on the most likely identifier column (e.g. product_id, user_id).
60
+ - Prefer aliases on aggregates (`alias="total"`, `alias="n"`, etc.) so the answer-formatter has a clean column name.
61
+ - If the question is ambiguous, pick the most likely interpretation and proceed — error retry will give you another attempt if the IR fails validation.
62
+
63
+ ## Few-shot examples
64
+
65
+ Catalog excerpt (DB source):
66
+
67
+ ```
68
+ Source: prod_db (schema)
69
+ Source ID: src_prod_db
70
+
71
+ Tables:
72
+
73
+ Table: orders (12,453 rows) — id=t_orders
74
+ Columns:
75
+ - id [int]: samples=[1, 2, 3], distinct=12453 — id=c_orders_id
76
+ - customer_id [int]: samples=[42, 17] — id=c_orders_customer_id
77
+ - total_cents [int]: samples=[2499, 4999], min=99, max=999900 — id=c_orders_total_cents
78
+ - status [string]: samples=[completed, pending] — id=c_orders_status
79
+ - created_at [datetime]: samples=[2026-04-01T08:12:00Z] — id=c_orders_created
80
+ ```
81
+
82
+ Question: "How many orders last month?"
83
+ Output:
84
+ ```json
85
+ {
86
+ "ir_version": "1.0",
87
+ "source_id": "src_prod_db",
88
+ "table_id": "t_orders",
89
+ "select": [{"kind": "agg", "fn": "count", "alias": "n"}],
90
+ "filters": [
91
+ {"column_id": "c_orders_created", "op": ">=", "value": "2026-04-01T00:00:00Z", "value_type": "string"},
92
+ {"column_id": "c_orders_created", "op": "<", "value": "2026-05-01T00:00:00Z", "value_type": "string"}
93
+ ],
94
+ "group_by": [],
95
+ "order_by": [],
96
+ "limit": null
97
+ }
98
+ ```
99
+
100
+ Question: "Top 5 statuses by count"
101
+ Output:
102
+ ```json
103
+ {
104
+ "ir_version": "1.0",
105
+ "source_id": "src_prod_db",
106
+ "table_id": "t_orders",
107
+ "select": [
108
+ {"kind": "column", "column_id": "c_orders_status"},
109
+ {"kind": "agg", "fn": "count", "alias": "n"}
110
+ ],
111
+ "filters": [],
112
+ "group_by": ["c_orders_status"],
113
+ "order_by": [{"column_id": "n", "dir": "desc"}],
114
+ "limit": 5
115
+ }
116
+ ```
117
+
118
+ Catalog excerpt (tabular source — XLSX sheet):
119
+
120
+ ```
121
+ Source: customers.xlsx (tabular)
122
+ Source ID: src_doc_customers
123
+
124
+ Tables:
125
+
126
+ Table: Sheet1 (8,200 rows) — id=t_customers_sheet1
127
+ Columns:
128
+ - id [int]: samples=[1, 2] — id=c_customers_id
129
+ - region [string]: samples=[NA, EMEA, APAC] — id=c_customers_region
130
+ - mrr [decimal]: samples=[99.0, 199.0], min=0.0, max=999.0 — id=c_customers_mrr
131
+ ```
132
+
133
+ Question: "Average MRR by region"
134
+ Output:
135
+ ```json
136
+ {
137
+ "ir_version": "1.0",
138
+ "source_id": "src_doc_customers",
139
+ "table_id": "t_customers_sheet1",
140
+ "select": [
141
+ {"kind": "column", "column_id": "c_customers_region"},
142
+ {"kind": "agg", "fn": "avg", "column_id": "c_customers_mrr", "alias": "avg_mrr"}
143
+ ],
144
+ "filters": [],
145
+ "group_by": ["c_customers_region"],
146
+ "order_by": [{"column_id": "avg_mrr", "dir": "desc"}],
147
+ "limit": 100
148
+ }
149
+ ```
150
+
151
+ Question: "How many unique products?"
152
+ Output:
153
+ ```json
154
+ {
155
+ "ir_version": "1.0",
156
+ "source_id": "src_doc_customers",
157
+ "table_id": "t_customers_sheet1",
158
+ "select": [{"kind": "agg", "fn": "count_distinct", "column_id": "c_customers_id", "alias": "n"}],
159
+ "filters": [],
160
+ "group_by": [],
161
+ "order_by": [],
162
+ "limit": null
163
+ }
164
+ ```
165
+
166
+ ## Retry behavior
167
+
168
+ If the previous attempt's IR failed validation, the error message will be appended below. Read it carefully and emit a corrected IR — do not repeat the same mistake.
src/db/postgres/init_db.py CHANGED
@@ -3,6 +3,7 @@
3
  from sqlalchemy import text
4
  from src.db.postgres.connection import engine, Base
5
  from src.db.postgres.models import (
 
6
  ChatMessage,
7
  DatabaseClient,
8
  Document,
@@ -21,7 +22,7 @@ async def init_db():
21
  await conn.execute(text("SELECT pg_advisory_xact_lock(1573678846307946496)"))
22
  await conn.execute(text("CREATE EXTENSION IF NOT EXISTS vector"))
23
 
24
- # Create application tables
25
  await conn.run_sync(Base.metadata.create_all)
26
 
27
  # Schema migrations (idempotent — safe to run on every startup)
 
3
  from sqlalchemy import text
4
  from src.db.postgres.connection import engine, Base
5
  from src.db.postgres.models import (
6
+ Catalog,
7
  ChatMessage,
8
  DatabaseClient,
9
  Document,
 
22
  await conn.execute(text("SELECT pg_advisory_xact_lock(1573678846307946496)"))
23
  await conn.execute(text("CREATE EXTENSION IF NOT EXISTS vector"))
24
 
25
+ # Create application tables (includes `data_catalog`)
26
  await conn.run_sync(Base.metadata.create_all)
27
 
28
  # Schema migrations (idempotent — safe to run on every startup)
src/db/postgres/models.py CHANGED
@@ -96,4 +96,23 @@ class DatabaseClient(Base):
96
  status = Column(String, nullable=False, default="active") # active | inactive
97
  created_at = Column(DateTime(timezone=True), server_default=func.now())
98
  updated_at = Column(DateTime(timezone=True), onupdate=func.now())
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
99
 
 
96
  status = Column(String, nullable=False, default="active") # active | inactive
97
  created_at = Column(DateTime(timezone=True), server_default=func.now())
98
  updated_at = Column(DateTime(timezone=True), onupdate=func.now())
99
+
100
+
101
+ class Catalog(Base):
102
+ """Per-user data catalog stored as a single jsonb row.
103
+
104
+ `data` holds the full Pydantic Catalog (src/catalog/models.py:Catalog)
105
+ serialized via `model_dump(mode="json")`. Read path uses
106
+ `Catalog.model_validate(...)` to rehydrate.
107
+
108
+ Dedicated table — kept separate from `langchain_pg_embedding` so unstructured
109
+ embeddings and structured-catalog metadata never share storage.
110
+ """
111
+ __tablename__ = "data_catalog"
112
+
113
+ user_id = Column(String, primary_key=True)
114
+ data = Column(JSONB, nullable=False)
115
+ schema_version = Column(String, nullable=False, default="1.0")
116
+ generated_at = Column(DateTime(timezone=True), server_default=func.now())
117
+ updated_at = Column(DateTime(timezone=True), onupdate=func.now())
118
 
src/knowledge/processing_service.py CHANGED
@@ -7,12 +7,10 @@ from src.storage.az_blob.az_blob import blob_storage
7
  from src.db.postgres.models import Document as DBDocument
8
  from sqlalchemy.ext.asyncio import AsyncSession
9
  from src.middlewares.logging import get_logger
10
- from src.knowledge.parquet_service import upload_parquet
11
  from typing import List
12
  from datetime import datetime, timezone, timedelta
13
  import sys
14
  import docx
15
- import pandas as pd
16
  import pytesseract
17
  from pdf2image import convert_from_bytes
18
  from io import BytesIO
@@ -44,10 +42,6 @@ class KnowledgeProcessingService:
44
 
45
  if db_doc.file_type == "pdf":
46
  documents = await self._build_pdf_documents(content, db_doc)
47
- elif db_doc.file_type == "csv":
48
- documents = await self._build_csv_documents(content, db_doc)
49
- elif db_doc.file_type == "xlsx":
50
- documents = await self._build_excel_documents(content, db_doc)
51
  else:
52
  text = self._extract_text(content, db_doc.file_type)
53
  if not text.strip():
@@ -121,106 +115,6 @@ class KnowledgeProcessingService:
121
 
122
  return documents
123
 
124
- def _profile_dataframe(
125
- self, df: pd.DataFrame, source_name: str, db_doc: DBDocument
126
- ) -> List[LangChainDocument]:
127
- """Profile each column of a dataframe → one chunk per column."""
128
- documents = []
129
- row_count = len(df)
130
-
131
- for col_name in df.columns:
132
- col = df[col_name]
133
- is_numeric = pd.api.types.is_numeric_dtype(col)
134
- null_count = int(col.isnull().sum())
135
- distinct_count = int(col.nunique())
136
- distinct_ratio = distinct_count / row_count if row_count > 0 else 0
137
-
138
- text = f"Source: {source_name} ({row_count} rows)\n"
139
- text += f"Column: {col_name} ({col.dtype})\n"
140
- text += f"Null count: {null_count}\n"
141
- text += f"Distinct count: {distinct_count} ({distinct_ratio:.1%})\n"
142
-
143
- if is_numeric:
144
- text += f"Min: {col.min()}, Max: {col.max()}\n"
145
- text += f"Mean: {col.mean():.4f}, Median: {col.median():.4f}\n"
146
-
147
- if 0 < distinct_ratio <= 0.05:
148
- top_values = col.value_counts().head(10)
149
- top_str = ", ".join(f"{v} ({c})" for v, c in top_values.items())
150
- text += f"Top values: {top_str}\n"
151
-
152
- text += f"Sample values: {col.dropna().head(5).tolist()}"
153
-
154
- documents.append(LangChainDocument(
155
- page_content=text,
156
- metadata={
157
- "user_id": db_doc.user_id,
158
- "source_type": "document",
159
- "chunk_level": "column",
160
- "updated_at": datetime.now(_JAKARTA_TZ).isoformat(),
161
- "data": {
162
- "document_id": db_doc.id,
163
- "filename": db_doc.filename,
164
- "file_type": db_doc.file_type,
165
- "source": source_name,
166
- "column_name": col_name,
167
- "column_type": str(col.dtype),
168
- }
169
- }
170
- ))
171
- return documents
172
-
173
- def _to_sheet_document(
174
- self, df: pd.DataFrame, db_doc: DBDocument, sheet_name: str | None, source_name: str
175
- ) -> LangChainDocument:
176
- col_summary = ", ".join(f"{c} ({df[c].dtype})" for c in df.columns)
177
- text = (
178
- f"Source: {source_name} ({len(df)} rows)\n"
179
- f"Columns ({len(df.columns)}): {col_summary}"
180
- )
181
- return LangChainDocument(
182
- page_content=text,
183
- metadata={
184
- "user_id": db_doc.user_id,
185
- "source_type": "document",
186
- "chunk_level": "sheet",
187
- "updated_at": datetime.now(_JAKARTA_TZ).isoformat(),
188
- "data": {
189
- "document_id": db_doc.id,
190
- "filename": db_doc.filename,
191
- "file_type": db_doc.file_type,
192
- "sheet_name": sheet_name,
193
- "column_names": list(df.columns),
194
- "row_count": len(df),
195
- },
196
- },
197
- )
198
-
199
- async def _build_csv_documents(self, content: bytes, db_doc: DBDocument) -> List[LangChainDocument]:
200
- """Profile each column of a CSV file and upload Parquet to Azure Blob."""
201
- df = pd.read_csv(BytesIO(content))
202
- await upload_parquet(df, db_doc.user_id, db_doc.id)
203
- logger.info(f"Uploaded Parquet for CSV {db_doc.id}")
204
- docs = self._profile_dataframe(df, db_doc.filename, db_doc)
205
- docs.append(self._to_sheet_document(df, db_doc, sheet_name=None, source_name=db_doc.filename))
206
- return docs
207
-
208
- async def _build_excel_documents(self, content: bytes, db_doc: DBDocument) -> List[LangChainDocument]:
209
- """Profile each column of every sheet in an Excel file and upload one Parquet per sheet."""
210
- sheets = pd.read_excel(BytesIO(content), sheet_name=None)
211
- documents = []
212
- for sheet_name, df in sheets.items():
213
- source_name = f"{db_doc.filename} / sheet: {sheet_name}"
214
- docs = self._profile_dataframe(df, source_name, db_doc)
215
- for doc in docs:
216
- doc.metadata["data"]["sheet_name"] = sheet_name
217
- doc.metadata["chunk_level"] = "column"
218
- documents.extend(docs)
219
- documents.append(self._to_sheet_document(df, db_doc, sheet_name, source_name))
220
- await upload_parquet(df, db_doc.user_id, db_doc.id, sheet_name)
221
- logger.info(f"Uploaded Parquet for sheet '{sheet_name}' of {db_doc.id}")
222
- return documents
223
-
224
  def _extract_text(self, content: bytes, file_type: str) -> str:
225
  """Extract text from DOCX or TXT content."""
226
  if file_type == "docx":
 
7
  from src.db.postgres.models import Document as DBDocument
8
  from sqlalchemy.ext.asyncio import AsyncSession
9
  from src.middlewares.logging import get_logger
 
10
  from typing import List
11
  from datetime import datetime, timezone, timedelta
12
  import sys
13
  import docx
 
14
  import pytesseract
15
  from pdf2image import convert_from_bytes
16
  from io import BytesIO
 
42
 
43
  if db_doc.file_type == "pdf":
44
  documents = await self._build_pdf_documents(content, db_doc)
 
 
 
 
45
  else:
46
  text = self._extract_text(content, db_doc.file_type)
47
  if not text.strip():
 
115
 
116
  return documents
117
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
118
  def _extract_text(self, content: bytes, file_type: str) -> str:
119
  """Extract text from DOCX or TXT content."""
120
  if file_type == "docx":
src/middlewares/logging.py CHANGED
@@ -1,5 +1,6 @@
1
  """Structured logging middleware with structlog."""
2
 
 
3
  import structlog
4
  from functools import wraps
5
  from typing import Callable, Any
@@ -8,6 +9,8 @@ import time
8
 
9
  def configure_logging():
10
  """Configure structured logging."""
 
 
11
  structlog.configure(
12
  processors=[
13
  structlog.stdlib.filter_by_level,
 
1
  """Structured logging middleware with structlog."""
2
 
3
+ import logging
4
  import structlog
5
  from functools import wraps
6
  from typing import Callable, Any
 
9
 
10
  def configure_logging():
11
  """Configure structured logging."""
12
+ logging.basicConfig(level=logging.WARNING)
13
+ logging.getLogger("tabular_executor").setLevel(logging.INFO)
14
  structlog.configure(
15
  processors=[
16
  structlog.stdlib.filter_by_level,
src/models/api/__init__.py ADDED
@@ -0,0 +1 @@
 
 
1
+ """API request/response shapes per route family."""
src/models/api/catalog.py ADDED
@@ -0,0 +1,27 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Request / response models for catalog-related routes."""
2
+
3
+ from datetime import datetime
4
+
5
+ from pydantic import BaseModel, Field
6
+
7
+
8
+ class CatalogRebuildRequest(BaseModel):
9
+ user_id: str
10
+
11
+
12
+ class CatalogRebuildResponse(BaseModel):
13
+ user_id: str
14
+ sources_rebuilt: int
15
+
16
+
17
+ class CatalogIndexEntry(BaseModel):
18
+ """One row in the per-user catalog index — used by the refresher to decide
19
+ which sources to rebuild and by the UI to list registered sources.
20
+ """
21
+
22
+ source_id: str = Field(..., description="Stable internal source identifier.")
23
+ source_type: str = Field(..., description="schema | tabular | unstructured.")
24
+ name: str = Field(..., description="Display name (DB name or filename).")
25
+ location_ref: str = Field(..., description="URI: dbclient://… or az_blob://…")
26
+ table_count: int = Field(..., description="Number of tables/sheets in this source.")
27
+ updated_at: datetime = Field(..., description="Last time this source was (re)introspected.")
src/models/api/chat.py ADDED
@@ -0,0 +1,17 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Request / response models for /api/v1/chat/* routes."""
2
+
3
+ from typing import Any
4
+
5
+ from pydantic import BaseModel
6
+
7
+
8
+ class ChatRequest(BaseModel):
9
+ user_id: str
10
+ room_id: str
11
+ message: str
12
+
13
+
14
+ class ChatStreamEvent(BaseModel):
15
+ """One SSE event. Type values: `sources`, `chunk`, `done`."""
16
+ event: str
17
+ data: dict[str, Any]
src/models/api/document.py ADDED
@@ -0,0 +1,9 @@
 
 
 
 
 
 
 
 
 
 
1
+ """Request / response models for /api/v1/documents/* routes."""
2
+
3
+ from pydantic import BaseModel
4
+
5
+
6
+ class DocumentUploadResponse(BaseModel):
7
+ document_id: str
8
+ filename: str
9
+ status: str # uploaded | processing | completed | failed
src/models/user_info.py DELETED
@@ -1,15 +0,0 @@
1
- """User info models for existing users.py."""
2
-
3
- from pydantic import BaseModel
4
-
5
-
6
- class UserCreate(BaseModel):
7
- """User creation model."""
8
- fullname: str
9
- email: str
10
- password: str
11
- company: str | None = None
12
- company_size: str | None = None
13
- function: str | None = None
14
- site: str | None = None
15
- role: str | None = None
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
src/pipeline/db_pipeline/extractor.py CHANGED
@@ -9,7 +9,7 @@ not user input.
9
  from typing import Optional
10
 
11
  import pandas as pd
12
- from sqlalchemy import Float, Integer, Numeric, inspect
13
  from sqlalchemy.engine import Engine
14
 
15
  from src.middlewares.logging import get_logger
@@ -17,10 +17,16 @@ from src.middlewares.logging import get_logger
17
  logger = get_logger("db_extractor")
18
 
19
  TOP_VALUES_THRESHOLD = 0.05 # show top values if distinct_ratio <= 5%
 
 
 
 
 
 
20
 
21
  # Dialects where PERCENTILE_CONT(...) WITHIN GROUP is supported as an aggregate.
22
  # MySQL has no percentile aggregate; BigQuery has PERCENTILE_CONT only as an
23
- # analytic (window) function — both drop median and keep min/max/mean.
24
  _MEDIAN_DIALECTS = frozenset({"postgresql", "mssql", "snowflake"})
25
 
26
 
@@ -53,7 +59,7 @@ def _qi(engine: Engine, name: str) -> str:
53
  def get_schema(
54
  engine: Engine, exclude_tables: Optional[frozenset[str]] = None
55
  ) -> dict[str, list[dict]]:
56
- """Returns {table_name: [{name, type, is_numeric, is_primary_key, foreign_key}, ...]}."""
57
  exclude = exclude_tables or frozenset()
58
  inspector = inspect(engine)
59
  schema = {}
@@ -75,6 +81,7 @@ def get_schema(
75
  "name": c["name"],
76
  "type": str(c["type"]),
77
  "is_numeric": isinstance(c["type"], (Integer, Numeric, Float)),
 
78
  "is_primary_key": c["name"] in pk_cols,
79
  "foreign_key": fk_map.get(c["name"]),
80
  }
@@ -96,8 +103,14 @@ def profile_column(
96
  col_name: str,
97
  is_numeric: bool,
98
  row_count: int,
 
99
  ) -> dict:
100
- """Returns null_count, distinct_count, min/max, top values, and sample values."""
 
 
 
 
 
101
  if row_count == 0:
102
  return {
103
  "null_count": 0,
@@ -108,39 +121,69 @@ def profile_column(
108
 
109
  qt = _qi(engine, table_name)
110
  qc = _qi(engine, col_name)
 
 
 
 
 
111
 
112
- # Combined stats query: null_count, distinct_count, and min/max (if numeric).
113
- # One round-trip instead of two.
114
- select_cols = [
115
  f"COUNT(*) - COUNT({qc}) AS nulls",
116
  f"COUNT(DISTINCT {qc}) AS distincts",
117
  ]
118
- if is_numeric:
119
- select_cols.append(f"MIN({qc}) AS min_val")
120
- select_cols.append(f"MAX({qc}) AS max_val")
121
- select_cols.append(f"AVG({qc}) AS mean_val")
122
- if _supports_median(engine):
123
- select_cols.append(
124
- f"PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY {qc}) AS median_val"
125
- )
126
- stats = pd.read_sql(f"SELECT {', '.join(select_cols)} FROM {qt}", engine)
127
-
128
- null_count = int(stats.iloc[0]["nulls"])
129
- distinct_count = int(stats.iloc[0]["distincts"])
130
- distinct_ratio = distinct_count / row_count if row_count > 0 else 0
131
 
132
- profile = {
133
- "null_count": null_count,
134
- "distinct_count": distinct_count,
135
- "distinct_ratio": round(distinct_ratio, 4),
136
- }
137
-
138
- if is_numeric:
139
- profile["min"] = stats.iloc[0]["min_val"]
140
- profile["max"] = stats.iloc[0]["max_val"]
141
- profile["mean"] = stats.iloc[0]["mean_val"]
142
- if _supports_median(engine):
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
143
  profile["median"] = stats.iloc[0]["median_val"]
 
 
 
 
 
 
 
 
144
 
145
  if 0 < distinct_ratio <= TOP_VALUES_THRESHOLD:
146
  top_sql = _head_query(
@@ -153,9 +196,6 @@ def profile_column(
153
  top = pd.read_sql(top_sql, engine)
154
  profile["top_values"] = list(zip(top.iloc[:, 0].tolist(), top["cnt"].tolist()))
155
 
156
- sample = pd.read_sql(_head_query(engine, qc, qt, 5), engine)
157
- profile["sample_values"] = sample.iloc[:, 0].tolist()
158
-
159
  return profile
160
 
161
 
@@ -273,7 +313,8 @@ def build_text(table_name: str, row_count: int, col: dict, profile: dict) -> str
273
  text += f"Distinct count: {profile['distinct_count']} ({profile['distinct_ratio']:.1%})\n"
274
  if "min" in profile:
275
  text += f"Min: {profile['min']}, Max: {profile['max']}\n"
276
- text += f"Mean: {profile['mean']}\n"
 
277
  if profile.get("median") is not None:
278
  text += f"Median: {profile['median']}\n"
279
  if "top_values" in profile:
 
9
  from typing import Optional
10
 
11
  import pandas as pd
12
+ from sqlalchemy import Date, DateTime, Float, Integer, Numeric, inspect
13
  from sqlalchemy.engine import Engine
14
 
15
  from src.middlewares.logging import get_logger
 
17
  logger = get_logger("db_extractor")
18
 
19
  TOP_VALUES_THRESHOLD = 0.05 # show top values if distinct_ratio <= 5%
20
+ SAMPLE_LIMIT = 3 # sample N rows per column (down from 5 — token cost)
21
+
22
+ # Dialects with a single-statement CTE that survives `pd.read_sql`. On these we
23
+ # fold the stats and sample queries into one round-trip per column. MySQL <8 and
24
+ # old SQLite are excluded out of caution.
25
+ _CTE_DIALECTS = frozenset({"postgresql", "mssql", "snowflake", "bigquery"})
26
 
27
  # Dialects where PERCENTILE_CONT(...) WITHIN GROUP is supported as an aggregate.
28
  # MySQL has no percentile aggregate; BigQuery has PERCENTILE_CONT only as an
29
+ # analytic (window) function — both drop median and keep mean.
30
  _MEDIAN_DIALECTS = frozenset({"postgresql", "mssql", "snowflake"})
31
 
32
 
 
59
  def get_schema(
60
  engine: Engine, exclude_tables: Optional[frozenset[str]] = None
61
  ) -> dict[str, list[dict]]:
62
+ """Returns {table_name: [{name, type, is_numeric, is_temporal, is_primary_key, foreign_key}, ...]}."""
63
  exclude = exclude_tables or frozenset()
64
  inspector = inspect(engine)
65
  schema = {}
 
81
  "name": c["name"],
82
  "type": str(c["type"]),
83
  "is_numeric": isinstance(c["type"], (Integer, Numeric, Float)),
84
+ "is_temporal": isinstance(c["type"], (Date, DateTime)),
85
  "is_primary_key": c["name"] in pk_cols,
86
  "foreign_key": fk_map.get(c["name"]),
87
  }
 
103
  col_name: str,
104
  is_numeric: bool,
105
  row_count: int,
106
+ is_temporal: bool = False,
107
  ) -> dict:
108
+ """Returns null_count, distinct_count, min/max (numeric+temporal), mean/median (numeric), and sample values.
109
+
110
+ Numeric columns compute mean and (where the dialect supports it) median.
111
+ Datetime/date get min/max only (no useful mean/median over timestamps).
112
+ Strings/bools skip range stats entirely.
113
+ """
114
  if row_count == 0:
115
  return {
116
  "null_count": 0,
 
121
 
122
  qt = _qi(engine, table_name)
123
  qc = _qi(engine, col_name)
124
+ wants_range = is_numeric or is_temporal
125
+ wants_mean = is_numeric
126
+ wants_median = is_numeric and _supports_median(engine)
127
+
128
+ profile: dict = {}
129
 
130
+ # Build the stats SELECT list incrementally same column set used in both
131
+ # the CTE and fallback branches.
132
+ stat_cols = [
133
  f"COUNT(*) - COUNT({qc}) AS nulls",
134
  f"COUNT(DISTINCT {qc}) AS distincts",
135
  ]
136
+ if wants_range:
137
+ stat_cols += [f"MIN({qc}) AS min_val", f"MAX({qc}) AS max_val"]
138
+ if wants_mean:
139
+ stat_cols.append(f"AVG({qc}) AS mean_val")
140
+ if wants_median:
141
+ stat_cols.append(
142
+ f"PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY {qc}) AS median_val"
143
+ )
 
 
 
 
 
144
 
145
+ if engine.dialect.name in _CTE_DIALECTS:
146
+ # Single round-trip: stats + sample together via CTE.
147
+ stats_select = ", ".join(stat_cols)
148
+ passthrough = ", ".join(
149
+ f"s.{c.split(' AS ')[-1]}" for c in stat_cols
150
+ )
151
+ sql = (
152
+ f"WITH stats AS (SELECT {stats_select} FROM {qt}), "
153
+ f"sample AS ({_head_query(engine, qc + ' AS sample_val', qt, SAMPLE_LIMIT)}) "
154
+ f"SELECT {passthrough}, sample.sample_val FROM stats s CROSS JOIN sample"
155
+ )
156
+ rows = pd.read_sql(sql, engine)
157
+ null_count = int(rows.iloc[0]["nulls"])
158
+ distinct_count = int(rows.iloc[0]["distincts"])
159
+ sample_values = rows["sample_val"].tolist()
160
+ if wants_range:
161
+ profile["min"] = rows.iloc[0]["min_val"]
162
+ profile["max"] = rows.iloc[0]["max_val"]
163
+ if wants_mean:
164
+ profile["mean"] = rows.iloc[0]["mean_val"]
165
+ if wants_median:
166
+ profile["median"] = rows.iloc[0]["median_val"]
167
+ else:
168
+ # Two-query fallback (MySQL/SQLite).
169
+ stats = pd.read_sql(f"SELECT {', '.join(stat_cols)} FROM {qt}", engine)
170
+ null_count = int(stats.iloc[0]["nulls"])
171
+ distinct_count = int(stats.iloc[0]["distincts"])
172
+ if wants_range:
173
+ profile["min"] = stats.iloc[0]["min_val"]
174
+ profile["max"] = stats.iloc[0]["max_val"]
175
+ if wants_mean:
176
+ profile["mean"] = stats.iloc[0]["mean_val"]
177
+ if wants_median:
178
  profile["median"] = stats.iloc[0]["median_val"]
179
+ sample = pd.read_sql(_head_query(engine, qc, qt, SAMPLE_LIMIT), engine)
180
+ sample_values = sample.iloc[:, 0].tolist()
181
+
182
+ distinct_ratio = distinct_count / row_count if row_count > 0 else 0
183
+ profile["null_count"] = null_count
184
+ profile["distinct_count"] = distinct_count
185
+ profile["distinct_ratio"] = round(distinct_ratio, 4)
186
+ profile["sample_values"] = sample_values
187
 
188
  if 0 < distinct_ratio <= TOP_VALUES_THRESHOLD:
189
  top_sql = _head_query(
 
196
  top = pd.read_sql(top_sql, engine)
197
  profile["top_values"] = list(zip(top.iloc[:, 0].tolist(), top["cnt"].tolist()))
198
 
 
 
 
199
  return profile
200
 
201
 
 
313
  text += f"Distinct count: {profile['distinct_count']} ({profile['distinct_ratio']:.1%})\n"
314
  if "min" in profile:
315
  text += f"Min: {profile['min']}, Max: {profile['max']}\n"
316
+ if profile.get("mean") is not None:
317
+ text += f"Mean: {profile['mean']}\n"
318
  if profile.get("median") is not None:
319
  text += f"Median: {profile['median']}\n"
320
  if "top_values" in profile:
src/pipeline/{document_pipeline/document_pipeline.py → document_pipeline.py} RENAMED
@@ -1,13 +1,17 @@
1
  """Document upload and processing pipeline."""
2
 
 
 
 
3
  from fastapi import HTTPException, UploadFile
4
  from sqlalchemy.ext.asyncio import AsyncSession
5
 
6
  from src.document.document_service import document_service
7
  from src.knowledge.processing_service import knowledge_processor
8
- from src.knowledge.parquet_service import delete_document_parquets
9
  from src.middlewares.logging import get_logger
10
  from src.storage.az_blob.az_blob import blob_storage
 
11
 
12
  logger = get_logger("document_pipeline")
13
 
@@ -62,11 +66,19 @@ class DocumentPipeline:
62
 
63
  try:
64
  await document_service.update_document_status(db, document_id, "processing")
65
- chunks_count = await knowledge_processor.process_document(document, db)
 
 
 
 
66
  await document_service.update_document_status(db, document_id, "completed")
 
 
 
 
67
 
68
  logger.info(f"Processed document {document_id}: {chunks_count} chunks")
69
- return {"document_id": document_id, "chunks_processed": chunks_count}
70
 
71
  except Exception as e:
72
  logger.error(f"Processing failed for document {document_id}", error=str(e))
@@ -87,8 +99,25 @@ class DocumentPipeline:
87
  if document.file_type in ("csv", "xlsx"):
88
  await delete_document_parquets(user_id, document_id)
89
 
 
 
 
 
 
90
  logger.info(f"Deleted document {document_id} for user {user_id}")
91
  return {"document_id": document_id}
92
 
93
 
 
 
 
 
 
 
 
 
 
 
 
 
94
  document_pipeline = DocumentPipeline()
 
1
  """Document upload and processing pipeline."""
2
 
3
+ from io import BytesIO
4
+
5
+ import pandas as pd
6
  from fastapi import HTTPException, UploadFile
7
  from sqlalchemy.ext.asyncio import AsyncSession
8
 
9
  from src.document.document_service import document_service
10
  from src.knowledge.processing_service import knowledge_processor
11
+ from src.storage.parquet import delete_document_parquets, upload_parquet
12
  from src.middlewares.logging import get_logger
13
  from src.storage.az_blob.az_blob import blob_storage
14
+ from src.retrieval.router import retrieval_router
15
 
16
  logger = get_logger("document_pipeline")
17
 
 
66
 
67
  try:
68
  await document_service.update_document_status(db, document_id, "processing")
69
+ if document.file_type not in ("csv", "xlsx"):
70
+ chunks_count = await knowledge_processor.process_document(document, db)
71
+ else:
72
+ await _upload_parquet(document)
73
+ chunks_count = 0
74
  await document_service.update_document_status(db, document_id, "completed")
75
+ try:
76
+ await retrieval_router.invalidate_cache(user_id)
77
+ except Exception as e:
78
+ logger.warning("Failed to invalidate retrieval cache", user_id=user_id, error=str(e))
79
 
80
  logger.info(f"Processed document {document_id}: {chunks_count} chunks")
81
+ return {"document_id": document_id, "chunks_processed": chunks_count, "file_type": document.file_type}
82
 
83
  except Exception as e:
84
  logger.error(f"Processing failed for document {document_id}", error=str(e))
 
99
  if document.file_type in ("csv", "xlsx"):
100
  await delete_document_parquets(user_id, document_id)
101
 
102
+ try:
103
+ await retrieval_router.invalidate_cache(user_id)
104
+ except Exception as e:
105
+ logger.warning("Failed to invalidate retrieval cache", user_id=user_id, error=str(e))
106
+
107
  logger.info(f"Deleted document {document_id} for user {user_id}")
108
  return {"document_id": document_id}
109
 
110
 
111
+ async def _upload_parquet(document) -> None:
112
+ """Download original blob and upload Parquet(s) without vector embedding."""
113
+ content = await blob_storage.download_file(document.blob_name)
114
+ if document.file_type == "csv":
115
+ df = pd.read_csv(BytesIO(content))
116
+ await upload_parquet(df, document.user_id, document.id)
117
+ else: # xlsx
118
+ sheets = pd.read_excel(BytesIO(content), sheet_name=None)
119
+ for sheet_name, df in sheets.items():
120
+ await upload_parquet(df, document.user_id, document.id, sheet_name)
121
+
122
+
123
  document_pipeline = DocumentPipeline()
src/pipeline/structured_pipeline.py ADDED
@@ -0,0 +1,91 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """StructuredPipeline — builds a catalog for DB / tabular sources.
2
+
3
+ Steps (per source, end-to-end):
4
+ 1. introspect (caller-supplied — DatabaseIntrospector or TabularIntrospector)
5
+ 2. merge (replace any existing source with the same source_id)
6
+ 3. validate (catalog/validator.py)
7
+ 4. upsert (catalog/store.py)
8
+
9
+ LLM-driven enrichment was removed: the planner relies on stats + sample
10
+ rows + column names directly. Source/table/column `description` fields stay
11
+ in the model but are not populated by this pipeline.
12
+
13
+ Source-type-agnostic: the caller picks the introspector. Triggers in
14
+ `pipeline/triggers.py` know which one to use based on the upload event.
15
+ """
16
+
17
+ from __future__ import annotations
18
+
19
+ from datetime import UTC, datetime
20
+ from typing import TYPE_CHECKING
21
+
22
+ from src.catalog.introspect.base import BaseIntrospector
23
+ from src.catalog.models import Catalog, Source
24
+ from src.middlewares.logging import get_logger
25
+
26
+ if TYPE_CHECKING:
27
+ from src.catalog.store import CatalogStore
28
+ from src.catalog.validator import CatalogValidator
29
+
30
+ logger = get_logger("structured_pipeline")
31
+
32
+
33
+ class StructuredPipeline:
34
+ """Orchestrates introspect → merge → validate → store.
35
+
36
+ Dependencies are injected (no concrete imports at class-definition time)
37
+ so tests can pass mocks without constructing Settings or opening DB
38
+ connections.
39
+ """
40
+
41
+ def __init__(
42
+ self,
43
+ validator: CatalogValidator,
44
+ store: CatalogStore,
45
+ ) -> None:
46
+ self._validator = validator
47
+ self._store = store
48
+
49
+ async def run(
50
+ self,
51
+ introspector: BaseIntrospector,
52
+ location_ref: str,
53
+ user_id: str,
54
+ ) -> Source:
55
+ source = await introspector.introspect(location_ref)
56
+ merged = await self._merge_with_existing(user_id, source)
57
+ self._validator.validate(merged)
58
+ await self._store.upsert(merged)
59
+ logger.info(
60
+ "structured pipeline complete",
61
+ user_id=user_id,
62
+ source_id=source.source_id,
63
+ source_type=source.source_type,
64
+ tables=len(source.tables),
65
+ )
66
+ return source
67
+
68
+ async def _merge_with_existing(self, user_id: str, new_source: Source) -> Catalog:
69
+ existing = await self._store.get(user_id)
70
+ now = datetime.now(UTC)
71
+ if existing is None:
72
+ return Catalog(user_id=user_id, generated_at=now, sources=[new_source])
73
+ kept = [s for s in existing.sources if s.source_id != new_source.source_id]
74
+ return existing.model_copy(
75
+ update={"sources": [*kept, new_source]}
76
+ )
77
+
78
+
79
+ def default_structured_pipeline() -> StructuredPipeline:
80
+ """Build the production pipeline with default deps.
81
+
82
+ Lazy imports keep `from src.pipeline.structured_pipeline import …` cheap
83
+ and side-effect-free for tests.
84
+ """
85
+ from src.catalog.store import CatalogStore
86
+ from src.catalog.validator import CatalogValidator
87
+
88
+ return StructuredPipeline(
89
+ validator=CatalogValidator(),
90
+ store=CatalogStore(),
91
+ )
src/pipeline/triggers.py ADDED
@@ -0,0 +1,115 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Pipeline trigger entry points called from API routes / event handlers.
2
+
3
+ These thin functions are what the FastAPI routes invoke; they delegate to the
4
+ appropriate pipeline (StructuredPipeline for DB/tabular, DocumentPipeline for
5
+ unstructured).
6
+
7
+ Errors propagate from the pipelines — the caller decides whether to surface
8
+ them as HTTP 4xx/5xx or quietly fail. The trigger itself does not catch.
9
+ """
10
+
11
+ from src.middlewares.logging import get_logger
12
+
13
+ logger = get_logger("pipeline_triggers")
14
+
15
+
16
+ async def on_db_registered(database_client_id: str, user_id: str) -> None:
17
+ """Build a dbclient:// location_ref and run the structured pipeline.
18
+
19
+ Called by `/api/v1/database-clients/{id}/ingest` (after rewiring in a
20
+ later PR). The DatabaseIntrospector resolves the client_id to a
21
+ DatabaseClient row, decrypts credentials, connects, and produces a Source.
22
+ The catalog is then validated and upserted (no LLM enrichment step).
23
+ """
24
+ from src.catalog.introspect.database import database_introspector
25
+ from src.pipeline.structured_pipeline import default_structured_pipeline
26
+
27
+ location_ref = f"dbclient://{database_client_id}"
28
+ logger.info(
29
+ "on_db_registered triggered",
30
+ user_id=user_id,
31
+ database_client_id=database_client_id,
32
+ )
33
+ pipeline = default_structured_pipeline()
34
+ await pipeline.run(database_introspector, location_ref, user_id)
35
+
36
+
37
+ async def on_tabular_uploaded(document_id: str, user_id: str) -> None:
38
+ """Build an az_blob:// location_ref and run the structured pipeline.
39
+
40
+ Called after a CSV/XLSX/Parquet file has been processed and its Parquet
41
+ blob(s) uploaded. The TabularIntrospector downloads the original blob,
42
+ profiles each column, and produces a Source. The catalog is then validated
43
+ and upserted (no LLM enrichment step).
44
+ """
45
+ from src.catalog.introspect.tabular import tabular_introspector
46
+ from src.pipeline.structured_pipeline import default_structured_pipeline
47
+
48
+ location_ref = f"az_blob://{user_id}/{document_id}"
49
+ logger.info(
50
+ "on_tabular_uploaded triggered",
51
+ user_id=user_id,
52
+ document_id=document_id,
53
+ )
54
+ pipeline = default_structured_pipeline()
55
+ await pipeline.run(tabular_introspector, location_ref, user_id)
56
+
57
+
58
+ async def on_document_uploaded(document_id: str, user_id: str) -> None:
59
+ """Process an unstructured document (PDF/DOCX/TXT) through the document pipeline.
60
+
61
+ Opens its own DB session so it can be called from event handlers that
62
+ don't have an injected session (same pattern as on_tabular_uploaded).
63
+ """
64
+ from src.db.postgres.connection import AsyncSessionLocal
65
+ from src.pipeline.document_pipeline import document_pipeline
66
+
67
+ logger.info("on_document_uploaded triggered", user_id=user_id, document_id=document_id)
68
+ async with AsyncSessionLocal() as db:
69
+ await document_pipeline.process(document_id, user_id, db)
70
+
71
+
72
+ async def on_tabular_deleted(document_id: str, user_id: str) -> None:
73
+ """Remove a tabular source from the user's catalog when its document is deleted."""
74
+ from src.catalog.store import CatalogStore
75
+
76
+ logger.info("on_tabular_deleted triggered", user_id=user_id, document_id=document_id)
77
+ await CatalogStore().remove_source(user_id, source_id=document_id)
78
+
79
+
80
+ async def on_db_deleted(client_id: str, user_id: str) -> None:
81
+ """Remove a schema source from the user's catalog when its DB client is deleted."""
82
+ from src.catalog.store import CatalogStore
83
+
84
+ logger.info("on_db_deleted triggered", user_id=user_id, client_id=client_id)
85
+ await CatalogStore().remove_source(user_id, source_id=client_id)
86
+
87
+
88
+ async def on_catalog_rebuild_requested(user_id: str) -> None:
89
+ """Re-introspect every source in the user's catalog and upsert the result.
90
+
91
+ Iterates all Sources in the current catalog. Each source is re-run through
92
+ its original trigger (on_db_registered for schema, on_tabular_uploaded for
93
+ tabular). Per-source failures are logged but do not abort the remaining
94
+ sources.
95
+ """
96
+ from src.catalog.store import CatalogStore
97
+
98
+ catalog = await CatalogStore().get(user_id)
99
+ if catalog is None:
100
+ logger.info("no catalog to rebuild", user_id=user_id)
101
+ return
102
+
103
+ logger.info("on_catalog_rebuild_requested triggered", user_id=user_id, source_count=len(catalog.sources))
104
+ for source in catalog.sources:
105
+ try:
106
+ if source.source_type == "schema":
107
+ client_id = source.location_ref.split("://")[1]
108
+ await on_db_registered(client_id, user_id)
109
+ elif source.source_type == "tabular":
110
+ document_id = source.location_ref.split("://")[1].split("/")[1]
111
+ await on_tabular_uploaded(document_id, user_id)
112
+ else:
113
+ logger.warning("unsupported source_type for rebuild", source_type=source.source_type, source_id=source.source_id)
114
+ except Exception as e:
115
+ logger.error("rebuild failed for source", source_id=source.source_id, source_type=source.source_type, error=str(e))
src/query/README.md ADDED
@@ -0,0 +1,11 @@
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # query
2
+
3
+ Catalog-driven query subsystem. User question → IR → SQL/pandas → result.
4
+
5
+ Subpackages:
6
+ - `ir/` — JSON IR Pydantic models + validator
7
+ - `planner/` — LLM step: question + catalog → IR
8
+ - `compiler/` — deterministic IR → SQL or pandas op chain (no LLM)
9
+ - `executor/` — runs the compiled query against DB or Parquet
10
+
11
+ See `ARCHITECTURE.md` (root) for the full design.
src/query/base.py DELETED
@@ -1,32 +0,0 @@
1
- """Shared contract for query executors."""
2
-
3
- from abc import ABC, abstractmethod
4
- from dataclasses import dataclass, field
5
-
6
- from sqlalchemy.ext.asyncio import AsyncSession
7
-
8
- from src.rag.base import RetrievalResult
9
-
10
-
11
- @dataclass
12
- class QueryResult:
13
- source_type: str # "database" or "document"
14
- source_id: str # database_client_id or document_id
15
- table_or_file: str
16
- columns: list[str]
17
- rows: list[dict]
18
- row_count: int
19
- metadata: dict = field(default_factory=dict)
20
- # metadata should include "column_types": {"col_name": "dtype"} when available
21
-
22
-
23
- class BaseExecutor(ABC):
24
- @abstractmethod
25
- async def execute(
26
- self,
27
- results: list[RetrievalResult],
28
- user_id: str,
29
- db: AsyncSession,
30
- question: str,
31
- limit: int = 100,
32
- ) -> list[QueryResult]: ...