/fix analysis + report_inputs and storage update
#9
by rhbt6767 - opened
- DEV_PLAN.md +1 -1
- REPO_STATUS.md +5 -3
- src/agents/chat_handler.py +11 -25
- src/api/v1/chat.py +2 -3
- src/api/v2/chat.py +7 -13
- src/config/settings.py +19 -6
- src/query/executor/tabular.py +25 -6
- src/storage/object_storage/__init__.py +13 -0
- src/storage/object_storage/supabase_s3.py +85 -0
DEV_PLAN.md
CHANGED
|
@@ -173,7 +173,7 @@ Status legend: ⬜ not started · 🔄 in progress · ✅ done · ⛔ blocked ·
|
|
| 173 |
| 22 | Finalize `report_inputs` schema → hand to Harry for the dedorch migration | Rifqi → Harry | 🔄 | **DDL ready** (uuid `id`/`analysis_id` + FK→`analyses(id)`; `user_id`/`plan_id` text; `data` jsonb = serialized `AnalysisRecord`, shape documented). dedorch has empty `analysis_records` → rename. Resolves #16. **Action: send Harry the DDL + `data` shape** |
|
| 174 |
| 23 | Report markdown formatting: tables, **bold**, *italic*, horizontal separators | Sofhia | ✅ | Done 2026-06-25. Added `---` separators between header + each section in `_render_markdown`. Tables (EDA) / bold (method labels) / italic (meta + citations) already emitted. Relaxed `report_summary.md` to allow inline `**bold**`/`*italic*` for emphasis (kept no-headings/no-bullets so it doesn't duplicate the section structure / Key Findings). Compile + ruff clean |
|
| 175 |
| 24 | Clarify report input contract: records table (+ `last_report` for edit mode?) | Rifqi/Sofhia ↔ Harry | ⬜ new | Edit-mode input left open at the checkpoint |
|
| 176 |
-
| 25 | Migrate Python chat path to Go `analyses_messages` (+ `analyses`) | Rifqi ↔ Harry |
|
| 177 |
| 26 | **Charts (DEFERRED):** store Plotly JSON in a future `chart` table (not matplotlib PNG) | — | ⏸️ | After the markdown path is done end-to-end |
|
| 178 |
| 27 | **Images (DEFERRED):** image table (id, analysis_id, msg/report ref, order) + originals in a bucket | — | ⏸️ | Maintenance-heavy; parked |
|
| 179 |
| 28 | **UI research** (FE): new-analysis form, knowledge menu (user vs analysis level), report artifacts + version selector | Team | ⬜ new | No dedicated UI person; interview + old analysis UI removed |
|
|
|
|
| 173 |
| 22 | Finalize `report_inputs` schema → hand to Harry for the dedorch migration | Rifqi → Harry | 🔄 | **DDL ready** (uuid `id`/`analysis_id` + FK→`analyses(id)`; `user_id`/`plan_id` text; `data` jsonb = serialized `AnalysisRecord`, shape documented). dedorch has empty `analysis_records` → rename. Resolves #16. **Action: send Harry the DDL + `data` shape** |
|
| 174 |
| 23 | Report markdown formatting: tables, **bold**, *italic*, horizontal separators | Sofhia | ✅ | Done 2026-06-25. Added `---` separators between header + each section in `_render_markdown`. Tables (EDA) / bold (method labels) / italic (meta + citations) already emitted. Relaxed `report_summary.md` to allow inline `**bold**`/`*italic*` for emphasis (kept no-headings/no-bullets so it doesn't duplicate the section structure / Key Findings). Compile + ruff clean |
|
| 175 |
| 24 | Clarify report input contract: records table (+ `last_report` for edit mode?) | Rifqi/Sofhia ↔ Harry | ⬜ new | Edit-mode input left open at the checkpoint |
|
| 176 |
+
| 25 | Migrate Python chat path to Go `analyses_messages` (+ `analyses`) | Rifqi ↔ Harry | ✅ | Done 2026-07-02. Read path already on `analyses_messages` (commit `0066161`). This change makes Python **read-only**: removed the `save_messages` calls from `/api/v2/chat/stream` so **Go is the sole writer** — fixes the double-write both Go+Python were producing. `load_history` still reads `analyses_messages`. v1 `/chat/stream` is unwired so left untouched |
|
| 177 |
| 26 | **Charts (DEFERRED):** store Plotly JSON in a future `chart` table (not matplotlib PNG) | — | ⏸️ | After the markdown path is done end-to-end |
|
| 178 |
| 27 | **Images (DEFERRED):** image table (id, analysis_id, msg/report ref, order) + originals in a bucket | — | ⏸️ | Maintenance-heavy; parked |
|
| 179 |
| 28 | **UI research** (FE): new-analysis form, knowledge menu (user vs analysis level), report artifacts + version selector | Team | ⬜ new | No dedicated UI person; interview + old analysis UI removed |
|
REPO_STATUS.md
CHANGED
|
@@ -129,8 +129,10 @@ GET /report/{analysis_id}/{ver} → fetch one version
|
|
| 129 |
```
|
| 130 |
|
| 131 |
Two facts to internalise:
|
| 132 |
-
- **Records only exist on the slow path.**
|
| 133 |
-
|
|
|
|
|
|
|
| 134 |
- **dedorch `reports` stores markdown only.** Structured report fields are computed at
|
| 135 |
generation, rendered into `rendered_markdown`, and only the markdown is persisted; on
|
| 136 |
read-back the structured fields come back empty.
|
|
@@ -289,7 +291,7 @@ only.
|
|
| 289 |
|
| 290 |
| Flag | Where | Default | Effect |
|
| 291 |
|---|---|---|---|
|
| 292 |
-
| `ENABLE_SLOW_PATH` |
|
| 293 |
| `ENABLE_GATE` | `settings.enable_gate` | **off** | **Deprecated 2026-06-25** — gate neutered; the flag has no effect. Kept to avoid `.env` churn. |
|
| 294 |
| `SKIP_INIT_DB` | `settings.skip_init_db` (.env/env) | **on** | Skip `init_db()` on startup — the dedorch cutover switch. **Defaults TRUE** (Go owns the dedorch schema); set `false` only for a local Python-owned DB. |
|
| 295 |
| `enable_tracing` | hardcoded `True` in `chat.py` | on (endpoint) | Langfuse tracing. |
|
|
|
|
| 129 |
```
|
| 130 |
|
| 131 |
Two facts to internalise:
|
| 132 |
+
- **Records only exist on the slow path.** The slow path is now **always on** for
|
| 133 |
+
`structured_flow` (the `ENABLE_SLOW_PATH` flag was removed 2026-07-02), so every
|
| 134 |
+
structured question persists a record. Reports still 409 until at least one `analyze_*`
|
| 135 |
+
task has actually succeeded (chat/help/check/unstructured turns write no record).
|
| 136 |
- **dedorch `reports` stores markdown only.** Structured report fields are computed at
|
| 137 |
generation, rendered into `rendered_markdown`, and only the markdown is persisted; on
|
| 138 |
read-back the structured fields come back empty.
|
|
|
|
| 291 |
|
| 292 |
| Flag | Where | Default | Effect |
|
| 293 |
|---|---|---|---|
|
| 294 |
+
| ~~`ENABLE_SLOW_PATH`~~ | — | **removed 2026-07-02** | Flag deleted. `structured_flow` now **always** runs Planner/TaskRunner/Assembler (the single-query `QueryService` fast path was retired from the chat handler), so records always persist. `extra="allow"` ignores a stale `ENABLE_SLOW_PATH` left in any `.env`. |
|
| 295 |
| `ENABLE_GATE` | `settings.enable_gate` | **off** | **Deprecated 2026-06-25** — gate neutered; the flag has no effect. Kept to avoid `.env` churn. |
|
| 296 |
| `SKIP_INIT_DB` | `settings.skip_init_db` (.env/env) | **on** | Skip `init_db()` on startup — the dedorch cutover switch. **Defaults TRUE** (Go owns the dedorch schema); set `false` only for a local Python-owned DB. |
|
| 297 |
| `enable_tracing` | hardcoded `True` in `chat.py` | on (endpoint) | Langfuse tracing. |
|
src/agents/chat_handler.py
CHANGED
|
@@ -5,7 +5,8 @@ End-to-end flow per user message:
|
|
| 5 |
1. `OrchestratorAgent.classify` → RouterDecision (one of six intents).
|
| 6 |
2. Route by intent:
|
| 7 |
- `chat` → no context. Pass straight to ChatbotAgent.
|
| 8 |
-
- `structured_flow` → CatalogReader → slow
|
|
|
|
| 9 |
- `unstructured_flow` → DocumentRetriever (RAG over PGVector) →
|
| 10 |
list[DocumentChunk].
|
| 11 |
- `check` → check_data / check_knowledge tool → rendered table.
|
|
@@ -48,7 +49,6 @@ from .orchestration import OrchestratorAgent
|
|
| 48 |
|
| 49 |
if TYPE_CHECKING:
|
| 50 |
from ..catalog.reader import CatalogReader
|
| 51 |
-
from ..query.service import QueryService
|
| 52 |
from ..retrieval.router import RetrievalRouter
|
| 53 |
from .gate import AnalysisState
|
| 54 |
from .slow_path.coordinator import SlowPathCoordinator
|
|
@@ -75,10 +75,8 @@ class ChatHandler:
|
|
| 75 |
intent_router: OrchestratorAgent | None = None,
|
| 76 |
answer_agent: ChatbotAgent | None = None,
|
| 77 |
catalog_reader: CatalogReader | None = None,
|
| 78 |
-
query_service: QueryService | None = None,
|
| 79 |
document_retriever: RetrievalRouter | None = None,
|
| 80 |
*,
|
| 81 |
-
enable_slow_path: bool = False,
|
| 82 |
slow_path_coordinator_factory: (
|
| 83 |
Callable[[str], SlowPathCoordinator] | None
|
| 84 |
) = None,
|
|
@@ -94,16 +92,13 @@ class ChatHandler:
|
|
| 94 |
self._intent_router = intent_router
|
| 95 |
self._answer_agent = answer_agent
|
| 96 |
self._catalog_reader = catalog_reader
|
| 97 |
-
self._query_service = query_service
|
| 98 |
self._document_retriever = document_retriever
|
| 99 |
# Langfuse tracing (tokens + latency). OFF by default so tests never hit
|
| 100 |
# Langfuse; the live endpoint opts in with ChatHandler(enable_tracing=True).
|
| 101 |
self._enable_tracing = enable_tracing
|
| 102 |
-
# Slow analytical path (Planner -> TaskRunner -> Assembler)
|
| 103 |
-
#
|
| 104 |
-
# intents route here instead of the single-query QueryService path. The
|
| 105 |
# factory + store are injectable for tests.
|
| 106 |
-
self._enable_slow_path = enable_slow_path
|
| 107 |
self._slow_path_factory = slow_path_coordinator_factory
|
| 108 |
self._analysis_store = analysis_store
|
| 109 |
# `check` skill: builds the data-access invoker (check_data/check_knowledge)
|
|
@@ -144,13 +139,6 @@ class ChatHandler:
|
|
| 144 |
self._catalog_reader = CatalogReader(CatalogStore())
|
| 145 |
return self._catalog_reader
|
| 146 |
|
| 147 |
-
def _get_query_service(self) -> QueryService:
|
| 148 |
-
if self._query_service is None:
|
| 149 |
-
from ..query.service import QueryService
|
| 150 |
-
|
| 151 |
-
self._query_service = QueryService()
|
| 152 |
-
return self._query_service
|
| 153 |
-
|
| 154 |
def _get_document_retriever(self) -> RetrievalRouter:
|
| 155 |
if self._document_retriever is None:
|
| 156 |
from ..retrieval.router import RetrievalRouter
|
|
@@ -351,15 +339,13 @@ class ChatHandler:
|
|
| 351 |
bound = await self._bound_source_ids(analysis_id)
|
| 352 |
reader = _ScopedCatalogReader(req_reader, bound) if bound else req_reader
|
| 353 |
catalog = await reader.read(user_id, "structured")
|
| 354 |
-
|
| 355 |
-
|
| 356 |
-
|
| 357 |
-
|
| 358 |
-
|
| 359 |
-
|
| 360 |
-
|
| 361 |
-
user_id, rewritten, catalog
|
| 362 |
-
)
|
| 363 |
except Exception as e:
|
| 364 |
logger.error(
|
| 365 |
"structured route failed",
|
|
|
|
| 5 |
1. `OrchestratorAgent.classify` → RouterDecision (one of six intents).
|
| 6 |
2. Route by intent:
|
| 7 |
- `chat` → no context. Pass straight to ChatbotAgent.
|
| 8 |
+
- `structured_flow` → CatalogReader → slow analytical path
|
| 9 |
+
(Planner → TaskRunner → Assembler).
|
| 10 |
- `unstructured_flow` → DocumentRetriever (RAG over PGVector) →
|
| 11 |
list[DocumentChunk].
|
| 12 |
- `check` → check_data / check_knowledge tool → rendered table.
|
|
|
|
| 49 |
|
| 50 |
if TYPE_CHECKING:
|
| 51 |
from ..catalog.reader import CatalogReader
|
|
|
|
| 52 |
from ..retrieval.router import RetrievalRouter
|
| 53 |
from .gate import AnalysisState
|
| 54 |
from .slow_path.coordinator import SlowPathCoordinator
|
|
|
|
| 75 |
intent_router: OrchestratorAgent | None = None,
|
| 76 |
answer_agent: ChatbotAgent | None = None,
|
| 77 |
catalog_reader: CatalogReader | None = None,
|
|
|
|
| 78 |
document_retriever: RetrievalRouter | None = None,
|
| 79 |
*,
|
|
|
|
| 80 |
slow_path_coordinator_factory: (
|
| 81 |
Callable[[str], SlowPathCoordinator] | None
|
| 82 |
) = None,
|
|
|
|
| 92 |
self._intent_router = intent_router
|
| 93 |
self._answer_agent = answer_agent
|
| 94 |
self._catalog_reader = catalog_reader
|
|
|
|
| 95 |
self._document_retriever = document_retriever
|
| 96 |
# Langfuse tracing (tokens + latency). OFF by default so tests never hit
|
| 97 |
# Langfuse; the live endpoint opts in with ChatHandler(enable_tracing=True).
|
| 98 |
self._enable_tracing = enable_tracing
|
| 99 |
+
# Slow analytical path (Planner -> TaskRunner -> Assembler): the only route for
|
| 100 |
+
# `structured_flow` now (the ENABLE_SLOW_PATH flag was removed 2026-07-02). The
|
|
|
|
| 101 |
# factory + store are injectable for tests.
|
|
|
|
| 102 |
self._slow_path_factory = slow_path_coordinator_factory
|
| 103 |
self._analysis_store = analysis_store
|
| 104 |
# `check` skill: builds the data-access invoker (check_data/check_knowledge)
|
|
|
|
| 139 |
self._catalog_reader = CatalogReader(CatalogStore())
|
| 140 |
return self._catalog_reader
|
| 141 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 142 |
def _get_document_retriever(self) -> RetrievalRouter:
|
| 143 |
if self._document_retriever is None:
|
| 144 |
from ..retrieval.router import RetrievalRouter
|
|
|
|
| 339 |
bound = await self._bound_source_ids(analysis_id)
|
| 340 |
reader = _ScopedCatalogReader(req_reader, bound) if bound else req_reader
|
| 341 |
catalog = await reader.read(user_id, "structured")
|
| 342 |
+
# structured_flow always runs the slow analytical path (the
|
| 343 |
+
# ENABLE_SLOW_PATH flag was removed 2026-07-02).
|
| 344 |
+
async for event in self._run_slow_path(
|
| 345 |
+
user_id, rewritten, catalog, tracer, reader, analysis_id
|
| 346 |
+
):
|
| 347 |
+
yield event
|
| 348 |
+
return
|
|
|
|
|
|
|
| 349 |
except Exception as e:
|
| 350 |
logger.error(
|
| 351 |
"structured route failed",
|
src/api/v1/chat.py
CHANGED
|
@@ -26,11 +26,10 @@ router = APIRouter(prefix="/api/v1", tags=["Chat"])
|
|
| 26 |
# is passed into handle()), and lazily builds + caches the Orchestrator/Chatbot
|
| 27 |
# chains — so reusing it keeps the Azure OpenAI clients (and their httpx/TLS pools)
|
| 28 |
# warm across requests instead of re-handshaking on the first call of every request.
|
| 29 |
-
#
|
| 30 |
-
#
|
| 31 |
_chat_handler = ChatHandler(
|
| 32 |
enable_tracing=True,
|
| 33 |
-
enable_slow_path=settings.enable_slow_path,
|
| 34 |
enable_gate=settings.enable_gate,
|
| 35 |
)
|
| 36 |
|
|
|
|
| 26 |
# is passed into handle()), and lazily builds + caches the Orchestrator/Chatbot
|
| 27 |
# chains — so reusing it keeps the Azure OpenAI clients (and their httpx/TLS pools)
|
| 28 |
# warm across requests instead of re-handshaking on the first call of every request.
|
| 29 |
+
# Structured intents always route Orchestrator -> Planner -> TaskRunner -> Assembler
|
| 30 |
+
# (the analytical slow path); the ENABLE_SLOW_PATH flag was removed 2026-07-02.
|
| 31 |
_chat_handler = ChatHandler(
|
| 32 |
enable_tracing=True,
|
|
|
|
| 33 |
enable_gate=settings.enable_gate,
|
| 34 |
)
|
| 35 |
|
src/api/v2/chat.py
CHANGED
|
@@ -12,10 +12,11 @@
|
|
| 12 |
Only chat moves to v2; the tools group + observability stay on `/api/v1` (contract:
|
| 13 |
API_ENDPOINTS_RESTRUCTURE.md §1).
|
| 14 |
|
| 15 |
-
|
| 16 |
-
|
| 17 |
-
|
| 18 |
-
|
|
|
|
| 19 |
"""
|
| 20 |
|
| 21 |
import json
|
|
@@ -38,7 +39,6 @@ from src.api.v1.chat import (
|
|
| 38 |
cache_response,
|
| 39 |
get_cached_response,
|
| 40 |
load_history,
|
| 41 |
-
save_messages,
|
| 42 |
)
|
| 43 |
from src.db.postgres.connection import get_db
|
| 44 |
from src.db.redis.connection import get_redis
|
|
@@ -88,7 +88,6 @@ async def chat_stream(request: ChatRequest, db: AsyncSession = Depends(get_db)):
|
|
| 88 |
logger.info("Returning cached response")
|
| 89 |
cached_text = cached["response"]
|
| 90 |
cached_sources = cached["sources"]
|
| 91 |
-
await save_messages(db, analysis_id, request.user_id, request.message, cached_text)
|
| 92 |
|
| 93 |
async def stream_cached():
|
| 94 |
yield {"event": "sources", "data": json.dumps(cached_sources)}
|
|
@@ -103,7 +102,6 @@ async def chat_stream(request: ChatRequest, db: AsyncSession = Depends(get_db)):
|
|
| 103 |
direct = _fast_intent(request.message)
|
| 104 |
if direct:
|
| 105 |
await cache_response(redis, cache_key, direct, sources=[])
|
| 106 |
-
await save_messages(db, analysis_id, request.user_id, request.message, direct)
|
| 107 |
|
| 108 |
async def stream_direct():
|
| 109 |
yield {"event": "sources", "data": json.dumps([])}
|
|
@@ -142,12 +140,8 @@ async def chat_stream(request: ChatRequest, db: AsyncSession = Depends(get_db)):
|
|
| 142 |
# Only cache stateless `chat` replies (see _CACHEABLE_INTENTS).
|
| 143 |
if effective_intent in _CACHEABLE_INTENTS:
|
| 144 |
await cache_response(redis, cache_key, full_response, sources=sources)
|
| 145 |
-
|
| 146 |
-
|
| 147 |
-
db, analysis_id, request.user_id, request.message, full_response
|
| 148 |
-
)
|
| 149 |
-
except Exception as e:
|
| 150 |
-
logger.error("save_messages failed", analysis_id=analysis_id, error=str(e))
|
| 151 |
yield done_event
|
| 152 |
elif event["event"] == "status":
|
| 153 |
# slow-path progress: forward so the client shows activity.
|
|
|
|
| 12 |
Only chat moves to v2; the tools group + observability stay on `/api/v1` (contract:
|
| 13 |
API_ENDPOINTS_RESTRUCTURE.md §1).
|
| 14 |
|
| 15 |
+
Persistence (DEV_PLAN #25 — done). Python is **read-only** on the Go-owned
|
| 16 |
+
`analyses_messages` table: it *reads* turn history (so multi-turn context works) but no
|
| 17 |
+
longer writes the user/AI turns — Go is the sole writer. This removes the double-write
|
| 18 |
+
that appeared when both Go and Python's stream persisted the same turn. Aligns chat with
|
| 19 |
+
`/tools/help`, which was already generative-only.
|
| 20 |
"""
|
| 21 |
|
| 22 |
import json
|
|
|
|
| 39 |
cache_response,
|
| 40 |
get_cached_response,
|
| 41 |
load_history,
|
|
|
|
| 42 |
)
|
| 43 |
from src.db.postgres.connection import get_db
|
| 44 |
from src.db.redis.connection import get_redis
|
|
|
|
| 88 |
logger.info("Returning cached response")
|
| 89 |
cached_text = cached["response"]
|
| 90 |
cached_sources = cached["sources"]
|
|
|
|
| 91 |
|
| 92 |
async def stream_cached():
|
| 93 |
yield {"event": "sources", "data": json.dumps(cached_sources)}
|
|
|
|
| 102 |
direct = _fast_intent(request.message)
|
| 103 |
if direct:
|
| 104 |
await cache_response(redis, cache_key, direct, sources=[])
|
|
|
|
| 105 |
|
| 106 |
async def stream_direct():
|
| 107 |
yield {"event": "sources", "data": json.dumps([])}
|
|
|
|
| 140 |
# Only cache stateless `chat` replies (see _CACHEABLE_INTENTS).
|
| 141 |
if effective_intent in _CACHEABLE_INTENTS:
|
| 142 |
await cache_response(redis, cache_key, full_response, sources=sources)
|
| 143 |
+
# Persistence is Go's job now (DEV_PLAN #25): Python reads history but
|
| 144 |
+
# no longer writes turns, so Go stays the sole writer of analyses_messages.
|
|
|
|
|
|
|
|
|
|
|
|
|
| 145 |
yield done_event
|
| 146 |
elif event["event"] == "status":
|
| 147 |
# slow-path progress: forward so the client shows activity.
|
src/config/settings.py
CHANGED
|
@@ -17,12 +17,10 @@ class Settings(BaseSettings):
|
|
| 17 |
)
|
| 18 |
|
| 19 |
# Feature flags
|
| 20 |
-
#
|
| 21 |
-
# (Planner -> TaskRunner -> Assembler)
|
| 22 |
-
#
|
| 23 |
-
#
|
| 24 |
-
# real source lands, so this stays opt-in.
|
| 25 |
-
enable_slow_path: bool = Field(alias="enable_slow_path", default=False)
|
| 26 |
|
| 27 |
# DEPRECATED 2026-06-24: the problem_validated gate was removed (the goal is now
|
| 28 |
# user-entered objective + business_questions, no agent validation). This flag no
|
|
@@ -65,6 +63,21 @@ class Settings(BaseSettings):
|
|
| 65 |
azureai_container_name: str = Field(alias="azureai__container__name", default="")
|
| 66 |
azureai_container_account_name: str = Field(alias="azureai__container__account__name", default="")
|
| 67 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 68 |
# Langfuse
|
| 69 |
LANGFUSE_PUBLIC_KEY: str
|
| 70 |
LANGFUSE_SECRET_KEY: str
|
|
|
|
| 17 |
)
|
| 18 |
|
| 19 |
# Feature flags
|
| 20 |
+
# REMOVED 2026-07-02: the former ENABLE_SLOW_PATH flag is gone. `structured_flow`
|
| 21 |
+
# now always runs the analytical slow path (Planner -> TaskRunner -> Assembler), so
|
| 22 |
+
# every structured question persists a report_inputs record (reports no longer
|
| 23 |
+
# depend on flipping a flag). extra="allow" ignores a stale ENABLE_SLOW_PATH in .env.
|
|
|
|
|
|
|
| 24 |
|
| 25 |
# DEPRECATED 2026-06-24: the problem_validated gate was removed (the goal is now
|
| 26 |
# user-entered objective + business_questions, no agent validation). This flag no
|
|
|
|
| 63 |
azureai_container_name: str = Field(alias="azureai__container__name", default="")
|
| 64 |
azureai_container_account_name: str = Field(alias="azureai__container__account__name", default="")
|
| 65 |
|
| 66 |
+
# Object storage provider toggle. Mirrors the Go data plane's `storage.provider`
|
| 67 |
+
# (see Orchestrator config: azure_blob | supabase_s3). Blank falls back to azure_blob.
|
| 68 |
+
# Tabular query execution reads the processed Parquet from whichever backend is active.
|
| 69 |
+
storage_provider: str = Field(alias="STORAGE_PROVIDER", default="azure_blob")
|
| 70 |
+
|
| 71 |
+
# Supabase S3-compatible object storage (active when storage_provider == "supabase_s3").
|
| 72 |
+
# BRIDGE (Option C): lets Python read the same bucket Go writes to, until Go exposes a
|
| 73 |
+
# source-data endpoint (see docs/proposal_go_source_data_endpoint.md) — then this client
|
| 74 |
+
# is removed and Python stops touching storage directly.
|
| 75 |
+
supabase_s3_bucket: str = Field(alias="SUPABASE_S3_BUCKET", default="")
|
| 76 |
+
supabase_s3_endpoint: str = Field(alias="SUPABASE_S3_ENDPOINT", default="")
|
| 77 |
+
supabase_s3_region: str = Field(alias="SUPABASE_S3_REGION", default="")
|
| 78 |
+
supabase_s3_access_key_id: str = Field(alias="SUPABASE_S3_ACCESS_KEY_ID", default="")
|
| 79 |
+
supabase_s3_secret_access_key: str = Field(alias="SUPABASE_S3_SECRET_ACCESS_KEY", default="")
|
| 80 |
+
|
| 81 |
# Langfuse
|
| 82 |
LANGFUSE_PUBLIC_KEY: str
|
| 83 |
LANGFUSE_SECRET_KEY: str
|
src/query/executor/tabular.py
CHANGED
|
@@ -29,13 +29,18 @@ from .base import BaseExecutor, QueryResult
|
|
| 29 |
logger = get_logger("tabular_executor")
|
| 30 |
|
| 31 |
_AZ_BLOB_PREFIX = "az_blob://"
|
|
|
|
|
|
|
|
|
|
|
|
|
| 32 |
_ROW_HARD_CAP = 10_000
|
| 33 |
|
| 34 |
|
| 35 |
class TabularExecutor(BaseExecutor):
|
| 36 |
"""Executes compiled pandas chain on a Parquet blob.
|
| 37 |
|
| 38 |
-
`fetch_blob` is injectable for tests — defaults to
|
|
|
|
| 39 |
"""
|
| 40 |
|
| 41 |
def __init__(
|
|
@@ -49,6 +54,16 @@ class TabularExecutor(BaseExecutor):
|
|
| 49 |
|
| 50 |
@staticmethod
|
| 51 |
async def _default_fetch_blob(blob_name: str) -> bytes:
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 52 |
from ...storage.az_blob.az_blob import blob_storage
|
| 53 |
|
| 54 |
return await blob_storage.download_file(blob_name)
|
|
@@ -157,15 +172,19 @@ def _resolve_blob_name(source: Source, table: Table) -> str:
|
|
| 157 |
on the upload pipeline preserving the file extension, which it does today
|
| 158 |
because `Document.filename` is set once at upload and never renamed.
|
| 159 |
"""
|
| 160 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
| 161 |
raise ValueError(
|
| 162 |
-
f"TabularExecutor expects 'az_blob://...'
|
| 163 |
-
f"got {source.location_ref!r}"
|
| 164 |
)
|
| 165 |
-
path = source.location_ref[len(
|
| 166 |
parts = path.split("/", 1)
|
| 167 |
if len(parts) != 2 or not parts[0] or not parts[1]:
|
| 168 |
-
raise ValueError(f"Malformed
|
| 169 |
user_id, document_id = parts
|
| 170 |
is_xlsx = source.name.lower().endswith(".xlsx")
|
| 171 |
sheet_name = table.name if is_xlsx else None
|
|
|
|
| 29 |
logger = get_logger("tabular_executor")
|
| 30 |
|
| 31 |
_AZ_BLOB_PREFIX = "az_blob://"
|
| 32 |
+
# Go's Supabase S3 data plane writes location_ref with this prefix instead of az_blob://.
|
| 33 |
+
# Both encode the same path structure after the prefix: {user_id}/{document_id}.
|
| 34 |
+
_OBJECT_STORAGE_PREFIX = "object_storage://"
|
| 35 |
+
_LOCATION_REF_PREFIXES = (_AZ_BLOB_PREFIX, _OBJECT_STORAGE_PREFIX)
|
| 36 |
_ROW_HARD_CAP = 10_000
|
| 37 |
|
| 38 |
|
| 39 |
class TabularExecutor(BaseExecutor):
|
| 40 |
"""Executes compiled pandas chain on a Parquet blob.
|
| 41 |
|
| 42 |
+
`fetch_blob` is injectable for tests — defaults to the storage backend
|
| 43 |
+
selected by `settings.storage_provider` (azure_blob | supabase_s3).
|
| 44 |
"""
|
| 45 |
|
| 46 |
def __init__(
|
|
|
|
| 54 |
|
| 55 |
@staticmethod
|
| 56 |
async def _default_fetch_blob(blob_name: str) -> bytes:
|
| 57 |
+
# Pick the storage backend by the same toggle the Go data plane uses.
|
| 58 |
+
# Blank/unknown falls back to Azure Blob (the pre-migration default).
|
| 59 |
+
from ...config.settings import settings
|
| 60 |
+
|
| 61 |
+
provider = (settings.storage_provider or "").strip().lower()
|
| 62 |
+
if provider == "supabase_s3":
|
| 63 |
+
from ...storage.object_storage import object_storage
|
| 64 |
+
|
| 65 |
+
return await object_storage.download_file(blob_name)
|
| 66 |
+
|
| 67 |
from ...storage.az_blob.az_blob import blob_storage
|
| 68 |
|
| 69 |
return await blob_storage.download_file(blob_name)
|
|
|
|
| 172 |
on the upload pipeline preserving the file extension, which it does today
|
| 173 |
because `Document.filename` is set once at upload and never renamed.
|
| 174 |
"""
|
| 175 |
+
matched_prefix = next(
|
| 176 |
+
(p for p in _LOCATION_REF_PREFIXES if source.location_ref.startswith(p)),
|
| 177 |
+
None,
|
| 178 |
+
)
|
| 179 |
+
if matched_prefix is None:
|
| 180 |
raise ValueError(
|
| 181 |
+
f"TabularExecutor expects 'az_blob://...' or 'object_storage://...' "
|
| 182 |
+
f"location_ref, got {source.location_ref!r}"
|
| 183 |
)
|
| 184 |
+
path = source.location_ref[len(matched_prefix):]
|
| 185 |
parts = path.split("/", 1)
|
| 186 |
if len(parts) != 2 or not parts[0] or not parts[1]:
|
| 187 |
+
raise ValueError(f"Malformed location_ref: {source.location_ref!r}")
|
| 188 |
user_id, document_id = parts
|
| 189 |
is_xlsx = source.name.lower().endswith(".xlsx")
|
| 190 |
sheet_name = table.name if is_xlsx else None
|
src/storage/object_storage/__init__.py
ADDED
|
@@ -0,0 +1,13 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 1 |
+
"""Supabase S3-compatible object storage (read side).
|
| 2 |
+
|
| 3 |
+
BRIDGE MODULE (Option C) — self-contained on purpose. It exists only so the
|
| 4 |
+
tabular query path can read the processed Parquet from the same bucket the Go
|
| 5 |
+
data plane writes to, after Go switched storage provider Azure Blob -> Supabase S3.
|
| 6 |
+
|
| 7 |
+
Once Go exposes a source-data endpoint (docs/proposal_go_source_data_endpoint.md),
|
| 8 |
+
delete this whole package and point TabularExecutor at that endpoint instead.
|
| 9 |
+
"""
|
| 10 |
+
|
| 11 |
+
from .supabase_s3 import object_storage
|
| 12 |
+
|
| 13 |
+
__all__ = ["object_storage"]
|
src/storage/object_storage/supabase_s3.py
ADDED
|
@@ -0,0 +1,85 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 1 |
+
"""Supabase S3-compatible object storage client (read side).
|
| 2 |
+
|
| 3 |
+
Mirrors the Go data plane's Supabase S3 client (Orchestrator
|
| 4 |
+
`internal/documents/supabase_s3.go`): path-style addressing against a custom
|
| 5 |
+
endpoint, static credentials, single bucket.
|
| 6 |
+
|
| 7 |
+
boto3 is synchronous; download runs in a worker thread so the async call sites
|
| 8 |
+
(TabularExecutor._fetch_blob) stay non-blocking. Only the read path is
|
| 9 |
+
implemented — Python is read-only here; Go owns all writes.
|
| 10 |
+
"""
|
| 11 |
+
|
| 12 |
+
from __future__ import annotations
|
| 13 |
+
|
| 14 |
+
import asyncio
|
| 15 |
+
|
| 16 |
+
from src.config.settings import settings
|
| 17 |
+
from src.middlewares.logging import get_logger
|
| 18 |
+
|
| 19 |
+
logger = get_logger("supabase_s3")
|
| 20 |
+
|
| 21 |
+
|
| 22 |
+
class SupabaseS3Storage:
|
| 23 |
+
"""Read-only client for the Supabase S3 bucket Go writes to."""
|
| 24 |
+
|
| 25 |
+
def __init__(self) -> None:
|
| 26 |
+
self._bucket = settings.supabase_s3_bucket
|
| 27 |
+
self._endpoint = settings.supabase_s3_endpoint.rstrip("/")
|
| 28 |
+
self._region = settings.supabase_s3_region
|
| 29 |
+
self._access_key_id = settings.supabase_s3_access_key_id
|
| 30 |
+
self._secret_access_key = settings.supabase_s3_secret_access_key
|
| 31 |
+
self._client = None # lazy — avoid import-time failure on Azure deployments
|
| 32 |
+
|
| 33 |
+
def _is_configured(self) -> bool:
|
| 34 |
+
return all(
|
| 35 |
+
(
|
| 36 |
+
self._bucket,
|
| 37 |
+
self._endpoint,
|
| 38 |
+
self._region,
|
| 39 |
+
self._access_key_id,
|
| 40 |
+
self._secret_access_key,
|
| 41 |
+
)
|
| 42 |
+
)
|
| 43 |
+
|
| 44 |
+
def _get_client(self):
|
| 45 |
+
if self._client is None:
|
| 46 |
+
if not self._is_configured():
|
| 47 |
+
raise RuntimeError(
|
| 48 |
+
"Supabase S3 storage is not fully configured "
|
| 49 |
+
"(set STORAGE_PROVIDER=supabase_s3 and SUPABASE_S3_* env vars)"
|
| 50 |
+
)
|
| 51 |
+
import boto3
|
| 52 |
+
from botocore.config import Config
|
| 53 |
+
|
| 54 |
+
self._client = boto3.client(
|
| 55 |
+
"s3",
|
| 56 |
+
endpoint_url=self._endpoint,
|
| 57 |
+
region_name=self._region,
|
| 58 |
+
aws_access_key_id=self._access_key_id,
|
| 59 |
+
aws_secret_access_key=self._secret_access_key,
|
| 60 |
+
config=Config(
|
| 61 |
+
signature_version="s3v4",
|
| 62 |
+
s3={"addressing_style": "path"}, # path-style, like Go's UsePathStyle
|
| 63 |
+
),
|
| 64 |
+
)
|
| 65 |
+
return self._client
|
| 66 |
+
|
| 67 |
+
def _download_sync(self, object_name: str) -> bytes:
|
| 68 |
+
client = self._get_client()
|
| 69 |
+
resp = client.get_object(Bucket=self._bucket, Key=object_name)
|
| 70 |
+
return resp["Body"].read()
|
| 71 |
+
|
| 72 |
+
async def download_file(self, object_name: str) -> bytes:
|
| 73 |
+
"""Download an object's bytes. Interface-compatible with AzureBlobStorage.download_file."""
|
| 74 |
+
try:
|
| 75 |
+
logger.info(f"Downloading object {object_name}")
|
| 76 |
+
content = await asyncio.to_thread(self._download_sync, object_name)
|
| 77 |
+
logger.info(f"Successfully downloaded {object_name} ({len(content)} bytes)")
|
| 78 |
+
return content
|
| 79 |
+
except Exception as e:
|
| 80 |
+
logger.error(f"Failed to download object {object_name}", error=str(e))
|
| 81 |
+
raise
|
| 82 |
+
|
| 83 |
+
|
| 84 |
+
# Singleton (lazy client construction — safe to import even when not configured)
|
| 85 |
+
object_storage = SupabaseS3Storage()
|