import os import re import pandas as pd from typing import Optional from utils.config import AppConfig from utils.tracing import Tracer class SQLTool: def __init__(self, cfg: AppConfig, tracer: Tracer): self.cfg = cfg self.tracer = tracer self.backend = cfg.sql_backend # "bigquery" or "motherduck" if self.backend == "bigquery": from google.cloud import bigquery from google.oauth2 import service_account key_json = os.getenv("GCP_SERVICE_ACCOUNT_JSON") if not key_json: raise RuntimeError("Missing GCP_SERVICE_ACCOUNT_JSON secret") # Accept full JSON string from Space Secret if key_json.strip().startswith("{"): import json info = json.loads(key_json) else: info = {} creds = service_account.Credentials.from_service_account_info(info) self.client = bigquery.Client(credentials=creds, project=cfg.gcp_project) elif self.backend == "motherduck": import duckdb token = self.cfg.motherduck_token or os.getenv("MOTHERDUCK_TOKEN") db_name = self.cfg.motherduck_db or "default" if not token: raise RuntimeError("Missing MOTHERDUCK_TOKEN") # Plain DuckDB connection self.client = duckdb.connect() # Ensure MotherDuck extension is available and loaded self.client.execute("INSTALL motherduck;") self.client.execute("LOAD motherduck;") # Attach the remote MotherDuck database and use it self.client.execute(f"SET motherduck_token='{token}';") self.client.execute(f"ATTACH 'md:/{db_name}' AS md;") self.client.execute("USE md;") # subsequent queries run against 'md' by default else: raise RuntimeError("Unknown SQL backend") def _nl_to_sql(self, message: str) -> str: """ Minimal NL2SQL heuristic; replace with your own mapping or LLM prompt. Expect users to include table names. Example: "avg metric by month from analytics.events" """ m = message.lower() # Very basic template example (edit to your tables/columns) if "avg" in m and " by " in m: return ( "-- Example template; edit me\n" "SELECT DATE_TRUNC('month', date_col) AS month, " "AVG(metric) AS avg_metric " "FROM analytics.table " "GROUP BY 1 ORDER BY 1;" ) # Pass-through if the user typed SQL explicitly if re.match(r"^\s*select ", m): return message # Fallback return "SELECT * FROM analytics.table LIMIT 100;" def run(self, message: str) -> pd.DataFrame: sql = self._nl_to_sql(message) self.tracer.trace_event("sql_query", {"sql": sql, "backend": self.backend}) if self.backend == "bigquery": df = self.client.query(sql).to_dataframe() else: # DuckDB (MotherDuck): fetch_df returns a pandas DataFrame df = self.client.execute(sql).fetch_df() return df