|
|
from langchain_core.tools import tool |
|
|
import os |
|
|
import duckdb |
|
|
import pandas as pd |
|
|
import warnings |
|
|
|
|
|
|
|
|
warnings.filterwarnings("ignore") |
|
|
|
|
|
|
|
|
|
|
|
def get_md_connection() -> duckdb.DuckDBPyConnection: |
|
|
""" |
|
|
Establishes a connection to MotherDuck using the MOTHERDUCK_TOKEN environment variable. |
|
|
""" |
|
|
|
|
|
token = os.environ.get('MOTHERDUCK_TOKEN') |
|
|
if not token: |
|
|
raise ConnectionError( |
|
|
"MOTHERDUCK_TOKEN environment variable is not set. " |
|
|
"Please ensure it is configured in your secrets to connect to the database." |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
conn = duckdb.connect(f'md:?motherduck_token={token}') |
|
|
return conn |
|
|
|
|
|
|
|
|
|
|
|
@tool |
|
|
def run_duckdb_query(query: str) -> str: |
|
|
""" |
|
|
Runs a read-only SQL query against the connected MotherDuck database and returns the results as a string. |
|
|
The query must be valid DuckDB SQL. This tool only supports SELECT queries. |
|
|
""" |
|
|
conn = None |
|
|
try: |
|
|
conn = get_md_connection() |
|
|
|
|
|
|
|
|
if not query.strip().lower().startswith('select'): |
|
|
return "Error: Only read-only SELECT queries are allowed." |
|
|
|
|
|
|
|
|
result_df = conn.execute(query).fetchdf() |
|
|
|
|
|
if result_df.empty: |
|
|
return "Query executed successfully, but no rows were returned." |
|
|
|
|
|
|
|
|
return result_df.to_string(index=False) |
|
|
|
|
|
except ConnectionError as e: |
|
|
|
|
|
return f"Connection Error: {e}" |
|
|
except Exception as e: |
|
|
|
|
|
return f"DuckDB Query Error: {e}" |
|
|
finally: |
|
|
|
|
|
if conn: |
|
|
conn.close() |
|
|
|
|
|
@tool |
|
|
def get_table_schema(table_name: str = "my_db.main.masterdataset_v") -> str: |
|
|
""" |
|
|
Returns the schema (column names and data types) for the specified table in the MotherDuck database. |
|
|
Defaults to the 'my_db.main.masterdataset_v' table. |
|
|
""" |
|
|
conn = None |
|
|
try: |
|
|
conn = get_md_connection() |
|
|
|
|
|
|
|
|
|
|
|
query = f"PRAGMA table_info('{table_name}')" |
|
|
schema_df = conn.execute(query).fetchdf() |
|
|
|
|
|
if schema_df.empty: |
|
|
|
|
|
available_tables = conn.execute('SHOW TABLES;').fetchnames() |
|
|
return f"Error: Table '{table_name}' not found. Available tables: {available_tables}" |
|
|
|
|
|
|
|
|
schema_parts = [f"{row['name']} {row['type']}" for index, row in schema_df.iterrows()] |
|
|
return ", ".join(schema_parts) |
|
|
|
|
|
except ConnectionError as e: |
|
|
return f"Connection Error: {e}" |
|
|
except Exception as e: |
|
|
return f"DuckDB Schema Error: {e}" |
|
|
finally: |
|
|
if conn: |
|
|
conn.close() |
|
|
|