| """Executor for tabular document sources (source_type="document", file_type csv/xlsx). | |
| Flow: | |
| 1. Group RetrievalResult chunks by document_id. | |
| 2. For each document: download bytes from Azure Blob -> read with pandas. | |
| 3. Filter DataFrame to relevant columns identified by retrieval. | |
| 4. Return QueryResult per document. | |
| """ | |
| from sqlalchemy.ext.asyncio import AsyncSession | |
| from src.middlewares.logging import get_logger | |
| from src.query.base import BaseExecutor, QueryResult | |
| from src.rag.base import RetrievalResult | |
| logger = get_logger("tabular_executor") | |
| _TABULAR_FILE_TYPES = ("csv", "xlsx") | |
| class TabularExecutor(BaseExecutor): | |
| async def execute( | |
| self, | |
| results: list[RetrievalResult], | |
| user_id: str, | |
| db: AsyncSession, | |
| limit: int = 100, | |
| ) -> list[QueryResult]: | |
| # TODO: implement | |
| # 1. filter results where source_type == "document" and file_type in _TABULAR_FILE_TYPES | |
| # 2. group by document_id -> list of column_names | |
| # 3. per group: look up Document by document_id -> get blob_name | |
| # 4. blob_storage.download_file(blob_name) -> pd.read_csv / pd.read_excel | |
| # 5. df[relevant_columns].head(limit) -> rows as list[dict] | |
| # 6. return QueryResult per document | |
| raise NotImplementedError | |
| tabular_executor = TabularExecutor() | |