File size: 19,606 Bytes
6bff5d9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
# Phase 1 β†’ Phase 2 Migration Report

A walkthrough of what changed between the original retrieval-style backend (Phase 1) and the current catalog-driven backend (Phase 2). Intended as a hand-off for the lead.

---

## 1. The conceptual change

**Phase 1** was a single retrieval-style RAG pipeline. Every question β€” whether it pointed at a database, a spreadsheet, or a PDF β€” went through the same primitive: **chunk + embed + top-K** over PGVector. Schema and tabular columns were embedded as chunks and ranked alongside prose. When the question needed SQL, the LLM **wrote the SQL string directly** (via `query_executor`).

**Phase 2** splits the system into two paths governed by an LLM router:

| Path | Primitive | Why |
|---|---|---|
| Unstructured (PDF / DOCX / TXT) | Dense similarity over prose chunks (PGVector) | Right primitive for free text |
| Structured (DB / CSV / XLSX / Parquet) | **Per-user data catalog** β†’ LLM emits a **JSON IR** of intent β†’ deterministic **compiler** β†’ **executor** (SQL or pandas) | A column lookup shouldn't go through a similarity ranking lottery; the LLM emits intent, never SQL syntax |

Three explicit LLM call sites only:

1. **Intent router** (classifies the user message into `chat` / `unstructured` / `structured`)
2. **Query planner** (turns the question + catalog into a Pydantic-validated `QueryIR`)
3. **Chatbot agent** (formats the final answer, streamed over SSE)

Everything else β€” IR validation, SQL/pandas compilation, execution β€” is deterministic Python.

---

## 2. File-by-file changes

### 2.1 Deleted (Phase 1 only)

| Phase 1 path | Reason it was removed |
|---|---|
| `src/rag/base.py`, `src/rag/retriever.py`, `src/rag/router.py` | Replaced by `src/retrieval/` |
| `src/rag/retrievers/baseline.py`, `schema.py`, `document.py` | Schema retrieval gone (catalog replaces it); document retriever rewritten in `src/retrieval/document.py` |
| `src/tools/search.py` (whole `tools/` folder) | Only consumer was `rag/router.py` |
| `src/query/base.py` | Duplicate of `query/executor/base.py` |
| `src/query/query_executor.py` | Replaced by `src/query/service.py` |
| `src/query/executors/db_executor.py` | Replaced by `src/query/executor/db.py` |
| `src/query/executors/tabular.py` | Replaced by `src/query/executor/tabular.py` |
| `src/agents/chatbot.py` (Phase 1 LangChain chatbot) | Phase 2 `ChatbotAgent` lives at the same path now β€” see Β§2.2 |
| `src/api/v1/knowledge.py` | Fake `/knowledge/rebuild` endpoint, never wired |
| `src/config/agents/system_prompt.md`, `guardrails_prompt.md` | Replaced by `src/config/prompts/{chatbot_system,guardrails}.md` |
| `src/models/structured_output.py` (`IntentClassification`) | Replaced by `IntentRouterDecision` Pydantic model inside `agents/orchestration.py` |
| `src/models/sql_query.py` | LLM no longer emits SQL; IR replaces it |
| `src/pipeline/orchestrator.py` (empty stub) | Redundant β€” `StructuredPipeline` takes the introspector at `run()` time |

### 2.2 Renamed / moved (same role, new home)

| Phase 1 location | Phase 2 location | Notes |
|---|---|---|
| `src/agents/chatbot.py` (Phase 1) β†’ deleted, then `src/agents/answer_agent.py` (`AnswerAgent`) β†’ renamed | `src/agents/chatbot.py::ChatbotAgent` | Final answer formation; streams via `astream` |
| `src/knowledge/parquet_service.py` | `src/storage/parquet.py` | Parquet upload/download helper |
| `src/pipeline/document_pipeline/document_pipeline.py` (folder) | `src/pipeline/document_pipeline.py` (flat) | Single module |
| `src/rag/retrievers/document.py` | `src/retrieval/document.py` | `DocumentRetriever` migrated; tabular file types filtered out of results |
| `src/rag/router.py` | `src/retrieval/router.py` | `RetrievalRouter`, Redis-cached, unstructured-only; dead `db: AsyncSession` + `source_hint` params removed |
| `src/rag/base.py` (`RetrievalResult`, `BaseRetriever`) | `src/retrieval/base.py` | Same dataclass + ABC |

