File size: 3,406 Bytes
2a4b15b f4dc602 0d9239a 3fbd26b 2a4b15b e002acf 2a4b15b 0d9239a 2a4b15b 0d9239a 2a4b15b 3fbd26b 2a4b15b 3fbd26b 2a4b15b 06d70ad 2a4b15b 06d70ad 2a4b15b 06d70ad 2a4b15b 06d70ad 2a4b15b 06d70ad 2a4b15b a325dbc 2a4b15b a325dbc 2a4b15b 06d70ad 2a4b15b 06d70ad 2a4b15b a325dbc 2a4b15b 06d70ad a325dbc |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 |
from langchain_core.tools import tool
import os
import duckdb
import pandas as pd
import warnings
# Suppress warnings that might clutter the output
warnings.filterwarnings("ignore")
# --- Database Connection Setup ---
def get_md_connection() -> duckdb.DuckDBPyConnection:
"""
Establishes a connection to MotherDuck using the MOTHERDUCK_TOKEN environment variable.
"""
# 1. Get the connection token
token = os.environ.get('MOTHERDUCK_TOKEN')
if not token:
raise ConnectionError(
"MOTHERDUCK_TOKEN environment variable is not set. "
"Please ensure it is configured in your secrets to connect to the database."
)
# 2. Connect to the MotherDuck service
# Note: If you have a specific database name, you can adjust the connection string here.
conn = duckdb.connect(f'md:?motherduck_token={token}')
return conn
# --- SQL Tools ---
@tool
def run_duckdb_query(query: str) -> str:
"""
Runs a read-only SQL query against the connected MotherDuck database and returns the results as a string.
The query must be valid DuckDB SQL. This tool only supports SELECT queries.
"""
conn = None
try:
conn = get_md_connection()
# Enforce read-only constraint
if not query.strip().lower().startswith('select'):
return "Error: Only read-only SELECT queries are allowed."
# Execute the query and fetch the results into a pandas DataFrame
result_df = conn.execute(query).fetchdf()
if result_df.empty:
return "Query executed successfully, but no rows were returned."
# Return the DataFrame as a string
return result_df.to_string(index=False)
except ConnectionError as e:
# Re-raise or handle specific connection errors
return f"Connection Error: {e}"
except Exception as e:
# Catch all other DuckDB or SQL execution errors
return f"DuckDB Query Error: {e}"
finally:
# Always close the connection if it was successfully opened
if conn:
conn.close()
@tool
def get_table_schema(table_name: str = "my_db.main.masterdataset_v") -> str:
"""
Returns the schema (column names and data types) for the specified table in the MotherDuck database.
Defaults to the 'my_db.main.masterdataset_v' table.
"""
conn = None
try:
conn = get_md_connection()
# Use PRAGMA table_info to get the schema details dynamically
# This is a standard DuckDB/SQLite way to get table schema
query = f"PRAGMA table_info('{table_name}')"
schema_df = conn.execute(query).fetchdf()
if schema_df.empty:
# Also fetch available table names for better error reporting
available_tables = conn.execute('SHOW TABLES;').fetchnames()
return f"Error: Table '{table_name}' not found. Available tables: {available_tables}"
# Format the schema into a simple string: name TYPE, name TYPE, ...
schema_parts = [f"{row['name']} {row['type']}" for index, row in schema_df.iterrows()]
return ", ".join(schema_parts)
except ConnectionError as e:
return f"Connection Error: {e}"
except Exception as e:
return f"DuckDB Schema Error: {e}"
finally:
if conn:
conn.close()
|