File size: 1,070 Bytes
2c8a3e8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
"""Executor for registered database sources (source_type="database").

Flow:
  1. Group RetrievalResult chunks by database_client_id.
  2. For each client: decrypt creds -> connect -> SELECT relevant columns FROM table LIMIT n.
  3. Return QueryResult per (client_id, table_name).
"""

from src.middlewares.logging import get_logger
from src.query.base import BaseExecutor, QueryResult
from src.rag.base import RetrievalResult

logger = get_logger("db_executor")


class DbExecutor(BaseExecutor):
    async def execute(
        self,
        results: list[RetrievalResult],
        user_id: str,
        limit: int = 100,
    ) -> list[QueryResult]:
        # TODO: implement
        # 1. filter results where source_type == "database"
        # 2. group by (database_client_id, table_name) -> list of column_names
        # 3. per group: look up DatabaseClient, decrypt creds, connect via db_pipeline_service
        # 4. SELECT <columns> FROM <table> LIMIT limit
        # 5. return QueryResult per group
        raise NotImplementedError


db_executor = DbExecutor()