File size: 1,340 Bytes
ba550a5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
"""Executor for tabular document sources (source_type="document", file_type csv/xlsx).

Flow:
  1. Group RetrievalResult chunks by document_id.
  2. For each document: download bytes from Azure Blob -> read with pandas.
  3. Filter DataFrame to relevant columns identified by retrieval.
  4. Return QueryResult per document.
"""

from sqlalchemy.ext.asyncio import AsyncSession

from src.middlewares.logging import get_logger
from src.query.base import BaseExecutor, QueryResult
from src.rag.base import RetrievalResult

logger = get_logger("tabular_executor")

_TABULAR_FILE_TYPES = ("csv", "xlsx")


class TabularExecutor(BaseExecutor):
    async def execute(
        self,
        results: list[RetrievalResult],
        user_id: str,
        db: AsyncSession,
        limit: int = 100,
    ) -> list[QueryResult]:
        # TODO: implement
        # 1. filter results where source_type == "document" and file_type in _TABULAR_FILE_TYPES
        # 2. group by document_id -> list of column_names
        # 3. per group: look up Document by document_id -> get blob_name
        # 4. blob_storage.download_file(blob_name) -> pd.read_csv / pd.read_excel
        # 5. df[relevant_columns].head(limit) -> rows as list[dict]
        # 6. return QueryResult per document
        raise NotImplementedError


tabular_executor = TabularExecutor()