/fix analysis + report_inputs and storage update

#9
by rhbt6767 - opened
DEV_PLAN.md CHANGED
@@ -173,7 +173,7 @@ Status legend: ⬜ not started · 🔄 in progress · ✅ done · ⛔ blocked ·
173
  | 22 | Finalize `report_inputs` schema → hand to Harry for the dedorch migration | Rifqi → Harry | 🔄 | **DDL ready** (uuid `id`/`analysis_id` + FK→`analyses(id)`; `user_id`/`plan_id` text; `data` jsonb = serialized `AnalysisRecord`, shape documented). dedorch has empty `analysis_records` → rename. Resolves #16. **Action: send Harry the DDL + `data` shape** |
174
  | 23 | Report markdown formatting: tables, **bold**, *italic*, horizontal separators | Sofhia | ✅ | Done 2026-06-25. Added `---` separators between header + each section in `_render_markdown`. Tables (EDA) / bold (method labels) / italic (meta + citations) already emitted. Relaxed `report_summary.md` to allow inline `**bold**`/`*italic*` for emphasis (kept no-headings/no-bullets so it doesn't duplicate the section structure / Key Findings). Compile + ruff clean |
175
  | 24 | Clarify report input contract: records table (+ `last_report` for edit mode?) | Rifqi/Sofhia ↔ Harry | ⬜ new | Edit-mode input left open at the checkpoint |
176
- | 25 | Migrate Python chat path to Go `analyses_messages` (+ `analyses`) | Rifqi ↔ Harry | | **Bigger than "confirm" (verified 2026-06-25):** dedorch `rooms` + `chat_messages` are **deprecated** (`zdeprecated_*`). Python's `Room`/`ChatMessage` models + `chat.py` `load_history`/`save_messages` target them → **break post-cutover**. Move history read/write to `analyses_messages` before the conn-string cutover |
177
  | 26 | **Charts (DEFERRED):** store Plotly JSON in a future `chart` table (not matplotlib PNG) | — | ⏸️ | After the markdown path is done end-to-end |
178
  | 27 | **Images (DEFERRED):** image table (id, analysis_id, msg/report ref, order) + originals in a bucket | — | ⏸️ | Maintenance-heavy; parked |
179
  | 28 | **UI research** (FE): new-analysis form, knowledge menu (user vs analysis level), report artifacts + version selector | Team | ⬜ new | No dedicated UI person; interview + old analysis UI removed |
 
173
  | 22 | Finalize `report_inputs` schema → hand to Harry for the dedorch migration | Rifqi → Harry | 🔄 | **DDL ready** (uuid `id`/`analysis_id` + FK→`analyses(id)`; `user_id`/`plan_id` text; `data` jsonb = serialized `AnalysisRecord`, shape documented). dedorch has empty `analysis_records` → rename. Resolves #16. **Action: send Harry the DDL + `data` shape** |
174
  | 23 | Report markdown formatting: tables, **bold**, *italic*, horizontal separators | Sofhia | ✅ | Done 2026-06-25. Added `---` separators between header + each section in `_render_markdown`. Tables (EDA) / bold (method labels) / italic (meta + citations) already emitted. Relaxed `report_summary.md` to allow inline `**bold**`/`*italic*` for emphasis (kept no-headings/no-bullets so it doesn't duplicate the section structure / Key Findings). Compile + ruff clean |
175
  | 24 | Clarify report input contract: records table (+ `last_report` for edit mode?) | Rifqi/Sofhia ↔ Harry | ⬜ new | Edit-mode input left open at the checkpoint |
176
+ | 25 | Migrate Python chat path to Go `analyses_messages` (+ `analyses`) | Rifqi ↔ Harry | | Done 2026-07-02. Read path already on `analyses_messages` (commit `0066161`). This change makes Python **read-only**: removed the `save_messages` calls from `/api/v2/chat/stream` so **Go is the sole writer** fixes the double-write both Go+Python were producing. `load_history` still reads `analyses_messages`. v1 `/chat/stream` is unwired so left untouched |
177
  | 26 | **Charts (DEFERRED):** store Plotly JSON in a future `chart` table (not matplotlib PNG) | — | ⏸️ | After the markdown path is done end-to-end |
178
  | 27 | **Images (DEFERRED):** image table (id, analysis_id, msg/report ref, order) + originals in a bucket | — | ⏸️ | Maintenance-heavy; parked |
179
  | 28 | **UI research** (FE): new-analysis form, knowledge menu (user vs analysis level), report artifacts + version selector | Team | ⬜ new | No dedicated UI person; interview + old analysis UI removed |
REPO_STATUS.md CHANGED
@@ -129,8 +129,10 @@ GET /report/{analysis_id}/{ver} → fetch one version
129
  ```
130
 
131
  Two facts to internalise:
132
- - **Records only exist on the slow path.** With `ENABLE_SLOW_PATH=false` (the default) no
133
- records accumulate, so generation 409s by design, not a bug.
 
 
134
  - **dedorch `reports` stores markdown only.** Structured report fields are computed at
135
  generation, rendered into `rendered_markdown`, and only the markdown is persisted; on
136
  read-back the structured fields come back empty.
@@ -289,7 +291,7 @@ only.
289
 
290
  | Flag | Where | Default | Effect |
291
  |---|---|---|---|
292
- | `ENABLE_SLOW_PATH` | `settings.enable_slow_path` | **off** | Route `structured_flow` through Planner/TaskRunner/Assembler (vs single-query `QueryService`). Records persist only on the slow path reports require this on. |
293
  | `ENABLE_GATE` | `settings.enable_gate` | **off** | **Deprecated 2026-06-25** — gate neutered; the flag has no effect. Kept to avoid `.env` churn. |
294
  | `SKIP_INIT_DB` | `settings.skip_init_db` (.env/env) | **on** | Skip `init_db()` on startup — the dedorch cutover switch. **Defaults TRUE** (Go owns the dedorch schema); set `false` only for a local Python-owned DB. |
295
  | `enable_tracing` | hardcoded `True` in `chat.py` | on (endpoint) | Langfuse tracing. |
 
129
  ```
130
 
131
  Two facts to internalise:
132
+ - **Records only exist on the slow path.** The slow path is now **always on** for
133
+ `structured_flow` (the `ENABLE_SLOW_PATH` flag was removed 2026-07-02), so every
134
+ structured question persists a record. Reports still 409 until at least one `analyze_*`
135
+ task has actually succeeded (chat/help/check/unstructured turns write no record).
136
  - **dedorch `reports` stores markdown only.** Structured report fields are computed at
137
  generation, rendered into `rendered_markdown`, and only the markdown is persisted; on
138
  read-back the structured fields come back empty.
 
291
 
292
  | Flag | Where | Default | Effect |
293
  |---|---|---|---|
294
+ | ~~`ENABLE_SLOW_PATH`~~ | | **removed 2026-07-02** | Flag deleted. `structured_flow` now **always** runs Planner/TaskRunner/Assembler (the single-query `QueryService` fast path was retired from the chat handler), so records always persist. `extra="allow"` ignores a stale `ENABLE_SLOW_PATH` left in any `.env`. |
295
  | `ENABLE_GATE` | `settings.enable_gate` | **off** | **Deprecated 2026-06-25** — gate neutered; the flag has no effect. Kept to avoid `.env` churn. |
296
  | `SKIP_INIT_DB` | `settings.skip_init_db` (.env/env) | **on** | Skip `init_db()` on startup — the dedorch cutover switch. **Defaults TRUE** (Go owns the dedorch schema); set `false` only for a local Python-owned DB. |
297
  | `enable_tracing` | hardcoded `True` in `chat.py` | on (endpoint) | Langfuse tracing. |
src/agents/chat_handler.py CHANGED
@@ -5,7 +5,8 @@ End-to-end flow per user message:
5
  1. `OrchestratorAgent.classify` → RouterDecision (one of six intents).
6
  2. Route by intent:
7
  - `chat` → no context. Pass straight to ChatbotAgent.
8
- - `structured_flow` → CatalogReader → slow path / QueryService.
 
9
  - `unstructured_flow` → DocumentRetriever (RAG over PGVector) →
10
  list[DocumentChunk].
11
  - `check` → check_data / check_knowledge tool → rendered table.
@@ -48,7 +49,6 @@ from .orchestration import OrchestratorAgent
48
 
49
  if TYPE_CHECKING:
50
  from ..catalog.reader import CatalogReader
51
- from ..query.service import QueryService
52
  from ..retrieval.router import RetrievalRouter
53
  from .gate import AnalysisState
54
  from .slow_path.coordinator import SlowPathCoordinator
@@ -75,10 +75,8 @@ class ChatHandler:
75
  intent_router: OrchestratorAgent | None = None,
76
  answer_agent: ChatbotAgent | None = None,
77
  catalog_reader: CatalogReader | None = None,
78
- query_service: QueryService | None = None,
79
  document_retriever: RetrievalRouter | None = None,
80
  *,
81
- enable_slow_path: bool = False,
82
  slow_path_coordinator_factory: (
83
  Callable[[str], SlowPathCoordinator] | None
84
  ) = None,
@@ -94,16 +92,13 @@ class ChatHandler:
94
  self._intent_router = intent_router
95
  self._answer_agent = answer_agent
96
  self._catalog_reader = catalog_reader
97
- self._query_service = query_service
98
  self._document_retriever = document_retriever
99
  # Langfuse tracing (tokens + latency). OFF by default so tests never hit
100
  # Langfuse; the live endpoint opts in with ChatHandler(enable_tracing=True).
101
  self._enable_tracing = enable_tracing
102
- # Slow analytical path (Planner -> TaskRunner -> Assembler). OFF by default:
103
- # gated until the lead's real BusinessContext lands. When True, `structured`
104
- # intents route here instead of the single-query QueryService path. The
105
  # factory + store are injectable for tests.
106
- self._enable_slow_path = enable_slow_path
107
  self._slow_path_factory = slow_path_coordinator_factory
108
  self._analysis_store = analysis_store
109
  # `check` skill: builds the data-access invoker (check_data/check_knowledge)
@@ -144,13 +139,6 @@ class ChatHandler:
144
  self._catalog_reader = CatalogReader(CatalogStore())
145
  return self._catalog_reader
146
 
147
- def _get_query_service(self) -> QueryService:
148
- if self._query_service is None:
149
- from ..query.service import QueryService
150
-
151
- self._query_service = QueryService()
152
- return self._query_service
153
-
154
  def _get_document_retriever(self) -> RetrievalRouter:
155
  if self._document_retriever is None:
156
  from ..retrieval.router import RetrievalRouter
@@ -351,15 +339,13 @@ class ChatHandler:
351
  bound = await self._bound_source_ids(analysis_id)
352
  reader = _ScopedCatalogReader(req_reader, bound) if bound else req_reader
353
  catalog = await reader.read(user_id, "structured")
354
- if self._enable_slow_path:
355
- async for event in self._run_slow_path(
356
- user_id, rewritten, catalog, tracer, reader, analysis_id
357
- ):
358
- yield event
359
- return
360
- query_result = await self._get_query_service().run(
361
- user_id, rewritten, catalog
362
- )
363
  except Exception as e:
364
  logger.error(
365
  "structured route failed",
 
5
  1. `OrchestratorAgent.classify` → RouterDecision (one of six intents).
6
  2. Route by intent:
7
  - `chat` → no context. Pass straight to ChatbotAgent.
8
+ - `structured_flow` → CatalogReader → slow analytical path
9
+ (Planner → TaskRunner → Assembler).
10
  - `unstructured_flow` → DocumentRetriever (RAG over PGVector) →
11
  list[DocumentChunk].
12
  - `check` → check_data / check_knowledge tool → rendered table.
 
49
 
50
  if TYPE_CHECKING:
51
  from ..catalog.reader import CatalogReader
 
52
  from ..retrieval.router import RetrievalRouter
53
  from .gate import AnalysisState
54
  from .slow_path.coordinator import SlowPathCoordinator
 
75
  intent_router: OrchestratorAgent | None = None,
76
  answer_agent: ChatbotAgent | None = None,
77
  catalog_reader: CatalogReader | None = None,
 
78
  document_retriever: RetrievalRouter | None = None,
79
  *,
 
80
  slow_path_coordinator_factory: (
81
  Callable[[str], SlowPathCoordinator] | None
82
  ) = None,
 
92
  self._intent_router = intent_router
93
  self._answer_agent = answer_agent
94
  self._catalog_reader = catalog_reader
 
95
  self._document_retriever = document_retriever
96
  # Langfuse tracing (tokens + latency). OFF by default so tests never hit
97
  # Langfuse; the live endpoint opts in with ChatHandler(enable_tracing=True).
98
  self._enable_tracing = enable_tracing
99
+ # Slow analytical path (Planner -> TaskRunner -> Assembler): the only route for
100
+ # `structured_flow` now (the ENABLE_SLOW_PATH flag was removed 2026-07-02). The
 
101
  # factory + store are injectable for tests.
 
102
  self._slow_path_factory = slow_path_coordinator_factory
103
  self._analysis_store = analysis_store
104
  # `check` skill: builds the data-access invoker (check_data/check_knowledge)
 
139
  self._catalog_reader = CatalogReader(CatalogStore())
140
  return self._catalog_reader
141
 
 
 
 
 
 
 
 
142
  def _get_document_retriever(self) -> RetrievalRouter:
143
  if self._document_retriever is None:
144
  from ..retrieval.router import RetrievalRouter
 
339
  bound = await self._bound_source_ids(analysis_id)
340
  reader = _ScopedCatalogReader(req_reader, bound) if bound else req_reader
341
  catalog = await reader.read(user_id, "structured")
342
+ # structured_flow always runs the slow analytical path (the
343
+ # ENABLE_SLOW_PATH flag was removed 2026-07-02).
344
+ async for event in self._run_slow_path(
345
+ user_id, rewritten, catalog, tracer, reader, analysis_id
346
+ ):
347
+ yield event
348
+ return
 
 
349
  except Exception as e:
350
  logger.error(
351
  "structured route failed",
src/api/v1/chat.py CHANGED
@@ -26,11 +26,10 @@ router = APIRouter(prefix="/api/v1", tags=["Chat"])
26
  # is passed into handle()), and lazily builds + caches the Orchestrator/Chatbot
27
  # chains — so reusing it keeps the Azure OpenAI clients (and their httpx/TLS pools)
28
  # warm across requests instead of re-handshaking on the first call of every request.
29
- # enable_slow_path is env-gated (ENABLE_SLOW_PATH): when on, structured intents route
30
- # Orchestrator -> Planner -> TaskRunner -> Assembler so the team can test e2e here.
31
  _chat_handler = ChatHandler(
32
  enable_tracing=True,
33
- enable_slow_path=settings.enable_slow_path,
34
  enable_gate=settings.enable_gate,
35
  )
36
 
 
26
  # is passed into handle()), and lazily builds + caches the Orchestrator/Chatbot
27
  # chains — so reusing it keeps the Azure OpenAI clients (and their httpx/TLS pools)
28
  # warm across requests instead of re-handshaking on the first call of every request.
29
+ # Structured intents always route Orchestrator -> Planner -> TaskRunner -> Assembler
30
+ # (the analytical slow path); the ENABLE_SLOW_PATH flag was removed 2026-07-02.
31
  _chat_handler = ChatHandler(
32
  enable_tracing=True,
 
33
  enable_gate=settings.enable_gate,
34
  )
35
 
src/api/v2/chat.py CHANGED
@@ -12,10 +12,11 @@
12
  Only chat moves to v2; the tools group + observability stay on `/api/v1` (contract:
13
  API_ENDPOINTS_RESTRUCTURE.md §1).
14
 
15
- ⚠️ Persistence (transitional). This mirrors v1: it still load/saves turn history via the
16
- analysis-keyed message tables so multi-turn context works in the playground. Moving the
17
- read/write to Go-owned `analyses_messages` (and making Python read-only) is DEV_PLAN #25.
18
- Note Sofhia's `/tools/help` is already generative-only align chat with that under #25.
 
19
  """
20
 
21
  import json
@@ -38,7 +39,6 @@ from src.api.v1.chat import (
38
  cache_response,
39
  get_cached_response,
40
  load_history,
41
- save_messages,
42
  )
43
  from src.db.postgres.connection import get_db
44
  from src.db.redis.connection import get_redis
@@ -88,7 +88,6 @@ async def chat_stream(request: ChatRequest, db: AsyncSession = Depends(get_db)):
88
  logger.info("Returning cached response")
89
  cached_text = cached["response"]
90
  cached_sources = cached["sources"]
91
- await save_messages(db, analysis_id, request.user_id, request.message, cached_text)
92
 
93
  async def stream_cached():
94
  yield {"event": "sources", "data": json.dumps(cached_sources)}
@@ -103,7 +102,6 @@ async def chat_stream(request: ChatRequest, db: AsyncSession = Depends(get_db)):
103
  direct = _fast_intent(request.message)
104
  if direct:
105
  await cache_response(redis, cache_key, direct, sources=[])
106
- await save_messages(db, analysis_id, request.user_id, request.message, direct)
107
 
108
  async def stream_direct():
109
  yield {"event": "sources", "data": json.dumps([])}
@@ -142,12 +140,8 @@ async def chat_stream(request: ChatRequest, db: AsyncSession = Depends(get_db)):
142
  # Only cache stateless `chat` replies (see _CACHEABLE_INTENTS).
143
  if effective_intent in _CACHEABLE_INTENTS:
144
  await cache_response(redis, cache_key, full_response, sources=sources)
145
- try:
146
- await save_messages(
147
- db, analysis_id, request.user_id, request.message, full_response
148
- )
149
- except Exception as e:
150
- logger.error("save_messages failed", analysis_id=analysis_id, error=str(e))
151
  yield done_event
152
  elif event["event"] == "status":
153
  # slow-path progress: forward so the client shows activity.
 
12
  Only chat moves to v2; the tools group + observability stay on `/api/v1` (contract:
13
  API_ENDPOINTS_RESTRUCTURE.md §1).
14
 
15
+ Persistence (DEV_PLAN #25 done). Python is **read-only** on the Go-owned
16
+ `analyses_messages` table: it *reads* turn history (so multi-turn context works) but no
17
+ longer writes the user/AI turns Go is the sole writer. This removes the double-write
18
+ that appeared when both Go and Python's stream persisted the same turn. Aligns chat with
19
+ `/tools/help`, which was already generative-only.
20
  """
21
 
22
  import json
 
39
  cache_response,
40
  get_cached_response,
41
  load_history,
 
42
  )
43
  from src.db.postgres.connection import get_db
44
  from src.db.redis.connection import get_redis
 
88
  logger.info("Returning cached response")
89
  cached_text = cached["response"]
90
  cached_sources = cached["sources"]
 
91
 
92
  async def stream_cached():
93
  yield {"event": "sources", "data": json.dumps(cached_sources)}
 
102
  direct = _fast_intent(request.message)
103
  if direct:
104
  await cache_response(redis, cache_key, direct, sources=[])
 
105
 
106
  async def stream_direct():
107
  yield {"event": "sources", "data": json.dumps([])}
 
140
  # Only cache stateless `chat` replies (see _CACHEABLE_INTENTS).
141
  if effective_intent in _CACHEABLE_INTENTS:
142
  await cache_response(redis, cache_key, full_response, sources=sources)
143
+ # Persistence is Go's job now (DEV_PLAN #25): Python reads history but
144
+ # no longer writes turns, so Go stays the sole writer of analyses_messages.
 
 
 
 
145
  yield done_event
146
  elif event["event"] == "status":
147
  # slow-path progress: forward so the client shows activity.
src/config/settings.py CHANGED
@@ -17,12 +17,10 @@ class Settings(BaseSettings):
17
  )
18
 
19
  # Feature flags
20
- # Route `structured` chat intents through the analytical SLOW PATH
21
- # (Planner -> TaskRunner -> Assembler) instead of the single-query QueryService.
22
- # Off by default; the team flips ENABLE_SLOW_PATH=true to test end-to-end from
23
- # the /chat/stream endpoint. BusinessContext is still a stub until the lead's
24
- # real source lands, so this stays opt-in.
25
- enable_slow_path: bool = Field(alias="enable_slow_path", default=False)
26
 
27
  # DEPRECATED 2026-06-24: the problem_validated gate was removed (the goal is now
28
  # user-entered objective + business_questions, no agent validation). This flag no
@@ -65,6 +63,21 @@ class Settings(BaseSettings):
65
  azureai_container_name: str = Field(alias="azureai__container__name", default="")
66
  azureai_container_account_name: str = Field(alias="azureai__container__account__name", default="")
67
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
68
  # Langfuse
69
  LANGFUSE_PUBLIC_KEY: str
70
  LANGFUSE_SECRET_KEY: str
 
17
  )
18
 
19
  # Feature flags
20
+ # REMOVED 2026-07-02: the former ENABLE_SLOW_PATH flag is gone. `structured_flow`
21
+ # now always runs the analytical slow path (Planner -> TaskRunner -> Assembler), so
22
+ # every structured question persists a report_inputs record (reports no longer
23
+ # depend on flipping a flag). extra="allow" ignores a stale ENABLE_SLOW_PATH in .env.
 
 
24
 
25
  # DEPRECATED 2026-06-24: the problem_validated gate was removed (the goal is now
26
  # user-entered objective + business_questions, no agent validation). This flag no
 
63
  azureai_container_name: str = Field(alias="azureai__container__name", default="")
64
  azureai_container_account_name: str = Field(alias="azureai__container__account__name", default="")
65
 
66
+ # Object storage provider toggle. Mirrors the Go data plane's `storage.provider`
67
+ # (see Orchestrator config: azure_blob | supabase_s3). Blank falls back to azure_blob.
68
+ # Tabular query execution reads the processed Parquet from whichever backend is active.
69
+ storage_provider: str = Field(alias="STORAGE_PROVIDER", default="azure_blob")
70
+
71
+ # Supabase S3-compatible object storage (active when storage_provider == "supabase_s3").
72
+ # BRIDGE (Option C): lets Python read the same bucket Go writes to, until Go exposes a
73
+ # source-data endpoint (see docs/proposal_go_source_data_endpoint.md) — then this client
74
+ # is removed and Python stops touching storage directly.
75
+ supabase_s3_bucket: str = Field(alias="SUPABASE_S3_BUCKET", default="")
76
+ supabase_s3_endpoint: str = Field(alias="SUPABASE_S3_ENDPOINT", default="")
77
+ supabase_s3_region: str = Field(alias="SUPABASE_S3_REGION", default="")
78
+ supabase_s3_access_key_id: str = Field(alias="SUPABASE_S3_ACCESS_KEY_ID", default="")
79
+ supabase_s3_secret_access_key: str = Field(alias="SUPABASE_S3_SECRET_ACCESS_KEY", default="")
80
+
81
  # Langfuse
82
  LANGFUSE_PUBLIC_KEY: str
83
  LANGFUSE_SECRET_KEY: str
src/query/executor/tabular.py CHANGED
@@ -29,13 +29,18 @@ from .base import BaseExecutor, QueryResult
29
  logger = get_logger("tabular_executor")
30
 
31
  _AZ_BLOB_PREFIX = "az_blob://"
 
 
 
 
32
  _ROW_HARD_CAP = 10_000
33
 
34
 
35
  class TabularExecutor(BaseExecutor):
36
  """Executes compiled pandas chain on a Parquet blob.
37
 
38
- `fetch_blob` is injectable for tests — defaults to AzureBlobStorage.
 
39
  """
40
 
41
  def __init__(
@@ -49,6 +54,16 @@ class TabularExecutor(BaseExecutor):
49
 
50
  @staticmethod
51
  async def _default_fetch_blob(blob_name: str) -> bytes:
 
 
 
 
 
 
 
 
 
 
52
  from ...storage.az_blob.az_blob import blob_storage
53
 
54
  return await blob_storage.download_file(blob_name)
@@ -157,15 +172,19 @@ def _resolve_blob_name(source: Source, table: Table) -> str:
157
  on the upload pipeline preserving the file extension, which it does today
158
  because `Document.filename` is set once at upload and never renamed.
159
  """
160
- if not source.location_ref.startswith(_AZ_BLOB_PREFIX):
 
 
 
 
161
  raise ValueError(
162
- f"TabularExecutor expects 'az_blob://...' location_ref, "
163
- f"got {source.location_ref!r}"
164
  )
165
- path = source.location_ref[len(_AZ_BLOB_PREFIX):]
166
  parts = path.split("/", 1)
167
  if len(parts) != 2 or not parts[0] or not parts[1]:
168
- raise ValueError(f"Malformed az_blob location_ref: {source.location_ref!r}")
169
  user_id, document_id = parts
170
  is_xlsx = source.name.lower().endswith(".xlsx")
171
  sheet_name = table.name if is_xlsx else None
 
29
  logger = get_logger("tabular_executor")
30
 
31
  _AZ_BLOB_PREFIX = "az_blob://"
32
+ # Go's Supabase S3 data plane writes location_ref with this prefix instead of az_blob://.
33
+ # Both encode the same path structure after the prefix: {user_id}/{document_id}.
34
+ _OBJECT_STORAGE_PREFIX = "object_storage://"
35
+ _LOCATION_REF_PREFIXES = (_AZ_BLOB_PREFIX, _OBJECT_STORAGE_PREFIX)
36
  _ROW_HARD_CAP = 10_000
37
 
38
 
39
  class TabularExecutor(BaseExecutor):
40
  """Executes compiled pandas chain on a Parquet blob.
41
 
42
+ `fetch_blob` is injectable for tests — defaults to the storage backend
43
+ selected by `settings.storage_provider` (azure_blob | supabase_s3).
44
  """
45
 
46
  def __init__(
 
54
 
55
  @staticmethod
56
  async def _default_fetch_blob(blob_name: str) -> bytes:
57
+ # Pick the storage backend by the same toggle the Go data plane uses.
58
+ # Blank/unknown falls back to Azure Blob (the pre-migration default).
59
+ from ...config.settings import settings
60
+
61
+ provider = (settings.storage_provider or "").strip().lower()
62
+ if provider == "supabase_s3":
63
+ from ...storage.object_storage import object_storage
64
+
65
+ return await object_storage.download_file(blob_name)
66
+
67
  from ...storage.az_blob.az_blob import blob_storage
68
 
69
  return await blob_storage.download_file(blob_name)
 
172
  on the upload pipeline preserving the file extension, which it does today
173
  because `Document.filename` is set once at upload and never renamed.
174
  """
175
+ matched_prefix = next(
176
+ (p for p in _LOCATION_REF_PREFIXES if source.location_ref.startswith(p)),
177
+ None,
178
+ )
179
+ if matched_prefix is None:
180
  raise ValueError(
181
+ f"TabularExecutor expects 'az_blob://...' or 'object_storage://...' "
182
+ f"location_ref, got {source.location_ref!r}"
183
  )
184
+ path = source.location_ref[len(matched_prefix):]
185
  parts = path.split("/", 1)
186
  if len(parts) != 2 or not parts[0] or not parts[1]:
187
+ raise ValueError(f"Malformed location_ref: {source.location_ref!r}")
188
  user_id, document_id = parts
189
  is_xlsx = source.name.lower().endswith(".xlsx")
190
  sheet_name = table.name if is_xlsx else None
src/storage/object_storage/__init__.py ADDED
@@ -0,0 +1,13 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Supabase S3-compatible object storage (read side).
2
+
3
+ BRIDGE MODULE (Option C) — self-contained on purpose. It exists only so the
4
+ tabular query path can read the processed Parquet from the same bucket the Go
5
+ data plane writes to, after Go switched storage provider Azure Blob -> Supabase S3.
6
+
7
+ Once Go exposes a source-data endpoint (docs/proposal_go_source_data_endpoint.md),
8
+ delete this whole package and point TabularExecutor at that endpoint instead.
9
+ """
10
+
11
+ from .supabase_s3 import object_storage
12
+
13
+ __all__ = ["object_storage"]
src/storage/object_storage/supabase_s3.py ADDED
@@ -0,0 +1,85 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Supabase S3-compatible object storage client (read side).
2
+
3
+ Mirrors the Go data plane's Supabase S3 client (Orchestrator
4
+ `internal/documents/supabase_s3.go`): path-style addressing against a custom
5
+ endpoint, static credentials, single bucket.
6
+
7
+ boto3 is synchronous; download runs in a worker thread so the async call sites
8
+ (TabularExecutor._fetch_blob) stay non-blocking. Only the read path is
9
+ implemented — Python is read-only here; Go owns all writes.
10
+ """
11
+
12
+ from __future__ import annotations
13
+
14
+ import asyncio
15
+
16
+ from src.config.settings import settings
17
+ from src.middlewares.logging import get_logger
18
+
19
+ logger = get_logger("supabase_s3")
20
+
21
+
22
+ class SupabaseS3Storage:
23
+ """Read-only client for the Supabase S3 bucket Go writes to."""
24
+
25
+ def __init__(self) -> None:
26
+ self._bucket = settings.supabase_s3_bucket
27
+ self._endpoint = settings.supabase_s3_endpoint.rstrip("/")
28
+ self._region = settings.supabase_s3_region
29
+ self._access_key_id = settings.supabase_s3_access_key_id
30
+ self._secret_access_key = settings.supabase_s3_secret_access_key
31
+ self._client = None # lazy — avoid import-time failure on Azure deployments
32
+
33
+ def _is_configured(self) -> bool:
34
+ return all(
35
+ (
36
+ self._bucket,
37
+ self._endpoint,
38
+ self._region,
39
+ self._access_key_id,
40
+ self._secret_access_key,
41
+ )
42
+ )
43
+
44
+ def _get_client(self):
45
+ if self._client is None:
46
+ if not self._is_configured():
47
+ raise RuntimeError(
48
+ "Supabase S3 storage is not fully configured "
49
+ "(set STORAGE_PROVIDER=supabase_s3 and SUPABASE_S3_* env vars)"
50
+ )
51
+ import boto3
52
+ from botocore.config import Config
53
+
54
+ self._client = boto3.client(
55
+ "s3",
56
+ endpoint_url=self._endpoint,
57
+ region_name=self._region,
58
+ aws_access_key_id=self._access_key_id,
59
+ aws_secret_access_key=self._secret_access_key,
60
+ config=Config(
61
+ signature_version="s3v4",
62
+ s3={"addressing_style": "path"}, # path-style, like Go's UsePathStyle
63
+ ),
64
+ )
65
+ return self._client
66
+
67
+ def _download_sync(self, object_name: str) -> bytes:
68
+ client = self._get_client()
69
+ resp = client.get_object(Bucket=self._bucket, Key=object_name)
70
+ return resp["Body"].read()
71
+
72
+ async def download_file(self, object_name: str) -> bytes:
73
+ """Download an object's bytes. Interface-compatible with AzureBlobStorage.download_file."""
74
+ try:
75
+ logger.info(f"Downloading object {object_name}")
76
+ content = await asyncio.to_thread(self._download_sync, object_name)
77
+ logger.info(f"Successfully downloaded {object_name} ({len(content)} bytes)")
78
+ return content
79
+ except Exception as e:
80
+ logger.error(f"Failed to download object {object_name}", error=str(e))
81
+ raise
82
+
83
+
84
+ # Singleton (lazy client construction — safe to import even when not configured)
85
+ object_storage = SupabaseS3Storage()