File size: 1,340 Bytes
ba550a5 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 | """Executor for tabular document sources (source_type="document", file_type csv/xlsx).
Flow:
1. Group RetrievalResult chunks by document_id.
2. For each document: download bytes from Azure Blob -> read with pandas.
3. Filter DataFrame to relevant columns identified by retrieval.
4. Return QueryResult per document.
"""
from sqlalchemy.ext.asyncio import AsyncSession
from src.middlewares.logging import get_logger
from src.query.base import BaseExecutor, QueryResult
from src.rag.base import RetrievalResult
logger = get_logger("tabular_executor")
_TABULAR_FILE_TYPES = ("csv", "xlsx")
class TabularExecutor(BaseExecutor):
async def execute(
self,
results: list[RetrievalResult],
user_id: str,
db: AsyncSession,
limit: int = 100,
) -> list[QueryResult]:
# TODO: implement
# 1. filter results where source_type == "document" and file_type in _TABULAR_FILE_TYPES
# 2. group by document_id -> list of column_names
# 3. per group: look up Document by document_id -> get blob_name
# 4. blob_storage.download_file(blob_name) -> pd.read_csv / pd.read_excel
# 5. df[relevant_columns].head(limit) -> rows as list[dict]
# 6. return QueryResult per document
raise NotImplementedError
tabular_executor = TabularExecutor()
|