> **Heads-up on the intent router**: the Phase 1 file `src/agents/orchestration.py` and its class `OrchestratorAgent` were **kept in place** for Phase 2 β€” but the body was fully rewritten. The class now emits `IntentRouterDecision(needs_search, source_hint ∈ {chat, unstructured, structured}, rewritten_query)`. The prompt file and test file use the `intent_router` name (`config/prompts/intent_router.md`, `tests/agents/test_intent_router.py`), but **the source module is still `orchestration.py` and the class is still `OrchestratorAgent`**. Existing imports continue to work; only the behavior changed.

### 2.3 Added (Phase 2 new)

**Catalog subsystem (whole new concept)**

| Path | Role |
|---|---|
| `src/catalog/models.py` | Pydantic: `Catalog β†’ Source[] β†’ Table[] β†’ Column[]`, `ForeignKey`, `ColumnStats.top_values` |
| `src/catalog/introspect/base.py` | `BaseIntrospector` ABC |
| `src/catalog/introspect/database.py` | DB introspector β€” wraps Phase 1 `db_pipeline/extractor.py` (`get_schema`, `profile_column`, `get_row_count`) |
| `src/catalog/introspect/tabular.py` | CSV / XLSX / Parquet introspector β€” one `Table` per XLSX sheet |
| `src/catalog/render.py` | Renders a `Source` for the planner prompt |
| `src/catalog/validator.py` | Unique-ID + foreign-key-ref invariants |
| `src/catalog/store.py` | Postgres `jsonb` upsert keyed by `user_id` (table `data_catalog`) |
| `src/catalog/reader.py` | Loads + filters catalog by `source_hint` |
| `src/catalog/pii_detector.py` | Flags PII columns at ingestion β†’ suppresses `sample_values` |
| `src/security/pii_patterns.py` | Name patterns + value regex used by the detector |

**JSON IR + query subsystem**

| Path | Role |
|---|---|
| `src/query/ir/models.py` | `QueryIR` Pydantic schema |
| `src/query/ir/operators.py` | `ALLOWED_FILTER_OPS`, `ALLOWED_AGG_FNS`, `LIMIT_HARD_CAP`, `TYPE_COMPATIBILITY` |
| `src/query/ir/validator.py` | Catalog-aware IR validation (rejects unknown column ids, bad ops, type mismatches, oversize limits) |
| `src/query/planner/service.py` | `QueryPlannerService.plan(question, catalog, previous_error)` β€” Azure OpenAI structured output β†’ `QueryIR` |
| `src/query/planner/prompt.py` | Builds the planner prompt from catalog text |
| `src/query/compiler/base.py` | Compiler ABC |
| `src/query/compiler/sql.py` | `SqlCompiler` (Postgres) β€” all 12 filter ops, params as a dict |
| `src/query/compiler/pandas.py` | `PandasCompiler` β€” returns `CompiledPandas(apply, output_columns)` |
| `src/query/executor/base.py` | `BaseExecutor` + `QueryResult` |
| `src/query/executor/db.py` | `DbExecutor` β€” sqlglot SELECT-only guard, RO txn, 30 s `statement_timeout`, 10 k row cap |
| `src/query/executor/tabular.py` | `TabularExecutor` β€” Parquet via blob, `asyncio.to_thread`, 10 k cap |
| `src/query/executor/dispatcher.py` | `ExecutorDispatcher.pick(ir)` β€” picks by `source.source_type` |
| `src/query/service.py` | `QueryService.run(user_id, question, catalog)` β€” plan β†’ validate β†’ retry (max 3) β†’ dispatch β†’ execute |

**Agents**

| Path | Role |
|---|---|
| `src/agents/orchestration.py` | `OrchestratorAgent` β€” Phase 1 file/class name preserved; Phase 2 body. Emits `IntentRouterDecision` |
| `src/agents/chatbot.py` | `ChatbotAgent` β€” formerly `AnswerAgent` in `agents/answer_agent.py`; renamed in Cleanup PR |
| `src/agents/chat_handler.py` | `ChatHandler.handle(...)` β€” top-level orchestrator; yields `intent` / `chunk` / `done` / `error` SSE events |

