Commit ·
ce20d89
1
Parent(s): b272cc7
fixes: chain singleton, parquet logger, SSE event consistency, remove double DB fetch
Browse files- src/api/v1/chat.py +2 -1
- src/api/v1/document.py +1 -2
- src/pipeline/document_pipeline.py +1 -1
- src/query/planner/service.py +11 -1
- src/storage/parquet.py +2 -2
src/api/v1/chat.py
CHANGED
|
@@ -176,7 +176,8 @@ async def chat_stream(request: ChatRequest, db: AsyncSession = Depends(get_db)):
|
|
| 176 |
|
| 177 |
async def stream_direct():
|
| 178 |
yield {"event": "sources", "data": json.dumps([])}
|
| 179 |
-
yield {"event": "
|
|
|
|
| 180 |
|
| 181 |
return EventSourceResponse(stream_direct())
|
| 182 |
|
|
|
|
| 176 |
|
| 177 |
async def stream_direct():
|
| 178 |
yield {"event": "sources", "data": json.dumps([])}
|
| 179 |
+
yield {"event": "chunk", "data": direct}
|
| 180 |
+
yield {"event": "done", "data": ""}
|
| 181 |
|
| 182 |
return EventSourceResponse(stream_direct())
|
| 183 |
|
src/api/v1/document.py
CHANGED
|
@@ -105,8 +105,7 @@ async def process_document(
|
|
| 105 |
"""Process document and ingest to vector index."""
|
| 106 |
data = await document_pipeline.process(document_id, user_id, db)
|
| 107 |
|
| 108 |
-
|
| 109 |
-
if document and document.file_type in ("csv", "xlsx"):
|
| 110 |
from src.pipeline.triggers import on_tabular_uploaded
|
| 111 |
try:
|
| 112 |
await on_tabular_uploaded(document_id, user_id)
|
|
|
|
| 105 |
"""Process document and ingest to vector index."""
|
| 106 |
data = await document_pipeline.process(document_id, user_id, db)
|
| 107 |
|
| 108 |
+
if data["file_type"] in ("csv", "xlsx"):
|
|
|
|
| 109 |
from src.pipeline.triggers import on_tabular_uploaded
|
| 110 |
try:
|
| 111 |
await on_tabular_uploaded(document_id, user_id)
|
src/pipeline/document_pipeline.py
CHANGED
|
@@ -66,7 +66,7 @@ class DocumentPipeline:
|
|
| 66 |
await document_service.update_document_status(db, document_id, "completed")
|
| 67 |
|
| 68 |
logger.info(f"Processed document {document_id}: {chunks_count} chunks")
|
| 69 |
-
return {"document_id": document_id, "chunks_processed": chunks_count}
|
| 70 |
|
| 71 |
except Exception as e:
|
| 72 |
logger.error(f"Processing failed for document {document_id}", error=str(e))
|
|
|
|
| 66 |
await document_service.update_document_status(db, document_id, "completed")
|
| 67 |
|
| 68 |
logger.info(f"Processed document {document_id}: {chunks_count} chunks")
|
| 69 |
+
return {"document_id": document_id, "chunks_processed": chunks_count, "file_type": document.file_type}
|
| 70 |
|
| 71 |
except Exception as e:
|
| 72 |
logger.error(f"Processing failed for document {document_id}", error=str(e))
|
src/query/planner/service.py
CHANGED
|
@@ -55,6 +55,16 @@ def _build_default_chain() -> Runnable:
|
|
| 55 |
return prompt | llm.with_structured_output(QueryIR)
|
| 56 |
|
| 57 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 58 |
class QueryPlannerService:
|
| 59 |
"""Wraps the LLM call with structured-output parsing into QueryIR.
|
| 60 |
|
|
@@ -68,7 +78,7 @@ class QueryPlannerService:
|
|
| 68 |
|
| 69 |
def _ensure_chain(self) -> Runnable:
|
| 70 |
if self._chain is None:
|
| 71 |
-
self._chain =
|
| 72 |
return self._chain
|
| 73 |
|
| 74 |
async def plan(
|
|
|
|
| 55 |
return prompt | llm.with_structured_output(QueryIR)
|
| 56 |
|
| 57 |
|
| 58 |
+
_default_chain: Runnable | None = None
|
| 59 |
+
|
| 60 |
+
|
| 61 |
+
def _get_default_chain() -> Runnable:
|
| 62 |
+
global _default_chain
|
| 63 |
+
if _default_chain is None:
|
| 64 |
+
_default_chain = _build_default_chain()
|
| 65 |
+
return _default_chain
|
| 66 |
+
|
| 67 |
+
|
| 68 |
class QueryPlannerService:
|
| 69 |
"""Wraps the LLM call with structured-output parsing into QueryIR.
|
| 70 |
|
|
|
|
| 78 |
|
| 79 |
def _ensure_chain(self) -> Runnable:
|
| 80 |
if self._chain is None:
|
| 81 |
+
self._chain = _get_default_chain()
|
| 82 |
return self._chain
|
| 83 |
|
| 84 |
async def plan(
|
src/storage/parquet.py
CHANGED
|
@@ -18,7 +18,7 @@ import pandas as pd
|
|
| 18 |
from src.middlewares.logging import get_logger
|
| 19 |
from src.storage.az_blob.az_blob import blob_storage
|
| 20 |
|
| 21 |
-
logger = get_logger("
|
| 22 |
|
| 23 |
|
| 24 |
def _safe_sheet_name(sheet_name: str) -> str:
|
|
@@ -27,7 +27,7 @@ def _safe_sheet_name(sheet_name: str) -> str:
|
|
| 27 |
|
| 28 |
def parquet_blob_name(user_id: str, document_id: str, sheet_name: str | None = None) -> str:
|
| 29 |
"""Construct deterministic Parquet blob name."""
|
| 30 |
-
if sheet_name:
|
| 31 |
return f"{user_id}/{document_id}__{_safe_sheet_name(sheet_name)}.parquet"
|
| 32 |
return f"{user_id}/{document_id}.parquet"
|
| 33 |
|
|
|
|
| 18 |
from src.middlewares.logging import get_logger
|
| 19 |
from src.storage.az_blob.az_blob import blob_storage
|
| 20 |
|
| 21 |
+
logger = get_logger("storage.parquet")
|
| 22 |
|
| 23 |
|
| 24 |
def _safe_sheet_name(sheet_name: str) -> str:
|
|
|
|
| 27 |
|
| 28 |
def parquet_blob_name(user_id: str, document_id: str, sheet_name: str | None = None) -> str:
|
| 29 |
"""Construct deterministic Parquet blob name."""
|
| 30 |
+
if sheet_name is not None:
|
| 31 |
return f"{user_id}/{document_id}__{_safe_sheet_name(sheet_name)}.parquet"
|
| 32 |
return f"{user_id}/{document_id}.parquet"
|
| 33 |
|