Progress β Phase 2 catalog-driven build
Persistent tracker mirroring the 42-item ownership table in REPO_CONTEXT.md "Team β division of work". Update as PRs land. Future Claude Code sessions read this to know what's already done.
Last updated: 2026-06-12 (Redis Cloud live; R3 closed as won't-do; R5 cache fix; AnalysisRecord persistence landed β PostgresAnalysisStore + analysis_records table)
Current open PR: pr/3 β active.
What just shipped (2026-06-12 β AnalysisRecord persistence, Rifqi)
Groundwork for generate_report. The slow path now persists a real, citable
record; the report (next) renders from it.
- Contract gaps closed (
agents/slow_path/schemas.py):stage: CrispStageadded toTaskResult+TaskSummaryand populated at all 3TaskResultbuild sites intask_runner.py+ copied inassembler._build_recordβ so the report can group its method appendix by CRISP-DM phase.AnalysisRecordgained identity:record_id(auto uuid),analysis_id/user_id(optional; stamped at persist). - Real store (
agents/slow_path/store.py):PostgresAnalysisStoreβsave()(never-throw, idempotent upsert) +list_for_analysis()(oldest-first, the report's render order).NullAnalysisStorekept (tests / disabled persistence).AnalysisStoreProtocol gainedlist_for_analysis. - Table (
db/postgres/models.py):analysis_recordsjsonb table (one row per run, indexed byanalysis_id+user_id); registered ininit_db.py, created bycreate_allon startup (no migration βdata_catalogprecedent). - Wired (
agents/chat_handler.py): default store flipped toPostgresAnalysisStore;user_idstamped onto the record at the save site (in scope there). - Open:
analysis_idisNULLuntil Harry's Analysis State reaches the slow path (session-ID handoff needed to group records per analysis).
Principal architecture review (2026-06-10) β findings + fix tracker
A full external review (read the context docs + the slow path, tool layer, query spine, catalog plumbing, chat endpoint, config/connection layers) landed. It confirmed the DB-latency diagnosis and surfaced several gaps not previously tracked here. Verified against code before logging. Severity: critical / important / nice-to-have.
Runtime / latency (the original problem):
- DB connection handling is the anomaly, NOT cold start.
DbExecutor._run_sync(db.py:192) βengine_scopedoescreate_engine β connect (TCP+TLS+SCRAM) β 2ΓSET β disposeon EVERY query. Measured ~6β8s for 60 rows; a 2nd warm-session query was still ~6.6s β per-call handshake, never amortized.engine_scope's connect-once-dispose semantics were designed for the ingestion pipeline and wrongly inherited by the query path. describe_source~3.5s is planner-induced waste: every few-shot (examples.py) opens with adescribe_sourcetask, so the LLM always plans a tool that re-reads from the catalog DB the same catalog already rendered into its prompt. Its impl does 2 sequential full-catalog reads (data_access.py:127-128). Total catalog reads/request ~5Γ.- Azure LLM clients rebuilt per request:
ChatHandler(enable_tracing=True)is constructed per request (chat.py:172) β fresh Orchestrator/Chatbot β fresh AzureChatOpenAI β fresh TLS to Azure each call. Planner/Assembler correctly use module singletons; the other two don't. - Tokens (~13k/request) are NORMAL for this design β do not optimize for $.
- Reject the scheduled DB-warmer idea: targets cold start (~1.8s slice) not the per-call handshake, keeps serverless user DBs awake 24/7 (their compute bill), and decrypts every tenant's creds on a cron (attack surface). Strictly dominated by an engine cache + request-scoped pre-connect.
Fix tracker (new):
| # | Fix | Severity | Owner | Status |
|---|---|---|---|---|
| R1 | AuthN/AuthZ on data endpoints β reject body-supplied user_id/room_id, derive identity from a verified token. /chat/stream has none (chat.py:40,128); tenant isolation is client honesty. CORRECTION to the review: security/auth.py is a STUB (all NotImplementedError); the real JWT impl lives in src/users/users.py (encode_jwt/decode_jwt, HS, env-keyed) but is unused β /login (api/v1/users.py) returns the user profile as plain JSON and mints NO token. So R1 is cross-team: (1) /login must issue a JWT, (2) frontend must send it as Bearer, (3) data endpoints validate it. Gates the engine-cache work (DB2). |
critical | DB/B + frontend | [ ] |
| R2 | Always compile a LIMIT β sql.py now emits a bound for every query: explicit limit honored (clamped to MAX_RESULT_ROWS=10000), unbounded queries get LIMIT cap+1 so an unbounded SELECT can't stream a whole table into memory. CompiledSql.row_cap carries the cap; DbExecutor caps + flags truncation from it (dropped its own _ROW_HARD_CAP). Tests updated (test_sql.py, +3 cases); S608 restored to tests/** ruff ignore (was dropped). |
critical | DB | [x] |
| R3 | Commit tests/ + minimal CI β tests/ is gitignored; the 200+ tests cited as done exist only on laptops (already caused rename rot). |
critical (process) | shared | [won't do] |
| DB1 | In-memory describe_source (request-scoped MemoizingCatalogReader, reader.py) + LLM-client hoist (shared module-level ChatHandler in chat.py). Measured live: describe_source 3.5sβquery_structured handshake unchanged (DB2's job) so total slow path is ~flat until DB2. Tests: tests/catalog/test_reader.py. |
important | agent | [x] |
| DB2 | Keyed engine cache β src/database_client/engine.py::UserEngineCache (process singleton): pooled engines keyed by client_id + creds-hash (rotation auto-invalidates), bounded LRU (50) + 600s idle TTL, pool_pre_ping + pool_recycle=300. DbExecutor._run_sync reuses the warm connection instead of create_engineβconnectβdispose per query (postgres/supabase only; other db_types keep the legacy path β no regression). Live-measured: warm query_structured 6.6β9.4s β ~2.5s (the residual is the per-call catalog-DB client fetch + pre-ping, not the external handshake). Finding: Neon's transaction pooler REJECTS default_transaction_read_only as a libpq startup option β caught live; moved read-only + statement_timeout to a per-connection connect event (best-effort; authoritative read-only is the SELECT-only compiler + sqlglot guard, see R10). Per-request ownership/active check kept. Proceeded ahead of R1 per owner decision (marginal security delta over the existing no-auth state; auth tracked separately). Tests: tests/database_client/test_engine.py. First query/process still cold β DB3. |
important | DB | [x] |
| DB3 | Speculative pre-connect β DbExecutor.prewarm(catalog, user_id) warms the pooled engine for schema sources (fire-and-forget at slow-path entry) so the cold first-query handshake overlaps the ~4s Planner call. Best-effort, never raises; gated to the default path (skipped when a coordinator factory is injected). Verified live through ChatHandler.handle. |
nice-to-have | DB | [x] |
| R4 | Per-stage progress events β SlowPathCoordinator.run gained an optional progress callback; ChatHandler bridges it to SSE status events (chat.py forwards them). Live: stream now shows Planningβ¦βRunning N stepsβ¦βComposingβ¦ (max wire gap ~4.6s, was ~13s of silence) β fixes proxy idle-timeout + UX. Deferred: token-streaming the Assembler answer needs splitting it into a streamed prose call + a structured-record call β that doubles the Assembler LLM calls (cost/latency), so it's a separate decision; the answer is still emitted as one chunk after the (fast ~2.5s) Assembler. Test: test_chat_handler_wiring.py. |
important | agent | [~] |
| R5 | Response cache: key on user_id + catalog version; invalidate on ingest. Was chat:{room_id}:{message}, 24h TTL, no user β cross-user replay + stale answers. 2026-06-12 (Rifqi): key now chat:{room_id}:{user_id}:{message} via _chat_cache_key(), TTL 24hβ1h (checkpoint decision) β urgent now that Redis is a shared Cloud instance. DELETE /chat/cache gained a required user_id param (frontend heads-up); room-wide clear pattern unchanged. Still open: catalog-version in key / invalidate-on-ingest. |
important | B | [~] |
| R6 | Hard time budget β wrap coordinator.run() in asyncio.wait_for (60β90s). Constraints.time_budget_seconds is rendered but not enforced. |
important | agent | [ ] |
| R7 | Root-task-failure short-circuit before the Assembler (templated/fast-path fallback, NOT replanning) β stops paying ~2k tok to narrate an empty RunState. | important | agent | [ ] |
| R8 | Catalog upsert race β per-user advisory lock around read-merge-upsert (store.py); concurrent uploads can drop a source. |
important | DB | [ ] |
| R9 | extra="ignore" in settings.py:15 (currently allow β typo'd env vars silently swallowed); require Azure keys in prod. |
nice-to-have | B | [ ] |
| R10 | Read-only enforcement is session-state, not a server role. REPO_CONTEXT.md counts "read-only DB credentials" as a defense layer but nothing requests/verifies a read-only role. Either request read-only creds at registration (verify via SELECT current_setting(...)) or drop the claim. |
important | DB | [ ] |
| R11 | De-duplicate _PLACEHOLDER_RE (task_runner.py:31 vs validator) and _DATA_ACCESS_TOOLS (invoker vs planner registry) β import one from the other; comments aren't a sync mechanism. TAB slice done (90e80f9): canonical DATA_ACCESS_TOOLS now lives once in tools/data_access.py; invoker.py imports it (was a duplicated frozenset synced by comment). Agent slice done (2026-06-10): PLACEHOLDER_RE single-sourced in planner/schemas.py (part of the ToolCall placeholder convention); validator + task_runner import it. planner/registry.py keeps local spec bodies (stub pending KM-465 #4) but name-checks them against DATA_ACCESS_TOOLS in _data_access_slice() β upstream rename/add now raises at default_registry() instead of drifting silently. Registry output unchanged (same 12 tools, same order). |
nice-to-have | agent/tool | [x] |
| R12 | Doc/process hygiene β some code docstrings cite internal design specs that are not committed to the repo (design docs are kept out of version control), so the references dangle for anyone but the author; CLAUDE.md lists deleted modules (enricher, pipeline/orchestrator.py); main is 38 commits behind on a dead architecture. |
nice-to-have | agent | [ ] |
| R13 | Pre-existing test failure (found during R2, NOT caused by it): tests/query/planner/test_prompt.py::test_render_catalog_with_sources fails β query/planner/prompt.py::render_catalog now renders stable IDs (src_test_db) the test asserts are absent. Old query-planner path; confirmed failing on a clean tree. |
nice-to-have | DB | [ ] |
| T1 | input_schema is presence-only, not type-checked β ToolSpec.input_schema comment said "validates ToolCall.args", but TaskRunner._validate_args only enforces required presence; the properties types are documentation, never validated at runtime. Clarified the contract in tools/contracts.py so nobody assumes type-safety (a wrong-typed arg passes validation, surfaces only inside the compute fn). Doc-only, no behavior change (90e80f9). |
nice-to-have | TAB | [x] |
| T2 | Dead Python embed path? β document_pipeline.process() β knowledge_processor β vector_store.aadd_documents() still writes PDF/DOCX/TXT embeddings to langchain_pg_embedding, contradicting CLAUDE.md's "Go is sole writer, Python reads only". Verified the Go service (Orchestrator-Agent-Service/internal/documents) IS a complete ingestion writer to the same tables for all 5 file types (OCR + chunk + embed) β the Python embed branch is very likely redundant. Blocked on one operational fact: does the frontend still upload to /document/process (Python) or to Go? Park until confirmed β deleting a live ingestion path would break unstructured RAG. The csv/xlsx parquet branch stays regardless (feeds the catalog/tabular path). |
nice-to-have | TAB | [blocked] |
Slow-path endpoint wiring (2026-06-10): the Orchestratorβslow-path is now wired
into the live endpoint behind an env flag. settings.enable_slow_path (env
ENABLE_SLOW_PATH, default off) is passed to the shared ChatHandler in
api/v1/chat.py. Flip ENABLE_SLOW_PATH=true to route structured intents through
PlannerβTaskRunnerβAssembler and test end-to-end from /chat/stream (status progress
events + answer stream). Stays opt-in because BusinessContext is still the stub;
fast/unstructured paths unchanged. Verified live via ChatHandler.handle.
Architecture verdict: fundamentally sound (catalog-driven IR + deterministic compiler
- static plan is the right call). Debt is transitional duplication (two planners/registries/
contract modules β documented, owned) and
ChatHandlerdrifting toward a god object (extract the slow-path composition root + the SSE_build_sources/_normalize_chunksmappers when convenient).
What just shipped (2026-06-09/10 β tool layer, tracing, slow-path wiring)
Big stretch since the slow-path workers landed. The tool layer (teammate-owned) is
now complete and real, the slow path is wired into ChatHandler behind a gate,
and the whole chat pipeline is traced. Fast path still untouched; live behavior
unchanged (flags default off).
Tool layer β COMPLETE (teammate, KM-624β630). src/tools/ was re-created (the
2026-05-11 note about deleting it is superseded). Now teammate-owned:
src/tools/analytics/β the 8 compositeanalyze_*computes (descriptive, aggregate, comparison, contribution, profile, correlation, segment, trend) + prompt-style DESCRIPTIONs (KM-624/625).src/tools/contracts.pyβ canonicalToolSpec/ToolRegistry/ToolOutput(KM-627).agents/planner/contracts.pynow just re-exports them + keeps theBusinessContextstub (lead's).src/tools/registry.py::analytics_registry()(KM-628);src/tools/invoker.py+src/tools/data_access.pyβAnalyticsToolInvoker(KM-629),DataAccessToolInvokerCompositeToolInvoker(KM-630). All never-throw. Pattern A confirmed (analyze_*take adata${t<id>}placeholder from an upstreamquery_structured).
- Verified live E2E (2026-06-09): real
query_structuredagainst a user's Neon Postgres βanalyze_trendβ Assembler.analyze_contributionsurfaced a real tool bug (Decimal vs float indecomposition.py) β degrade-and-continue held; now fixed by the tool owner (_coerce_decimalsininvoker._materialize, KM-630 / commit 1195870), so the wholeanalyze_*family is covered in one place. Directive: agent side does NOT modifysrc/tools/without confirmation.
Planner β realigned to the real tools (KM-626). registry.py::default_registry()
composes the real analytics_registry() + a local stub for the 4 data-access tools.
Few-shots grown to AβD: A analyze_contribution, B analyze_trend, C mixed
structured+unstructured (retrieve_documents, independent branch), D analyze_aggregate.
parallelizable_with removed from Task (schema/validator/examples/prompt) β
TaskRunner derives parallelism from depends_on alone.
Slow-path wiring β built, GATED OFF (KM-626). agents/chat_handler.py gains a
structuredβslow branch behind ChatHandler(enable_slow_path=False): when on it builds
a per-request CompositeToolInvoker (composition root) + SlowPathCoordinator, streams
chat_answer, persists the analysis_record. Two seams isolate the remaining blockers:
agents/planner/business_context.py::get_business_context(user_id)β async stubBusinessContext; TODO(lead) swap for the real read.agents/slow_path/store.pyβAnalysisStoreProtocol +NullAnalysisStore(logs only). Real store =analysis_recordstable in the catalog DB (Neondataeyond) β table not created yet.chat_answerstill emitted as one chunk (not token-streamed).
Observability β Langfuse tracing wired (KM-631). src/observability/langfuse/ tracing.py β RequestTracer/NullTracer/TracingToolInvoker + _redact. One trace
per request groups Orchestrator.classify, Planner.plan (each retry = its own generation),
Assembler.assemble, Chatbot.astream + tool spans (latency/metadata only). Gated:
ChatHandler(enable_tracing=False); api/v1/chat.py opts in (=True). PII policy:
Orchestrator+Planner unmasked (question + PII-safe summary); Assembler+Chatbot masked
(see real rows/chunks); tool spans carry name + arg keys + row count only. Zero added
LLM tokens; verified live to US Cloud.
Live evals green (2026-06-09, real Azure 4o): RUN_PLANNER_EVAL=1 and
RUN_SLOW_PATH_EVAL=1 both pass β Planner emits valid catalog-consistent QueryIR and
wires Pattern A correctly; self-corrects via retry.
Open follow-ups: real BusinessContext (lead); create analysis_records table +
real AnalysisStore (Rifqi owns, 2026-06-12 β folded into generate_report work,
see CHECKPOINT_PLAN_2026-06-17.md); register data-access ToolSpecs upstream (data_access_registry())
or keep the planner stub; 4o β GPT-mini deployment swap; flip enable_slow_path on once
BusinessContext is real. NOTE: 3 test files pre-existing broken from rename rot
(test_chat_handler.py, test_intent_router.py, test_answer_agent.py import the old
answer_agent/intent_router module names).
What just shipped (2026-06-10 β TAB: tool-layer hardening + DRY)
Owner-side companion to the agent block above. After the live E2E surfaced real-data
edge cases, the tool layer got a round of correctness hardening. All in TAB-owned paths
(src/tools/, src/catalog/); no agent-side or API change.
JSON-safety across the analyze_* family. Real DB rows carry scalar types that
don't survive the jsonb / SSE round-trip:
[KM-630] coerce DB Decimal β float(commit 1195870) β_coerce_decimalsininvoker._materializeconverts object-columns holdingdecimal.Decimal(asyncpg returns NUMERIC asDecimal) tofloat64before any compute runs. Fixes thefloat + DecimalTypeError indecomposition.analyze_contributionand the whole family in one seam β only touches columns that actually contain aDecimal.[KM-624] non-JSON-safe scalars in mode & top_value(commit 6981ed3) β normalize numpy / non-native scalars so descriptive + top-value outputs serialize cleanly.
PlannerβTools registry alignment + Timestamp keys (commit 4bb7623, fix(tools)):
registry.pyβanalyze_descriptive.requiredcorrected["data"]β["data", "column_ids"]to match the compute signature (column_idshas no default). Prevents the Planner from emitting a call that's missing a required arg.analyze_profilestays["data"](itscolumn_idsdefaults toNone).aggregation._cleanβ group-by over a datetime column producedpd.Timestampgroup keys that aren't JSON-safe; now normalized to.isoformat()alongside the existing numpy.item()branch.
DRY: single SAMPLE_LIMIT constant (commit 6d46ba5, [NOTICKET] refactor(catalog)):
- One source of truth in
catalog/introspect/base.py(SAMPLE_LIMIT = 3, down from 5 β token cost: sample values feed the planner prompt). Both introspection paths import it:catalog/introspect/tabular.pyandpipeline/db_pipeline/extractor.py(which dropped its own local= 3). Dependency direction is pipelineβcatalog (no circular import). Stale testtest_sample_values_capped_at_fiveupdated to assert the real cap (3).
Audit result: PlannerβTools arg alignment swept end-to-end β 7/8 analyze_* tools
already matched; the 1 mismatch (analyze_descriptive) is the fix above. Pattern A holds
across all of them.
What just shipped (2026-06-08 β KM-626: slow-path agent layer)
The rest of the slow path after the Planner (KM-567) β TaskRunner, Assembler, and
the coordinator. Built and tested against
mocks; not yet wired into the live ChatHandler (waits on the tool team's real
ToolInvoker + a real BusinessContext). Fast path untouched.
Naming: "Orchestrator" = the entry dispatcher only (agents/orchestration.py).
The slow-path workers live in agents/slow_path/ β deliberately NOT named
"orchestrator".
Files added (src/agents/slow_path/):
schemas.pyβTaskResult,RunState;TaskSummary,AnalysisRecord,AssembledOutput,AssemblerNarrative. ReusesToolOutput.invoker.pyβToolInvokerProtocol only; the tool team owns the impl (KM-418).errors.pyβSlowPathError,AssemblerError.task_runner.pyβ deterministic, 0 LLM: wave-based execution,${t<id>}placeholder resolution, internalvalidate_args, never-throw invoke, status labeling, degrade-and-continue βRunState.assembler.py+prompt.py+config/prompts/assembler.mdβ single LLM call βAssemblerNarrative; code merges withRunStateto build theAnalysisRecord(structured fields copied, never re-authored).coordinator.pyβSlowPathCoordinator: Planner β TaskRunner β Assembler.
Tests added (tests/agents/slow_path/, 12 passing; gitignored): schema round-trips
- chat_answer-first; runner happy/placeholder/parallel/degrade/arg-miss; assembler
narrative-vs-snapshot + question threading; coordinator end-to-end.
ruffclean; tool-agnostic (nosrc/tools/*import).
Open follow-ups (not blockers): wire SlowPathCoordinator into the expanded
Orchestrator/ChatHandler once the real invoker + BusinessContext exist; swap the
test MockToolInvoker for the tool team's real one (zero agent change, INV-7); 4o β
GPT-mini deployment swap.
What just shipped (2026-06-08 β tool taxonomy + ownership revision)
Team decisions after the teammate pushed KM-624 (src/tools/analytics/):
- Composite tools, not atomic. v1 uses composite "family" tools (
analyze_*), not the atomiccompute_*set the earlier draft assumed. Oneanalyze_*call does a whole analytical job (e.g.analyze_descriptivesubsumes median/mode/stddev/percentile;analyze_trendsubsumesdate_trunc). Tool-taxonomy decision recorded. - Tool team owns ALL tools β compute, data-access (
query_structured,retrieve_documents,list_sources,describe_source), the wrapper/invoker layer (KM-418), and all tool tests. The agent team owns nothing below the registry contract. - Planner stub realigned to the real tools.
registry.pyrewritten from the 9 atomic entries to 12 composite entries (4 data-access + 8analyze_*);examples.pyrewritten (Example A βanalyze_contribution, Example B βanalyze_trend);planner.mdbullet updated; planner tests updated. 32 passing + 1 gated,ruffclean. - Open (tool team's call): Pattern A (analyze_* take a
${t<id>}dataplaceholder from an upstreamquery_structured) vs Pattern B (self-fetch bysource_id). Stub assumes A; reshaped to match once decided (agent code unaffected, INV-7). - New coupling: the tool team's
query_structured/retrieve_documentsare expected to call our existingQueryService/RetrievalRouter;query_structuredstays inline-QueryIRsoIRValidatorstill applies. Interface to coordinate.
Next (our scope, all mock-able now): TaskRunner + Assembler against a MockToolInvoker,
then Orchestrator slow-path wiring. Stubs still to retire on integration: contracts.py
(BusinessContext from lead; ToolSpec/ToolRegistry/ToolOutput from tool team) and registry.py
(real registry from tool team). Infra: swap the 4o stand-in for a GPT-mini deployment.
What just shipped (2026-06-05 β Phase 3: Planner agent)
First slow-path agent (the Planner). A single LLM
call turns BusinessContext + Catalog + ToolRegistry + question + Constraints into a
validated, static TaskList (DAG of fully-specified tool-call chains). No
replanning (INV-6); tool-agnostic against a registry contract (INV-7). Fast path
(agents/orchestration.py, agents/chatbot.py, query/) untouched.
Files added (src/agents/planner/):
contracts.pyβ STUB Pydantic contracts pending reconciliation:BusinessContext(+KeyTerm/DataTableNote/DataColumnNote, lead's),ToolSpec/ToolRegistry(tool team KM-608),ToolOutputenvelope.schemas.pyβCrispStage,ToolCall,Task,TaskList. No replan schemas.inputs.pyβCatalogSummary(condensed, PIIsample_valuesnulled,from_catalogbuilder +render) andConstraints(max_tasks=5, modeling_allowed=False).registry.pyβ STUB v1 P0 registry: query_structured, retrieve_documents, list_sources, describe_source, compute_median/stddev/percentile/mode, date_trunc.errors.pyβPlannerError,PlannerValidationError.prompt.py+config/prompts/planner.mdβ system prompt (INV-1/6/7 + principles) + per-call human content (context + catalog + tools + constraints + few-shots + question).examples.pyβ two few-shots (A exploratory revenue-by-category; B descriptive monthly-trend-by-region with date_trunc), built from the realTaskListschema.validator.pyβPlannerValidatorrunning the 8 checks; reuses the existingIRValidatorfor inlinequery_structuredIRs.service.pyβPlannerService+plan_analysis(...): chain (mirrorsquery/planner/service.py) + validate-and-retry loop (max 3, mirrorsQueryService).
Tests added (tests/agents/planner/, 30 passing + 1 gated): test_schemas.py,
test_inputs.py, test_validator.py (one failure per check + happy paths),
test_service.py (_FakeChain + retry), test_golden_questions.py (live eval gated on
RUN_PLANNER_EVAL=1). ruff check clean on planner paths.
Open follow-ups (not blockers): reconcile BusinessContext with the lead and
ToolRegistry/ToolSpec + real tools with teammate (KM-608); "GPT mini" currently uses
the configured 4o deployment (swap azure_deployment when a mini deployment exists). Next:
Orchestrator slow-path expansion + TaskRunner + Assembler.
Legend
[x]done and merged[~]in progress (open PR or active branch)[ ]not started- DB / TAB / B β ownership (from REPO_CONTEXT.md)
PR sequence
| PR | Status | Owner(s) | Scope |
|---|---|---|---|
| PR1 | [x] merged |
DB | Contract locks + catalog plumbing + DB introspector + IR validator + tests |
| PR1-tab | [x] shipped |
TAB | Tabular introspector + on_tabular_uploaded trigger + 31 unit tests |
| PR2a | [x] merged |
DB | CatalogEnricher + StructuredPipeline + on_db_registered trigger + FK extension on Table (enricher later removed in KM-557) |
| KM-557 | [x] shipped |
DB | Drop CatalogEnricher entirely (cost cut β planner uses stats + sample rows directly); rename jsonb table catalogs β data_catalog; add GET /api/v1/data-catalog/{user_id} index endpoint for catalog refresher |
| PR2b | [x] shipped |
DB-solo (B-review) | IntentRouter + planner prompt + planner LLM service |
| PR3-DB | [x] shipped |
DB | SqlCompiler (Postgres) + DbExecutor (sqlglot guard, RO + statement_timeout, asyncio.to_thread) + 36 golden IRβSQL tests |
| PR3-TAB | [x] shipped |
TAB | PandasCompiler + TabularExecutor + 43+12 golden IRβDataFrame tests |
| PR4 | [x] |
DB-solo (B-review) | ExecutorDispatcher + QueryService + ChatHandler module. API rewired in Cleanup PR. |
| PR5 | [x] shipped |
DB-solo (B-review) | Retry/self-correction loop on validation failure (lives in QueryService, max 3 attempts, planner re-prompted with prior error) |
| PR6 | [~] scaffold |
DB-solo (B-review) | Eval harness scaffold + 3 DB-targeting golden cases. Skipped without RUN_PLANNER_EVAL=1 env. TAB extends with tabular cases. |
| PR7 | [x] |
DB-solo (B-review) | ChatbotAgent (renamed from AnswerAgent) + chatbot_system + guardrails prompts. answer_agent.py β chatbot.py, AnswerAgent β ChatbotAgent. API rewired in Cleanup PR. |
| Cleanup | [x] |
B | ChatHandler wired to chat.py; Phase 1 dual-write dropped from /ingest; on_catalog_rebuild_requested + POST /data-catalog/rebuild; dead modules deleted (chatbot Phase 1, orchestrator, query/base, knowledge.py, config/agents/); retrieval cache restored via RetrievalRouter; top_values added to ColumnStats; lifespan migration; knowledge_router removed. |
All items
Contracts (B β shared)
| # | Item | Status | Notes |
|---|---|---|---|
| 1 | Catalog Pydantic models (catalog/models.py) |
[x] |
PR1 added location_ref URI-scheme docstring; PR2a added ForeignKey model + Table.foreign_keys field |
| 2 | IR Pydantic models (query/ir/models.py) |
[x] |
Pre-existing scaffold |
| 3 | IR operator whitelists (query/ir/operators.py) |
[x] |
PR1 filled TYPE_COMPATIBILITY matrix |
| 4 | PII patterns / regex (security/pii_patterns.py) |
[x] |
Pre-existing |
| β | data_catalog Postgres jsonb table (db/postgres/models.py) |
[x] |
PR1 added Catalog SQLAlchemy class + init_db.py import. KM-557 renamed __tablename__ from catalogs β data_catalog; created fresh (no migration) |
| β | QueryResult shape (query/executor/base.py) |
[x] |
Pre-existing scaffold; columns: list[str] added (TAB owner, PR1-tab) β DbExecutor updated to populate it. |
| β | Source.location_ref URI scheme |
[x] |
PR1 documented in catalog/models.py docstring |
Ingestion β introspection
| # | Item | Owner | Status | Notes |
|---|---|---|---|---|
| 5 | DB introspector (catalog/introspect/database.py) |
DB | [x] |
PR1 β reuses Phase 1 database_client_service, db_credential_encryption, db_pipeline_service.engine_scope, extractor.get_schema/profile_column/get_row_count. PR2a wired FK extraction (was discarded before). |
| 6 | Tabular introspector (catalog/introspect/tabular.py) |
TAB | [x] |
PR1-tab β downloads original blob (CSV/XLSX/Parquet), one Table per sheet (XLSX) or one Table (CSV/Parquet). source_id = document_id. fetch_doc/fetch_blob injectable for unit tests (no Settings). 2026-06-10: sample cap now imports the shared SAMPLE_LIMIT (=3) from catalog/introspect/base.py β single source of truth across the tabular + DB introspection paths (commit 6d46ba5). |
| 7 | BaseIntrospector ABC (catalog/introspect/base.py) |
B | [x] |
Pre-existing; signature locked |
Ingestion β shared catalog plumbing
| # | Item | Owner | Status | Notes |
|---|---|---|---|---|
| 8 | B | REMOVED in KM-557 | Cost optimization β planner reads stats + sample rows + column names directly. catalog/enricher.py + config/prompts/catalog_enricher.md deleted. render_source (the only piece still needed) moved to src/catalog/render.py. Tests moved to tests/catalog/test_render.py. |
|
| 9 | Catalog validator (catalog/validator.py) |
B | [x] |
PR1 (DB owner picked up) β uniqueness invariants |
| 10 | Catalog store β Postgres jsonb (catalog/store.py) |
B | [x] |
PR1 (DB owner picked up) β INSERT ... ON CONFLICT |
| 11 | Catalog reader (catalog/reader.py) |
B | [x] |
PR1 (DB owner picked up) β filters by source_hint, empty on miss |
| 12 | PII detector (catalog/pii_detector.py) |
B | [x] |
PR1 (DB owner picked up) β name + value matching, bias toward over-flag |
Ingestion β pipelines
| # | Item | Owner | Status | Notes |
|---|---|---|---|---|
| 13 | Structured pipeline (pipeline/structured_pipeline.py) |
B | [x] |
PR2a (DB owner) β Source-type-agnostic: caller supplies the introspector. default_structured_pipeline() factory wires production deps lazily so tests can inject mocks without Settings() construction. KM-557: enrich step removed; pipeline is now introspect β merge with existing β validate β upsert. Constructor no longer takes enricher. |
| 14 | Triggers (pipeline/triggers.py) |
B | [x] |
PR2a β on_db_registered implemented (DB owner). PR1-tab β on_tabular_uploaded implemented (TAB owner). 2026-05-11 β on_document_uploaded implemented. 2026-05-12 β on_catalog_rebuild_requested implemented: iterates all Sources in current catalog, re-runs on_db_registered (schema) or on_tabular_uploaded (tabular) per source; per-source errors logged but don't abort. |
| 15 | Ingestion orchestrator (pipeline/orchestrator.py) |
B | DELETED | Redundant stub β StructuredPipeline already takes introspector at run() time. Deleted in Cleanup PR. |
| 16 | Document pipeline (pipeline/document_pipeline.py) |
TAB | [x] |
Flattened pipeline/document_pipeline/document_pipeline.py (folder) β pipeline/document_pipeline.py (file). Updated import in api/v1/document.py. |
Query β shared spine
| # | Item | Owner | Status | Notes |
|---|---|---|---|---|
| 17 | IR validator (query/ir/validator.py) |
B | [x] |
PR1 (DB owner) β full rule set; descriptive errors for planner retry |
| 18 | Planner LLM service (query/planner/service.py) |
B | [x] |
PR2b β Azure OpenAI structured output β QueryIR. Injectable chain. Supports retry via previous_error argument. |
| 19 | Planner prompt (query/planner/prompt.py, config/prompts/query_planner.md) |
B | [x] |
PR2b β system prompt with hard constraints + few-shot for DB and tabular sources. build_planner_prompt(question, catalog, previous_error) calls catalog.render.render_source (renamed from catalog.enricher.render_source in KM-557). |
| 20 | Intent router (agents/orchestration.py β class OrchestratorAgent; config/prompts/intent_router.md) |
B | [x] |
PR2b β single LLM call β IntentRouterDecision(needs_search, source_hint, rewritten_query). Supports conversation history. NOTE: source filename + class name were kept from Phase 1 for import-site compatibility; only the body is Phase 2. Prompt file and test file use the intent_router name. |
| 21 | Executor base + QueryResult (query/executor/base.py) |
B | [x] |
Pre-existing scaffold |
| 22 | Executor dispatcher (query/executor/dispatcher.py) |
B | [x] |
PR4 β picks DbExecutor / TabularExecutor by source.source_type. Lazy imports of production executors keep import side-effect-free for tests. Caches per source_type. |
| 23 | Compiler base ABC (query/compiler/base.py) |
B | [x] |
Pre-existing scaffold |
| 24 | Top-level QueryService (query/service.py) |
B | [x] |
PR4+5 β plan β validate β dispatch β execute β QueryResult. Retry loop on validation failure (max 3, planner re-prompted with prior error). Catches NotImplementedError from TabularExecutor placeholder gracefully. Never raises. |
Query β DB path
| # | Item | Status | Notes |
|---|---|---|---|
| 25 | SQL compiler (query/compiler/sql.py) |
[x] |
PR3-DB β Postgres dialect (Supabase reuses); deterministic IR β (sql, named-params dict); double-quoted identifiers from catalog; all whitelisted ops (=, !=, <, <=, >, >=, in, not_in, is_null, is_not_null, like, between); alias-aware order_by; CompiledSql.params: dict[str, Any] (changed from list). MySQL/BigQuery/Snowflake compilers later. |
| 26 | DB executor (query/executor/db.py) |
[x] |
PR3-DB β sync engine via db_pipeline_service.engine_scope inside asyncio.to_thread. sqlglot SELECT-only / no-DML guard. Postgres-only session settings: default_transaction_read_only=on + statement_timeout=30000. asyncio.wait_for backstop. Never raises β populates QueryResult.error. 10k row hard cap. |
| 27 | Credential encryption (security/credentials.py) |
[ ] |
Stub exists; PR1 reused Phase 1 utils/db_credential_encryption.py instead. Move in cleanup PR |
| 28 | User-DB connection management | [x] |
PR3-DB reused Phase 1 db_pipeline_service.engine_scope (same as PR1 introspector); no new helper needed |
Query β Tabular path
| # | Item | Status | Notes |
|---|---|---|---|
| 29 | Pandas compiler (query/compiler/pandas.py) |
[x] |
PR3-TAB β CompiledPandas dataclass; all 12 filter ops; all 6 aggs; group_by via pd.concat of Series; alias-aware order_by; _like_to_regex (%β.*, _β.); pure module-level helpers. (polars for large files still deferred β see Planned dependencies.) |
| 30 | Tabular executor (query/executor/tabular.py) |
[x] |
PR3-TAB β fetch_blob injectable for tests; blob path: single-table β {uid}/{did}.parquet, multi-table β {uid}/{did}__{table.name}.parquet; asyncio.to_thread; 10k row hard cap; errors β QueryResult.error. Dispatcher routes to it by source_type. |
| 31 | Parquet upload/download wrapper | [x] |
Moved knowledge/parquet_service.py β storage/parquet.py. Updated 4 import sites: pipeline/document_pipeline.py, knowledge/processing_service.py, query/executor/tabular.py, query/executors/tabular.py. |
Agents + chat
| # | Item | Status | Notes |
|---|---|---|---|
| 32 | Chatbot agent + prompt (agents/chatbot.py, config/prompts/chatbot_system.md) |
[x] |
PR7-bundle β ChatbotAgent (was AnswerAgent) streams tokens, accepts QueryResult or list[DocumentChunk] or neither. Cleanup PR: renamed answer_agent.py β chatbot.py, AnswerAgent β ChatbotAgent; Phase 1 agents/chatbot.py deleted. |
| 33 | Guardrails prompt (config/prompts/guardrails.md) |
[x] |
PR7-bundle β appended to chatbot_system.md so guardrails take precedence in conflict. |
| β | Chat handler / orchestrator (agents/chat_handler.py) |
[x] |
PR4-bundle β top-level Phase 2 orchestrator. Routes by source_hint: chat β AnswerAgent direct; structured β CatalogReader + QueryService; unstructured β DocumentRetriever placeholder + AnswerAgent. Yields intent / chunk / done / error SSE-style events. Phase 1 chat.py NOT touched β cleanup PR rewires the API to call this. 2026-06-09: gained the gated structuredβslow branch (enable_slow_path=False) + enable_tracing (KM-626/631). |
Tools β slow-path "Tools" component (TAB)
New scope after the original 42-item table; added as the tool layer landed (KM-608/624β631). All TAB-owned (src/tools/), all never-throw.
| # | Item | Owner | Status | Notes |
|---|---|---|---|---|
| β | Analytics compute fns (tools/analytics/) |
TAB | [x] |
KM-608/624/625 β 8 composite analyze_* fns (descriptive, aggregate, comparison, contribution, profile, correlation, segment, trend) + prompt-style DESCRIPTIONs. Pure pandas, no I/O. JSON-safe outputs (numpy/Decimal/Timestamp normalized β KM-624 + commit 4bb7623). |
| β | Tool contracts (tools/contracts.py) |
TAB | [x] |
KM-627 β canonical ToolSpec / ToolRegistry / ToolOutput. agents/planner/contracts.py re-exports them (+ keeps the lead's BusinessContext stub). |
| β | Analytics registry (tools/registry.py) |
TAB | [x] |
KM-628 β analytics_registry(). analyze_descriptive.required = ["data","column_ids"] (aligned to compute signature, commit 4bb7623). |
| β | Invoker layer (tools/invoker.py) |
TAB | [x] |
KM-629 β AnalyticsToolInvoker (Pattern A: analyze_* take a data ${t<id>} placeholder from upstream query_structured; _materialize β DataFrame, _coerce_decimals covers the whole family) + CompositeToolInvoker (routes data-access vs analytics by name). |
| β | Data-access tools (tools/data_access.py) |
TAB | [x] |
KM-630 β DataAccessToolInvoker: list_sources / describe_source / query_structured / retrieve_documents. Per-request DI (user_id + CatalogReader). query_structured calls IRValidator + ExecutorDispatcher (planner skipped β IR pre-built by the agent Planner). Superseded by KM-642/643 β renamed data_retrieve/knowledge_retrieve and list_sources+describe_source merged into data_check + new knowledge_check; see row below. |
| β | Tool tests (tests/unit/tools/) |
TAB | [x] |
analytics + data-access + invoker tests (gitignored). Incl. regression test_decimal_columns_coerced_for_analyze_contribution. |
| β | Data/knowledge tool taxonomy (tools/data_access.py) |
TAB | [x] |
KM-642/643 (commits c38c0c2, 4bd5f1e) β renamed query_structuredβdata_retrieve, retrieve_documentsβknowledge_retrieve; merged list_sources+describe_source β parameterized data_check (no arg = list structured sources; source_id = that source's schema) + new knowledge_check (unstructured/documents). Split mirrors the catalog's structured/unstructured slices. Planner stub/prompt/validator/few-shots synced; DATA_ACCESS_TOOLS guard kept in lockstep. Note: dated log entries above (e.g. the 2026-06-09 E2E) keep the old names as historical record. |
API surface
| # | Item | Owner | Status | Notes |
|---|---|---|---|---|
| 34 | DB client endpoints (api/v1/db_client.py) |
DB | [x] |
Cleanup PR β /ingest now calls only on_db_registered. Phase 1 db_pipeline_service.run() + decrypt_credentials_dict removed. Error from catalog build now raises HTTP 500 (was silent log). Response simplified to {"status": "success", "client_id": ...}. |
| 35 | Document/tabular upload endpoints (api/v1/document.py) |
TAB | [x] |
Rewired /document/process β after processing CSV/XLSX, calls on_tabular_uploaded(document_id, user_id). Catalog ingestion failure is logged but does not fail the request. 2026-05-11 β CSV/XLSX no longer ingested to vector store (knowledge_processor skipped for tabular types in document_pipeline.py); they go to catalog only. |
| 36 | Chat stream endpoint (api/v1/chat.py) |
B | [x] |
Rewired /chat/stream β replaced query_executor.execute() (Phase 1) with CatalogReader + QueryService (Phase 2). Cleanup PR: fully rewired to ChatHandler.handle(). Inline intent routing, retrieval, and answer generation removed. Redis cache, fast intent, history loading, and message persistence remain in chat.py. Sources event emits [] (retrieval not yet exposed by ChatHandler). |
| 37 | Room / users endpoints (api/v1/room.py, api/v1/users.py) |
B | [ ] |
No catalog work; only touch if auth flow changes |
| β | Data catalog index endpoint (api/v1/data_catalog.py) |
DB | [x] |
KM-557 β GET /api/v1/data-catalog/{user_id} β list[CatalogIndexEntry]. Cleanup PR β added POST /api/v1/data-catalog/rebuild?user_id= β calls on_catalog_rebuild_requested; per-source errors logged but don't fail the request. |
Tests + eval
| # | Item | Owner | Status | Notes |
|---|---|---|---|---|
| 38 | DB compiler golden tests (tests/query/compiler/test_sql.py) |
DB | [x] |
PR3-DB β 36 tests across all whitelisted ops, identifier quoting, agg / count_distinct / count(*), order_by alias resolution, parameter sequencing, error paths. Pure-Python, no LLM, no DB. |
| 39 | Pandas compiler golden tests (tests/unit/query/compiler/test_pandas_compiler.py) |
TAB | [x] |
PR3-TAB β 43 tests: all 12 filter ops, all 6 aggs, group_by, order_by, limit, aliases, empty DataFrame, error paths. test_tabular_executor.py adds 12 more (blob name resolution + happy path + error paths). |
| 40 | IR validator tests (tests/query/ir/test_validator.py) |
B | [x] |
PR1 β 19 tests, all rules covered |
| β | PII detector tests (tests/catalog/test_pii_detector.py) |
B | [x] |
PR1 β 26 tests (parametrized) |
| β | Catalog validator tests (tests/catalog/test_validator.py) |
B | [x] |
PR1 β 5 tests |
| β | Catalog render tests (tests/catalog/test_render.py) |
B | [x] |
KM-557 β 5 tests (renamed from test_enricher.py; LLM enrichment tests dropped, render-only tests kept). |
| β | Catalog store integration test (tests/catalog/test_store.py) |
DB | [x] |
PR1 β module-level skip without RUN_INTEGRATION_TESTS=1 |
| β | DB introspector test | DB | [ ] |
Deferred to PR2 β needs Postgres testcontainer or fixture infra |
| β | Tabular introspector test | TAB | [x] |
PR1-tab β 31 unit tests (CSV/XLSX/Parquet, stats, PII, error paths). No DB/blob I/O β mocks injected via constructor. |
| 41 | Planner eval (tests/query/planner/) |
B | [x] |
PR6-scaffold β test_golden_questions.py with 3 DB-targeting cases. TAB added test_golden_tabular.py with 4 tabular cases (group_by+sum, top-N+limit, date range filter, XLSX sheet selection). All 4 passed against real Azure OpenAI. Fix shipped alongside: query/planner/service.py replaced ("system", text) tuple with SystemMessage β without this, {...} in query_planner.md was parsed as f-string variables and crashed on every real invocation. |
| 42 | E2E smoke tests (tests/e2e/) |
B | [ ] |
Defer until Phase 2 endpoints are wired (cleanup PR). Component-level orchestration is already covered by test_chat_handler.py + test_service.py. |
| β | Golden IR fixtures (tests/fixtures/golden_irs.json) |
B | [~] |
PR1 seeded with 5 DB-targeting examples; TAB extends in PR1-tab |
| β | Shared sample_catalog fixture (tests/conftest.py) |
B | [x] |
PR1 β DB-shaped; TAB may add tabular sibling |
What just shipped (2026-05-12 β Cleanup PR)
Phase 1 removal + Phase 2 API rewiring:
src/api/v1/chat.pyβ fully rewired toChatHandler.handle(). Removed inline IntentRouter, retrieval, and ChatbotAgent calls. Redis cache, fast intent, load_history, save_messages stay in chat.py.src/api/v1/db_client.pyβ/ingestnow calls onlyon_db_registered. Phase 1db_pipeline_service.run()block removed. Catalog build failure now raises HTTP 500.src/api/v1/data_catalog.pyβ addedPOST /api/v1/data-catalog/rebuildendpoint.src/pipeline/triggers.pyβon_catalog_rebuild_requestedimplemented: iterates catalog sources, re-runs the appropriate trigger per source type, per-source errors logged.
Dead modules deleted:
src/agents/chatbot.py(Phase 1 LangChain chatbot)src/pipeline/orchestrator.py(empty stub)src/query/base.py(old duplicate ofexecutor/base.py)src/api/v1/knowledge.py(fake/knowledge/rebuildendpoint)src/config/agents/(folder β prompts only used by deleted Phase 1 chatbot)
Renames:
src/agents/answer_agent.pyβsrc/agents/chatbot.py;AnswerAgentβChatbotAgent; updated all import sites (chat_handler.py,chat.py)
Fixes + improvements:
src/agents/chat_handler.pyβ_get_document_retriever()now returnsRetrievalRouter(Redis-cached) instead ofDocumentRetrieverdirectly; retrieval-level cache restored.src/retrieval/router.pyβ removed deaddb: AsyncSessionandsource_hintparameters +_UNSTRUCTURED_HINTSconstant fromretrieve(). Cache key simplified.src/knowledge/processing_service.pyβ removed dead_build_csv_documents,_build_excel_documents,_profile_dataframe,_to_sheet_documentmethods +pandasandupload_parquetimports.src/catalog/models.pyβ addedtop_values: list[Any] | NonetoColumnStats.src/catalog/introspect/tabular.pyβ_to_columnnow populatestop_valuesfor columns with β€10 distinct values; useful for query planner WHERE clause generation.main.pyβ replaced deprecated@app.on_event("startup")withlifespancontext manager; removedknowledge_router.
What just shipped (KM-557 β DB owner)
After lead review of the catalog ingestion cost: dropped LLM enrichment, renamed the storage table, and exposed a lightweight index endpoint for the upcoming catalog refresher.
Files deleted:
src/catalog/enricher.pyβ entire CatalogEnricher + EnrichmentResponse + apply_descriptions removedsrc/config/prompts/catalog_enricher.mdβ dead prompttests/catalog/test_enricher.pyβ replaced bytest_render.py
Files added:
src/catalog/render.pyβ new home forrender_source(the only piece of the old enricher still needed; consumed byquery/planner/prompt.py)src/api/v1/data_catalog.pyβGET /api/v1/data-catalog/{user_id}returnslist[CatalogIndexEntry]tests/catalog/test_render.pyβ 5 tests (same coverage as the old render block)
Files modified:
src/db/postgres/models.pyβ__tablename__ = "data_catalog"(was"catalogs"). Class name unchangedsrc/pipeline/structured_pipeline.pyβStructuredPipeline(validator, store)(was(enricher, validator, store)); pipeline is nowintrospect β merge β validate β upsert;default_structured_pipeline()no longer constructs an enrichersrc/pipeline/triggers.pyβ docstrings updated;on_catalog_rebuild_requesteddocstring rewritten for the refresher use casesrc/query/planner/prompt.pyβ import nowfrom ...catalog.render import render_sourcesrc/catalog/introspect/{base,database,tabular}.pyβ docstring scrubs (no behavior changes)src/models/api/catalog.pyβ addedCatalogIndexEntry; simplifiedCatalogRebuildResponsetosources_rebuiltmain.pyβ registereddata_catalog_routersrc/security/README.mdβ one stale wording fix
No migration: the data_catalog table is created from scratch on first init_db(). The old catalogs table was never deployed against production data, so no rename SQL is needed.
Tests: all 4 test_structured_pipeline.py tests reworked to construct StructuredPipeline(validator=, store=) without enricher. 5 test_render.py tests cover render_source standalone.
Lint: ruff check clean on modified Phase 2 paths.
Open follow-ups left for the lead:
on_catalog_rebuild_requestedbody β the refresher will iterate the index endpoint and call this trigger per sourceapi/v1/db_client.py/ingeststill doesn't callon_db_registeredβ same blocker as before, untouched by KM-557
What just shipped (2026-05-11 β retrieval migration + bug fixes)
Files implemented / migrated:
src/retrieval/base.pyβRetrievalResultdataclass +BaseRetrieverABC (was insrc/rag/base.py)src/retrieval/document.pyβ fullDocumentRetrievermigrated fromsrc/rag/retrievers/document.py; all retrieval methods (MMR/cosine/euclidean/inner_product/manhattan). Tabular file types filtered out from results.src/retrieval/router.pyβRetrievalRouter(Redis-cached, unstructured-only).invalidate_cache(user_id)clears allretrieval:{user_id}:*keys.
Deleted (no longer used):
src/rag/β entire folder (base.py, retriever.py, router.py, retrievers/)src/tools/β entire folder (search.py was the only real file; only called by deleted rag/ router)
Bug fixes:
src/pipeline/document_pipeline.pyβretrieval_router.invalidate_cache(user_id)called afterprocess()anddelete(). Redis failure is caught and logged (does not fail the document op).src/pipeline/document_pipeline.pyβ CSV/XLSX now skipsknowledge_processor(vector store). Tabular files go to catalog only; no duplicate embeddings.src/pipeline/triggers.pyβon_document_uploadedimplemented (wasraise NotImplementedError).src/agents/chat_handler.pyβ_normalize_chunksnow handlesRetrievalResultobjects. Previously they were silently dropped, causing empty context for unstructured queries through ChatHandler.
Import updates (all changed from src.rag.* β src.retrieval.*):
src/api/v1/chat.py,src/query/base.py,src/query/query_executor.py,src/query/executors/db_executor.py,src/query/executors/tabular.py
What shipped previously (PR2b/4/5/6/7-bundle β DB owner solo, teammate reviews)
Files implemented:
src/agents/orchestration.pyβOrchestratorAgent.classify(message, history) β IntentRouterDecision. Pydantic model for structured output. History-aware query rewriting. Phase 1 filename + class name preserved; body fully rewritten for Phase 2.src/agents/answer_agent.pyβAnswerAgent.astream(...)streams answer tokens; acceptsQueryResultand/orlist[DocumentChunk]. Renames tochatbot.pyin cleanup PR.src/agents/chat_handler.pyβChatHandler.handle(message, user_id, history)returnsAsyncIterator[dict]ofintent/chunk/done/errorSSE events. All deps injectable; lazy default builders.src/query/planner/prompt.pyβrender_catalog(catalog)+build_planner_prompt(question, catalog, previous_error). Reusescatalog.enricher.render_sourcefor consistency across LLM call sites.src/query/planner/service.pyβQueryPlannerService.plan(question, catalog, previous_error)Azure OpenAI structured output βQueryIR.src/query/executor/dispatcher.pyβExecutorDispatcher.pick(ir) β BaseExecutorbysource.source_type. Lazy executor imports + per-source-type cache.src/query/service.pyβQueryService.run(user_id, question, catalog) β QueryResult. Planβvalidateβretry-on-failure (max 3)βdispatchβexecute. Catches NotImplementedError from TabularExecutor placeholder gracefully.
Prompts written (filled in placeholders):
src/config/prompts/intent_router.mdsrc/config/prompts/query_planner.mdsrc/config/prompts/chatbot_system.mdsrc/config/prompts/guardrails.md
Tests added (46 new β total now 146 + 2 skipped):
tests/agents/test_intent_router.py(4)tests/agents/test_answer_agent.py(12)tests/agents/test_chat_handler.py(6)tests/query/planner/test_prompt.py(7)tests/query/planner/test_service.py(3)tests/query/executor/test_dispatcher.py(5)tests/query/test_service.py(8)tests/query/planner/test_golden_questions.py(3 β skipped by default; eval harness scaffold)
Lint: ruff check clean on all Phase 2 paths. Phase 1 files have pre-existing E501/S608 issues β out of scope for this PR.
Placeholders / blockers for teammate (status as of DB owner's commit, before merge):
src/query/executor/tabular.py(TAB) β DB owner's note: "still raises NotImplementedError". Post-merge: TAB shipped this in PR3-TAB; dispatcher now routes to the realTabularExecutor. TheNotImplementedErrorcatch inQueryServicestays as a safety net.src/retrieval/document.pyβ implemented (2026-05-11). FullDocumentRetrievermigrated fromsrc/rag/retrievers/document.py; supports MMR/cosine/euclidean/manhattan/inner_product._normalize_chunksinchat_handler.pynow handlesRetrievalResultβDocumentChunkconversion correctly.src/api/v1/chat.py(Phase 1) β NOT touched. Cleanup PR rewires the SSE endpoint to callChatHandler.handle(...).src/api/v1/db_client.py(Phase 1) β NOT touched. Cleanup PR rewires/database-clients/{id}/ingestto callpipeline.triggers.on_db_registered.
What shipped previously (PR3-TAB β TAB owner)
Files implemented:
src/query/compiler/pandas.pyβPandasCompiler+CompiledPandas(apply, output_columns)dataclass. Pure helper functions (easier to test in isolation):_apply_filters(all 12 ops,_like_to_regexfor LIKE),_apply_select(column pick + rename),_apply_agg(scalar + group_by viapd.concatof Series βreset_index),_apply_orderby(alias-aware via_resolve_order_col). Closure captures all IR fields explicitly soapply(df)is self-contained.src/query/executor/tabular.pyβTabularExecutorwith injectablefetch_blob(same testability pattern asTabularIntrospector). Resolves Parquet blob path fromaz_blob://{uid}/{did}+ table: single-table β{uid}/{did}.parquet, multi-table β{uid}/{did}__{table.name}.parquet. Runs compile β download βasyncio.to_thread(_load_and_apply)β 10k hard cap. Never raises; errors populateQueryResult.error. Usescompiled.output_columnsfor column labels (safe on empty DataFrame).
Tests added (55 new β total suite was 86 all passing at PR3-TAB time):
tests/unit/query/compiler/test_pandas_compiler.pyβ 43 tests across all 12 filter ops (includingis_null,not_in,like,between), all 6 agg fns, group_by, order_by asc/desc, limit-after-order, alias round-trip, empty DataFrame, error paths.tests/unit/query/executor/test_tabular_executor.pyβ 12 tests:_resolve_blob_name(single/multi-table, bad prefix), happy-pathQueryResultshape (columns, rows, backend, truncated, source_id), wrong source_type β error, blob fetch failure β error, unknown source β error.
Lint: ruff check clean on both files.
What shipped previously (PR1-tab β TAB owner)
Files implemented:
src/catalog/introspect/tabular.pyβTabularIntrospectorreads original blob (CSV/XLSX/Parquet), profiles each column (dtype, stats, sample values), runs PIIDetector. For XLSX: oneTableper sheet (Table.name = sheet_name); for CSV/Parquet: oneTable(Table.name = filename stem).fetch_doc/fetch_blobare constructor-injectable for unit tests β noSettingsor DB required at import time.src/pipeline/triggers.pyβon_tabular_uploadedwired (mirrorson_db_registeredpattern).
Tests added (31 new):
tests/unit/catalog/test_introspect_tabular.pyβ CSV / XLSX / Parquet shapes, per-column stats, nullable detection, PII name + value matching, sample capping, all error paths. Pure Python, no network I/O.
Executor contract note: introspector downloads the original blob for schema reading. The tabular executor (PR3-TAB) downloads Parquet blobs for query execution. For CSV/Parquet sources (single table), the executor must call parquet_blob_name(uid, did, sheet_name=None); for XLSX (multi-table), parquet_blob_name(uid, did, table.name).
What shipped previously (PR3-DB β DB owner)
Files implemented:
src/query/compiler/sql.pyβSqlCompilerfor Postgres dialect;CompiledSql(sql, params)dataclass withparams: dict[str, Any](changed fromlist); supports all 12 whitelisted filter ops, all 6 aggs, alias-aware order_by;_qidentescapes embedded double-quotessrc/query/executor/db.pyβDbExecutorwith sqlglot SELECT-only guard, Postgres session-level read-only + 30sstatement_timeout,asyncio.wait_forbackstop, 10k row hard cap; rejects non-schemasource_type anddbclient://URI mismatch; never raises (populatesQueryResult.error)
Files extended:
src/query/compiler/pandas.pyβ fixed pre-existing UP035 (Callable import)pyproject.tomlβ addedS608totests/**ruff ignore (false positive: tests assert literal SQL strings)
Tests added (36 new, all passing β total now 100):
tests/query/compiler/test_sql.pyβ every filter op, every agg, count(*), count_distinct, order_by alias vs column, multi-filter AND, identifier quoting escape, error paths
Lint: ruff check clean on Phase 2 paths.
Hand-off note for teammate: CompiledSql.params is now dict[str, Any] not list. The pandas compiler will follow the same convention (or document its own) β coordinate when PR3-TAB lands.
What shipped previously (PR2a β DB owner)
Files implemented:
src/catalog/enricher.pyβ Azure OpenAI GPT-4o + structured output (EnrichmentResponse),render_source(reusable by planner prompt later),apply_descriptionsmerger, injectablestructured_chainfor testssrc/pipeline/structured_pipeline.pyβStructuredPipelineorchestrator +default_structured_pipeline()factory with lazy production-dep importssrc/pipeline/triggers.pyβon_db_registeredwired; tabular/document/rebuild stubs preserved with implementation notes
Files extended:
src/catalog/models.pyβ addedForeignKeymodel,Table.foreign_keys: list[ForeignKey] = []src/catalog/introspect/database.pyβ_extract_foreign_keyspopulatesTable.foreign_keysfrom extractor datasrc/config/prompts/catalog_enricher.mdβ full system prompt with style rules and one few-shot example
Tests added (14 new, all passing β total now 64):
tests/catalog/test_enricher.pyβ render / apply / end-to-end with fake chain (10 tests)tests/pipeline/test_structured_pipeline.pyβ orchestration with stub deps (4 tests)
Lint: ruff check clean on all Phase 2 paths. Phase 1 files (pipeline/db_pipeline/, pipeline/document_pipeline/) have pre-existing ruff issues β out of scope for this PR.
What shipped previously (PR1 β DB owner's first chunk)
Files implemented (was NotImplementedError):
src/catalog/pii_detector.py,src/catalog/validator.py,src/catalog/store.py,src/catalog/reader.pysrc/catalog/introspect/database.py(FK extraction added in PR2a)src/query/ir/validator.py
Files extended:
src/query/ir/operators.pyβTYPE_COMPATIBILITYmatrixsrc/catalog/models.pyβlocation_refURI-scheme docstringsrc/db/postgres/models.pyβCatalogSQLAlchemy table;init_db.pyimports it
Tests: 50 unit tests + 1 integration (gated on RUN_INTEGRATION_TESTS=1).
Reused Phase 1 utilities (cleanup deferred):
src/database_client/database_client_service.py:getsrc/utils/db_credential_encryption.py:decrypt_credentials_dictsrc/pipeline/db_pipeline/db_pipeline_service.py:engine_scopesrc/pipeline/db_pipeline/extractor.py:get_schema/profile_column/get_row_count
Open contract items (not yet locked)
- Joins in IR β currently single-table only (ARCHITECTURE.md Β§7); DB owner accepted the constraint for v1, will revisit in PR3 if it's blocking real queries
updated_aton Source vsgenerated_aton Catalog β Pydantic models have both; introspector sets per-Source; CatalogStore preserves both- Catalog refresh trigger (open question Β§3) β default policy is rebuild-on-upload-or-connect; auto-refresh deferred
- Unstructured catalog entries (open question Β§2) β currently empty filter for
source_hint="unstructured"; revisit when adding doc descriptions - PII handling for
sample_values(open question Β§5) β currently nulls them out (skip); mask/synthesize deferred - Dialect priority for SQL compiler β PR3 will land Postgres first, MySQL second; BigQuery/Snowflake/SQL Server later
How to update this file
When a PR lands:
- Flip status from
[ ]or[~]to[x] - Add a short note (file paths, scope cuts, surprises)
- Bump "Last updated" at the top
- If a new contract decision lands, move it from "Open contract items" to the relevant inline note
When opening a PR:
- Flip status to
[~]and add yourself as the active owner in the PR row - Don't promise items in the PR description that aren't in the table