"""Executor for tabular document sources (source_type="document", file_type csv/xlsx). Flow: 1. Group RetrievalResult chunks by document_id. 2. For each document: download bytes from Azure Blob -> read with pandas. 3. Filter DataFrame to relevant columns identified by retrieval. 4. Return QueryResult per document. """ from sqlalchemy.ext.asyncio import AsyncSession from src.middlewares.logging import get_logger from src.query.base import BaseExecutor, QueryResult from src.rag.base import RetrievalResult logger = get_logger("tabular_executor") _TABULAR_FILE_TYPES = ("csv", "xlsx") class TabularExecutor(BaseExecutor): async def execute( self, results: list[RetrievalResult], user_id: str, db: AsyncSession, limit: int = 100, ) -> list[QueryResult]: # TODO: implement # 1. filter results where source_type == "document" and file_type in _TABULAR_FILE_TYPES # 2. group by document_id -> list of column_names # 3. per group: look up Document by document_id -> get blob_name # 4. blob_storage.download_file(blob_name) -> pd.read_csv / pd.read_excel # 5. df[relevant_columns].head(limit) -> rows as list[dict] # 6. return QueryResult per document raise NotImplementedError tabular_executor = TabularExecutor()