Rifqi Hafizuddin
[KM-564] fix source, now shows name instead of id. added diff retrieval vs catalog
96598f8 | # Phase 1 β Phase 2 Migration Report | |
| A walkthrough of what changed between the original retrieval-style backend (Phase 1) and the current catalog-driven backend (Phase 2). Intended as a hand-off for the lead. | |
| --- | |
| ## 1. The conceptual change | |
| **Phase 1** was a single retrieval-style RAG pipeline. Every question β whether it pointed at a database, a spreadsheet, or a PDF β went through the same primitive: **chunk + embed + top-K** over PGVector. Schema and tabular columns were embedded as chunks and ranked alongside prose. When the question needed SQL, the LLM **wrote the SQL string directly** (via `query_executor`). | |
| **Phase 2** splits the system into two paths governed by an LLM router: | |
| | Path | Primitive | Why | | |
| |---|---|---| | |
| | Unstructured (PDF / DOCX / TXT) | Dense similarity over prose chunks (PGVector) | Right primitive for free text | | |
| | Structured (DB / CSV / XLSX / Parquet) | **Per-user data catalog** β LLM emits a **JSON IR** of intent β deterministic **compiler** β **executor** (SQL or pandas) | A column lookup shouldn't go through a similarity ranking lottery; the LLM emits intent, never SQL syntax | | |
| Three explicit LLM call sites only: | |
| 1. **Intent router** (classifies the user message into `chat` / `unstructured` / `structured`) | |
| 2. **Query planner** (turns the question + catalog into a Pydantic-validated `QueryIR`) | |
| 3. **Chatbot agent** (formats the final answer, streamed over SSE) | |
| Everything else β IR validation, SQL/pandas compilation, execution β is deterministic Python. | |
| --- | |
| ## 2. File-by-file changes | |
| ### 2.1 Deleted (Phase 1 only) | |
| | Phase 1 path | Reason it was removed | | |
| |---|---| | |
| | `src/rag/base.py`, `src/rag/retriever.py`, `src/rag/router.py` | Replaced by `src/retrieval/` | | |
| | `src/rag/retrievers/baseline.py`, `schema.py`, `document.py` | Schema retrieval gone (catalog replaces it); document retriever rewritten in `src/retrieval/document.py` | | |
| | `src/tools/search.py` (whole `tools/` folder) | Only consumer was `rag/router.py` | | |
| | `src/query/base.py` | Duplicate of `query/executor/base.py` | | |
| | `src/query/query_executor.py` | Replaced by `src/query/service.py` | | |
| | `src/query/executors/db_executor.py` | Replaced by `src/query/executor/db.py` | | |
| | `src/query/executors/tabular.py` | Replaced by `src/query/executor/tabular.py` | | |
| | `src/agents/chatbot.py` (Phase 1 LangChain chatbot) | Phase 2 `ChatbotAgent` lives at the same path now β see Β§2.2 | | |
| | `src/api/v1/knowledge.py` | Fake `/knowledge/rebuild` endpoint, never wired | | |
| | `src/config/agents/system_prompt.md`, `guardrails_prompt.md` | Replaced by `src/config/prompts/{chatbot_system,guardrails}.md` | | |
| | `src/models/structured_output.py` (`IntentClassification`) | Replaced by `IntentRouterDecision` Pydantic model inside `agents/orchestration.py` | | |
| | `src/models/sql_query.py` | LLM no longer emits SQL; IR replaces it | | |
| | `src/pipeline/orchestrator.py` (empty stub) | Redundant β `StructuredPipeline` takes the introspector at `run()` time | | |
| ### 2.2 Renamed / moved (same role, new home) | |
| | Phase 1 location | Phase 2 location | Notes | | |
| |---|---|---| | |
| | `src/agents/chatbot.py` (Phase 1) β deleted, then `src/agents/answer_agent.py` (`AnswerAgent`) β renamed | `src/agents/chatbot.py::ChatbotAgent` | Final answer formation; streams via `astream` | | |
| | `src/knowledge/parquet_service.py` | `src/storage/parquet.py` | Parquet upload/download helper | | |
| | `src/pipeline/document_pipeline/document_pipeline.py` (folder) | `src/pipeline/document_pipeline.py` (flat) | Single module | | |
| | `src/rag/retrievers/document.py` | `src/retrieval/document.py` | `DocumentRetriever` migrated; tabular file types filtered out of results | | |
| | `src/rag/router.py` | `src/retrieval/router.py` | `RetrievalRouter`, Redis-cached, unstructured-only; dead `db: AsyncSession` + `source_hint` params removed | | |
| | `src/rag/base.py` (`RetrievalResult`, `BaseRetriever`) | `src/retrieval/base.py` | Same dataclass + ABC | | |
| > **Heads-up on the intent router**: the Phase 1 file `src/agents/orchestration.py` and its class `OrchestratorAgent` were **kept in place** for Phase 2 β but the body was fully rewritten. The class now emits `IntentRouterDecision(needs_search, source_hint β {chat, unstructured, structured}, rewritten_query)`. The prompt file and test file use the `intent_router` name (`config/prompts/intent_router.md`, `tests/agents/test_intent_router.py`), but **the source module is still `orchestration.py` and the class is still `OrchestratorAgent`**. Existing imports continue to work; only the behavior changed. | |
| ### 2.3 Added (Phase 2 new) | |
| **Catalog subsystem (whole new concept)** | |
| | Path | Role | | |
| |---|---| | |
| | `src/catalog/models.py` | Pydantic: `Catalog β Source[] β Table[] β Column[]`, `ForeignKey`, `ColumnStats.top_values` | | |
| | `src/catalog/introspect/base.py` | `BaseIntrospector` ABC | | |
| | `src/catalog/introspect/database.py` | DB introspector β wraps Phase 1 `db_pipeline/extractor.py` (`get_schema`, `profile_column`, `get_row_count`) | | |
| | `src/catalog/introspect/tabular.py` | CSV / XLSX / Parquet introspector β one `Table` per XLSX sheet | | |
| | `src/catalog/render.py` | Renders a `Source` for the planner prompt | | |
| | `src/catalog/validator.py` | Unique-ID + foreign-key-ref invariants | | |
| | `src/catalog/store.py` | Postgres `jsonb` upsert keyed by `user_id` (table `data_catalog`) | | |
| | `src/catalog/reader.py` | Loads + filters catalog by `source_hint` | | |
| | `src/catalog/pii_detector.py` | Flags PII columns at ingestion β suppresses `sample_values` | | |
| | `src/security/pii_patterns.py` | Name patterns + value regex used by the detector | | |
| **JSON IR + query subsystem** | |
| | Path | Role | | |
| |---|---| | |
| | `src/query/ir/models.py` | `QueryIR` Pydantic schema | | |
| | `src/query/ir/operators.py` | `ALLOWED_FILTER_OPS`, `ALLOWED_AGG_FNS`, `LIMIT_HARD_CAP`, `TYPE_COMPATIBILITY` | | |
| | `src/query/ir/validator.py` | Catalog-aware IR validation (rejects unknown column ids, bad ops, type mismatches, oversize limits) | | |
| | `src/query/planner/service.py` | `QueryPlannerService.plan(question, catalog, previous_error)` β Azure OpenAI structured output β `QueryIR` | | |
| | `src/query/planner/prompt.py` | Builds the planner prompt from catalog text | | |
| | `src/query/compiler/base.py` | Compiler ABC | | |
| | `src/query/compiler/sql.py` | `SqlCompiler` (Postgres) β all 12 filter ops, params as a dict | | |
| | `src/query/compiler/pandas.py` | `PandasCompiler` β returns `CompiledPandas(apply, output_columns)` | | |
| | `src/query/executor/base.py` | `BaseExecutor` + `QueryResult` | | |
| | `src/query/executor/db.py` | `DbExecutor` β sqlglot SELECT-only guard, RO txn, 30 s `statement_timeout`, 10 k row cap | | |
| | `src/query/executor/tabular.py` | `TabularExecutor` β Parquet via blob, `asyncio.to_thread`, 10 k cap | | |
| | `src/query/executor/dispatcher.py` | `ExecutorDispatcher.pick(ir)` β picks by `source.source_type` | | |
| | `src/query/service.py` | `QueryService.run(user_id, question, catalog)` β plan β validate β retry (max 3) β dispatch β execute | | |
| **Agents** | |
| | Path | Role | | |
| |---|---| | |
| | `src/agents/orchestration.py` | `OrchestratorAgent` β Phase 1 file/class name preserved; Phase 2 body. Emits `IntentRouterDecision` | | |
| | `src/agents/chatbot.py` | `ChatbotAgent` β formerly `AnswerAgent` in `agents/answer_agent.py`; renamed in Cleanup PR | | |
| | `src/agents/chat_handler.py` | `ChatHandler.handle(...)` β top-level orchestrator; yields `intent` / `chunk` / `done` / `error` SSE events | | |
| **Pipelines & API** | |
| | Path | Role | | |
| |---|---| | |
| | `src/pipeline/structured_pipeline.py` | DB / tabular ingestion: introspect β merge β validate β upsert | | |
| | `src/pipeline/triggers.py` | `on_db_registered`, `on_tabular_uploaded`, `on_document_uploaded`, `on_catalog_rebuild_requested` | | |
| | `src/api/v1/data_catalog.py` | `GET /api/v1/data-catalog/{user_id}` + `POST /api/v1/data-catalog/rebuild` | | |
| | `src/models/api/catalog.py` | Catalog request/response models | | |
| | `src/config/prompts/intent_router.md`, `query_planner.md`, `chatbot_system.md`, `guardrails.md` | New prompts. `guardrails.md` is appended to `chatbot_system.md` at load time | | |
| | `src/db/postgres/models.py` (added `Catalog` SQLAlchemy class) | Stores the per-user jsonb document in `data_catalog` | | |
| ### 2.4 Rewired API endpoints | |
| | Endpoint | Phase 1 wiring | Phase 2 wiring | | |
| |---|---|---| | |
| | `POST /api/v1/chat/stream` | Inline in `chat.py`: `OrchestratorAgent` β `retriever` β `query_executor` β `chatbot` | Delegates to `ChatHandler.handle()`. Redis cache, fast intent, history load, and message persistence stay in the endpoint | | |
| | `POST /api/v1/database-clients/{id}/ingest` | Called `db_pipeline_service.run()` and dual-wrote vectors | Calls **only** `on_db_registered` (catalog build). Failure β HTTP 500 | | |
| | `POST /api/v1/document/process` | Always pushed to vector store | PDF/DOCX/TXT β `knowledge_processor` (vectors); CSV/XLSX β `on_tabular_uploaded` (catalog only, **no vector embedding**) | | |
| | `POST /api/v1/document/upload` | Storage + DB row | Same, plus `on_document_uploaded` trigger | | |
| | `POST /api/v1/data-catalog/rebuild` | β | New: iterates all sources, re-runs per-source trigger | | |
| | `GET /api/v1/data-catalog/{user_id}` | β | New: returns `list[CatalogIndexEntry]` | | |
| ### 2.5 Phase 1 files still in production use | |
| These were **not rewritten** β Phase 2 imports them directly: | |
| - `src/database_client/database_client_service.py` | |
| - `src/utils/db_credential_encryption.py` (`decrypt_credentials_dict`) β `src/security/credentials.py` is still a stub | |
| - `src/pipeline/db_pipeline/db_pipeline_service.py` (`engine_scope` context manager β used by both the introspector and `DbExecutor`) | |
| - `src/pipeline/db_pipeline/extractor.py` (`get_schema`, `profile_column`, `get_row_count`) | |
| - `src/knowledge/processing_service.py` (PDF / DOCX / TXT extraction + embedding) | |
| - `src/db/postgres/{connection,init_db,vector_store}.py`, `src/storage/az_blob/`, `src/middlewares/`, `src/security/auth.py` | |
| --- | |
| ## 3. End-to-end flow (current state) | |
| ### 3.1 Ingestion | |
| ``` | |
| User action Pipeline Storage | |
| ββββββββββββββ ββββββββββββββββββββββββββββ βββββββββββββββββ | |
| upload PDF/DOCX/TXT β DocumentPipeline β Azure Blob + PGVector | |
| (extract β chunk β embed) (table: langchain_pg_embedding) | |
| + on_document_uploaded + retrieval cache invalidate | |
| upload CSV/XLSX β TabularIntrospector β Azure Blob (Parquet) | |
| (sheets / columns + sample + stats) + data_catalog jsonb row | |
| β CatalogValidator β CatalogStore (NO vector store β catalog only) | |
| via on_tabular_uploaded | |
| register DB β DatabaseIntrospector β data_catalog jsonb row | |
| (information_schema + sample + FKs) | |
| β validate β store | |
| via on_db_registered | |
| ``` | |
| ### 3.2 Query (per user message β SSE stream) | |
| ``` | |
| POST /api/v1/chat/stream | |
| β | |
| βββ Redis cache check (24h TTL) β hit returns cached stream | |
| βββ _fast_intent (greetings / goodbyes) β bypass LLM | |
| βββ load history from chat_messages | |
| β | |
| βββ ChatHandler.handle(message, user_id, history) [src/agents/chat_handler.py] | |
| β | |
| ββ OrchestratorAgent.classify() [agents/orchestration.py] | |
| β β needs_search, source_hint, rewritten_query | |
| β | |
| βββ source_hint == "chat" | |
| β β ChatbotAgent.astream() β yield chunk events | |
| β | |
| βββ source_hint == "unstructured" | |
| β β RetrievalRouter.retrieve() [retrieval/router.py, Redis-cached] | |
| β β DocumentRetriever (PGVector MMR/cosine/etc.) | |
| β β ChatbotAgent.astream(chunks=...) | |
| β | |
| βββ source_hint == "structured" | |
| β CatalogReader.read(user_id, "structured") [catalog/reader.py] | |
| β QueryService.run(user_id, question, catalog) [query/service.py] | |
| β | |
| ββ QueryPlannerService.plan(...) [query/planner/service.py] | |
| β LLM(catalog, question, prev_error?) β QueryIR | |
| β | |
| ββ IRValidator.validate(ir, catalog) [query/ir/validator.py] | |
| β fail β loop back to planner with error context (max 3) | |
| β | |
| ββ ExecutorDispatcher.pick(ir) [query/executor/dispatcher.py] | |
| β schema source β DbExecutor | |
| β tabular source β TabularExecutor | |
| β | |
| ββ DbExecutor.run(ir): [query/executor/db.py] | |
| β SqlCompiler β (sql, params) | |
| β β sqlglot SELECT-only guard | |
| β β engine_scope (Phase 1 utility) in asyncio.to_thread | |
| β β RO txn + statement_timeout=30s + 10k cap | |
| β | |
| ββ TabularExecutor.run(ir): [query/executor/tabular.py] | |
| β resolve Parquet blob path | |
| β β download β PandasCompiler.apply(df) | |
| β β asyncio.to_thread β 10k cap | |
| β | |
| ββ QueryResult { rows, columns, row_count, | |
| truncated, source_id, error?, elapsed_ms } | |
| β | |
| ChatbotAgent.astream(query_result=...) | |
| β yield chunk events | |
| β | |
| βββ final events: done / error | |
| β | |
| βββ persist user + assistant messages to chat_messages | |
| βββ populate Redis cache | |
| ``` | |
| **Safety invariants for the structured path** (read-only at every layer): | |
| 1. IR validated against the catalog before reaching the compiler | |
| 2. Identifiers come from the catalog (trusted; inlined as quoted identifiers) | |
| 3. Values from `IR.filters` are always parameterized | |
| 4. Compiler is deterministic β no LLM in the hot path | |
| 5. sqlglot rejects anything that isn't a pure SELECT | |
| 6. DB connection is read-only with a 30 s `statement_timeout` | |
| 7. Hard 10 000 row cap on both executors; neither raises β errors go in `QueryResult.error` | |
| --- | |
| ## 4. Summary table for review | |
| | Concern | Phase 1 β where it lived | Phase 2 β where it lives | Change type | | |
| |---|---|---|---| | |
| | Intent classification | `agents/orchestration.py::OrchestratorAgent` (free-text intent) | **Same path + same class name** β body rewritten to emit `IntentRouterDecision` | Body rewrite only | | |
| | Top-level chat orchestration | Inline in `api/v1/chat.py` | `agents/chat_handler.py::ChatHandler` | Extracted to a reusable module | | |
| | Final answer formation | `agents/chatbot.py` (Phase 1 LangChain) | `agents/chatbot.py::ChatbotAgent` (was `AnswerAgent` in `answer_agent.py` mid-cycle) | Rewritten + renamed | | |
| | Schema retrieval (DB / tabular) | `rag/retrievers/schema.py` + PGVector chunks | **Removed**. Replaced by catalog (`catalog/store.py` jsonb) loaded verbatim into planner prompt | Whole concept replaced | | |
| | Doc retrieval (PDF / DOCX / TXT) | `rag/retrievers/document.py`, `rag/router.py` | `retrieval/document.py`, `retrieval/router.py` | Moved; Redis cache restored; tabular files filtered | | |
| | Query writing | `query/query_executor.py` + `models/sql_query.py` (LLM writes SQL) | `query/planner/service.py` (LLM writes IR) + `query/compiler/sql.py` (deterministic) | LLM emits intent, not SQL | | |
| | DB execution | `query/executors/db_executor.py` | `query/executor/db.py::DbExecutor` | Folder renamed (`executors` β `executor`); sqlglot guard + RO txn + 30 s timeout kept | | |
| | Tabular execution | `query/executors/tabular.py` | `query/executor/tabular.py::TabularExecutor` | Parquet-only; pandas compiler split out | | |
| | Executor selection | Hard-coded in `query_executor.py` | `query/executor/dispatcher.py::ExecutorDispatcher` | New; routes by `source.source_type` | | |
| | Catalog (NEW) | β | `catalog/` (models, introspect/, validator, store, reader, pii_detector, render) | New subsystem | | |
| | Catalog persistence | (data was embedded in PGVector) | Postgres jsonb table `data_catalog`, keyed by `user_id` | New table | | |
| | Ingestion triggers | Inline in API endpoints | `pipeline/triggers.py` (`on_db_registered`, `on_tabular_uploaded`, `on_document_uploaded`, `on_catalog_rebuild_requested`) | Centralized event entry points | | |
| | Structured pipeline | `pipeline/db_pipeline/db_pipeline_service.py` (still present for `engine_scope` + extractor reuse) | `pipeline/structured_pipeline.py` (orchestrator) β reuses Phase 1 extractor | New orchestrator wraps Phase 1 introspection helpers | | |
| | Document pipeline | `pipeline/document_pipeline/document_pipeline.py` (folder) | `pipeline/document_pipeline.py` (file) | Flattened; CSV / XLSX now skip the vector store | | |
| | Parquet helper | `knowledge/parquet_service.py` | `storage/parquet.py` | Moved into `storage/` | | |
| | Prompts | `config/agents/system_prompt.md`, `guardrails_prompt.md` | `config/prompts/{intent_router,query_planner,chatbot_system,guardrails}.md` | Folder renamed; split into four files; guardrails appended to `chatbot_system` at load | | |
| | PII detection | β | `catalog/pii_detector.py` + `security/pii_patterns.py` | New. Columns flagged `pii_flag=true` get `sample_values: null` so PII never enters prompts | | |
| | Chat endpoint | `api/v1/chat.py` (does everything inline) | `api/v1/chat.py` (cache + history + persistence) β delegates to `ChatHandler` | Slimmed; SSE event shape is `intent` / `chunk` / `done` / `error` | | |
| | DB ingest endpoint | `api/v1/db_client.py::ingest` (Phase 1 `db_pipeline_service.run()`) | `api/v1/db_client.py::ingest` (calls `on_db_registered` only) | Phase 1 dual-write removed | | |
| | Document process endpoint | `api/v1/document.py::process` (always vectorize) | `api/v1/document.py::process` (PDF/DOCX/TXT β vectors; CSV/XLSX β catalog via `on_tabular_uploaded`) | Routing by file type | | |
| | Catalog management API | β | `api/v1/data_catalog.py` (GET index + POST rebuild) | New | | |
| **Bottom line.** Every Phase 1 file under `src/rag/`, `src/tools/`, `src/query/executors/`, `src/query/query_executor.py`, `src/query/base.py`, `src/api/v1/knowledge.py`, and `src/config/agents/` is gone. Phase 1 introspection helpers under `src/pipeline/db_pipeline/` and `src/database_client/` are still imported by Phase 2 β they were not rewritten, just wrapped. The three LLM call sites are now explicit and the SQL-writing one no longer exists; the planner emits a Pydantic-validated `QueryIR` instead. | |
| The one filename gotcha to remember: the **intent router** still lives at `src/agents/orchestration.py` as class `OrchestratorAgent` (Phase 1 name kept for import-site compatibility, Phase 2 body). The matching prompt and tests use the `intent_router` name, but the source module does not. | |