#!/usr/bin/env python3
"""
ThoughtSpot Deployment Module
A comprehensive tool for deploying data models to ThoughtSpot:
- Creates Snowflake connections
- Parses DDL and creates tables
- Generates and deploys models
Usage:
from thoughtspot_deployer import ThoughtSpotDeployer
deployer = ThoughtSpotDeployer()
results = deployer.deploy_all(ddl, database, schema)
"""
import os
from supabase_client import get_admin_setting
import re
import yaml
import json
import requests
import snowflake.connector
from datetime import datetime
from typing import Dict, List, Optional, Tuple
from dotenv import load_dotenv
from snowflake_auth import get_snowflake_connection_params
# Load environment variables
load_dotenv()
def _safe_print(*args, **kwargs):
"""Print that ignores BrokenPipeError - prevents crashes when output is closed."""
try:
print(*args, **kwargs)
except BrokenPipeError:
pass
def _apply_naming_style(name: str, style: str = "snake_case") -> str:
"""
Convert column name to specified naming style for ThoughtSpot display.
Args:
name: Original column name
style: Naming style - one of: Regular Case, snake_case, camelCase, PascalCase, UPPER_CASE, original
Examples (for "SHIPPING_MODE"):
Regular Case → Shipping Mode
snake_case → shipping_mode
camelCase → shippingMode
PascalCase → ShippingMode
UPPER_CASE → SHIPPING_MODE
original → SHIPPING_MODE (unchanged)
"""
name = name.strip()
if style == "original":
return name
if style == "UPPER_CASE":
return name.upper().replace(" ", "_")
# Split into words (handle underscores, spaces, and camelCase)
import re
# Split on underscores, spaces, or camelCase boundaries
words = re.split(r'[_\s]+', name)
# Further split camelCase words
expanded_words = []
for word in words:
# Split on camelCase boundaries (e.g., "firstName" -> ["first", "Name"])
parts = re.findall(r'[A-Z]?[a-z]+|[A-Z]+(?=[A-Z][a-z]|\d|\W|$)|\d+', word)
if parts:
expanded_words.extend(parts)
else:
expanded_words.append(word)
words = [w.lower() for w in expanded_words if w]
if not words:
return name.lower()
if style == "Regular Case":
# Title case each word, join with spaces: STATE_ID -> State Id
return " ".join(w.capitalize() for w in words)
if style == "snake_case":
return "_".join(words)
elif style == "camelCase":
# First word lowercase, rest capitalized
return words[0] + "".join(w.capitalize() for w in words[1:])
elif style == "PascalCase":
# All words capitalized
return "".join(w.capitalize() for w in words)
else:
# Default to snake_case
return "_".join(words)
def _to_snake_case(name: str) -> str:
"""
Legacy function - converts to snake_case.
Use _apply_naming_style() for more options.
"""
return _apply_naming_style(name, "snake_case")
class ThoughtSpotDeployer:
"""ThoughtSpot deployment automation"""
def __init__(self, base_url: str = None, username: str = None, secret_key: str = None):
"""
Initialize ThoughtSpot deployer (trusted auth only)
Reads from environment variables if not passed directly.
Env vars are populated from Supabase admin settings at login time.
Raises ValueError if any required setting is missing.
"""
self.base_url = base_url if base_url else get_admin_setting('THOUGHTSPOT_URL')
if not username:
raise ValueError("ThoughtSpotDeployer requires username — pass the logged-in user's email")
self.username = username
if not secret_key:
raise ValueError("ThoughtSpotDeployer requires secret_key — pass the trusted auth key for the selected environment")
self.secret_key = secret_key
# Snowflake connection details from environment (key pair auth)
self.sf_account = get_admin_setting('SNOWFLAKE_ACCOUNT')
self.sf_user = get_admin_setting('SNOWFLAKE_KP_USER')
self.sf_role = get_admin_setting('SNOWFLAKE_ROLE')
self.sf_warehouse = get_admin_setting('SNOWFLAKE_WAREHOUSE')
self.headers = {
'Content-Type': 'application/json',
'X-Requested-By': 'ThoughtSpot'
}
# Use session to maintain cookies between requests
self.session = requests.Session()
self.session.headers.update(self.headers)
# Column naming style for ThoughtSpot model columns
# Options: Regular Case, snake_case, camelCase, PascalCase, UPPER_CASE, original
self.column_naming_style = "Regular Case"
# Per-session prompt logger — set by the chat controller after construction
self.prompt_logger = None
# Validate credentials for trusted auth
if not all([self.base_url, self.username, self.secret_key]):
raise ValueError("Missing ThoughtSpot URL, username, or trusted auth key")
if not all([self.sf_account, self.sf_user, self.sf_role, self.sf_warehouse]):
raise ValueError("Missing required Snowflake credentials in environment variables")
def _get_private_key_for_thoughtspot(self) -> str:
"""Get private key in format suitable for ThoughtSpot TML"""
private_key_raw = get_admin_setting('SNOWFLAKE_KP_PK')
if not private_key_raw:
raise ValueError("SNOWFLAKE_KP_PK environment variable not set")
# ThoughtSpot expects the private key as raw PEM format string
if not private_key_raw.startswith('-----BEGIN'):
# If it's base64 encoded, decode it
import base64
try:
private_key_raw = base64.b64decode(private_key_raw).decode('utf-8')
except Exception:
pass
return private_key_raw
def authenticate(self) -> bool:
"""Authenticate with ThoughtSpot using trusted authentication"""
return self.authenticate_trusted()
def authenticate_trusted(self) -> bool:
"""Authenticate with ThoughtSpot using trusted authentication (secret key)"""
try:
auth_url = f"{self.base_url}/api/rest/2.0/auth/token/full"
print(f" 🔐 Attempting trusted authentication to: {auth_url}")
print(f" 👤 Username: {self.username}")
print(f" 🔑 Using secret key: {self.secret_key[:8]}...{self.secret_key[-4:]}" if self.secret_key and len(self.secret_key) > 12 else " 🔑 Using secret key")
response = self.session.post(
auth_url,
json={
"username": self.username,
"secret_key": self.secret_key,
"validity_time_in_sec": 3600 # 1 hour token
}
)
print(f" 📡 HTTP Status: {response.status_code}")
if response.status_code == 200:
result = response.json()
if 'token' in result:
# Use the token as bearer auth
self.session.headers['Authorization'] = f'Bearer {result["token"]}'
print(" ✅ Trusted authentication successful (bearer token)")
return True
else:
print(f" ❌ No token in response: {result}")
return False
elif response.status_code == 204:
# Session cookie auth
print(" ✅ Trusted authentication successful (session cookies)")
return True
else:
print(f" ❌ HTTP Error {response.status_code}: {response.text}")
return False
except Exception as e:
print(f" 💥 Trusted authentication exception: {e}")
return False
def authenticate_oauth(self, timeout: int = 120) -> bool:
"""
Authenticate with ThoughtSpot using browser-based SSO (Okta, SAML, etc.)
Opens browser to ThoughtSpot login, user authenticates via SSO,
and cookies are captured via a local callback server.
Args:
timeout: Seconds to wait for authentication (default 120)
Returns:
True if authentication successful, False otherwise
"""
import webbrowser
import http.server
import socketserver
import threading
import urllib.parse
print(f" 🔐 Starting OAuth/SSO authentication for: {self.base_url}")
print(f" 👤 User: {self.username or 'SSO user'}")
# Find an available port for the callback server
callback_port = 8765
for port in range(8765, 8800):
try:
with socketserver.TCPServer(("", port), None) as test:
callback_port = port
break
except OSError:
continue
callback_url = f"http://localhost:{callback_port}/callback"
auth_complete = threading.Event()
auth_success = [False] # Use list to allow modification in nested function
class OAuthCallbackHandler(http.server.BaseHTTPRequestHandler):
def log_message(self, format, *args):
pass # Suppress logging
def do_GET(self):
if self.path.startswith('/callback'):
# Authentication completed - show success page
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
# Page that extracts cookies and displays success
html = """
ThoughtSpot Authentication
✓
Authentication Successful!
You can close this window and return to the application.
"""
self.wfile.write(html.encode())
auth_success[0] = True
auth_complete.set()
elif self.path == '/check':
# Health check endpoint
self.send_response(200)
self.send_header('Content-type', 'text/plain')
self.end_headers()
self.wfile.write(b'OK')
else:
self.send_response(404)
self.end_headers()
# Start callback server in background thread
server = socketserver.TCPServer(("", callback_port), OAuthCallbackHandler)
server_thread = threading.Thread(target=server.handle_request)
server_thread.daemon = True
server_thread.start()
# Build the SSO login URL
# ThoughtSpot redirects to SSO provider, then back to ThoughtSpot, then to our callback
ts_login_url = f"{self.base_url}/?redirectURL={urllib.parse.quote(callback_url)}"
print(f" 🌐 Opening browser for SSO login...")
print(f" 📍 Callback URL: {callback_url}")
print(f" ⏳ Waiting up to {timeout} seconds for authentication...")
# Open browser to ThoughtSpot login
webbrowser.open(ts_login_url)
# Wait for authentication to complete
if auth_complete.wait(timeout=timeout):
if auth_success[0]:
print(" ✅ Browser authentication completed!")
# Now we need to get the session from ThoughtSpot
# The user authenticated in the browser, so we need to get a session token
# We'll use the session/token endpoint to get a token for API calls
# Try to get a session token using the trusted auth flow
# Since user is now logged in via browser, we attempt to get session info
try:
# Check if session is valid by calling a simple API endpoint
# First, let's try to get current user info
user_response = self.session.get(
f"{self.base_url}/api/rest/2.0/auth/session/user",
timeout=10
)
if user_response.status_code == 200:
user_info = user_response.json()
print(f" ✅ Session active for: {user_info.get('name', 'unknown')}")
return True
else:
# Browser auth completed but we don't have cookies in our session
# This is expected - browser and Python have separate cookie jars
print(" ⚠️ Browser authenticated but Python session needs cookies")
print(" 💡 For full OAuth support, please use the browser-based workflow")
print(" 💡 Or configure trusted authentication on ThoughtSpot")
return False
except Exception as e:
print(f" ⚠️ Could not verify session: {e}")
return False
else:
print(" ❌ Authentication callback received but marked as failed")
return False
else:
print(" ❌ Authentication timed out")
server.shutdown()
return False
def get_model_columns(self, model_guid: str) -> List[Dict]:
"""
Get actual column names from a ThoughtSpot model.
This is important because ThoughtSpot may rename columns to make them unique
(e.g., PROCESSING_FEE becomes gift_processing_fee and tran_processing_fee).
Args:
model_guid: GUID of the ThoughtSpot model
Returns:
List of column dicts with 'name' and 'type' keys
"""
try:
# Export the model TML to get actual column names
export_response = self.session.post(
f"{self.base_url}/api/rest/2.0/metadata/tml/export",
json={
'metadata': [{'identifier': model_guid}],
'export_associated': False
}
)
if export_response.status_code != 200:
print(f" ⚠️ Could not export model TML: HTTP {export_response.status_code}")
return []
tml_data = export_response.json()
if not tml_data or len(tml_data) == 0:
print(f" ⚠️ Empty TML export response")
return []
# Parse YAML TML
tml_str = tml_data[0].get('edoc', '')
model_tml = yaml.safe_load(tml_str)
if not model_tml or 'model' not in model_tml:
print(f" ⚠️ Invalid model TML structure")
return []
# Extract columns with their actual names from the model
columns = []
for col in model_tml.get('model', {}).get('columns', []):
col_name = col.get('name', '')
col_props = col.get('properties', {})
col_type = col_props.get('column_type', 'ATTRIBUTE')
# Map ThoughtSpot column types to SQL-like types for AI understanding
if col_type == 'MEASURE':
sql_type = 'NUMBER' # Measures are numeric
elif col_props.get('calendar'):
sql_type = 'DATE' # Calendar attribute = date column
else:
sql_type = 'VARCHAR' # Other attributes are typically strings
columns.append({
'name': col_name,
'type': sql_type,
'ts_type': col_type # Keep original for reference
})
print(f" 📊 Got {len(columns)} columns from ThoughtSpot model")
return columns
except Exception as e:
print(f" ⚠️ Error getting model columns: {e}")
return []
def parse_ddl(self, ddl: str) -> Tuple[Dict, List]:
"""
Parse DDL to extract table definitions and foreign key relationships
Returns:
Tuple of (tables_dict, foreign_keys_list)
"""
tables = {}
foreign_keys = []
# Find all CREATE TABLE statements
table_pattern = (
r'CREATE\s+TABLE\s+(?:IF\s+NOT\s+EXISTS\s+)?'
r'(?:"?([A-Za-z0-9_]+)"?)\s*\((.*?)\)\s*;'
)
for match in re.finditer(table_pattern, ddl, re.IGNORECASE | re.DOTALL):
table_name = match.group(1).upper()
columns_text = match.group(2)
columns = []
# Parse each column definition - PROPERLY FIXED parsing
# Split by comma but be careful of commas inside parentheses
column_lines = []
current_line = ""
paren_count = 0
for char in columns_text:
if char == '(':
paren_count += 1
elif char == ')':
paren_count -= 1
elif char == ',' and paren_count == 0:
column_lines.append(current_line.strip())
current_line = ""
continue
current_line += char
# Add the last line
if current_line.strip():
column_lines.append(current_line.strip())
for line in column_lines:
line = line.strip()
line_upper = line.upper()
def _normalize_table_name(raw_name: str) -> str:
# Handle optional quoting and optional DB/SCHEMA qualifiers.
normalized = raw_name.replace('"', '').strip()
if '.' in normalized:
normalized = normalized.split('.')[-1]
return normalized.upper()
# Parse FK constraints in any common table-level form:
# 1) FOREIGN KEY (COL) REFERENCES TBL(COL)
# 2) CONSTRAINT FK_NAME FOREIGN KEY (COL) REFERENCES TBL(COL)
fk_match = re.search(
r'FOREIGN\s+KEY\s*\((\w+)\)\s*REFERENCES\s+([A-Za-z0-9_".]+)\s*\((\w+)\)',
line,
re.IGNORECASE
)
if fk_match:
from_col = fk_match.group(1).upper()
to_table = _normalize_table_name(fk_match.group(2))
to_col = fk_match.group(3).upper()
foreign_keys.append({
'from_table': table_name,
'from_column': from_col,
'to_table': to_table,
'to_column': to_col
})
print(f" 🔗 Found FK: {table_name}.{from_col} -> {to_table}.{to_col}")
continue
# Parse inline FK form in column definitions:
# COL_NAME REFERENCES TARGET_TABLE(TARGET_COL)
inline_fk_match = re.search(
r'^(\w+)\s+.+?\s+REFERENCES\s+([A-Za-z0-9_".]+)\s*\((\w+)\)',
line,
re.IGNORECASE
)
if inline_fk_match:
from_col = inline_fk_match.group(1).upper()
to_table = _normalize_table_name(inline_fk_match.group(2))
to_col = inline_fk_match.group(3).upper()
foreign_keys.append({
'from_table': table_name,
'from_column': from_col,
'to_table': to_table,
'to_column': to_col
})
print(f" 🔗 Found inline FK: {table_name}.{from_col} -> {to_table}.{to_col}")
if not line_upper.startswith(('PRIMARY KEY', 'CONSTRAINT', 'FOREIGN KEY', 'UNIQUE', 'CHECK', 'INDEX')):
# Parse: COLUMNNAME DATATYPE(params) [IDENTITY] [NOT NULL]
parts = line.split()
if len(parts) >= 2:
col_name_original = parts[0] # Preserve original casing for display name
col_name = parts[0].upper() # Uppercase for DB reference
# Get the FULL data type including parameters - HANDLE IDENTITY!
col_type_match = re.match(r'(\w+(?:\([^)]+\))?)', parts[1])
col_type = col_type_match.group(1).upper() if col_type_match else parts[1].upper()
columns.append({
'name': col_name,
'original_name': col_name_original, # Keep original for naming style
'type': col_type,
'nullable': 'NOT NULL' not in line.upper()
})
tables[table_name] = columns
print(f"📊 Found {len(tables)} tables and {len(foreign_keys)} foreign keys in DDL")
return tables, foreign_keys
def create_relationships_separately(self, table_relationships: Dict, table_guids: Dict):
"""Create relationships as separate TML objects after tables exist"""
for table_name, relationships in table_relationships.items():
for relationship in relationships:
# Create relationship TML
relationship_tml = {
'guid': None,
'relationship': {
'name': relationship['name'],
'destination_table': table_guids.get(relationship['to_table']),
'source_table': table_guids.get(table_name),
'type': relationship['type'],
'join_columns': [
{
'source_column': rel_on['from_column'],
'destination_column': rel_on['to_column']
}
for rel_on in relationship['on']
]
}
}
relationship_yaml = yaml.dump(relationship_tml, default_flow_style=False, sort_keys=False)
print(f" 🔗 Creating relationship: {relationship['name']}")
print(f" 📄 Relationship TML:\n{relationship_yaml}")
response = self.session.post(
f"{self.base_url}/api/rest/2.0/metadata/tml/import",
json={
"metadata_tmls": [relationship_yaml],
"import_policy": "ALL_OR_NONE",
"create_new": True
}
)
if response.status_code == 200:
result = response.json()
print(f" 📋 Relationship response: {result}")
if result[0].get('response', {}).get('status', {}).get('status_code') == 'OK':
print(f" ✅ Relationship created: {relationship['name']}")
else:
error_msg = result[0].get('response', {}).get('status', {}).get('error_message', 'Unknown error')
print(f" ❌ Relationship failed: {error_msg}")
else:
print(f" ❌ Relationship API call failed: {response.status_code}")
print(f" 📋 Response: {response.text}")
def create_table_tml(self, table_name: str, columns: List, connection_name: str,
database: str, schema: str, all_tables: Dict = None, table_guid: str = None, foreign_keys: List = None) -> str:
"""Generate table TML matching working example structure
Args:
table_guid: If provided, use this GUID (for updating existing tables with joins)
foreign_keys: List of foreign key relationships parsed from DDL
"""
tml_columns = []
# Generate columns with proper typing
for col in columns:
ts_type = self._map_data_type(col['type'])
col_name = col['name'].upper()
# Determine column type - IDs are measures in table TML but not model TML
if ts_type in ['INT64'] and col_name.endswith('ID'):
col_type = 'MEASURE'
properties = {
'column_type': col_type,
'aggregation': 'SUM',
'index_type': 'DONT_INDEX'
}
elif ts_type in ['DOUBLE', 'INT64'] and not col_name.endswith('ID'):
col_type = 'MEASURE'
properties = {
'column_type': col_type,
'aggregation': 'SUM',
'index_type': 'DONT_INDEX'
}
else:
col_type = 'ATTRIBUTE'
properties = {
'column_type': col_type,
'index_type': 'DONT_INDEX'
}
column_def = {
'name': col['name'].upper(),
'db_column_name': col['name'].upper(),
'properties': properties,
'db_column_properties': {
'data_type': ts_type
}
}
tml_columns.append(column_def)
table_tml = {
'guid': table_guid, # Use provided GUID or None for new tables
'table': {
'name': table_name.upper(),
'db': database,
'schema': schema,
'db_table': table_name.upper(),
'connection': {
'name': connection_name
},
'columns': tml_columns,
'properties': {
'sage_config': {
'is_sage_enabled': False
}
}
}
}
# Add joins_with relationships (matching working example)
if all_tables:
joins_with = self._generate_table_joins(table_name, columns, all_tables, foreign_keys)
if joins_with:
table_tml['table']['joins_with'] = joins_with
# Generate YAML with proper formatting
yaml_output = yaml.dump(table_tml, default_flow_style=False, sort_keys=False)
# Keep quotes around 'on' key as shown in working example
return yaml_output
def _generate_table_joins(self, table_name: str, columns: List, all_tables: Dict, foreign_keys: List = None) -> List:
"""Generate joins_with structure based on parsed foreign keys from DDL"""
joins = []
table_name_upper = table_name.upper()
if not foreign_keys:
print(f" ⚠️ No foreign keys provided for {table_name_upper}")
return joins
# Use actual foreign keys from DDL
for fk in foreign_keys:
if fk['from_table'] == table_name_upper:
to_table = fk['to_table']
from_col = fk['from_column']
to_col = fk['to_column']
# Skip self-joins (e.g., EMPLOYEES.MANAGER_ID -> EMPLOYEES.EMPLOYEE_ID)
# ThoughtSpot models don't handle self-referential joins well (causes cycles)
if to_table == table_name_upper:
print(f" ⏭️ Skipping self-join: {table_name_upper}.{from_col} -> {to_table}.{to_col} (self-referential)")
continue
# Check if target table exists in THIS deployment
available_tables_upper = [t.upper() for t in all_tables.keys()]
if to_table in available_tables_upper:
constraint_id = f"SYS_CONSTRAINT_{self._generate_constraint_id()}"
join_def = {
'name': constraint_id,
'destination': {
'name': to_table
},
'on': f"[{table_name_upper}::{from_col}] = [{to_table}::{to_col}]",
'type': 'INNER'
}
joins.append(join_def)
print(f" 🔗 Generated join: {table_name_upper}.{from_col} -> {to_table}.{to_col}")
else:
print(f" ⏭️ Skipping join: {table_name_upper}.{from_col} -> {to_table} (table not in this deployment)")
return joins
def create_connection_tml(self, connection_name: str) -> str:
"""Generate connection TML matching working example"""
connection_tml = {
'guid': None, # Will be generated by ThoughtSpot
'connection': {
'name': connection_name,
'type': 'RDBMS_SNOWFLAKE',
'authentication_type': 'KEY_PAIR',
'properties': [
{'key': 'accountName', 'value': self.sf_account},
{'key': 'user', 'value': self.sf_user},
{'key': 'private_key', 'value': self._get_private_key_for_thoughtspot()},
{'key': 'passphrase', 'value': get_admin_setting('SNOWFLAKE_KP_PASSPHRASE', required=False)},
{'key': 'role', 'value': self.sf_role},
{'key': 'warehouse', 'value': self.sf_warehouse}
],
'description': f'Auto-generated Snowflake connection for {connection_name}'
}
}
yaml_output = yaml.dump(connection_tml, default_flow_style=False, sort_keys=False)
return yaml_output
def create_actual_model_tml(self, tables: Dict, foreign_keys: List, table_guids: Dict = None,
model_name: str = None, connection_name: str = None) -> str:
"""Generate proper model TML matching boone_test5 working example"""
if not model_name:
model_name = f"demo_model_{datetime.now().strftime('%Y%m%d')}"
if not connection_name:
connection_name = model_name
# Create model structure matching working example exactly
model = {
'guid': None, # Will be generated by ThoughtSpot
'model': {
'name': model_name,
'model_tables': [],
'columns': [],
'properties': {
'is_bypass_rls': False,
'join_progressive': True,
'spotter_config': {
'is_spotter_enabled': True
}
}
}
}
# Build column name conflict resolution
column_name_counts = {}
for table_name, columns in tables.items():
for col in columns:
col_name = col['name'].upper()
if col_name not in column_name_counts:
column_name_counts[col_name] = []
column_name_counts[col_name].append(table_name.upper())
# Add model_tables - START WITH NO JOINS for now (we can add them later)
print(" 📋 Creating model without explicit joins (ThoughtSpot can auto-detect)")
for table_name in tables.keys():
table_name_upper = table_name.upper()
table_guid = table_guids.get(table_name_upper) if table_guids else None
# Use FQN to resolve "multiple data sources with same name" issue
# ThoughtSpot explicitly requires this when there are duplicate table names
table_entry = {
'name': table_name_upper,
'fqn': table_guid # Required to uniquely identify which table to use
}
# For now, don't add explicit joins - let ThoughtSpot auto-detect
# This matches the pattern where some tables in your working example don't have joins
model['model']['model_tables'].append(table_entry)
# Remove diamond join paths - ThoughtSpot rejects models where
# table A joins to B, A joins to C, and C also joins to B
self._remove_diamond_joins(model['model']['model_tables'])
# Add columns with proper global conflict resolution
used_display_names = set() # Track used names globally across all columns
for table_name, columns in tables.items():
table_name_upper = table_name.upper()
for col in columns:
col_name = col['name'].upper()
original_col_name = col.get('original_name', col['name']) # Use original casing for display
# TODO: Later we can exclude ID columns for cleaner model
# For now, include all columns to get the basic model working
# Start with basic conflict resolution
display_name = self._resolve_column_name_conflict(
col_name, table_name_upper, column_name_counts,
original_name=original_col_name
)
# If the display name is still used, make it unique with table prefixes
original_display_name = display_name
counter = 1
while display_name.lower() in used_display_names:
# Use table prefix for all tables consistently
if counter == 1:
# Generate consistent short prefix from table name
if len(table_name_upper) <= 4:
prefix = table_name_upper.lower()
else:
prefix = table_name_upper[:4].lower()
# Use snake_case: prefix_name
display_name = f"{prefix}_{original_display_name}"
else:
# Fallback: add number
display_name = f"{original_display_name}_{counter}"
counter += 1
used_display_names.add(display_name.lower())
# Determine column type based on data type
col_type, aggregation = self._determine_column_type(col['type'], col_name)
column_def = {
'name': display_name,
'column_id': f"{table_name_upper}::{col_name}",
'properties': {
'column_type': col_type,
'index_type': 'DONT_INDEX'
}
}
# Add aggregation for measures
if aggregation:
column_def['properties']['aggregation'] = aggregation
# Add calendar property for DATE columns so ThoughtSpot enables
# time bucketing (.weekly, .monthly, etc.) on them
if self._map_data_type(col['type']) == 'DATE':
column_def['properties']['calendar'] = 'calendar'
model['model']['columns'].append(column_def)
# Generate YAML output with proper formatting
yaml_output = yaml.dump(model, default_flow_style=False, sort_keys=False,
default_style=None, indent=2, width=120)
# Validate the generated YAML
try:
# Test if the YAML can be parsed back
yaml.safe_load(yaml_output)
print(" ✅ Generated YAML is valid")
except yaml.YAMLError as e:
print(f" ❌ Generated YAML is invalid: {e}")
print(" 📄 Invalid YAML:")
print(yaml_output)
raise ValueError(f"Generated invalid YAML: {e}")
return yaml_output
def _is_foreign_key_column(self, col_name: str, table_name: str, foreign_keys: List) -> bool:
"""Check if column is a foreign key (used only for joins, not analytics)"""
for fk in foreign_keys:
if (fk.get('source_table', '').upper() == table_name and
fk.get('source_column', '').upper() == col_name):
return True
return False
def _is_surrogate_primary_key(self, col: Dict, col_name: str) -> bool:
"""Check if column is a meaningless surrogate key (numeric ID)"""
# Common patterns: ID, _ID, ID_, ends with 'id'
if col_name.upper().endswith('ID'):
# Check if it's numeric (INT, BIGINT, NUMBER)
col_type = col.get('type', '').upper()
if any(t in col_type for t in ['INT', 'NUMBER', 'NUMERIC', 'BIGINT']):
return True
return False
def _create_model_with_constraints(self, tables: Dict, foreign_keys: List, table_guids: Dict,
table_constraints: Dict, model_name: str, connection_name: str) -> str:
"""Generate model TML with constraint references like our successful test"""
print(" 📋 Creating model with constraint references")
# Build column name conflict tracking
column_name_counts = {}
for table_name, columns in tables.items():
for col in columns:
col_name = col['name'].upper()
if col_name not in column_name_counts:
column_name_counts[col_name] = []
column_name_counts[col_name].append(table_name.upper())
model = {
'guid': None,
'model': {
'name': model_name,
'model_tables': [],
'columns': [],
'properties': {
'is_bypass_rls': False,
'join_progressive': True,
'spotter_config': {
'is_spotter_enabled': True
}
}
}
}
# Add model_tables with FQNs and constraint-based joins
for table_name in tables.keys():
table_name_upper = table_name.upper()
table_guid = table_guids.get(table_name_upper)
table_entry = {
'name': table_name_upper,
'fqn': table_guid
}
# Build joins from foreign_keys list (more reliable than constraint extraction)
table_joins = []
for fk in foreign_keys:
if fk['from_table'].upper() == table_name_upper:
to_table = fk['to_table'].upper()
# Skip self-joins (e.g., EMPLOYEES.MANAGER_ID -> EMPLOYEES.EMPLOYEE_ID)
# ThoughtSpot models don't handle self-referential joins well (causes cycles)
if to_table == table_name_upper:
print(f" ⏭️ Skipping self-join in model: {table_name_upper}.{fk['from_column']} -> {to_table}")
continue
# Check if target table exists in this deployment
if to_table in [t.upper() for t in tables.keys()]:
# ThoughtSpot on clause format: [SOURCE::COL] = [DEST::COL]
from_col = fk['from_column'].upper()
to_col = fk['to_column'].upper()
on_clause = f"[{table_name_upper}::{from_col}] = [{to_table}::{to_col}]"
join_entry = {
'with': to_table,
'on': on_clause,
'type': 'LEFT_OUTER',
'cardinality': 'MANY_TO_ONE' # Fact to dimension is many-to-one
}
table_joins.append(join_entry)
print(f" 🔗 Added join: {table_name_upper}.{from_col} -> {to_table}.{to_col}")
if table_joins:
table_entry['joins'] = table_joins
model['model']['model_tables'].append(table_entry)
# Remove diamond join paths - ThoughtSpot rejects models where
# table A joins to B, A joins to C, and C also joins to B
self._remove_diamond_joins(model['model']['model_tables'])
# Add columns with proper global conflict resolution (same as working version)
used_display_names = set()
for table_name, columns in tables.items():
table_name_upper = table_name.upper()
for col in columns:
col_name = col['name'].upper()
original_col_name = col.get('original_name', col['name']) # Use original casing for display
# NOTE: We used to skip FK/PK columns, but ThoughtSpot requires them for joins
# Even though users don't search "customer 23455", the join columns must be present
# in the model's columns section for the joins to work properly.
#
# SKIP foreign key columns - they're join keys, not analytics columns
# if self._is_foreign_key_column(col_name, table_name_upper, foreign_keys):
# print(f" ⏭️ Skipping FK column: {table_name_upper}.{col_name}")
# continue
#
# SKIP surrogate primary keys (numeric IDs) - nobody searches "customer 23455"
# if self._is_surrogate_primary_key(col, col_name):
# print(f" ⏭️ Skipping surrogate PK: {table_name_upper}.{col_name}")
# continue
# Start with basic conflict resolution
display_name = self._resolve_column_name_conflict(
col_name, table_name_upper, column_name_counts,
original_name=original_col_name
)
# If the display name is still used, make it unique with table prefixes
original_display_name = display_name
counter = 1
while display_name.lower() in used_display_names:
# Use snake_case: prefix_name
if counter == 1:
if table_name_upper == 'CUSTOMERS':
display_name = f"cust_{original_display_name}"
elif table_name_upper == 'PRODUCTS':
display_name = f"prod_{original_display_name}"
elif table_name_upper == 'ORDERS':
display_name = f"order_{original_display_name}"
elif table_name_upper == 'ORDERITEMS':
display_name = f"item_{original_display_name}"
elif table_name_upper == 'SALES':
display_name = f"sale_{original_display_name}"
elif table_name_upper == 'SALESREPS':
display_name = f"rep_{original_display_name}"
else:
display_name = f"{table_name_upper[:4].lower()}_{original_display_name}"
else:
display_name = f"{original_display_name}_{counter}"
counter += 1
used_display_names.add(display_name.lower())
# Determine column type based on data type
col_type, aggregation = self._determine_column_type(col['type'], col_name)
column_def = {
'name': display_name,
'column_id': f"{table_name_upper}::{col_name}",
'properties': {
'column_type': col_type,
'index_type': 'DONT_INDEX'
}
}
if aggregation:
column_def['properties']['aggregation'] = aggregation
# Add calendar property for DATE columns so ThoughtSpot enables
# time bucketing (.weekly, .monthly, etc.) on them
if self._map_data_type(col['type']) == 'DATE':
column_def['properties']['calendar'] = 'calendar'
model['model']['columns'].append(column_def)
# Generate YAML output with validation
yaml_output = yaml.dump(model, default_flow_style=False, sort_keys=False,
default_style=None, indent=2, width=120)
# Fix YAML reserved word quoting - 'on' gets quoted because it's a YAML boolean
# ThoughtSpot needs it unquoted
yaml_output = yaml_output.replace("'on':", "on:")
# Validate the generated YAML
try:
yaml.safe_load(yaml_output)
print(" ✅ Generated YAML is valid")
except yaml.YAMLError as e:
print(f" ❌ Generated YAML is invalid: {e}")
raise ValueError(f"Generated invalid YAML: {e}")
return yaml_output
def _remove_diamond_joins(self, model_tables: list):
"""Remove join cycles that ThoughtSpot rejects using a spanning tree.
ThoughtSpot requires exactly ONE path between any two tables in a model.
Uses Kruskal's algorithm: add edges from highest-priority (fact) tables first,
skip any edge that would create a cycle. Priority order ensures fact table
joins are preserved and dimension-to-dimension "snowflake" joins are pruned.
"""
def edge_key(src_name: str, join_def: dict):
return (
src_name,
join_def.get('with'),
join_def.get('on', ''),
join_def.get('type', ''),
join_def.get('cardinality', ''),
)
all_edges = []
for t in model_tables:
src_name = t['name']
for j in t.get('joins', []):
all_edges.append((src_name, j.get('with'), j, edge_key(src_name, j)))
if not all_edges:
print(f" ✅ No joins to check for cycles")
return
out_degree = {}
for t in model_tables:
out_degree[t['name']] = len(t.get('joins', []))
in_degree = {t['name']: 0 for t in model_tables}
for src, dst, _, _ in all_edges:
in_degree[dst] = in_degree.get(dst, 0) + 1
all_edges.sort(key=lambda e: (-out_degree.get(e[0], 0), -in_degree.get(e[1], 0), e[0], e[1]))
parent = {t['name']: t['name'] for t in model_tables}
def find(x):
while parent[x] != x:
parent[x] = parent[parent[x]]
x = parent[x]
return x
def union(a, b):
ra, rb = find(a), find(b)
if ra == rb:
return False
parent[ra] = rb
return True
kept_edge_keys = set()
removed = []
for src, dst, join_def, e_key in all_edges:
if union(src, dst):
kept_edge_keys.add(e_key)
else:
removed.append(f"{src}->{dst} ({join_def.get('on', '')})")
for t in model_tables:
if 'joins' not in t:
continue
src_name = t['name']
t['joins'] = [j for j in t['joins'] if edge_key(src_name, j) in kept_edge_keys]
for t in model_tables:
if 'joins' in t and not t['joins']:
del t['joins']
if removed:
print(f" 🔶 Removed {len(removed)} redundant joins (cycle prevention):")
for r in removed:
print(f" - {r}")
else:
print(f" ✅ No join cycles detected")
def _generate_constraint_id(self) -> str:
"""Generate a constraint ID similar to ThoughtSpot's system constraints"""
import uuid
return str(uuid.uuid4())
def validate_foreign_key_references(self, tables: Dict, foreign_keys: List = None) -> List[str]:
"""
Validate that foreign key columns reference tables that exist in the schema.
Uses explicit FK constraints from DDL - not heuristics.
Args:
tables: Dictionary of table definitions
foreign_keys: List of FK relationships parsed from DDL
Each FK is: {'from_table': str, 'from_column': str,
'to_table': str, 'to_column': str}
Returns:
List of warning messages about missing referenced tables
"""
warnings = []
if not foreign_keys:
return warnings # No explicit FKs defined, nothing to validate
table_names_upper = [t.upper() for t in tables.keys()]
for fk in foreign_keys:
target_table = fk.get('to_table', '').upper()
from_table = fk.get('from_table', '')
from_column = fk.get('from_column', '')
# Check if the target table exists in this schema
if target_table and target_table not in table_names_upper:
warnings.append(
f"⚠️ {from_table}.{from_column} references {fk.get('to_table')}, "
f"but {fk.get('to_table')} is not in this schema. "
f"The join will be skipped during deployment."
)
return warnings
def _resolve_column_name_conflict(self, col_name: str, table_name: str,
column_name_counts: Dict,
original_name: str = None) -> str:
"""
Resolve column name conflicts using configured naming style and prefixes.
Examples (snake_case):
SHIPPING_MODE → shipping_mode
DAYS_TO_SHIP → days_to_ship
ORDER_DATE (conflict) → order_order_date, cust_order_date, etc.
Args:
col_name: Uppercase column name (for conflict detection)
table_name: Table name for prefix generation
column_name_counts: Dict tracking column name occurrences
original_name: Original casing of column name (for proper camelCase detection)
"""
# Use original name if provided (preserves camelCase boundaries)
name_for_styling = original_name if original_name else col_name
# Apply configured naming style
styled_name = _apply_naming_style(name_for_styling, self.column_naming_style)
if len(column_name_counts.get(col_name, [])) <= 1:
# No conflict - use styled name directly
return styled_name
# For conflicts, generate prefix dynamically from table name
if len(table_name) <= 4:
prefix = table_name.lower()
elif 'sales' in table_name.lower() and 'rep' in table_name.lower():
prefix = 'rep' # Special case for readability
elif 'customer' in table_name.lower():
prefix = 'cust' # Common abbreviation
elif 'product' in table_name.lower():
prefix = 'prod' # Common abbreviation
elif 'order' in table_name.lower():
prefix = 'order' # Common abbreviation
else:
prefix = table_name[:4].lower() # First 4 characters
# Apply naming style to prefix + name combination
prefixed_name = f"{prefix}_{styled_name}" if styled_name else prefix
return _apply_naming_style(prefixed_name, self.column_naming_style)
def _get_table_prefix(self, table_name: str) -> str:
"""Get appropriate prefix for table to avoid column conflicts"""
# Generate prefix dynamically based on table name patterns
table_lower = table_name.lower()
if 'customer' in table_lower:
return '' # Primary table gets no prefix for readability
elif 'sales' in table_lower and 'rep' in table_lower:
return 'Rep'
elif 'sales' in table_lower:
return 'Sale'
elif 'order' in table_lower and 'item' in table_lower:
return 'Item'
elif 'order' in table_lower:
return 'Order'
elif 'product' in table_lower:
return 'Product'
else:
# Use first 3-4 characters as prefix, capitalize first letter
prefix = table_name[:4] if len(table_name) > 3 else table_name
return prefix.capitalize()
def _determine_column_type(self, data_type: str, col_name: str) -> tuple:
"""Determine if column should be ATTRIBUTE or MEASURE"""
base_type = data_type.upper().split('(')[0]
col_upper = col_name.upper()
# SALEID is special - it's treated as a measure in the working example
if col_upper == 'SALEID':
return 'MEASURE', 'SUM'
# Numeric types should be measures (unless they're IDs or keys)
if base_type in ['NUMBER', 'DECIMAL', 'FLOAT', 'DOUBLE', 'INT', 'INTEGER', 'BIGINT']:
# Skip ID/KEY columns - they're join keys, not analytics columns
if col_upper.endswith('ID') or col_upper.endswith('KEY') or col_upper.endswith('_CODE'):
return 'ATTRIBUTE', None
# All other numeric columns are measures
# Determine aggregation based on column name patterns
if any(word in col_upper for word in ['QUANTITY', 'QTY', 'COUNT', 'SOLD']):
return 'MEASURE', 'SUM'
elif any(word in col_upper for word in ['PRICE', 'COST', 'REVENUE', 'AMOUNT', 'TOTAL', 'PROFIT', 'DISCOUNT', 'SHIPPING', 'TAX']):
return 'MEASURE', 'SUM'
elif any(word in col_upper for word in ['RATING', 'SCORE', 'MARGIN', 'PERCENT', 'RATE']):
return 'MEASURE', 'AVERAGE'
else:
# Default: numeric = measure with SUM
return 'MEASURE', 'SUM'
# Everything else is an attribute (strings, dates, booleans, etc.)
return 'ATTRIBUTE', None
def _build_table_relationships(self, tables: Dict, foreign_keys: List) -> Dict:
"""Build table relationships for joins"""
relationships = {}
# Auto-detect relationships based on common ID patterns
table_names = list(tables.keys())
for table_name in table_names:
table_name_upper = table_name.upper()
table_cols = [col['name'].upper() for col in tables[table_name]]
# Find foreign key relationships
for col_name in table_cols:
if col_name.endswith('ID') and col_name != f"{table_name_upper}ID":
# This looks like a foreign key
target_table = col_name[:-2] + 'S' # CUSTOMERID -> CUSTOMERS
if target_table in [t.upper() for t in table_names]:
if table_name_upper not in relationships:
relationships[table_name_upper] = []
relationships[table_name_upper].append({
'to_table': target_table,
'on_column': col_name
})
return relationships
def _create_model_level_joins(self, tables, foreign_keys):
"""Create joins at model level using the format from working example"""
joins = []
# Auto-detect joins if no explicit foreign keys
if len(tables) > 1:
table_names = list(tables.keys())
for i, table1 in enumerate(table_names):
table1_upper = table1.upper()
table1_cols = [col['name'].upper() for col in tables[table1]]
for j, table2 in enumerate(table_names):
if i >= j: # Avoid duplicates and self-joins
continue
table2_upper = table2.upper()
table2_cols = [col['name'].upper() for col in tables[table2]]
# Look for matching ID columns
for col1 in table1_cols:
if col1.endswith('ID') and col1 in table2_cols:
join_entry = {
'name': f"{table1_upper.lower()}_{table2_upper.lower()}",
'source': table1_upper,
'destination': table2_upper,
'type': 'INNER',
'on': f"{table1_upper}.{col1} = {table2_upper}.{col1}"
}
joins.append(join_entry)
print(f" 🔗 Model-level join: {table1_upper} -> {table2_upper} on {col1}")
break
return joins
def _add_joins_to_tables(self, model_tables, tables, foreign_keys):
"""Add joins to individual tables (not as separate section)"""
# Build join relationships
table_joins = {}
# Skip joins for now - test basic model creation first
if False and foreign_keys:
for fk in foreign_keys:
from_table = fk['from_table'].upper()
to_table = fk['to_table'].upper()
if from_table not in table_joins:
table_joins[from_table] = []
join_entry = {
'with': to_table,
'on': f"[{from_table}].[{fk['from_column'].upper()}] = [{to_table}].[{fk['to_column'].upper()}]",
'type': 'INNER',
'cardinality': 'MANY_TO_ONE'
}
table_joins[from_table].append(join_entry)
print(f" 🔗 Adding join: {from_table} -> {to_table}")
# Skip joins for now - test basic model creation first
elif False and len(tables) > 1:
table_names = list(tables.keys())
for i, table1 in enumerate(table_names):
table1_upper = table1.upper()
table1_cols = [col['name'].upper() for col in tables[table1]]
for j, table2 in enumerate(table_names[i+1:], i+1):
table2_upper = table2.upper()
table2_cols = [col['name'].upper() for col in tables[table2]]
# Look for matching ID columns
for col1 in table1_cols:
if col1.endswith('ID') and col1 in table2_cols:
if table1_upper not in table_joins:
table_joins[table1_upper] = []
join_entry = {
'with': table2_upper,
'on': f"[{table1_upper}].[{col1}] = [{table2_upper}].[{col1}]",
'type': 'INNER',
'cardinality': 'MANY_TO_ONE'
}
table_joins[table1_upper].append(join_entry)
print(f" 🔗 Auto-detected join: {table1_upper} -> {table2_upper} on {col1}")
break
# Apply joins to model_tables
for table_entry in model_tables:
table_name = table_entry['name']
if table_name in table_joins:
table_entry['joins'] = table_joins[table_name]
def _build_table_relationships(self, tables: Dict, foreign_keys: List) -> Dict:
"""Build relationships for each table based on foreign keys"""
table_relationships = {}
if foreign_keys:
for fk in foreign_keys:
from_table = fk['from_table'].upper()
to_table = fk['to_table'].upper()
from_column = fk['from_column'].upper()
to_column = fk['to_column'].upper()
# Add relationship to the from_table
if from_table not in table_relationships:
table_relationships[from_table] = []
relationship = {
'name': f"{from_table}_{to_table}_{from_column}",
'to_table': to_table,
'type': 'many_to_one', # Assuming FK relationships are many-to-one
'on': [
{
'from_column': from_column,
'to_column': to_column
}
]
}
table_relationships[from_table].append(relationship)
print(f" 🔗 Relationship: {from_table}.{from_column} -> {to_table}.{to_column}")
# Auto-detect relationships if no explicit foreign keys
elif len(tables) > 1:
table_names = list(tables.keys())
for i, table1 in enumerate(table_names):
table1_upper = table1.upper()
table1_cols = [col['name'].upper() for col in tables[table1]]
for j, table2 in enumerate(table_names[i+1:], i+1):
table2_upper = table2.upper()
table2_cols = [col['name'].upper() for col in tables[table2]]
# Look for matching ID columns
for col1 in table1_cols:
if col1.endswith('ID') and col1 in table2_cols:
if table1_upper not in table_relationships:
table_relationships[table1_upper] = []
relationship = {
'name': f"{table1_upper}_{table2_upper}_{col1}",
'to_table': table2_upper,
'type': 'many_to_one',
'on': [
{
'from_column': col1,
'to_column': col1
}
]
}
table_relationships[table1_upper].append(relationship)
print(f" 🔗 Auto-detected relationship: {table1_upper}.{col1} -> {table2_upper}.{col1}")
break
return table_relationships
def create_model_tml(self, tables: Dict, foreign_keys: List, table_guids: Dict = None,
model_name: str = None) -> str:
"""Generate worksheet TML (ORIGINAL APPROACH - keeping for comparison)"""
if not model_name:
model_name = f"demo_worksheet_{datetime.now().strftime('%Y%m%d')}"
worksheet = {
'guid': None,
'worksheet': {
'name': model_name,
'description': 'Auto-generated worksheet from DDL',
'tables': [],
'worksheet_columns': [], # Adding back - but with GUID references
'properties': {
'is_bypass_rls': False,
'join_progressive': True,
'spotter_config': {
'is_spotter_enabled': True
}
}
}
}
# Add tables with joins
for table_name in tables.keys():
table_entry = {'name': table_name.upper()}
# Add FQN (GUID) if available to resolve multiple tables with same name
if table_guids and table_name.upper() in table_guids:
table_entry['fqn'] = table_guids[table_name.upper()]
joins = []
for fk in foreign_keys:
if fk['source_table'] == table_name:
joins.append({
'with': fk['target_table'].upper(),
'referencing_join': f"FK_{table_name.upper()}_{fk['target_table'].upper()}"
})
if joins:
table_entry['joins'] = joins
# Just populate the required 'tables' field with GUID reference
worksheet['worksheet']['tables'].append({
'name': table_name.upper(),
'fqn': table_guids.get(table_name.upper()) if table_guids else f"table_{table_name.lower()}"
})
# Add columns using table GUIDs in expressions
for table_name, columns in tables.items():
table_guid = table_guids.get(table_name.upper()) if table_guids else None
for col in columns:
col_type = 'MEASURE' if 'DECIMAL' in col['type'] else 'ATTRIBUTE'
# Use GUID in expression if available
if table_guid:
expr = f"[{table_guid}].[{col['name']}]"
else:
expr = f"[{table_name.upper()}].[{col['name']}]"
column_def = {
'name': col['name'].upper(),
'data_type': col_type,
'expr': expr
}
worksheet['worksheet']['worksheet_columns'].append(column_def)
return yaml.dump(worksheet, default_flow_style=False, sort_keys=False)
def _map_data_type(self, sql_type: str) -> str:
"""Map SQL data types to ThoughtSpot types"""
sql_type = sql_type.upper()
# DEBUG: Print what we're mapping (commented out for cleaner output)
# print(f" 🔍 Mapping data type: '{sql_type}'")
# Handle NUMBER with precision/scale intelligently
if sql_type.startswith('NUMBER'):
# Extract precision and scale from NUMBER(precision,scale)
if '(' in sql_type and ')' in sql_type:
params = sql_type[sql_type.find('(')+1:sql_type.find(')')].split(',')
if len(params) >= 2:
scale = int(params[1].strip())
result = 'INT64' if scale == 0 else 'DOUBLE'
# print(f" → NUMBER({params[0].strip()},{scale}) → {result}")
return result
else:
# print(f" → NUMBER({params[0].strip()}) → INT64")
return 'INT64' # NUMBER(x) defaults to integer
else:
# print(f" → Plain NUMBER → DOUBLE")
return 'DOUBLE' # Plain NUMBER defaults to double
type_mapping = {
'INT64': 'INT64',
'INT': 'INT64', # FIXED: INT should map to INT64
'INTEGER': 'INT64',
'BIGINT': 'INT64',
'VARCHAR': 'VARCHAR',
'TEXT': 'VARCHAR',
'STRING': 'VARCHAR',
'DATE': 'DATE',
'TIMESTAMP': 'DATE', # Try DATE for TIMESTAMP - DATE fields worked fine
'TIMESTAMP_NTZ': 'DATE', # Try DATE for TIMESTAMP_NTZ - we know DATE works
'DECIMAL': 'DOUBLE',
'FLOAT': 'DOUBLE',
'BOOLEAN': 'BOOL'
}
for sql_key, ts_type in type_mapping.items():
if sql_key in sql_type:
return ts_type
return 'VARCHAR' # Default fallback
def get_connection_by_name(self, connection_name: str) -> Dict:
"""Check if a connection with this name already exists"""
try:
response = self.session.get(
f"{self.base_url}/api/rest/2.0/metadata/search",
params={
"metadata": [{"type": "DATA_SOURCE", "identifier": connection_name}],
"record_size": 10
}
)
if response.status_code == 200:
data = response.json()
if data and len(data) > 0:
return data[0] # Return first matching connection
return None
except Exception as e:
print(f" ⚠️ Could not check existing connections: {e}")
return None
def create_snowflake_schema(self, database: str, schema: str):
"""Create schema in Snowflake via ThoughtSpot connection"""
try:
print(f" 🏗️ Creating schema {database}.{schema}...")
# Use ThoughtSpot's SQL execution API to create schema
create_schema_sql = f"CREATE SCHEMA IF NOT EXISTS {database}.{schema}"
response = self.session.post(
f"{self.base_url}/api/rest/2.0/database/executeQuery",
json={
"sql_query": create_schema_sql,
"connection_guid": self.sf_connection_guid if hasattr(self, 'sf_connection_guid') else None
}
)
if response.status_code == 200:
print(f" ✅ Schema {database}.{schema} created/verified")
else:
print(f" ⚠️ Schema creation response: {response.status_code} - {response.text}")
print(f" 📝 Will proceed assuming schema exists or will be created by table operations")
except Exception as e:
print(f" ⚠️ Could not create schema: {e}")
print(f" 📝 Will proceed assuming schema exists or will be created by table operations")
def ensure_tag_exists(self, tag_name: str) -> bool:
"""
Check if a tag exists, create it if it doesn't.
Args:
tag_name: Name of the tag
Returns:
True if tag exists or was created, False on error
"""
if not tag_name:
# No tag name provided - skip silently
return True
try:
# First, try to get the tag to see if it exists
print(f"[ThoughtSpot] 🔍 Checking if tag '{tag_name}' exists...", flush=True)
search_response = self.session.post(
f"{self.base_url}/api/rest/2.0/tags/search",
json={"tag_identifier": tag_name}
)
print(f"[ThoughtSpot] 🔍 Tag search response: {search_response.status_code}", flush=True)
if search_response.status_code == 200:
tags = search_response.json()
print(f"[ThoughtSpot] 🔍 Tags found: {len(tags) if tags else 0}", flush=True)
if tags and len(tags) > 0:
# Tag exists
tag_id = tags[0].get('id', 'unknown')
print(f"[ThoughtSpot] ✅ Tag '{tag_name}' exists (ID: {tag_id})", flush=True)
return True
elif search_response.status_code == 400:
# 400 might mean tag not found in some ThoughtSpot versions
print(f"[ThoughtSpot] 🔍 Tag search returned 400 - tag likely doesn't exist", flush=True)
else:
print(f"[ThoughtSpot] ⚠️ Tag search error: {search_response.status_code}", flush=True)
try:
print(f"[ThoughtSpot] ⚠️ Response: {search_response.text[:200]}", flush=True)
except:
pass
# Tag doesn't exist - create it
print(f"[ThoughtSpot] 📎 Creating tag '{tag_name}'...", flush=True)
create_response = self.session.post(
f"{self.base_url}/api/rest/2.0/tags/create",
json={"name": tag_name}
)
print(f"[ThoughtSpot] 📎 Create tag response: {create_response.status_code}", flush=True)
if create_response.status_code in [200, 201]:
try:
result = create_response.json()
tag_id = result.get('id', 'unknown')
print(f"[ThoughtSpot] ✅ Tag '{tag_name}' created (ID: {tag_id})", flush=True)
except:
print(f"[ThoughtSpot] ✅ Tag '{tag_name}' created", flush=True)
return True
else:
print(f"[ThoughtSpot] ⚠️ Could not create tag: {create_response.status_code}", flush=True)
try:
print(f"[ThoughtSpot] ⚠️ Response: {create_response.text[:200]}", flush=True)
except:
pass
# Return False - don't silently proceed
return False
except Exception as e:
import traceback
print(f"[ThoughtSpot] ⚠️ Tag check/create error: {str(e)}", flush=True)
print(f"[ThoughtSpot] ⚠️ Traceback: {traceback.format_exc()}", flush=True)
return False
def assign_tags_to_objects(self, object_guids: List[str], object_type: str, tag_name: str) -> bool:
"""
Assign tags to ThoughtSpot objects using REST API v1.
Auto-creates the tag if it doesn't exist.
Args:
object_guids: List of object GUIDs to tag
object_type: Type of objects (LOGICAL_TABLE for tables/models, PINBOARD_ANSWER_BOOK for liveboards)
tag_name: Tag name to assign
Returns:
True if successful, False otherwise
"""
if not tag_name:
# No tag name provided - skip silently (this is expected behavior)
return True
if not object_guids:
return False
try:
import json as json_module
# Ensure tag exists (create if needed)
tag_ready = self.ensure_tag_exists(tag_name)
if not tag_ready:
print(f"[ThoughtSpot] ⚠️ Could not ensure tag exists, skipping assignment", flush=True)
return False
# Use V1 API which actually works
assign_response = self.session.post(
f"{self.base_url}/tspublic/v1/metadata/assigntag",
data={
'id': json_module.dumps(object_guids),
'type': object_type,
'tagname': json_module.dumps([tag_name])
},
headers={
'X-Requested-By': 'ThoughtSpot',
'Content-Type': 'application/x-www-form-urlencoded'
}
)
if assign_response.status_code in [200, 204]:
print(f"[ThoughtSpot] ✅ Tagged {len(object_guids)} {object_type} objects with '{tag_name}'", flush=True)
return True
else:
print(f"[ThoughtSpot] ⚠️ Tag assignment failed: {assign_response.status_code}", flush=True)
print(f"[ThoughtSpot] DEBUG: Response text: {assign_response.text[:500]}", flush=True)
print(f"[ThoughtSpot] DEBUG: Object GUIDs: {object_guids}", flush=True)
print(f"[ThoughtSpot] DEBUG: Object type: {object_type}", flush=True)
return False
except Exception as e:
print(f"[ThoughtSpot] ⚠️ Tag assignment error: {str(e)}", flush=True)
return False
def share_objects(self, object_guids: List[str], object_type: str, share_with: str) -> bool:
"""
Share ThoughtSpot objects with a user or group (can_edit / MODIFY).
Args:
object_guids: GUIDs to share
object_type: 'LOGICAL_TABLE' for models/tables, 'LIVEBOARD' for liveboards
share_with: user email (contains '@') or group name
"""
if not share_with or not object_guids:
return True
principal_type = "USER" if '@' in share_with else "USER_GROUP"
try:
response = self.session.post(
f"{self.base_url}/api/rest/2.0/security/metadata/share",
json={
"permissions": [
{
"principal": {
"identifier": share_with,
"type": principal_type
},
"share_mode": "MODIFY"
}
],
"metadata": [
{"identifier": guid, "type": object_type}
for guid in object_guids
]
}
)
if response.status_code in [200, 204]:
print(f"[ThoughtSpot] ✅ Shared {len(object_guids)} {object_type} with {principal_type} '{share_with}'", flush=True)
return True
else:
print(f"[ThoughtSpot] ⚠️ Share failed: {response.status_code} - {response.text[:200]}", flush=True)
return False
except Exception as e:
print(f"[ThoughtSpot] ⚠️ Share error: {str(e)}", flush=True)
return False
def _generate_demo_names(self, company_name: str = None, use_case: str = None):
"""Generate standardized demo names using DM convention"""
from datetime import datetime
import re
# Get timestamp components
now = datetime.now()
yymmdd = now.strftime('%y%m%d')
hhmmss = now.strftime('%H%M%S')
# Clean and truncate company name (5 chars)
if company_name:
company_clean = re.sub(r'[^a-zA-Z0-9]', '', company_name.upper())[:5]
else:
company_clean = 'DEMO'[:5]
# Clean and truncate use case (3 chars)
if use_case:
usecase_clean = re.sub(r'[^a-zA-Z0-9]', '', use_case.upper())[:3]
else:
usecase_clean = 'GEN'[:3]
# Generate names
base_name = f"DM{yymmdd}_{hhmmss}_{company_clean}_{usecase_clean}"
return {
'schema': base_name,
'connection': f"{base_name}_conn",
'model': f"{base_name}_model",
'base': base_name
}
def deploy_all(self, ddl: str, database: str, schema: str, base_name: str,
connection_name: str = None, company_name: str = None,
use_case: str = None, liveboard_name: str = None,
llm_model: str = None, tag_name: str = None,
liveboard_method: str = None, share_with: str = None,
company_research: str = None,
progress_callback=None) -> Dict:
"""
Deploy complete data model to ThoughtSpot
Args:
ddl: Data Definition Language statements
database: Target database name
schema: Target schema name
connection_name: Optional connection name (auto-generated if not provided)
Returns:
Dict with deployment results and names of created objects
"""
results = {
'success': False,
'connection': None,
'tables': [],
'model': None,
'errors': []
}
table_guids = {} # Store table GUIDs for model creation
def log_progress(message):
"""Helper to log progress both to console and callback"""
# ALWAYS print to console FIRST
import sys
print(f"[ThoughtSpot] {message}", flush=True)
sys.stdout.flush() # Force flush
# Then call callback if provided
if progress_callback:
try:
progress_callback(message)
except Exception as e:
print(f"[Warning] Callback error: {e}", flush=True)
try:
import time
start_time = time.time()
# STEP 0: Authenticate first!
log_progress("Authenticating...")
if not self.authenticate():
raise Exception("ThoughtSpot authentication failed")
auth_time = time.time() - start_time
log_progress(f"[OK] Auth complete ({auth_time:.1f}s)")
# Parse DDL
tables, foreign_keys = self.parse_ddl(ddl)
if not tables:
raise Exception("No tables found in DDL")
# Validate foreign key references before deployment (uses explicit FKs from DDL)
fk_warnings = self.validate_foreign_key_references(tables, foreign_keys)
if fk_warnings:
log_progress(f"[WARN] {len(fk_warnings)} FK warning(s) - joins to missing tables will be skipped")
for warning in fk_warnings:
log_progress(f" {warning}")
# Step 1: Create connection using base name
# base_name is like "DEMO_AMA_12111207_X4R"
# schema is like "DEMO_
# 2111207_X4R_sch"
demo_names = {
'schema': schema,
'connection': f"{base_name}_conn",
'model': f"{base_name}_mdl",
'base': base_name
}
if not connection_name:
connection_name = demo_names['connection']
log_progress(f"Creating connection: {connection_name}...")
# Check if connection already exists first
existing_connection = self.get_connection_by_name(connection_name)
if existing_connection:
connection_guid = existing_connection['header']['id_guid']
connection_fqn = connection_guid
results['connection'] = connection_name
results['connection_guid'] = connection_guid
log_progress(f"[OK] Connection ready")
# Assign tag to existing connection
if tag_name and connection_guid:
log_progress(f"Assigning tag '{tag_name}' to connection...")
self.assign_tags_to_objects([connection_guid], 'DATA_SOURCE', tag_name)
else:
log_progress(f"Creating new connection: {connection_name}")
# base_name already has random suffix (e.g., _UZM) so no need for additional uniqueness
print(f"🔗 Creating connection: {connection_name}")
print(f" Account: '{self.sf_account}' (length: {len(self.sf_account)})")
print(f" User: '{self.sf_user}'")
print(f" Database: '{database}'")
connection_tml_yaml = self.create_connection_tml(connection_name)
# Filter out sensitive info for display - use regex for private key
import re
display_tml = re.sub(r'-----BEGIN ENCRYPTED PRIVATE KEY-----.*?-----END ENCRYPTED PRIVATE KEY-----', '[PRIVATE_KEY_REDACTED]', connection_tml_yaml, flags=re.DOTALL)
passphrase = get_admin_setting('SNOWFLAKE_KP_PASSPHRASE', required=False)
if passphrase:
display_tml = display_tml.replace(passphrase, "[PASSPHRASE_REDACTED]")
print(f"\n📄 TML being sent:\n{display_tml}")
# connection_tml_yaml is already a YAML string from create_connection_tml()
response = self.session.post(
f"{self.base_url}/api/rest/2.0/metadata/tml/import",
json={
"metadata_tmls": [connection_tml_yaml],
"import_policy": "PARTIAL"
}
)
print(f" Response status: {response.status_code}")
if response.status_code != 200:
# Print the actual error response from ThoughtSpot
try:
error_response = response.json()
print(f"❌ Error response: {error_response}")
except:
print(f"❌ Error response (raw): {response.text}")
if response.status_code == 200:
result = response.json()
print(f"📋 Connection response: {result}")
print(f"📋 Response type: {type(result)}")
# Handle both dict and list responses
if isinstance(result, list):
# Response is a list
if len(result) > 0 and result[0].get('response', {}).get('status', {}).get('status_code') == 'OK':
connection_guid = result[0].get('response', {}).get('header', {}).get('id_guid')
print(f"✅ Connection created: {connection_name} (GUID: {connection_guid})")
results['connection'] = connection_name
results['connection_guid'] = connection_guid
# Store connection GUID for table creation
connection_fqn = connection_guid
else:
error_msg = result[0].get('response', {}).get('status', {}).get('error_message', 'Unknown error') if result else 'Empty response'
raise Exception(f"Connection creation failed: {error_msg}")
elif isinstance(result, dict) and result.get('object') and len(result['object']) > 0:
obj = result['object'][0]
if obj.get('status', {}).get('status_code') == 'OK':
connection_guid = obj.get('header', {}).get('id_guid')
print(f"✅ Connection created: {connection_name} (GUID: {connection_guid})")
results['connection'] = connection_name
results['connection_guid'] = connection_guid
# Store connection GUID for table creation
connection_fqn = connection_guid
else:
error_msg = obj.get('status', {}).get('error_message', 'Unknown error')
raise Exception(f"Connection creation failed: {error_msg}")
else:
raise Exception("Connection creation failed: No object in response")
else:
raise Exception(f"Connection creation failed: HTTP {response.status_code}")
# Assign tag to connection
if tag_name and connection_guid:
log_progress(f"Assigning tag '{tag_name}' to connection...")
self.assign_tags_to_objects([connection_guid], 'DATA_SOURCE', tag_name)
# Step 1.5: Schema should already exist (created by demo_prep tool)
print("\n1️⃣.5 Using existing schema in Snowflake...")
# Step 2: Build relationships for tables
print("\n1️⃣.5 Building relationships...")
table_relationships = self._build_table_relationships(tables, foreign_keys)
# Step 2: TWO-PHASE TABLE CREATION (to avoid dependency order issues)
table_count = len(tables)
batch1_start = time.time()
log_progress(f"Batch 1/2: Creating {table_count} tables...")
# PHASE 1: Create all tables WITHOUT joins in ONE batch API call
# Build array of all table TMLs
table_tmls_batch1 = []
table_names_order = [] # Track order for matching response
for table_name, columns in tables.items():
print(f"[ThoughtSpot] Preparing {table_name.upper()}...", flush=True)
table_tml = self.create_table_tml(table_name, columns, connection_name, database, schema, all_tables=None, foreign_keys=foreign_keys)
table_tmls_batch1.append(table_tml)
table_names_order.append(table_name.upper())
# Send all tables in ONE API call
log_progress(f" Sending batch request for {len(table_tmls_batch1)} tables...")
response = self.session.post(
f"{self.base_url}/api/rest/2.0/metadata/tml/import",
json={
"metadata_tmls": table_tmls_batch1,
"import_policy": "PARTIAL",
"create_new": True
}
)
if response.status_code == 200:
result = response.json()
# Handle both response formats (list or dict with 'object' key)
if isinstance(result, list):
objects = result
elif isinstance(result, dict) and 'object' in result:
objects = result['object']
else:
error = f"Batch 1 failed: Unexpected response format: {type(result)}"
log_progress(f" ❌ {error}")
results['errors'].append(error)
return results
# Process each table result
for idx, obj in enumerate(objects):
table_name = table_names_order[idx] if idx < len(table_names_order) else f"TABLE_{idx}"
if obj.get('response', {}).get('status', {}).get('status_code') == 'OK':
table_guid = obj.get('response', {}).get('header', {}).get('id_guid')
print(f"[ThoughtSpot] ✅ {table_name} created", flush=True)
results['tables'].append(table_name)
table_guids[table_name] = table_guid
else:
error_msg = obj.get('response', {}).get('status', {}).get('error_message', 'Unknown error')
error = f"Table {table_name} failed: {error_msg}"
print(f"[ThoughtSpot] ❌ {table_name} failed: {error_msg}", flush=True)
results['errors'].append(error)
else:
error = f"Batch 1 HTTP error: {response.status_code} - {response.text}"
log_progress(f" ❌ {error}")
results['errors'].append(error)
return results
# Check if we created any tables successfully
if not table_guids:
log_progress(" ❌ No tables were created successfully in Batch 1")
return results
# Assign tags to tables
table_guid_list = list(table_guids.values())
print(f"🔍 DEBUG BEFORE TAG CALL: tag_name='{tag_name}', table_guid_list={table_guid_list}")
log_progress(f"Assigning tag '{tag_name}' to {len(table_guid_list)} tables...")
self.assign_tags_to_objects(table_guid_list, 'LOGICAL_TABLE', tag_name)
batch1_time = time.time() - batch1_start
log_progress(f"[OK] Batch 1 complete: {len(table_guids)} tables created ({batch1_time:.1f}s)")
# PHASE 2: Update tables WITH joins in ONE batch API call
batch2_start = time.time()
log_progress(f"Batch 2/2: Adding joins to {len(table_guids)} tables...")
# Build array of all table update TMLs (with joins)
table_tmls_batch2 = []
table_names_order_batch2 = []
for table_name, columns in tables.items():
table_name_upper = table_name.upper()
# Only add joins if the table was created successfully in Phase 1
if table_name_upper not in table_guids:
print(f"[ThoughtSpot] Skipping {table_name_upper} (not created)", flush=True)
continue
# Get the GUID for this table
table_guid = table_guids[table_name_upper]
print(f"[ThoughtSpot] Preparing joins for {table_name_upper}...", flush=True)
# Create table TML WITH joins_with section AND the table GUID
table_tml = self.create_table_tml(
table_name, columns, connection_name, database, schema,
all_tables=tables, table_guid=table_guid, foreign_keys=foreign_keys
)
table_tmls_batch2.append(table_tml)
table_names_order_batch2.append(table_name_upper)
# Send all table updates in ONE API call
if table_tmls_batch2:
log_progress(f" Sending batch request to add joins to {len(table_tmls_batch2)} tables...")
response = self.session.post(
f"{self.base_url}/api/rest/2.0/metadata/tml/import",
json={
"metadata_tmls": table_tmls_batch2,
"import_policy": "PARTIAL",
"create_new": False # Update existing tables
}
)
if response.status_code == 200:
result = response.json()
# Handle both response formats
if isinstance(result, list):
objects = result
elif isinstance(result, dict) and 'object' in result:
objects = result['object']
else:
objects = []
# Process each result
for idx, obj in enumerate(objects):
table_name = table_names_order_batch2[idx] if idx < len(table_names_order_batch2) else f"TABLE_{idx}"
if obj.get('response', {}).get('status', {}).get('status_code') == 'OK':
print(f"[ThoughtSpot] ✅ {table_name} joins added", flush=True)
else:
error_msg = obj.get('response', {}).get('status', {}).get('error_message', 'Unknown error')
print(f"[ThoughtSpot] ⚠️ {table_name} joins failed: {error_msg}", flush=True)
results['errors'].append(f"Joins for {table_name} failed: {error_msg}")
else:
log_progress(f" ⚠️ Batch 2 HTTP error: {response.status_code}")
batch2_time = time.time() - batch2_start
log_progress(f"[OK] Batch 2 complete: Joins added ({batch2_time:.1f}s)")
actual_constraint_ids = {} # We'll generate these for the model
# Skip separate relationship creation for now
# print("\n2️⃣.5 Creating relationships separately...")
# self.create_relationships_separately(table_relationships, table_guids)
# Step 3: Extract constraint IDs from created tables
table_constraints = {}
for table_name, table_guid in table_guids.items():
print(f"[ThoughtSpot] Extracting joins from {table_name}...", flush=True)
# Export table TML to get constraint IDs
export_response = self.session.post(
f"{self.base_url}/api/rest/2.0/metadata/tml/export",
json={
"metadata": [{"identifier": table_guid, "type": "LOGICAL_TABLE"}],
"export_associated": False,
"format_type": "YAML"
}
)
if export_response.status_code == 200:
tml_data = export_response.json()
if tml_data and 'edoc' in tml_data[0]:
import json
tml_json = json.loads(tml_data[0]['edoc'])
# Extract joins_with constraint IDs
joins_with = tml_json.get('table', {}).get('joins_with', [])
if joins_with:
table_constraints[table_name] = []
for join in joins_with:
constraint_id = join.get('name')
destination = join.get('destination', {}).get('name')
if constraint_id and destination:
table_constraints[table_name].append({
'constraint_id': constraint_id,
'destination': destination
})
# Step 4: Create model (semantic layer) with constraint references
model_start = time.time()
model_name = demo_names['model']
log_progress(f"Creating model: {model_name}...")
# Use the enhanced model creation that includes constraint references
model_tml = self._create_model_with_constraints(tables, foreign_keys, table_guids, table_constraints, model_name, connection_name)
print(f"\n📄 Model TML being sent:\n{model_tml}")
response = self.session.post(
f"{self.base_url}/api/rest/2.0/metadata/tml/import",
json={
"metadata_tmls": [model_tml],
"import_policy": "ALL_OR_NONE",
"create_new": True
}
)
if response.status_code == 200:
result = response.json()
# Handle both response formats (list or dict with 'object' key)
if isinstance(result, list):
objects = result
elif isinstance(result, dict) and 'object' in result:
objects = result['object']
else:
error = f"Model failed: Unexpected response format: {type(result)}"
log_progress(f" ❌ {error}")
results['errors'].append(error)
objects = []
if objects and len(objects) > 0:
if objects[0].get('response', {}).get('status', {}).get('status_code') == 'OK':
model_guid = objects[0].get('response', {}).get('header', {}).get('id_guid')
model_time = time.time() - model_start
log_progress(f"[OK] Model created ({model_time:.1f}s)")
results['model'] = model_name
results['model_guid'] = model_guid
# Assign tag to model
print(f"🔍 DEBUG BEFORE TAG CALL: tag_name='{tag_name}', model_guid='{model_guid}'")
log_progress(f"Assigning tag '{tag_name}' to model...")
self.assign_tags_to_objects([model_guid], 'LOGICAL_TABLE', tag_name)
# Share model
_effective_share = share_with or get_admin_setting('SHARE_WITH', required=False)
if _effective_share:
log_progress(f"Sharing model with '{_effective_share}'...")
self.share_objects([model_guid], 'LOGICAL_TABLE', _effective_share)
# Step 3.5: Enable Spotter + enrich model semantics in a single export→update→reimport
# (create_new=True import ignores spotter_config, so we always re-import here)
try:
export_resp = self.session.post(
f"{self.base_url}/api/rest/2.0/metadata/tml/export",
json={
"metadata": [{"identifier": model_guid, "type": "LOGICAL_TABLE"}],
"export_associated": False,
"format_type": "YAML"
}
)
if export_resp.status_code == 200:
export_data = export_resp.json()
if export_data and 'edoc' in export_data[0]:
model_tml_dict = json.loads(export_data[0]['edoc'])
# Enable Spotter
model_tml_dict.setdefault('model', {}).setdefault('properties', {})['spotter_config'] = {'is_spotter_enabled': True}
# Enrich with description, synonyms, and AI context
if company_research:
try:
from model_semantic_updater import ModelSemanticUpdater
log_progress("Generating model description, synonyms, and AI context...")
sem_start = time.time()
updater = ModelSemanticUpdater(self, llm_model=llm_model)
model_description = updater.generate_model_description(
company_research=company_research,
use_case=use_case or '',
company_name=company_name or '',
model_name=model_name,
)
column_semantics = updater.generate_column_semantics(
company_research=company_research,
model_tml=model_tml_dict,
use_case=use_case or '',
company_name=company_name or '',
)
# Apply to TML dict in-place (returns YAML string)
enriched_yaml = updater.apply_to_model_tml(
model_tml_dict, column_semantics, model_description
)
# Parse back so we can still dump consistently below
model_tml_dict = yaml.safe_load(enriched_yaml)
sem_time = time.time() - sem_start
log_progress(f"[OK] Semantics generated: {len(column_semantics)} columns enriched ({sem_time:.1f}s)")
except Exception as sem_err:
log_progress(f"[WARN] Semantic enrichment failed (non-fatal): {sem_err}")
updated_tml = yaml.dump(model_tml_dict, allow_unicode=True, sort_keys=False)
update_resp = self.session.post(
f"{self.base_url}/api/rest/2.0/metadata/tml/import",
json={
"metadata_tmls": [updated_tml],
"import_policy": "ALL_OR_NONE",
"create_new": False
}
)
if update_resp.status_code == 200:
log_progress("🤖 Spotter enabled + model semantics applied")
else:
log_progress(f"🤖 Model update failed: HTTP {update_resp.status_code} — {update_resp.text[:200]}")
else:
log_progress("🤖 Spotter enable: export returned no edoc")
else:
log_progress(f"🤖 Spotter enable: export failed HTTP {export_resp.status_code}")
except Exception as spotter_error:
log_progress(f"🤖 Spotter/semantics exception: {spotter_error}")
# Step 4: Auto-create Liveboard from model
lb_start = time.time()
# HYBRID is the only supported liveboard creation method
method = 'HYBRID'
log_progress(f"Creating liveboard ({method} method)...")
try:
# Build company data from parameters
# Clean company name for display (strip .com, .org, etc)
clean_company = company_name.split('.')[0].title() if company_name and '.' in company_name else (company_name or 'Demo Company')
company_data = {
'name': clean_company,
'use_case': use_case or 'General Analytics'
}
if method == 'HYBRID':
# HYBRID: MCP creates liveboard, TML post-processes
from liveboard_creator import create_liveboard_from_model_mcp, create_liveboard_from_model, enhance_mcp_liveboard
# Get actual column names from ThoughtSpot model (not DDL)
# This is critical because TS may rename columns to make them unique
# e.g., PROCESSING_FEE becomes gift_processing_fee and tran_processing_fee
model_columns = self.get_model_columns(model_guid)
if not model_columns:
log_progress(f" ⚠️ Could not get model columns, falling back to DDL")
model_columns = []
for table_name, columns_list in tables.items():
for col in columns_list:
model_columns.append(col)
# Get liveboard questions from the vertical×function config
outlier_dicts = []
try:
from demo_personas import parse_use_case, get_use_case_config
uc_vertical, uc_function = parse_use_case(use_case or '')
uc_config = get_use_case_config(uc_vertical or "Generic", uc_function or "Generic")
lq = uc_config.get("liveboard_questions", [])
required_qs = [q for q in lq if q.get("required")]
optional_qs = [q for q in lq if not q.get("required")]
for q in required_qs:
outlier_dicts.append({
'title': q['title'],
'insight': q.get('insight', ''),
'viz_type': q['viz_type'],
'show_me_query': q['viz_question'],
'kpi_companion': True,
'spotter_questions': q.get('spotter_qs', []),
})
for q in optional_qs[:2]:
outlier_dicts.append({
'title': q['title'],
'insight': q.get('insight', ''),
'viz_type': q['viz_type'],
'show_me_query': q['viz_question'],
'kpi_companion': False,
'spotter_questions': q.get('spotter_qs', []),
})
if outlier_dicts:
log_progress(f" [MCP] Using {len(outlier_dicts)} liveboard questions from {uc_vertical}×{uc_function}")
except Exception as outlier_err:
log_progress(f" [MCP] Liveboard questions loading skipped: {outlier_err}")
# MCP creates liveboard
if method == 'HYBRID':
log_progress(f" Step 1/2: MCP creating initial liveboard...")
log_progress(f" [MCP] Model: {model_name}, GUID: {model_guid}")
log_progress(f" [MCP] Use case: {use_case or 'General Analytics'}")
log_progress(f" [MCP] Using {len(model_columns)} columns from ThoughtSpot model")
try:
liveboard_result = create_liveboard_from_model_mcp(
ts_client=self,
model_id=model_guid,
model_name=model_name,
company_data=company_data,
use_case=use_case or 'General Analytics',
num_visualizations=10,
liveboard_name=liveboard_name,
outliers=outlier_dicts if outlier_dicts else None,
llm_model=llm_model,
model_columns=model_columns,
prompt_logger=self.prompt_logger
)
except Exception as mcp_error:
import traceback
error_trace = traceback.format_exc()
log_progress(f" [MCP ERROR] {type(mcp_error).__name__}: {str(mcp_error)}")
liveboard_result = {'success': False, 'error': str(mcp_error), 'traceback': error_trace}
# Log result
if liveboard_result.get('success'):
log_progress(f" [MCP] Liveboard created: {liveboard_result.get('liveboard_guid')}")
else:
log_progress(f" [MCP FAILED] {liveboard_result.get('error', 'Unknown error')}")
log_progress(" [FALLBACK] Trying direct TML liveboard creation...")
try:
liveboard_result = create_liveboard_from_model(
ts_client=self,
model_id=model_guid,
model_name=model_name,
company_data=company_data,
use_case=use_case or 'General Analytics',
num_visualizations=10,
liveboard_name=liveboard_name,
llm_model=llm_model,
outliers=outlier_dicts if outlier_dicts else None,
model_columns=model_columns,
prompt_logger=self.prompt_logger
)
if liveboard_result.get('success'):
log_progress(f" [FALLBACK OK] Liveboard created: {liveboard_result.get('liveboard_guid')}")
else:
log_progress(f" [FALLBACK FAILED] {liveboard_result.get('error', 'Unknown error')}")
except Exception as fallback_error:
import traceback
fallback_trace = traceback.format_exc()
log_progress(f" [FALLBACK ERROR] {type(fallback_error).__name__}: {str(fallback_error)}")
liveboard_result = {
'success': False,
'error': f"MCP failed and fallback failed: {fallback_error}",
'traceback': fallback_trace
}
# HYBRID: Add TML enhancement if MCP succeeded
if method == 'HYBRID' and liveboard_result.get('success') and liveboard_result.get('liveboard_guid'):
log_progress(f" Step 2/2: Enhancing with TML post-processing...")
enhance_result = enhance_mcp_liveboard(
liveboard_guid=liveboard_result['liveboard_guid'],
company_data=company_data,
ts_client=self,
add_groups=True,
fix_kpis=True,
apply_brand_colors=True
)
if enhance_result.get('success'):
log_progress(f" [OK] Enhancement applied")
else:
enhance_err = f"TML enhancement failed: {enhance_result.get('message', 'unknown')[:100]}"
log_progress(f" [ERROR] {enhance_err}")
results['errors'].append(enhance_err)
# Check result
print(f"🔍 DEBUG: Liveboard result received: {liveboard_result}")
print(f"🔍 DEBUG: Success flag: {liveboard_result.get('success')}")
if liveboard_result.get('success'):
lb_time = time.time() - lb_start
log_progress(f"[OK] Liveboard created via {method} ({lb_time:.1f}s)")
results['liveboard'] = liveboard_result.get('liveboard_name')
results['liveboard_guid'] = liveboard_result.get('liveboard_guid')
results['liveboard_method'] = method
if liveboard_result.get('liveboard_url'):
results['liveboard_url'] = liveboard_result.get('liveboard_url')
# Assign tag to liveboard
if tag_name and liveboard_result.get('liveboard_guid'):
log_progress(f"Assigning tag '{tag_name}' to liveboard...")
self.assign_tags_to_objects([liveboard_result['liveboard_guid']], 'PINBOARD_ANSWER_BOOK', tag_name)
# Share liveboard
_effective_share = share_with or get_admin_setting('SHARE_WITH', required=False)
if _effective_share and liveboard_result.get('liveboard_guid'):
log_progress(f"Sharing liveboard with '{_effective_share}'...")
self.share_objects([liveboard_result['liveboard_guid']], 'LIVEBOARD', _effective_share)
else:
error = f"Liveboard creation failed: {liveboard_result.get('error', 'Unknown error')}"
print(f"❌ DEBUG: Liveboard creation failed! Error: {error}")
results['errors'].append(error)
log_progress(f"[ERROR] {error}")
except Exception as lb_error:
error = f"Liveboard creation exception: {str(lb_error)}"
results['errors'].append(error)
log_progress(f"[ERROR] {error}")
else:
# Extract detailed error information
obj_response = objects[0].get('response', {})
status = obj_response.get('status', {})
error_message = status.get('error_message', 'Unknown error')
# Clean HTML tags from error message (ThoughtSpot sometimes returns HTML)
error_message = re.sub(r'<[^>]+>', '', error_message).strip()
if not error_message:
error_message = 'Schema validation failed (no details provided)'
error_code = status.get('error_code', 'N/A')
# Try to extract additional error details from various response fields
error_details = []
# Check for detailed error messages in different response structures
if 'error_details' in status:
error_details.append(f"Error details: {status.get('error_details')}")
if 'validation_errors' in obj_response:
error_details.append(f"Validation errors: {obj_response.get('validation_errors')}")
if 'warnings' in obj_response:
error_details.append(f"Warnings: {obj_response.get('warnings')}")
# Check header for additional info
header = obj_response.get('header', {})
if 'error' in header:
error_details.append(f"Header error: {header.get('error')}")
# Get any additional error details
full_response = json.dumps(objects[0], indent=2)
# Save the TML that failed for debugging
import tempfile
# os is already imported at module level
try:
debug_dir = os.path.join(tempfile.gettempdir(), 'thoughtspot_debug')
os.makedirs(debug_dir, exist_ok=True)
failed_tml_path = os.path.join(debug_dir, f'failed_model_{datetime.now().strftime("%Y%m%d_%H%M%S")}.tml')
with open(failed_tml_path, 'w') as f:
f.write(model_tml)
log_progress(f"💾 Failed TML saved to: {failed_tml_path}")
print(f"💾 Failed TML saved to: {failed_tml_path}")
except Exception as save_error:
log_progress(f"[WARN] Could not save failed TML: {save_error}")
# Build comprehensive error message
error = f"Model validation failed: {error_message}"
if error_code != 'N/A':
error += f" (Error code: {error_code})"
if error_details:
error += f"\n\nAdditional details:\n" + "\n".join(error_details)
print(f"📋 Full model response: {full_response}") # DEBUG: Show full response
print(f" ❌ {error}")
log_progress(f" ❌ {error}")
log_progress(f" 📋 Full response details:")
log_progress(f"{full_response}")
# Log full TML for debugging
log_progress(f"\n📄 TML that was sent:\n{model_tml}")
results['errors'].append(error)
results['errors'].append(f"Full API response: {full_response}")
results['errors'].append(f"Failed TML saved to: {failed_tml_path if 'failed_tml_path' in locals() else 'N/A'}")
else:
error = "Model failed: No objects in response"
log_progress(f" ❌ {error}")
results['errors'].append(error)
# Mark as successful if we got this far
results['success'] = len(results['errors']) == 0
except Exception as e:
import traceback
error_msg = str(e)
full_trace = traceback.format_exc()
# Log to console with full details
print(f"\n{'='*60}")
print(f"❌ DEPLOYMENT EXCEPTION")
print(f"{'='*60}")
print(f"Error: {error_msg}")
print(f"\nFull traceback:")
print(full_trace)
print(f"{'='*60}\n")
# Log through callback too
log_progress(f"[ERROR] Deployment failed: {error_msg}")
log_progress(f"Traceback: {full_trace}")
results['errors'].append(error_msg)
results['errors'].append(f"Traceback: {full_trace}")
return results
def deploy_to_thoughtspot(ddl: str, database: str, schema: str,
connection_name: str = None, company_name: str = None,
use_case: str = None, progress_callback=None) -> Dict:
"""
Convenience function for deploying to ThoughtSpot
Args:
ddl: Data Definition Language statements
database: Target database name
schema: Target schema name
connection_name: Optional connection name
progress_callback: Optional callback for progress updates
Returns:
Dict with deployment results
"""
deployer = ThoughtSpotDeployer()
return deployer.deploy_all(ddl, database, schema, connection_name, company_name, use_case, progress_callback)
if __name__ == "__main__":
# Example usage
test_ddl = """
CREATE TABLE CUSTOMERS (
CUSTOMERID INT64 PRIMARY KEY,
NAME VARCHAR(255)
);
"""
# Test deployment - using a schema that exists
results = deploy_to_thoughtspot(
ddl=test_ddl,
database="DEMOBUILD", # Use the actual Snowflake database
schema="THOUGHTSPO_SALESA_20250915_193303" # Use the working schema from your working table
)
print("\n" + "=" * 60)
print("📊 DEPLOYMENT RESULTS:")
print("=" * 60)
print(json.dumps(results, indent=2))