File size: 1,070 Bytes
2c8a3e8 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 | """Executor for registered database sources (source_type="database").
Flow:
1. Group RetrievalResult chunks by database_client_id.
2. For each client: decrypt creds -> connect -> SELECT relevant columns FROM table LIMIT n.
3. Return QueryResult per (client_id, table_name).
"""
from src.middlewares.logging import get_logger
from src.query.base import BaseExecutor, QueryResult
from src.rag.base import RetrievalResult
logger = get_logger("db_executor")
class DbExecutor(BaseExecutor):
async def execute(
self,
results: list[RetrievalResult],
user_id: str,
limit: int = 100,
) -> list[QueryResult]:
# TODO: implement
# 1. filter results where source_type == "database"
# 2. group by (database_client_id, table_name) -> list of column_names
# 3. per group: look up DatabaseClient, decrypt creds, connect via db_pipeline_service
# 4. SELECT <columns> FROM <table> LIMIT limit
# 5. return QueryResult per group
raise NotImplementedError
db_executor = DbExecutor()
|