**Pipelines & API**

| Path | Role |
|---|---|
| `src/pipeline/structured_pipeline.py` | DB / tabular ingestion: introspect β†’ merge β†’ validate β†’ upsert |
| `src/pipeline/triggers.py` | `on_db_registered`, `on_tabular_uploaded`, `on_document_uploaded`, `on_catalog_rebuild_requested` |
| `src/api/v1/data_catalog.py` | `GET /api/v1/data-catalog/{user_id}` + `POST /api/v1/data-catalog/rebuild` |
| `src/models/api/catalog.py` | Catalog request/response models |
| `src/config/prompts/intent_router.md`, `query_planner.md`, `chatbot_system.md`, `guardrails.md` | New prompts. `guardrails.md` is appended to `chatbot_system.md` at load time |
| `src/db/postgres/models.py` (added `Catalog` SQLAlchemy class) | Stores the per-user jsonb document in `data_catalog` |

### 2.4 Rewired API endpoints

| Endpoint | Phase 1 wiring | Phase 2 wiring |
|---|---|---|
| `POST /api/v1/chat/stream` | Inline in `chat.py`: `OrchestratorAgent` β†’ `retriever` β†’ `query_executor` β†’ `chatbot` | Delegates to `ChatHandler.handle()`. Redis cache, fast intent, history load, and message persistence stay in the endpoint |
| `POST /api/v1/database-clients/{id}/ingest` | Called `db_pipeline_service.run()` and dual-wrote vectors | Calls **only** `on_db_registered` (catalog build). Failure β†’ HTTP 500 |
| `POST /api/v1/document/process` | Always pushed to vector store | PDF/DOCX/TXT β†’ `knowledge_processor` (vectors); CSV/XLSX β†’ `on_tabular_uploaded` (catalog only, **no vector embedding**) |
| `POST /api/v1/document/upload` | Storage + DB row | Same, plus `on_document_uploaded` trigger |
| `POST /api/v1/data-catalog/rebuild` | β€” | New: iterates all sources, re-runs per-source trigger |
| `GET /api/v1/data-catalog/{user_id}` | β€” | New: returns `list[CatalogIndexEntry]` |

### 2.5 Phase 1 files still in production use

These were **not rewritten** β€” Phase 2 imports them directly:

- `src/database_client/database_client_service.py`
- `src/utils/db_credential_encryption.py` (`decrypt_credentials_dict`) β€” `src/security/credentials.py` is still a stub
- `src/pipeline/db_pipeline/db_pipeline_service.py` (`engine_scope` context manager β€” used by both the introspector and `DbExecutor`)
- `src/pipeline/db_pipeline/extractor.py` (`get_schema`, `profile_column`, `get_row_count`)
- `src/knowledge/processing_service.py` (PDF / DOCX / TXT extraction + embedding)
- `src/db/postgres/{connection,init_db,vector_store}.py`, `src/storage/az_blob/`, `src/middlewares/`, `src/security/auth.py`

---

## 3. End-to-end flow (current state)

### 3.1 Ingestion

```
User action                Pipeline                                Storage
──────────────             ────────────────────────────             ─────────────────
upload PDF/DOCX/TXT  β†’   DocumentPipeline                       β†’  Azure Blob + PGVector
                          (extract β†’ chunk β†’ embed)                 (table: langchain_pg_embedding)
                          + on_document_uploaded                    + retrieval cache invalidate

upload CSV/XLSX     β†’    TabularIntrospector                   β†’  Azure Blob (Parquet)
                          (sheets / columns + sample + stats)       + data_catalog jsonb row
                          β†’ CatalogValidator β†’ CatalogStore         (NO vector store β€” catalog only)
                          via on_tabular_uploaded

register DB         β†’    DatabaseIntrospector                  β†’  data_catalog jsonb row
                          (information_schema + sample + FKs)
                          β†’ validate β†’ store
                          via on_db_registered
```

### 3.2 Query (per user message β†’ SSE stream)

