| """Executor for registered database sources (source_type="database"). | |
| Flow: | |
| 1. Group RetrievalResult chunks by database_client_id. | |
| 2. For each client: decrypt creds -> connect -> SELECT relevant columns FROM table LIMIT n. | |
| 3. Return QueryResult per (client_id, table_name). | |
| """ | |
| from src.middlewares.logging import get_logger | |
| from src.query.base import BaseExecutor, QueryResult | |
| from src.rag.base import RetrievalResult | |
| logger = get_logger("db_executor") | |
| class DbExecutor(BaseExecutor): | |
| async def execute( | |
| self, | |
| results: list[RetrievalResult], | |
| user_id: str, | |
| limit: int = 100, | |
| ) -> list[QueryResult]: | |
| # TODO: implement | |
| # 1. filter results where source_type == "database" | |
| # 2. group by (database_client_id, table_name) -> list of column_names | |
| # 3. per group: look up DatabaseClient, decrypt creds, connect via db_pipeline_service | |
| # 4. SELECT <columns> FROM <table> LIMIT limit | |
| # 5. return QueryResult per group | |
| raise NotImplementedError | |
| db_executor = DbExecutor() | |