Rifqi Hafizuddin
[KM-512] create folder for querying from bd/tabular docs
2c8a3e8
raw
history blame
1.07 kB
"""Executor for registered database sources (source_type="database").
Flow:
1. Group RetrievalResult chunks by database_client_id.
2. For each client: decrypt creds -> connect -> SELECT relevant columns FROM table LIMIT n.
3. Return QueryResult per (client_id, table_name).
"""
from src.middlewares.logging import get_logger
from src.query.base import BaseExecutor, QueryResult
from src.rag.base import RetrievalResult
logger = get_logger("db_executor")
class DbExecutor(BaseExecutor):
async def execute(
self,
results: list[RetrievalResult],
user_id: str,
limit: int = 100,
) -> list[QueryResult]:
# TODO: implement
# 1. filter results where source_type == "database"
# 2. group by (database_client_id, table_name) -> list of column_names
# 3. per group: look up DatabaseClient, decrypt creds, connect via db_pipeline_service
# 4. SELECT <columns> FROM <table> LIMIT limit
# 5. return QueryResult per group
raise NotImplementedError
db_executor = DbExecutor()