```
POST /api/v1/chat/stream
        β”‚
        β”œβ”€β”€ Redis cache check (24h TTL) β€” hit returns cached stream
        β”œβ”€β”€ _fast_intent (greetings / goodbyes) β€” bypass LLM
        β”œβ”€β”€ load history from chat_messages
        β”‚
        └── ChatHandler.handle(message, user_id, history)         [src/agents/chat_handler.py]
                β”‚
                β”œβ”€ OrchestratorAgent.classify()                    [agents/orchestration.py]
                β”‚     β†’ needs_search, source_hint, rewritten_query
                β”‚
                β”œβ”€β”€ source_hint == "chat"
                β”‚     β†’ ChatbotAgent.astream() β†’ yield chunk events
                β”‚
                β”œβ”€β”€ source_hint == "unstructured"
                β”‚     β†’ RetrievalRouter.retrieve()                 [retrieval/router.py, Redis-cached]
                β”‚         β†’ DocumentRetriever (PGVector MMR/cosine/etc.)
                β”‚     β†’ ChatbotAgent.astream(chunks=...)
                β”‚
                └── source_hint == "structured"
                      β†’ CatalogReader.read(user_id, "structured")  [catalog/reader.py]
                      β†’ QueryService.run(user_id, question, catalog)   [query/service.py]
                           β”‚
                           β”œβ”€ QueryPlannerService.plan(...)        [query/planner/service.py]
                           β”‚    LLM(catalog, question, prev_error?) β†’ QueryIR
                           β”‚
                           β”œβ”€ IRValidator.validate(ir, catalog)    [query/ir/validator.py]
                           β”‚    fail β†’ loop back to planner with error context (max 3)
                           β”‚
                           β”œβ”€ ExecutorDispatcher.pick(ir)          [query/executor/dispatcher.py]
                           β”‚    schema source  β†’ DbExecutor
                           β”‚    tabular source β†’ TabularExecutor
                           β”‚
                           β”œβ”€ DbExecutor.run(ir):                  [query/executor/db.py]
                           β”‚    SqlCompiler β†’ (sql, params)
                           β”‚      β†’ sqlglot SELECT-only guard
                           β”‚      β†’ engine_scope (Phase 1 utility) in asyncio.to_thread
                           β”‚      β†’ RO txn + statement_timeout=30s + 10k cap
                           β”‚
                           β”œβ”€ TabularExecutor.run(ir):             [query/executor/tabular.py]
                           β”‚    resolve Parquet blob path
                           β”‚      β†’ download β†’ PandasCompiler.apply(df)
                           β”‚      β†’ asyncio.to_thread β†’ 10k cap
                           β”‚
                           └─ QueryResult { rows, columns, row_count,
                                            truncated, source_id, error?, elapsed_ms }
                      β†’
                      ChatbotAgent.astream(query_result=...)
                            β†’ yield chunk events
                β”‚
                └── final events: done / error
        β”‚
        └── persist user + assistant messages to chat_messages
        └── populate Redis cache
```

**Safety invariants for the structured path** (read-only at every layer):

1. IR validated against the catalog before reaching the compiler
2. Identifiers come from the catalog (trusted; inlined as quoted identifiers)
3. Values from `IR.filters` are always parameterized
4. Compiler is deterministic β€” no LLM in the hot path
5. sqlglot rejects anything that isn't a pure SELECT
6. DB connection is read-only with a 30 s `statement_timeout`
7. Hard 10 000 row cap on both executors; neither raises β€” errors go in `QueryResult.error`

---

## 4. Summary table for review

