File size: 19,606 Bytes
6bff5d9 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 | # Phase 1 β Phase 2 Migration Report
A walkthrough of what changed between the original retrieval-style backend (Phase 1) and the current catalog-driven backend (Phase 2). Intended as a hand-off for the lead.
---
## 1. The conceptual change
**Phase 1** was a single retrieval-style RAG pipeline. Every question β whether it pointed at a database, a spreadsheet, or a PDF β went through the same primitive: **chunk + embed + top-K** over PGVector. Schema and tabular columns were embedded as chunks and ranked alongside prose. When the question needed SQL, the LLM **wrote the SQL string directly** (via `query_executor`).
**Phase 2** splits the system into two paths governed by an LLM router:
| Path | Primitive | Why |
|---|---|---|
| Unstructured (PDF / DOCX / TXT) | Dense similarity over prose chunks (PGVector) | Right primitive for free text |
| Structured (DB / CSV / XLSX / Parquet) | **Per-user data catalog** β LLM emits a **JSON IR** of intent β deterministic **compiler** β **executor** (SQL or pandas) | A column lookup shouldn't go through a similarity ranking lottery; the LLM emits intent, never SQL syntax |
Three explicit LLM call sites only:
1. **Intent router** (classifies the user message into `chat` / `unstructured` / `structured`)
2. **Query planner** (turns the question + catalog into a Pydantic-validated `QueryIR`)
3. **Chatbot agent** (formats the final answer, streamed over SSE)
Everything else β IR validation, SQL/pandas compilation, execution β is deterministic Python.
---
## 2. File-by-file changes
### 2.1 Deleted (Phase 1 only)
| Phase 1 path | Reason it was removed |
|---|---|
| `src/rag/base.py`, `src/rag/retriever.py`, `src/rag/router.py` | Replaced by `src/retrieval/` |
| `src/rag/retrievers/baseline.py`, `schema.py`, `document.py` | Schema retrieval gone (catalog replaces it); document retriever rewritten in `src/retrieval/document.py` |
| `src/tools/search.py` (whole `tools/` folder) | Only consumer was `rag/router.py` |
| `src/query/base.py` | Duplicate of `query/executor/base.py` |
| `src/query/query_executor.py` | Replaced by `src/query/service.py` |
| `src/query/executors/db_executor.py` | Replaced by `src/query/executor/db.py` |
| `src/query/executors/tabular.py` | Replaced by `src/query/executor/tabular.py` |
| `src/agents/chatbot.py` (Phase 1 LangChain chatbot) | Phase 2 `ChatbotAgent` lives at the same path now β see Β§2.2 |
| `src/api/v1/knowledge.py` | Fake `/knowledge/rebuild` endpoint, never wired |
| `src/config/agents/system_prompt.md`, `guardrails_prompt.md` | Replaced by `src/config/prompts/{chatbot_system,guardrails}.md` |
| `src/models/structured_output.py` (`IntentClassification`) | Replaced by `IntentRouterDecision` Pydantic model inside `agents/orchestration.py` |
| `src/models/sql_query.py` | LLM no longer emits SQL; IR replaces it |
| `src/pipeline/orchestrator.py` (empty stub) | Redundant β `StructuredPipeline` takes the introspector at `run()` time |
### 2.2 Renamed / moved (same role, new home)
| Phase 1 location | Phase 2 location | Notes |
|---|---|---|
| `src/agents/chatbot.py` (Phase 1) β deleted, then `src/agents/answer_agent.py` (`AnswerAgent`) β renamed | `src/agents/chatbot.py::ChatbotAgent` | Final answer formation; streams via `astream` |
| `src/knowledge/parquet_service.py` | `src/storage/parquet.py` | Parquet upload/download helper |
| `src/pipeline/document_pipeline/document_pipeline.py` (folder) | `src/pipeline/document_pipeline.py` (flat) | Single module |
| `src/rag/retrievers/document.py` | `src/retrieval/document.py` | `DocumentRetriever` migrated; tabular file types filtered out of results |
| `src/rag/router.py` | `src/retrieval/router.py` | `RetrievalRouter`, Redis-cached, unstructured-only; dead `db: AsyncSession` + `source_hint` params removed |
| `src/rag/base.py` (`RetrievalResult`, `BaseRetriever`) | `src/retrieval/base.py` | Same dataclass + ABC |
> **Heads-up on the intent router**: the Phase 1 file `src/agents/orchestration.py` and its class `OrchestratorAgent` were **kept in place** for Phase 2 β but the body was fully rewritten. The class now emits `IntentRouterDecision(needs_search, source_hint β {chat, unstructured, structured}, rewritten_query)`. The prompt file and test file use the `intent_router` name (`config/prompts/intent_router.md`, `tests/agents/test_intent_router.py`), but **the source module is still `orchestration.py` and the class is still `OrchestratorAgent`**. Existing imports continue to work; only the behavior changed.
### 2.3 Added (Phase 2 new)
**Catalog subsystem (whole new concept)**
| Path | Role |
|---|---|
| `src/catalog/models.py` | Pydantic: `Catalog β Source[] β Table[] β Column[]`, `ForeignKey`, `ColumnStats.top_values` |
| `src/catalog/introspect/base.py` | `BaseIntrospector` ABC |
| `src/catalog/introspect/database.py` | DB introspector β wraps Phase 1 `db_pipeline/extractor.py` (`get_schema`, `profile_column`, `get_row_count`) |
| `src/catalog/introspect/tabular.py` | CSV / XLSX / Parquet introspector β one `Table` per XLSX sheet |
| `src/catalog/render.py` | Renders a `Source` for the planner prompt |
| `src/catalog/validator.py` | Unique-ID + foreign-key-ref invariants |
| `src/catalog/store.py` | Postgres `jsonb` upsert keyed by `user_id` (table `data_catalog`) |
| `src/catalog/reader.py` | Loads + filters catalog by `source_hint` |
| `src/catalog/pii_detector.py` | Flags PII columns at ingestion β suppresses `sample_values` |
| `src/security/pii_patterns.py` | Name patterns + value regex used by the detector |
**JSON IR + query subsystem**
| Path | Role |
|---|---|
| `src/query/ir/models.py` | `QueryIR` Pydantic schema |
| `src/query/ir/operators.py` | `ALLOWED_FILTER_OPS`, `ALLOWED_AGG_FNS`, `LIMIT_HARD_CAP`, `TYPE_COMPATIBILITY` |
| `src/query/ir/validator.py` | Catalog-aware IR validation (rejects unknown column ids, bad ops, type mismatches, oversize limits) |
| `src/query/planner/service.py` | `QueryPlannerService.plan(question, catalog, previous_error)` β Azure OpenAI structured output β `QueryIR` |
| `src/query/planner/prompt.py` | Builds the planner prompt from catalog text |
| `src/query/compiler/base.py` | Compiler ABC |
| `src/query/compiler/sql.py` | `SqlCompiler` (Postgres) β all 12 filter ops, params as a dict |
| `src/query/compiler/pandas.py` | `PandasCompiler` β returns `CompiledPandas(apply, output_columns)` |
| `src/query/executor/base.py` | `BaseExecutor` + `QueryResult` |
| `src/query/executor/db.py` | `DbExecutor` β sqlglot SELECT-only guard, RO txn, 30 s `statement_timeout`, 10 k row cap |
| `src/query/executor/tabular.py` | `TabularExecutor` β Parquet via blob, `asyncio.to_thread`, 10 k cap |
| `src/query/executor/dispatcher.py` | `ExecutorDispatcher.pick(ir)` β picks by `source.source_type` |
| `src/query/service.py` | `QueryService.run(user_id, question, catalog)` β plan β validate β retry (max 3) β dispatch β execute |
**Agents**
| Path | Role |
|---|---|
| `src/agents/orchestration.py` | `OrchestratorAgent` β Phase 1 file/class name preserved; Phase 2 body. Emits `IntentRouterDecision` |
| `src/agents/chatbot.py` | `ChatbotAgent` β formerly `AnswerAgent` in `agents/answer_agent.py`; renamed in Cleanup PR |
| `src/agents/chat_handler.py` | `ChatHandler.handle(...)` β top-level orchestrator; yields `intent` / `chunk` / `done` / `error` SSE events |
**Pipelines & API**
| Path | Role |
|---|---|
| `src/pipeline/structured_pipeline.py` | DB / tabular ingestion: introspect β merge β validate β upsert |
| `src/pipeline/triggers.py` | `on_db_registered`, `on_tabular_uploaded`, `on_document_uploaded`, `on_catalog_rebuild_requested` |
| `src/api/v1/data_catalog.py` | `GET /api/v1/data-catalog/{user_id}` + `POST /api/v1/data-catalog/rebuild` |
| `src/models/api/catalog.py` | Catalog request/response models |
| `src/config/prompts/intent_router.md`, `query_planner.md`, `chatbot_system.md`, `guardrails.md` | New prompts. `guardrails.md` is appended to `chatbot_system.md` at load time |
| `src/db/postgres/models.py` (added `Catalog` SQLAlchemy class) | Stores the per-user jsonb document in `data_catalog` |
### 2.4 Rewired API endpoints
| Endpoint | Phase 1 wiring | Phase 2 wiring |
|---|---|---|
| `POST /api/v1/chat/stream` | Inline in `chat.py`: `OrchestratorAgent` β `retriever` β `query_executor` β `chatbot` | Delegates to `ChatHandler.handle()`. Redis cache, fast intent, history load, and message persistence stay in the endpoint |
| `POST /api/v1/database-clients/{id}/ingest` | Called `db_pipeline_service.run()` and dual-wrote vectors | Calls **only** `on_db_registered` (catalog build). Failure β HTTP 500 |
| `POST /api/v1/document/process` | Always pushed to vector store | PDF/DOCX/TXT β `knowledge_processor` (vectors); CSV/XLSX β `on_tabular_uploaded` (catalog only, **no vector embedding**) |
| `POST /api/v1/document/upload` | Storage + DB row | Same, plus `on_document_uploaded` trigger |
| `POST /api/v1/data-catalog/rebuild` | β | New: iterates all sources, re-runs per-source trigger |
| `GET /api/v1/data-catalog/{user_id}` | β | New: returns `list[CatalogIndexEntry]` |
### 2.5 Phase 1 files still in production use
These were **not rewritten** β Phase 2 imports them directly:
- `src/database_client/database_client_service.py`
- `src/utils/db_credential_encryption.py` (`decrypt_credentials_dict`) β `src/security/credentials.py` is still a stub
- `src/pipeline/db_pipeline/db_pipeline_service.py` (`engine_scope` context manager β used by both the introspector and `DbExecutor`)
- `src/pipeline/db_pipeline/extractor.py` (`get_schema`, `profile_column`, `get_row_count`)
- `src/knowledge/processing_service.py` (PDF / DOCX / TXT extraction + embedding)
- `src/db/postgres/{connection,init_db,vector_store}.py`, `src/storage/az_blob/`, `src/middlewares/`, `src/security/auth.py`
---
## 3. End-to-end flow (current state)
### 3.1 Ingestion
```
User action Pipeline Storage
ββββββββββββββ ββββββββββββββββββββββββββββ βββββββββββββββββ
upload PDF/DOCX/TXT β DocumentPipeline β Azure Blob + PGVector
(extract β chunk β embed) (table: langchain_pg_embedding)
+ on_document_uploaded + retrieval cache invalidate
upload CSV/XLSX β TabularIntrospector β Azure Blob (Parquet)
(sheets / columns + sample + stats) + data_catalog jsonb row
β CatalogValidator β CatalogStore (NO vector store β catalog only)
via on_tabular_uploaded
register DB β DatabaseIntrospector β data_catalog jsonb row
(information_schema + sample + FKs)
β validate β store
via on_db_registered
```
### 3.2 Query (per user message β SSE stream)
```
POST /api/v1/chat/stream
β
βββ Redis cache check (24h TTL) β hit returns cached stream
βββ _fast_intent (greetings / goodbyes) β bypass LLM
βββ load history from chat_messages
β
βββ ChatHandler.handle(message, user_id, history) [src/agents/chat_handler.py]
β
ββ OrchestratorAgent.classify() [agents/orchestration.py]
β β needs_search, source_hint, rewritten_query
β
βββ source_hint == "chat"
β β ChatbotAgent.astream() β yield chunk events
β
βββ source_hint == "unstructured"
β β RetrievalRouter.retrieve() [retrieval/router.py, Redis-cached]
β β DocumentRetriever (PGVector MMR/cosine/etc.)
β β ChatbotAgent.astream(chunks=...)
β
βββ source_hint == "structured"
β CatalogReader.read(user_id, "structured") [catalog/reader.py]
β QueryService.run(user_id, question, catalog) [query/service.py]
β
ββ QueryPlannerService.plan(...) [query/planner/service.py]
β LLM(catalog, question, prev_error?) β QueryIR
β
ββ IRValidator.validate(ir, catalog) [query/ir/validator.py]
β fail β loop back to planner with error context (max 3)
β
ββ ExecutorDispatcher.pick(ir) [query/executor/dispatcher.py]
β schema source β DbExecutor
β tabular source β TabularExecutor
β
ββ DbExecutor.run(ir): [query/executor/db.py]
β SqlCompiler β (sql, params)
β β sqlglot SELECT-only guard
β β engine_scope (Phase 1 utility) in asyncio.to_thread
β β RO txn + statement_timeout=30s + 10k cap
β
ββ TabularExecutor.run(ir): [query/executor/tabular.py]
β resolve Parquet blob path
β β download β PandasCompiler.apply(df)
β β asyncio.to_thread β 10k cap
β
ββ QueryResult { rows, columns, row_count,
truncated, source_id, error?, elapsed_ms }
β
ChatbotAgent.astream(query_result=...)
β yield chunk events
β
βββ final events: done / error
β
βββ persist user + assistant messages to chat_messages
βββ populate Redis cache
```
**Safety invariants for the structured path** (read-only at every layer):
1. IR validated against the catalog before reaching the compiler
2. Identifiers come from the catalog (trusted; inlined as quoted identifiers)
3. Values from `IR.filters` are always parameterized
4. Compiler is deterministic β no LLM in the hot path
5. sqlglot rejects anything that isn't a pure SELECT
6. DB connection is read-only with a 30 s `statement_timeout`
7. Hard 10 000 row cap on both executors; neither raises β errors go in `QueryResult.error`
---
## 4. Summary table for review
| Concern | Phase 1 β where it lived | Phase 2 β where it lives | Change type |
|---|---|---|---|
| Intent classification | `agents/orchestration.py::OrchestratorAgent` (free-text intent) | **Same path + same class name** β body rewritten to emit `IntentRouterDecision` | Body rewrite only |
| Top-level chat orchestration | Inline in `api/v1/chat.py` | `agents/chat_handler.py::ChatHandler` | Extracted to a reusable module |
| Final answer formation | `agents/chatbot.py` (Phase 1 LangChain) | `agents/chatbot.py::ChatbotAgent` (was `AnswerAgent` in `answer_agent.py` mid-cycle) | Rewritten + renamed |
| Schema retrieval (DB / tabular) | `rag/retrievers/schema.py` + PGVector chunks | **Removed**. Replaced by catalog (`catalog/store.py` jsonb) loaded verbatim into planner prompt | Whole concept replaced |
| Doc retrieval (PDF / DOCX / TXT) | `rag/retrievers/document.py`, `rag/router.py` | `retrieval/document.py`, `retrieval/router.py` | Moved; Redis cache restored; tabular files filtered |
| Query writing | `query/query_executor.py` + `models/sql_query.py` (LLM writes SQL) | `query/planner/service.py` (LLM writes IR) + `query/compiler/sql.py` (deterministic) | LLM emits intent, not SQL |
| DB execution | `query/executors/db_executor.py` | `query/executor/db.py::DbExecutor` | Folder renamed (`executors` β `executor`); sqlglot guard + RO txn + 30 s timeout kept |
| Tabular execution | `query/executors/tabular.py` | `query/executor/tabular.py::TabularExecutor` | Parquet-only; pandas compiler split out |
| Executor selection | Hard-coded in `query_executor.py` | `query/executor/dispatcher.py::ExecutorDispatcher` | New; routes by `source.source_type` |
| Catalog (NEW) | β | `catalog/` (models, introspect/, validator, store, reader, pii_detector, render) | New subsystem |
| Catalog persistence | (data was embedded in PGVector) | Postgres jsonb table `data_catalog`, keyed by `user_id` | New table |
| Ingestion triggers | Inline in API endpoints | `pipeline/triggers.py` (`on_db_registered`, `on_tabular_uploaded`, `on_document_uploaded`, `on_catalog_rebuild_requested`) | Centralized event entry points |
| Structured pipeline | `pipeline/db_pipeline/db_pipeline_service.py` (still present for `engine_scope` + extractor reuse) | `pipeline/structured_pipeline.py` (orchestrator) β reuses Phase 1 extractor | New orchestrator wraps Phase 1 introspection helpers |
| Document pipeline | `pipeline/document_pipeline/document_pipeline.py` (folder) | `pipeline/document_pipeline.py` (file) | Flattened; CSV / XLSX now skip the vector store |
| Parquet helper | `knowledge/parquet_service.py` | `storage/parquet.py` | Moved into `storage/` |
| Prompts | `config/agents/system_prompt.md`, `guardrails_prompt.md` | `config/prompts/{intent_router,query_planner,chatbot_system,guardrails}.md` | Folder renamed; split into four files; guardrails appended to `chatbot_system` at load |
| PII detection | β | `catalog/pii_detector.py` + `security/pii_patterns.py` | New. Columns flagged `pii_flag=true` get `sample_values: null` so PII never enters prompts |
| Chat endpoint | `api/v1/chat.py` (does everything inline) | `api/v1/chat.py` (cache + history + persistence) β delegates to `ChatHandler` | Slimmed; SSE event shape is `intent` / `chunk` / `done` / `error` |
| DB ingest endpoint | `api/v1/db_client.py::ingest` (Phase 1 `db_pipeline_service.run()`) | `api/v1/db_client.py::ingest` (calls `on_db_registered` only) | Phase 1 dual-write removed |
| Document process endpoint | `api/v1/document.py::process` (always vectorize) | `api/v1/document.py::process` (PDF/DOCX/TXT β vectors; CSV/XLSX β catalog via `on_tabular_uploaded`) | Routing by file type |
| Catalog management API | β | `api/v1/data_catalog.py` (GET index + POST rebuild) | New |
**Bottom line.** Every Phase 1 file under `src/rag/`, `src/tools/`, `src/query/executors/`, `src/query/query_executor.py`, `src/query/base.py`, `src/api/v1/knowledge.py`, and `src/config/agents/` is gone. Phase 1 introspection helpers under `src/pipeline/db_pipeline/` and `src/database_client/` are still imported by Phase 2 β they were not rewritten, just wrapped. The three LLM call sites are now explicit and the SQL-writing one no longer exists; the planner emits a Pydantic-validated `QueryIR` instead.
The one filename gotcha to remember: the **intent router** still lives at `src/agents/orchestration.py` as class `OrchestratorAgent` (Phase 1 name kept for import-site compatibility, Phase 2 body). The matching prompt and tests use the `intent_router` name, but the source module does not.
|