| Concern | Phase 1 β€” where it lived | Phase 2 β€” where it lives | Change type |
|---|---|---|---|
| Intent classification | `agents/orchestration.py::OrchestratorAgent` (free-text intent) | **Same path + same class name** β€” body rewritten to emit `IntentRouterDecision` | Body rewrite only |
| Top-level chat orchestration | Inline in `api/v1/chat.py` | `agents/chat_handler.py::ChatHandler` | Extracted to a reusable module |
| Final answer formation | `agents/chatbot.py` (Phase 1 LangChain) | `agents/chatbot.py::ChatbotAgent` (was `AnswerAgent` in `answer_agent.py` mid-cycle) | Rewritten + renamed |
| Schema retrieval (DB / tabular) | `rag/retrievers/schema.py` + PGVector chunks | **Removed**. Replaced by catalog (`catalog/store.py` jsonb) loaded verbatim into planner prompt | Whole concept replaced |
| Doc retrieval (PDF / DOCX / TXT) | `rag/retrievers/document.py`, `rag/router.py` | `retrieval/document.py`, `retrieval/router.py` | Moved; Redis cache restored; tabular files filtered |
| Query writing | `query/query_executor.py` + `models/sql_query.py` (LLM writes SQL) | `query/planner/service.py` (LLM writes IR) + `query/compiler/sql.py` (deterministic) | LLM emits intent, not SQL |
| DB execution | `query/executors/db_executor.py` | `query/executor/db.py::DbExecutor` | Folder renamed (`executors` β†’ `executor`); sqlglot guard + RO txn + 30 s timeout kept |
| Tabular execution | `query/executors/tabular.py` | `query/executor/tabular.py::TabularExecutor` | Parquet-only; pandas compiler split out |
| Executor selection | Hard-coded in `query_executor.py` | `query/executor/dispatcher.py::ExecutorDispatcher` | New; routes by `source.source_type` |
| Catalog (NEW) | β€” | `catalog/` (models, introspect/, validator, store, reader, pii_detector, render) | New subsystem |
| Catalog persistence | (data was embedded in PGVector) | Postgres jsonb table `data_catalog`, keyed by `user_id` | New table |
| Ingestion triggers | Inline in API endpoints | `pipeline/triggers.py` (`on_db_registered`, `on_tabular_uploaded`, `on_document_uploaded`, `on_catalog_rebuild_requested`) | Centralized event entry points |
| Structured pipeline | `pipeline/db_pipeline/db_pipeline_service.py` (still present for `engine_scope` + extractor reuse) | `pipeline/structured_pipeline.py` (orchestrator) β€” reuses Phase 1 extractor | New orchestrator wraps Phase 1 introspection helpers |
| Document pipeline | `pipeline/document_pipeline/document_pipeline.py` (folder) | `pipeline/document_pipeline.py` (file) | Flattened; CSV / XLSX now skip the vector store |
| Parquet helper | `knowledge/parquet_service.py` | `storage/parquet.py` | Moved into `storage/` |
| Prompts | `config/agents/system_prompt.md`, `guardrails_prompt.md` | `config/prompts/{intent_router,query_planner,chatbot_system,guardrails}.md` | Folder renamed; split into four files; guardrails appended to `chatbot_system` at load |
| PII detection | β€” | `catalog/pii_detector.py` + `security/pii_patterns.py` | New. Columns flagged `pii_flag=true` get `sample_values: null` so PII never enters prompts |
| Chat endpoint | `api/v1/chat.py` (does everything inline) | `api/v1/chat.py` (cache + history + persistence) β†’ delegates to `ChatHandler` | Slimmed; SSE event shape is `intent` / `chunk` / `done` / `error` |
| DB ingest endpoint | `api/v1/db_client.py::ingest` (Phase 1 `db_pipeline_service.run()`) | `api/v1/db_client.py::ingest` (calls `on_db_registered` only) | Phase 1 dual-write removed |
| Document process endpoint | `api/v1/document.py::process` (always vectorize) | `api/v1/document.py::process` (PDF/DOCX/TXT β†’ vectors; CSV/XLSX β†’ catalog via `on_tabular_uploaded`) | Routing by file type |
| Catalog management API | β€” | `api/v1/data_catalog.py` (GET index + POST rebuild) | New |

**Bottom line.** Every Phase 1 file under `src/rag/`, `src/tools/`, `src/query/executors/`, `src/query/query_executor.py`, `src/query/base.py`, `src/api/v1/knowledge.py`, and `src/config/agents/` is gone. Phase 1 introspection helpers under `src/pipeline/db_pipeline/` and `src/database_client/` are still imported by Phase 2 β€” they were not rewritten, just wrapped. The three LLM call sites are now explicit and the SQL-writing one no longer exists; the planner emits a Pydantic-validated `QueryIR` instead.

The one filename gotcha to remember: the **intent router** still lives at `src/agents/orchestration.py` as class `OrchestratorAgent` (Phase 1 name kept for import-site compatibility, Phase 2 body). The matching prompt and tests use the `intent_router` name, but the source module does not.