#!/usr/bin/env python3 """ ThoughtSpot Deployment Module A comprehensive tool for deploying data models to ThoughtSpot: - Creates Snowflake connections - Parses DDL and creates tables - Generates and deploys models Usage: from thoughtspot_deployer import ThoughtSpotDeployer deployer = ThoughtSpotDeployer() results = deployer.deploy_all(ddl, database, schema) """ import os from supabase_client import get_admin_setting import re import yaml import json import requests import snowflake.connector from datetime import datetime from typing import Dict, List, Optional, Tuple from dotenv import load_dotenv from snowflake_auth import get_snowflake_connection_params # Load environment variables load_dotenv() def _safe_print(*args, **kwargs): """Print that ignores BrokenPipeError - prevents crashes when output is closed.""" try: print(*args, **kwargs) except BrokenPipeError: pass def _apply_naming_style(name: str, style: str = "snake_case") -> str: """ Convert column name to specified naming style for ThoughtSpot display. Args: name: Original column name style: Naming style - one of: Regular Case, snake_case, camelCase, PascalCase, UPPER_CASE, original Examples (for "SHIPPING_MODE"): Regular Case → Shipping Mode snake_case → shipping_mode camelCase → shippingMode PascalCase → ShippingMode UPPER_CASE → SHIPPING_MODE original → SHIPPING_MODE (unchanged) """ name = name.strip() if style == "original": return name if style == "UPPER_CASE": return name.upper().replace(" ", "_") # Split into words (handle underscores, spaces, and camelCase) import re # Split on underscores, spaces, or camelCase boundaries words = re.split(r'[_\s]+', name) # Further split camelCase words expanded_words = [] for word in words: # Split on camelCase boundaries (e.g., "firstName" -> ["first", "Name"]) parts = re.findall(r'[A-Z]?[a-z]+|[A-Z]+(?=[A-Z][a-z]|\d|\W|$)|\d+', word) if parts: expanded_words.extend(parts) else: expanded_words.append(word) words = [w.lower() for w in expanded_words if w] if not words: return name.lower() if style == "Regular Case": # Title case each word, join with spaces: STATE_ID -> State Id return " ".join(w.capitalize() for w in words) if style == "snake_case": return "_".join(words) elif style == "camelCase": # First word lowercase, rest capitalized return words[0] + "".join(w.capitalize() for w in words[1:]) elif style == "PascalCase": # All words capitalized return "".join(w.capitalize() for w in words) else: # Default to snake_case return "_".join(words) def _to_snake_case(name: str) -> str: """ Legacy function - converts to snake_case. Use _apply_naming_style() for more options. """ return _apply_naming_style(name, "snake_case") class ThoughtSpotDeployer: """ThoughtSpot deployment automation""" def __init__(self, base_url: str = None, username: str = None, secret_key: str = None): """ Initialize ThoughtSpot deployer (trusted auth only) Reads from environment variables if not passed directly. Env vars are populated from Supabase admin settings at login time. Raises ValueError if any required setting is missing. """ self.base_url = base_url if base_url else get_admin_setting('THOUGHTSPOT_URL') if not username: raise ValueError("ThoughtSpotDeployer requires username — pass the logged-in user's email") self.username = username if not secret_key: raise ValueError("ThoughtSpotDeployer requires secret_key — pass the trusted auth key for the selected environment") self.secret_key = secret_key # Snowflake connection details from environment (key pair auth) self.sf_account = get_admin_setting('SNOWFLAKE_ACCOUNT') self.sf_user = get_admin_setting('SNOWFLAKE_KP_USER') self.sf_role = get_admin_setting('SNOWFLAKE_ROLE') self.sf_warehouse = get_admin_setting('SNOWFLAKE_WAREHOUSE') self.headers = { 'Content-Type': 'application/json', 'X-Requested-By': 'ThoughtSpot' } # Use session to maintain cookies between requests self.session = requests.Session() self.session.headers.update(self.headers) # Column naming style for ThoughtSpot model columns # Options: Regular Case, snake_case, camelCase, PascalCase, UPPER_CASE, original self.column_naming_style = "Regular Case" # Per-session prompt logger — set by the chat controller after construction self.prompt_logger = None # Validate credentials for trusted auth if not all([self.base_url, self.username, self.secret_key]): raise ValueError("Missing ThoughtSpot URL, username, or trusted auth key") if not all([self.sf_account, self.sf_user, self.sf_role, self.sf_warehouse]): raise ValueError("Missing required Snowflake credentials in environment variables") def _get_private_key_for_thoughtspot(self) -> str: """Get private key in format suitable for ThoughtSpot TML""" private_key_raw = get_admin_setting('SNOWFLAKE_KP_PK') if not private_key_raw: raise ValueError("SNOWFLAKE_KP_PK environment variable not set") # ThoughtSpot expects the private key as raw PEM format string if not private_key_raw.startswith('-----BEGIN'): # If it's base64 encoded, decode it import base64 try: private_key_raw = base64.b64decode(private_key_raw).decode('utf-8') except Exception: pass return private_key_raw def authenticate(self) -> bool: """Authenticate with ThoughtSpot using trusted authentication""" return self.authenticate_trusted() def authenticate_trusted(self) -> bool: """Authenticate with ThoughtSpot using trusted authentication (secret key)""" try: auth_url = f"{self.base_url}/api/rest/2.0/auth/token/full" print(f" 🔐 Attempting trusted authentication to: {auth_url}") print(f" 👤 Username: {self.username}") print(f" 🔑 Using secret key: {self.secret_key[:8]}...{self.secret_key[-4:]}" if self.secret_key and len(self.secret_key) > 12 else " 🔑 Using secret key") response = self.session.post( auth_url, json={ "username": self.username, "secret_key": self.secret_key, "validity_time_in_sec": 3600 # 1 hour token } ) print(f" 📡 HTTP Status: {response.status_code}") if response.status_code == 200: result = response.json() if 'token' in result: # Use the token as bearer auth self.session.headers['Authorization'] = f'Bearer {result["token"]}' print(" ✅ Trusted authentication successful (bearer token)") return True else: print(f" ❌ No token in response: {result}") return False elif response.status_code == 204: # Session cookie auth print(" ✅ Trusted authentication successful (session cookies)") return True else: print(f" ❌ HTTP Error {response.status_code}: {response.text}") return False except Exception as e: print(f" 💥 Trusted authentication exception: {e}") return False def authenticate_oauth(self, timeout: int = 120) -> bool: """ Authenticate with ThoughtSpot using browser-based SSO (Okta, SAML, etc.) Opens browser to ThoughtSpot login, user authenticates via SSO, and cookies are captured via a local callback server. Args: timeout: Seconds to wait for authentication (default 120) Returns: True if authentication successful, False otherwise """ import webbrowser import http.server import socketserver import threading import urllib.parse print(f" 🔐 Starting OAuth/SSO authentication for: {self.base_url}") print(f" 👤 User: {self.username or 'SSO user'}") # Find an available port for the callback server callback_port = 8765 for port in range(8765, 8800): try: with socketserver.TCPServer(("", port), None) as test: callback_port = port break except OSError: continue callback_url = f"http://localhost:{callback_port}/callback" auth_complete = threading.Event() auth_success = [False] # Use list to allow modification in nested function class OAuthCallbackHandler(http.server.BaseHTTPRequestHandler): def log_message(self, format, *args): pass # Suppress logging def do_GET(self): if self.path.startswith('/callback'): # Authentication completed - show success page self.send_response(200) self.send_header('Content-type', 'text/html') self.end_headers() # Page that extracts cookies and displays success html = """ ThoughtSpot Authentication

Authentication Successful!

You can close this window and return to the application.

""" self.wfile.write(html.encode()) auth_success[0] = True auth_complete.set() elif self.path == '/check': # Health check endpoint self.send_response(200) self.send_header('Content-type', 'text/plain') self.end_headers() self.wfile.write(b'OK') else: self.send_response(404) self.end_headers() # Start callback server in background thread server = socketserver.TCPServer(("", callback_port), OAuthCallbackHandler) server_thread = threading.Thread(target=server.handle_request) server_thread.daemon = True server_thread.start() # Build the SSO login URL # ThoughtSpot redirects to SSO provider, then back to ThoughtSpot, then to our callback ts_login_url = f"{self.base_url}/?redirectURL={urllib.parse.quote(callback_url)}" print(f" 🌐 Opening browser for SSO login...") print(f" 📍 Callback URL: {callback_url}") print(f" ⏳ Waiting up to {timeout} seconds for authentication...") # Open browser to ThoughtSpot login webbrowser.open(ts_login_url) # Wait for authentication to complete if auth_complete.wait(timeout=timeout): if auth_success[0]: print(" ✅ Browser authentication completed!") # Now we need to get the session from ThoughtSpot # The user authenticated in the browser, so we need to get a session token # We'll use the session/token endpoint to get a token for API calls # Try to get a session token using the trusted auth flow # Since user is now logged in via browser, we attempt to get session info try: # Check if session is valid by calling a simple API endpoint # First, let's try to get current user info user_response = self.session.get( f"{self.base_url}/api/rest/2.0/auth/session/user", timeout=10 ) if user_response.status_code == 200: user_info = user_response.json() print(f" ✅ Session active for: {user_info.get('name', 'unknown')}") return True else: # Browser auth completed but we don't have cookies in our session # This is expected - browser and Python have separate cookie jars print(" ⚠️ Browser authenticated but Python session needs cookies") print(" 💡 For full OAuth support, please use the browser-based workflow") print(" 💡 Or configure trusted authentication on ThoughtSpot") return False except Exception as e: print(f" ⚠️ Could not verify session: {e}") return False else: print(" ❌ Authentication callback received but marked as failed") return False else: print(" ❌ Authentication timed out") server.shutdown() return False def get_model_columns(self, model_guid: str) -> List[Dict]: """ Get actual column names from a ThoughtSpot model. This is important because ThoughtSpot may rename columns to make them unique (e.g., PROCESSING_FEE becomes gift_processing_fee and tran_processing_fee). Args: model_guid: GUID of the ThoughtSpot model Returns: List of column dicts with 'name' and 'type' keys """ try: # Export the model TML to get actual column names export_response = self.session.post( f"{self.base_url}/api/rest/2.0/metadata/tml/export", json={ 'metadata': [{'identifier': model_guid}], 'export_associated': False } ) if export_response.status_code != 200: print(f" ⚠️ Could not export model TML: HTTP {export_response.status_code}") return [] tml_data = export_response.json() if not tml_data or len(tml_data) == 0: print(f" ⚠️ Empty TML export response") return [] # Parse YAML TML tml_str = tml_data[0].get('edoc', '') model_tml = yaml.safe_load(tml_str) if not model_tml or 'model' not in model_tml: print(f" ⚠️ Invalid model TML structure") return [] # Extract columns with their actual names from the model columns = [] for col in model_tml.get('model', {}).get('columns', []): col_name = col.get('name', '') col_props = col.get('properties', {}) col_type = col_props.get('column_type', 'ATTRIBUTE') # Map ThoughtSpot column types to SQL-like types for AI understanding if col_type == 'MEASURE': sql_type = 'NUMBER' # Measures are numeric elif col_props.get('calendar'): sql_type = 'DATE' # Calendar attribute = date column else: sql_type = 'VARCHAR' # Other attributes are typically strings columns.append({ 'name': col_name, 'type': sql_type, 'ts_type': col_type # Keep original for reference }) print(f" 📊 Got {len(columns)} columns from ThoughtSpot model") return columns except Exception as e: print(f" ⚠️ Error getting model columns: {e}") return [] def parse_ddl(self, ddl: str) -> Tuple[Dict, List]: """ Parse DDL to extract table definitions and foreign key relationships Returns: Tuple of (tables_dict, foreign_keys_list) """ tables = {} foreign_keys = [] # Find all CREATE TABLE statements table_pattern = ( r'CREATE\s+TABLE\s+(?:IF\s+NOT\s+EXISTS\s+)?' r'(?:"?([A-Za-z0-9_]+)"?)\s*\((.*?)\)\s*;' ) for match in re.finditer(table_pattern, ddl, re.IGNORECASE | re.DOTALL): table_name = match.group(1).upper() columns_text = match.group(2) columns = [] # Parse each column definition - PROPERLY FIXED parsing # Split by comma but be careful of commas inside parentheses column_lines = [] current_line = "" paren_count = 0 for char in columns_text: if char == '(': paren_count += 1 elif char == ')': paren_count -= 1 elif char == ',' and paren_count == 0: column_lines.append(current_line.strip()) current_line = "" continue current_line += char # Add the last line if current_line.strip(): column_lines.append(current_line.strip()) for line in column_lines: line = line.strip() line_upper = line.upper() def _normalize_table_name(raw_name: str) -> str: # Handle optional quoting and optional DB/SCHEMA qualifiers. normalized = raw_name.replace('"', '').strip() if '.' in normalized: normalized = normalized.split('.')[-1] return normalized.upper() # Parse FK constraints in any common table-level form: # 1) FOREIGN KEY (COL) REFERENCES TBL(COL) # 2) CONSTRAINT FK_NAME FOREIGN KEY (COL) REFERENCES TBL(COL) fk_match = re.search( r'FOREIGN\s+KEY\s*\((\w+)\)\s*REFERENCES\s+([A-Za-z0-9_".]+)\s*\((\w+)\)', line, re.IGNORECASE ) if fk_match: from_col = fk_match.group(1).upper() to_table = _normalize_table_name(fk_match.group(2)) to_col = fk_match.group(3).upper() foreign_keys.append({ 'from_table': table_name, 'from_column': from_col, 'to_table': to_table, 'to_column': to_col }) print(f" 🔗 Found FK: {table_name}.{from_col} -> {to_table}.{to_col}") continue # Parse inline FK form in column definitions: # COL_NAME REFERENCES TARGET_TABLE(TARGET_COL) inline_fk_match = re.search( r'^(\w+)\s+.+?\s+REFERENCES\s+([A-Za-z0-9_".]+)\s*\((\w+)\)', line, re.IGNORECASE ) if inline_fk_match: from_col = inline_fk_match.group(1).upper() to_table = _normalize_table_name(inline_fk_match.group(2)) to_col = inline_fk_match.group(3).upper() foreign_keys.append({ 'from_table': table_name, 'from_column': from_col, 'to_table': to_table, 'to_column': to_col }) print(f" 🔗 Found inline FK: {table_name}.{from_col} -> {to_table}.{to_col}") if not line_upper.startswith(('PRIMARY KEY', 'CONSTRAINT', 'FOREIGN KEY', 'UNIQUE', 'CHECK', 'INDEX')): # Parse: COLUMNNAME DATATYPE(params) [IDENTITY] [NOT NULL] parts = line.split() if len(parts) >= 2: col_name_original = parts[0] # Preserve original casing for display name col_name = parts[0].upper() # Uppercase for DB reference # Get the FULL data type including parameters - HANDLE IDENTITY! col_type_match = re.match(r'(\w+(?:\([^)]+\))?)', parts[1]) col_type = col_type_match.group(1).upper() if col_type_match else parts[1].upper() columns.append({ 'name': col_name, 'original_name': col_name_original, # Keep original for naming style 'type': col_type, 'nullable': 'NOT NULL' not in line.upper() }) tables[table_name] = columns print(f"📊 Found {len(tables)} tables and {len(foreign_keys)} foreign keys in DDL") return tables, foreign_keys def create_relationships_separately(self, table_relationships: Dict, table_guids: Dict): """Create relationships as separate TML objects after tables exist""" for table_name, relationships in table_relationships.items(): for relationship in relationships: # Create relationship TML relationship_tml = { 'guid': None, 'relationship': { 'name': relationship['name'], 'destination_table': table_guids.get(relationship['to_table']), 'source_table': table_guids.get(table_name), 'type': relationship['type'], 'join_columns': [ { 'source_column': rel_on['from_column'], 'destination_column': rel_on['to_column'] } for rel_on in relationship['on'] ] } } relationship_yaml = yaml.dump(relationship_tml, default_flow_style=False, sort_keys=False) print(f" 🔗 Creating relationship: {relationship['name']}") print(f" 📄 Relationship TML:\n{relationship_yaml}") response = self.session.post( f"{self.base_url}/api/rest/2.0/metadata/tml/import", json={ "metadata_tmls": [relationship_yaml], "import_policy": "ALL_OR_NONE", "create_new": True } ) if response.status_code == 200: result = response.json() print(f" 📋 Relationship response: {result}") if result[0].get('response', {}).get('status', {}).get('status_code') == 'OK': print(f" ✅ Relationship created: {relationship['name']}") else: error_msg = result[0].get('response', {}).get('status', {}).get('error_message', 'Unknown error') print(f" ❌ Relationship failed: {error_msg}") else: print(f" ❌ Relationship API call failed: {response.status_code}") print(f" 📋 Response: {response.text}") def create_table_tml(self, table_name: str, columns: List, connection_name: str, database: str, schema: str, all_tables: Dict = None, table_guid: str = None, foreign_keys: List = None) -> str: """Generate table TML matching working example structure Args: table_guid: If provided, use this GUID (for updating existing tables with joins) foreign_keys: List of foreign key relationships parsed from DDL """ tml_columns = [] # Generate columns with proper typing for col in columns: ts_type = self._map_data_type(col['type']) col_name = col['name'].upper() # Determine column type - IDs are measures in table TML but not model TML if ts_type in ['INT64'] and col_name.endswith('ID'): col_type = 'MEASURE' properties = { 'column_type': col_type, 'aggregation': 'SUM', 'index_type': 'DONT_INDEX' } elif ts_type in ['DOUBLE', 'INT64'] and not col_name.endswith('ID'): col_type = 'MEASURE' properties = { 'column_type': col_type, 'aggregation': 'SUM', 'index_type': 'DONT_INDEX' } else: col_type = 'ATTRIBUTE' properties = { 'column_type': col_type, 'index_type': 'DONT_INDEX' } column_def = { 'name': col['name'].upper(), 'db_column_name': col['name'].upper(), 'properties': properties, 'db_column_properties': { 'data_type': ts_type } } tml_columns.append(column_def) table_tml = { 'guid': table_guid, # Use provided GUID or None for new tables 'table': { 'name': table_name.upper(), 'db': database, 'schema': schema, 'db_table': table_name.upper(), 'connection': { 'name': connection_name }, 'columns': tml_columns, 'properties': { 'sage_config': { 'is_sage_enabled': False } } } } # Add joins_with relationships (matching working example) if all_tables: joins_with = self._generate_table_joins(table_name, columns, all_tables, foreign_keys) if joins_with: table_tml['table']['joins_with'] = joins_with # Generate YAML with proper formatting yaml_output = yaml.dump(table_tml, default_flow_style=False, sort_keys=False) # Keep quotes around 'on' key as shown in working example return yaml_output def _generate_table_joins(self, table_name: str, columns: List, all_tables: Dict, foreign_keys: List = None) -> List: """Generate joins_with structure based on parsed foreign keys from DDL""" joins = [] table_name_upper = table_name.upper() if not foreign_keys: print(f" ⚠️ No foreign keys provided for {table_name_upper}") return joins # Use actual foreign keys from DDL for fk in foreign_keys: if fk['from_table'] == table_name_upper: to_table = fk['to_table'] from_col = fk['from_column'] to_col = fk['to_column'] # Skip self-joins (e.g., EMPLOYEES.MANAGER_ID -> EMPLOYEES.EMPLOYEE_ID) # ThoughtSpot models don't handle self-referential joins well (causes cycles) if to_table == table_name_upper: print(f" ⏭️ Skipping self-join: {table_name_upper}.{from_col} -> {to_table}.{to_col} (self-referential)") continue # Check if target table exists in THIS deployment available_tables_upper = [t.upper() for t in all_tables.keys()] if to_table in available_tables_upper: constraint_id = f"SYS_CONSTRAINT_{self._generate_constraint_id()}" join_def = { 'name': constraint_id, 'destination': { 'name': to_table }, 'on': f"[{table_name_upper}::{from_col}] = [{to_table}::{to_col}]", 'type': 'INNER' } joins.append(join_def) print(f" 🔗 Generated join: {table_name_upper}.{from_col} -> {to_table}.{to_col}") else: print(f" ⏭️ Skipping join: {table_name_upper}.{from_col} -> {to_table} (table not in this deployment)") return joins def create_connection_tml(self, connection_name: str) -> str: """Generate connection TML matching working example""" connection_tml = { 'guid': None, # Will be generated by ThoughtSpot 'connection': { 'name': connection_name, 'type': 'RDBMS_SNOWFLAKE', 'authentication_type': 'KEY_PAIR', 'properties': [ {'key': 'accountName', 'value': self.sf_account}, {'key': 'user', 'value': self.sf_user}, {'key': 'private_key', 'value': self._get_private_key_for_thoughtspot()}, {'key': 'passphrase', 'value': get_admin_setting('SNOWFLAKE_KP_PASSPHRASE', required=False)}, {'key': 'role', 'value': self.sf_role}, {'key': 'warehouse', 'value': self.sf_warehouse} ], 'description': f'Auto-generated Snowflake connection for {connection_name}' } } yaml_output = yaml.dump(connection_tml, default_flow_style=False, sort_keys=False) return yaml_output def create_actual_model_tml(self, tables: Dict, foreign_keys: List, table_guids: Dict = None, model_name: str = None, connection_name: str = None) -> str: """Generate proper model TML matching boone_test5 working example""" if not model_name: model_name = f"demo_model_{datetime.now().strftime('%Y%m%d')}" if not connection_name: connection_name = model_name # Create model structure matching working example exactly model = { 'guid': None, # Will be generated by ThoughtSpot 'model': { 'name': model_name, 'model_tables': [], 'columns': [], 'properties': { 'is_bypass_rls': False, 'join_progressive': True, 'spotter_config': { 'is_spotter_enabled': True } } } } # Build column name conflict resolution column_name_counts = {} for table_name, columns in tables.items(): for col in columns: col_name = col['name'].upper() if col_name not in column_name_counts: column_name_counts[col_name] = [] column_name_counts[col_name].append(table_name.upper()) # Add model_tables - START WITH NO JOINS for now (we can add them later) print(" 📋 Creating model without explicit joins (ThoughtSpot can auto-detect)") for table_name in tables.keys(): table_name_upper = table_name.upper() table_guid = table_guids.get(table_name_upper) if table_guids else None # Use FQN to resolve "multiple data sources with same name" issue # ThoughtSpot explicitly requires this when there are duplicate table names table_entry = { 'name': table_name_upper, 'fqn': table_guid # Required to uniquely identify which table to use } # For now, don't add explicit joins - let ThoughtSpot auto-detect # This matches the pattern where some tables in your working example don't have joins model['model']['model_tables'].append(table_entry) # Remove diamond join paths - ThoughtSpot rejects models where # table A joins to B, A joins to C, and C also joins to B self._remove_diamond_joins(model['model']['model_tables']) # Add columns with proper global conflict resolution used_display_names = set() # Track used names globally across all columns for table_name, columns in tables.items(): table_name_upper = table_name.upper() for col in columns: col_name = col['name'].upper() original_col_name = col.get('original_name', col['name']) # Use original casing for display # TODO: Later we can exclude ID columns for cleaner model # For now, include all columns to get the basic model working # Start with basic conflict resolution display_name = self._resolve_column_name_conflict( col_name, table_name_upper, column_name_counts, original_name=original_col_name ) # If the display name is still used, make it unique with table prefixes original_display_name = display_name counter = 1 while display_name.lower() in used_display_names: # Use table prefix for all tables consistently if counter == 1: # Generate consistent short prefix from table name if len(table_name_upper) <= 4: prefix = table_name_upper.lower() else: prefix = table_name_upper[:4].lower() # Use snake_case: prefix_name display_name = f"{prefix}_{original_display_name}" else: # Fallback: add number display_name = f"{original_display_name}_{counter}" counter += 1 used_display_names.add(display_name.lower()) # Determine column type based on data type col_type, aggregation = self._determine_column_type(col['type'], col_name) column_def = { 'name': display_name, 'column_id': f"{table_name_upper}::{col_name}", 'properties': { 'column_type': col_type, 'index_type': 'DONT_INDEX' } } # Add aggregation for measures if aggregation: column_def['properties']['aggregation'] = aggregation # Add calendar property for DATE columns so ThoughtSpot enables # time bucketing (.weekly, .monthly, etc.) on them if self._map_data_type(col['type']) == 'DATE': column_def['properties']['calendar'] = 'calendar' model['model']['columns'].append(column_def) # Generate YAML output with proper formatting yaml_output = yaml.dump(model, default_flow_style=False, sort_keys=False, default_style=None, indent=2, width=120) # Validate the generated YAML try: # Test if the YAML can be parsed back yaml.safe_load(yaml_output) print(" ✅ Generated YAML is valid") except yaml.YAMLError as e: print(f" ❌ Generated YAML is invalid: {e}") print(" 📄 Invalid YAML:") print(yaml_output) raise ValueError(f"Generated invalid YAML: {e}") return yaml_output def _is_foreign_key_column(self, col_name: str, table_name: str, foreign_keys: List) -> bool: """Check if column is a foreign key (used only for joins, not analytics)""" for fk in foreign_keys: if (fk.get('source_table', '').upper() == table_name and fk.get('source_column', '').upper() == col_name): return True return False def _is_surrogate_primary_key(self, col: Dict, col_name: str) -> bool: """Check if column is a meaningless surrogate key (numeric ID)""" # Common patterns: ID, _ID, ID_, ends with 'id' if col_name.upper().endswith('ID'): # Check if it's numeric (INT, BIGINT, NUMBER) col_type = col.get('type', '').upper() if any(t in col_type for t in ['INT', 'NUMBER', 'NUMERIC', 'BIGINT']): return True return False def _create_model_with_constraints(self, tables: Dict, foreign_keys: List, table_guids: Dict, table_constraints: Dict, model_name: str, connection_name: str) -> str: """Generate model TML with constraint references like our successful test""" print(" 📋 Creating model with constraint references") # Build column name conflict tracking column_name_counts = {} for table_name, columns in tables.items(): for col in columns: col_name = col['name'].upper() if col_name not in column_name_counts: column_name_counts[col_name] = [] column_name_counts[col_name].append(table_name.upper()) model = { 'guid': None, 'model': { 'name': model_name, 'model_tables': [], 'columns': [], 'properties': { 'is_bypass_rls': False, 'join_progressive': True, 'spotter_config': { 'is_spotter_enabled': True } } } } # Add model_tables with FQNs and constraint-based joins for table_name in tables.keys(): table_name_upper = table_name.upper() table_guid = table_guids.get(table_name_upper) table_entry = { 'name': table_name_upper, 'fqn': table_guid } # Build joins from foreign_keys list (more reliable than constraint extraction) table_joins = [] for fk in foreign_keys: if fk['from_table'].upper() == table_name_upper: to_table = fk['to_table'].upper() # Skip self-joins (e.g., EMPLOYEES.MANAGER_ID -> EMPLOYEES.EMPLOYEE_ID) # ThoughtSpot models don't handle self-referential joins well (causes cycles) if to_table == table_name_upper: print(f" ⏭️ Skipping self-join in model: {table_name_upper}.{fk['from_column']} -> {to_table}") continue # Check if target table exists in this deployment if to_table in [t.upper() for t in tables.keys()]: # ThoughtSpot on clause format: [SOURCE::COL] = [DEST::COL] from_col = fk['from_column'].upper() to_col = fk['to_column'].upper() on_clause = f"[{table_name_upper}::{from_col}] = [{to_table}::{to_col}]" join_entry = { 'with': to_table, 'on': on_clause, 'type': 'LEFT_OUTER', 'cardinality': 'MANY_TO_ONE' # Fact to dimension is many-to-one } table_joins.append(join_entry) print(f" 🔗 Added join: {table_name_upper}.{from_col} -> {to_table}.{to_col}") if table_joins: table_entry['joins'] = table_joins model['model']['model_tables'].append(table_entry) # Remove diamond join paths - ThoughtSpot rejects models where # table A joins to B, A joins to C, and C also joins to B self._remove_diamond_joins(model['model']['model_tables']) # Add columns with proper global conflict resolution (same as working version) used_display_names = set() for table_name, columns in tables.items(): table_name_upper = table_name.upper() for col in columns: col_name = col['name'].upper() original_col_name = col.get('original_name', col['name']) # Use original casing for display # NOTE: We used to skip FK/PK columns, but ThoughtSpot requires them for joins # Even though users don't search "customer 23455", the join columns must be present # in the model's columns section for the joins to work properly. # # SKIP foreign key columns - they're join keys, not analytics columns # if self._is_foreign_key_column(col_name, table_name_upper, foreign_keys): # print(f" ⏭️ Skipping FK column: {table_name_upper}.{col_name}") # continue # # SKIP surrogate primary keys (numeric IDs) - nobody searches "customer 23455" # if self._is_surrogate_primary_key(col, col_name): # print(f" ⏭️ Skipping surrogate PK: {table_name_upper}.{col_name}") # continue # Start with basic conflict resolution display_name = self._resolve_column_name_conflict( col_name, table_name_upper, column_name_counts, original_name=original_col_name ) # If the display name is still used, make it unique with table prefixes original_display_name = display_name counter = 1 while display_name.lower() in used_display_names: # Use snake_case: prefix_name if counter == 1: if table_name_upper == 'CUSTOMERS': display_name = f"cust_{original_display_name}" elif table_name_upper == 'PRODUCTS': display_name = f"prod_{original_display_name}" elif table_name_upper == 'ORDERS': display_name = f"order_{original_display_name}" elif table_name_upper == 'ORDERITEMS': display_name = f"item_{original_display_name}" elif table_name_upper == 'SALES': display_name = f"sale_{original_display_name}" elif table_name_upper == 'SALESREPS': display_name = f"rep_{original_display_name}" else: display_name = f"{table_name_upper[:4].lower()}_{original_display_name}" else: display_name = f"{original_display_name}_{counter}" counter += 1 used_display_names.add(display_name.lower()) # Determine column type based on data type col_type, aggregation = self._determine_column_type(col['type'], col_name) column_def = { 'name': display_name, 'column_id': f"{table_name_upper}::{col_name}", 'properties': { 'column_type': col_type, 'index_type': 'DONT_INDEX' } } if aggregation: column_def['properties']['aggregation'] = aggregation # Add calendar property for DATE columns so ThoughtSpot enables # time bucketing (.weekly, .monthly, etc.) on them if self._map_data_type(col['type']) == 'DATE': column_def['properties']['calendar'] = 'calendar' model['model']['columns'].append(column_def) # Generate YAML output with validation yaml_output = yaml.dump(model, default_flow_style=False, sort_keys=False, default_style=None, indent=2, width=120) # Fix YAML reserved word quoting - 'on' gets quoted because it's a YAML boolean # ThoughtSpot needs it unquoted yaml_output = yaml_output.replace("'on':", "on:") # Validate the generated YAML try: yaml.safe_load(yaml_output) print(" ✅ Generated YAML is valid") except yaml.YAMLError as e: print(f" ❌ Generated YAML is invalid: {e}") raise ValueError(f"Generated invalid YAML: {e}") return yaml_output def _remove_diamond_joins(self, model_tables: list): """Remove join cycles that ThoughtSpot rejects using a spanning tree. ThoughtSpot requires exactly ONE path between any two tables in a model. Uses Kruskal's algorithm: add edges from highest-priority (fact) tables first, skip any edge that would create a cycle. Priority order ensures fact table joins are preserved and dimension-to-dimension "snowflake" joins are pruned. """ def edge_key(src_name: str, join_def: dict): return ( src_name, join_def.get('with'), join_def.get('on', ''), join_def.get('type', ''), join_def.get('cardinality', ''), ) all_edges = [] for t in model_tables: src_name = t['name'] for j in t.get('joins', []): all_edges.append((src_name, j.get('with'), j, edge_key(src_name, j))) if not all_edges: print(f" ✅ No joins to check for cycles") return out_degree = {} for t in model_tables: out_degree[t['name']] = len(t.get('joins', [])) in_degree = {t['name']: 0 for t in model_tables} for src, dst, _, _ in all_edges: in_degree[dst] = in_degree.get(dst, 0) + 1 all_edges.sort(key=lambda e: (-out_degree.get(e[0], 0), -in_degree.get(e[1], 0), e[0], e[1])) parent = {t['name']: t['name'] for t in model_tables} def find(x): while parent[x] != x: parent[x] = parent[parent[x]] x = parent[x] return x def union(a, b): ra, rb = find(a), find(b) if ra == rb: return False parent[ra] = rb return True kept_edge_keys = set() removed = [] for src, dst, join_def, e_key in all_edges: if union(src, dst): kept_edge_keys.add(e_key) else: removed.append(f"{src}->{dst} ({join_def.get('on', '')})") for t in model_tables: if 'joins' not in t: continue src_name = t['name'] t['joins'] = [j for j in t['joins'] if edge_key(src_name, j) in kept_edge_keys] for t in model_tables: if 'joins' in t and not t['joins']: del t['joins'] if removed: print(f" 🔶 Removed {len(removed)} redundant joins (cycle prevention):") for r in removed: print(f" - {r}") else: print(f" ✅ No join cycles detected") def _generate_constraint_id(self) -> str: """Generate a constraint ID similar to ThoughtSpot's system constraints""" import uuid return str(uuid.uuid4()) def validate_foreign_key_references(self, tables: Dict, foreign_keys: List = None) -> List[str]: """ Validate that foreign key columns reference tables that exist in the schema. Uses explicit FK constraints from DDL - not heuristics. Args: tables: Dictionary of table definitions foreign_keys: List of FK relationships parsed from DDL Each FK is: {'from_table': str, 'from_column': str, 'to_table': str, 'to_column': str} Returns: List of warning messages about missing referenced tables """ warnings = [] if not foreign_keys: return warnings # No explicit FKs defined, nothing to validate table_names_upper = [t.upper() for t in tables.keys()] for fk in foreign_keys: target_table = fk.get('to_table', '').upper() from_table = fk.get('from_table', '') from_column = fk.get('from_column', '') # Check if the target table exists in this schema if target_table and target_table not in table_names_upper: warnings.append( f"⚠️ {from_table}.{from_column} references {fk.get('to_table')}, " f"but {fk.get('to_table')} is not in this schema. " f"The join will be skipped during deployment." ) return warnings def _resolve_column_name_conflict(self, col_name: str, table_name: str, column_name_counts: Dict, original_name: str = None) -> str: """ Resolve column name conflicts using configured naming style and prefixes. Examples (snake_case): SHIPPING_MODE → shipping_mode DAYS_TO_SHIP → days_to_ship ORDER_DATE (conflict) → order_order_date, cust_order_date, etc. Args: col_name: Uppercase column name (for conflict detection) table_name: Table name for prefix generation column_name_counts: Dict tracking column name occurrences original_name: Original casing of column name (for proper camelCase detection) """ # Use original name if provided (preserves camelCase boundaries) name_for_styling = original_name if original_name else col_name # Apply configured naming style styled_name = _apply_naming_style(name_for_styling, self.column_naming_style) if len(column_name_counts.get(col_name, [])) <= 1: # No conflict - use styled name directly return styled_name # For conflicts, generate prefix dynamically from table name if len(table_name) <= 4: prefix = table_name.lower() elif 'sales' in table_name.lower() and 'rep' in table_name.lower(): prefix = 'rep' # Special case for readability elif 'customer' in table_name.lower(): prefix = 'cust' # Common abbreviation elif 'product' in table_name.lower(): prefix = 'prod' # Common abbreviation elif 'order' in table_name.lower(): prefix = 'order' # Common abbreviation else: prefix = table_name[:4].lower() # First 4 characters # Apply naming style to prefix + name combination prefixed_name = f"{prefix}_{styled_name}" if styled_name else prefix return _apply_naming_style(prefixed_name, self.column_naming_style) def _get_table_prefix(self, table_name: str) -> str: """Get appropriate prefix for table to avoid column conflicts""" # Generate prefix dynamically based on table name patterns table_lower = table_name.lower() if 'customer' in table_lower: return '' # Primary table gets no prefix for readability elif 'sales' in table_lower and 'rep' in table_lower: return 'Rep' elif 'sales' in table_lower: return 'Sale' elif 'order' in table_lower and 'item' in table_lower: return 'Item' elif 'order' in table_lower: return 'Order' elif 'product' in table_lower: return 'Product' else: # Use first 3-4 characters as prefix, capitalize first letter prefix = table_name[:4] if len(table_name) > 3 else table_name return prefix.capitalize() def _determine_column_type(self, data_type: str, col_name: str) -> tuple: """Determine if column should be ATTRIBUTE or MEASURE""" base_type = data_type.upper().split('(')[0] col_upper = col_name.upper() # SALEID is special - it's treated as a measure in the working example if col_upper == 'SALEID': return 'MEASURE', 'SUM' # Numeric types should be measures (unless they're IDs or keys) if base_type in ['NUMBER', 'DECIMAL', 'FLOAT', 'DOUBLE', 'INT', 'INTEGER', 'BIGINT']: # Skip ID/KEY columns - they're join keys, not analytics columns if col_upper.endswith('ID') or col_upper.endswith('KEY') or col_upper.endswith('_CODE'): return 'ATTRIBUTE', None # All other numeric columns are measures # Determine aggregation based on column name patterns if any(word in col_upper for word in ['QUANTITY', 'QTY', 'COUNT', 'SOLD']): return 'MEASURE', 'SUM' elif any(word in col_upper for word in ['PRICE', 'COST', 'REVENUE', 'AMOUNT', 'TOTAL', 'PROFIT', 'DISCOUNT', 'SHIPPING', 'TAX']): return 'MEASURE', 'SUM' elif any(word in col_upper for word in ['RATING', 'SCORE', 'MARGIN', 'PERCENT', 'RATE']): return 'MEASURE', 'AVERAGE' else: # Default: numeric = measure with SUM return 'MEASURE', 'SUM' # Everything else is an attribute (strings, dates, booleans, etc.) return 'ATTRIBUTE', None def _build_table_relationships(self, tables: Dict, foreign_keys: List) -> Dict: """Build table relationships for joins""" relationships = {} # Auto-detect relationships based on common ID patterns table_names = list(tables.keys()) for table_name in table_names: table_name_upper = table_name.upper() table_cols = [col['name'].upper() for col in tables[table_name]] # Find foreign key relationships for col_name in table_cols: if col_name.endswith('ID') and col_name != f"{table_name_upper}ID": # This looks like a foreign key target_table = col_name[:-2] + 'S' # CUSTOMERID -> CUSTOMERS if target_table in [t.upper() for t in table_names]: if table_name_upper not in relationships: relationships[table_name_upper] = [] relationships[table_name_upper].append({ 'to_table': target_table, 'on_column': col_name }) return relationships def _create_model_level_joins(self, tables, foreign_keys): """Create joins at model level using the format from working example""" joins = [] # Auto-detect joins if no explicit foreign keys if len(tables) > 1: table_names = list(tables.keys()) for i, table1 in enumerate(table_names): table1_upper = table1.upper() table1_cols = [col['name'].upper() for col in tables[table1]] for j, table2 in enumerate(table_names): if i >= j: # Avoid duplicates and self-joins continue table2_upper = table2.upper() table2_cols = [col['name'].upper() for col in tables[table2]] # Look for matching ID columns for col1 in table1_cols: if col1.endswith('ID') and col1 in table2_cols: join_entry = { 'name': f"{table1_upper.lower()}_{table2_upper.lower()}", 'source': table1_upper, 'destination': table2_upper, 'type': 'INNER', 'on': f"{table1_upper}.{col1} = {table2_upper}.{col1}" } joins.append(join_entry) print(f" 🔗 Model-level join: {table1_upper} -> {table2_upper} on {col1}") break return joins def _add_joins_to_tables(self, model_tables, tables, foreign_keys): """Add joins to individual tables (not as separate section)""" # Build join relationships table_joins = {} # Skip joins for now - test basic model creation first if False and foreign_keys: for fk in foreign_keys: from_table = fk['from_table'].upper() to_table = fk['to_table'].upper() if from_table not in table_joins: table_joins[from_table] = [] join_entry = { 'with': to_table, 'on': f"[{from_table}].[{fk['from_column'].upper()}] = [{to_table}].[{fk['to_column'].upper()}]", 'type': 'INNER', 'cardinality': 'MANY_TO_ONE' } table_joins[from_table].append(join_entry) print(f" 🔗 Adding join: {from_table} -> {to_table}") # Skip joins for now - test basic model creation first elif False and len(tables) > 1: table_names = list(tables.keys()) for i, table1 in enumerate(table_names): table1_upper = table1.upper() table1_cols = [col['name'].upper() for col in tables[table1]] for j, table2 in enumerate(table_names[i+1:], i+1): table2_upper = table2.upper() table2_cols = [col['name'].upper() for col in tables[table2]] # Look for matching ID columns for col1 in table1_cols: if col1.endswith('ID') and col1 in table2_cols: if table1_upper not in table_joins: table_joins[table1_upper] = [] join_entry = { 'with': table2_upper, 'on': f"[{table1_upper}].[{col1}] = [{table2_upper}].[{col1}]", 'type': 'INNER', 'cardinality': 'MANY_TO_ONE' } table_joins[table1_upper].append(join_entry) print(f" 🔗 Auto-detected join: {table1_upper} -> {table2_upper} on {col1}") break # Apply joins to model_tables for table_entry in model_tables: table_name = table_entry['name'] if table_name in table_joins: table_entry['joins'] = table_joins[table_name] def _build_table_relationships(self, tables: Dict, foreign_keys: List) -> Dict: """Build relationships for each table based on foreign keys""" table_relationships = {} if foreign_keys: for fk in foreign_keys: from_table = fk['from_table'].upper() to_table = fk['to_table'].upper() from_column = fk['from_column'].upper() to_column = fk['to_column'].upper() # Add relationship to the from_table if from_table not in table_relationships: table_relationships[from_table] = [] relationship = { 'name': f"{from_table}_{to_table}_{from_column}", 'to_table': to_table, 'type': 'many_to_one', # Assuming FK relationships are many-to-one 'on': [ { 'from_column': from_column, 'to_column': to_column } ] } table_relationships[from_table].append(relationship) print(f" 🔗 Relationship: {from_table}.{from_column} -> {to_table}.{to_column}") # Auto-detect relationships if no explicit foreign keys elif len(tables) > 1: table_names = list(tables.keys()) for i, table1 in enumerate(table_names): table1_upper = table1.upper() table1_cols = [col['name'].upper() for col in tables[table1]] for j, table2 in enumerate(table_names[i+1:], i+1): table2_upper = table2.upper() table2_cols = [col['name'].upper() for col in tables[table2]] # Look for matching ID columns for col1 in table1_cols: if col1.endswith('ID') and col1 in table2_cols: if table1_upper not in table_relationships: table_relationships[table1_upper] = [] relationship = { 'name': f"{table1_upper}_{table2_upper}_{col1}", 'to_table': table2_upper, 'type': 'many_to_one', 'on': [ { 'from_column': col1, 'to_column': col1 } ] } table_relationships[table1_upper].append(relationship) print(f" 🔗 Auto-detected relationship: {table1_upper}.{col1} -> {table2_upper}.{col1}") break return table_relationships def create_model_tml(self, tables: Dict, foreign_keys: List, table_guids: Dict = None, model_name: str = None) -> str: """Generate worksheet TML (ORIGINAL APPROACH - keeping for comparison)""" if not model_name: model_name = f"demo_worksheet_{datetime.now().strftime('%Y%m%d')}" worksheet = { 'guid': None, 'worksheet': { 'name': model_name, 'description': 'Auto-generated worksheet from DDL', 'tables': [], 'worksheet_columns': [], # Adding back - but with GUID references 'properties': { 'is_bypass_rls': False, 'join_progressive': True, 'spotter_config': { 'is_spotter_enabled': True } } } } # Add tables with joins for table_name in tables.keys(): table_entry = {'name': table_name.upper()} # Add FQN (GUID) if available to resolve multiple tables with same name if table_guids and table_name.upper() in table_guids: table_entry['fqn'] = table_guids[table_name.upper()] joins = [] for fk in foreign_keys: if fk['source_table'] == table_name: joins.append({ 'with': fk['target_table'].upper(), 'referencing_join': f"FK_{table_name.upper()}_{fk['target_table'].upper()}" }) if joins: table_entry['joins'] = joins # Just populate the required 'tables' field with GUID reference worksheet['worksheet']['tables'].append({ 'name': table_name.upper(), 'fqn': table_guids.get(table_name.upper()) if table_guids else f"table_{table_name.lower()}" }) # Add columns using table GUIDs in expressions for table_name, columns in tables.items(): table_guid = table_guids.get(table_name.upper()) if table_guids else None for col in columns: col_type = 'MEASURE' if 'DECIMAL' in col['type'] else 'ATTRIBUTE' # Use GUID in expression if available if table_guid: expr = f"[{table_guid}].[{col['name']}]" else: expr = f"[{table_name.upper()}].[{col['name']}]" column_def = { 'name': col['name'].upper(), 'data_type': col_type, 'expr': expr } worksheet['worksheet']['worksheet_columns'].append(column_def) return yaml.dump(worksheet, default_flow_style=False, sort_keys=False) def _map_data_type(self, sql_type: str) -> str: """Map SQL data types to ThoughtSpot types""" sql_type = sql_type.upper() # DEBUG: Print what we're mapping (commented out for cleaner output) # print(f" 🔍 Mapping data type: '{sql_type}'") # Handle NUMBER with precision/scale intelligently if sql_type.startswith('NUMBER'): # Extract precision and scale from NUMBER(precision,scale) if '(' in sql_type and ')' in sql_type: params = sql_type[sql_type.find('(')+1:sql_type.find(')')].split(',') if len(params) >= 2: scale = int(params[1].strip()) result = 'INT64' if scale == 0 else 'DOUBLE' # print(f" → NUMBER({params[0].strip()},{scale}) → {result}") return result else: # print(f" → NUMBER({params[0].strip()}) → INT64") return 'INT64' # NUMBER(x) defaults to integer else: # print(f" → Plain NUMBER → DOUBLE") return 'DOUBLE' # Plain NUMBER defaults to double type_mapping = { 'INT64': 'INT64', 'INT': 'INT64', # FIXED: INT should map to INT64 'INTEGER': 'INT64', 'BIGINT': 'INT64', 'VARCHAR': 'VARCHAR', 'TEXT': 'VARCHAR', 'STRING': 'VARCHAR', 'DATE': 'DATE', 'TIMESTAMP': 'DATE', # Try DATE for TIMESTAMP - DATE fields worked fine 'TIMESTAMP_NTZ': 'DATE', # Try DATE for TIMESTAMP_NTZ - we know DATE works 'DECIMAL': 'DOUBLE', 'FLOAT': 'DOUBLE', 'BOOLEAN': 'BOOL' } for sql_key, ts_type in type_mapping.items(): if sql_key in sql_type: return ts_type return 'VARCHAR' # Default fallback def get_connection_by_name(self, connection_name: str) -> Dict: """Check if a connection with this name already exists""" try: response = self.session.get( f"{self.base_url}/api/rest/2.0/metadata/search", params={ "metadata": [{"type": "DATA_SOURCE", "identifier": connection_name}], "record_size": 10 } ) if response.status_code == 200: data = response.json() if data and len(data) > 0: return data[0] # Return first matching connection return None except Exception as e: print(f" ⚠️ Could not check existing connections: {e}") return None def create_snowflake_schema(self, database: str, schema: str): """Create schema in Snowflake via ThoughtSpot connection""" try: print(f" 🏗️ Creating schema {database}.{schema}...") # Use ThoughtSpot's SQL execution API to create schema create_schema_sql = f"CREATE SCHEMA IF NOT EXISTS {database}.{schema}" response = self.session.post( f"{self.base_url}/api/rest/2.0/database/executeQuery", json={ "sql_query": create_schema_sql, "connection_guid": self.sf_connection_guid if hasattr(self, 'sf_connection_guid') else None } ) if response.status_code == 200: print(f" ✅ Schema {database}.{schema} created/verified") else: print(f" ⚠️ Schema creation response: {response.status_code} - {response.text}") print(f" 📝 Will proceed assuming schema exists or will be created by table operations") except Exception as e: print(f" ⚠️ Could not create schema: {e}") print(f" 📝 Will proceed assuming schema exists or will be created by table operations") def ensure_tag_exists(self, tag_name: str) -> bool: """ Check if a tag exists, create it if it doesn't. Args: tag_name: Name of the tag Returns: True if tag exists or was created, False on error """ if not tag_name: # No tag name provided - skip silently return True try: # First, try to get the tag to see if it exists print(f"[ThoughtSpot] 🔍 Checking if tag '{tag_name}' exists...", flush=True) search_response = self.session.post( f"{self.base_url}/api/rest/2.0/tags/search", json={"tag_identifier": tag_name} ) print(f"[ThoughtSpot] 🔍 Tag search response: {search_response.status_code}", flush=True) if search_response.status_code == 200: tags = search_response.json() print(f"[ThoughtSpot] 🔍 Tags found: {len(tags) if tags else 0}", flush=True) if tags and len(tags) > 0: # Tag exists tag_id = tags[0].get('id', 'unknown') print(f"[ThoughtSpot] ✅ Tag '{tag_name}' exists (ID: {tag_id})", flush=True) return True elif search_response.status_code == 400: # 400 might mean tag not found in some ThoughtSpot versions print(f"[ThoughtSpot] 🔍 Tag search returned 400 - tag likely doesn't exist", flush=True) else: print(f"[ThoughtSpot] ⚠️ Tag search error: {search_response.status_code}", flush=True) try: print(f"[ThoughtSpot] ⚠️ Response: {search_response.text[:200]}", flush=True) except: pass # Tag doesn't exist - create it print(f"[ThoughtSpot] 📎 Creating tag '{tag_name}'...", flush=True) create_response = self.session.post( f"{self.base_url}/api/rest/2.0/tags/create", json={"name": tag_name} ) print(f"[ThoughtSpot] 📎 Create tag response: {create_response.status_code}", flush=True) if create_response.status_code in [200, 201]: try: result = create_response.json() tag_id = result.get('id', 'unknown') print(f"[ThoughtSpot] ✅ Tag '{tag_name}' created (ID: {tag_id})", flush=True) except: print(f"[ThoughtSpot] ✅ Tag '{tag_name}' created", flush=True) return True else: print(f"[ThoughtSpot] ⚠️ Could not create tag: {create_response.status_code}", flush=True) try: print(f"[ThoughtSpot] ⚠️ Response: {create_response.text[:200]}", flush=True) except: pass # Return False - don't silently proceed return False except Exception as e: import traceback print(f"[ThoughtSpot] ⚠️ Tag check/create error: {str(e)}", flush=True) print(f"[ThoughtSpot] ⚠️ Traceback: {traceback.format_exc()}", flush=True) return False def assign_tags_to_objects(self, object_guids: List[str], object_type: str, tag_name: str) -> bool: """ Assign tags to ThoughtSpot objects using REST API v1. Auto-creates the tag if it doesn't exist. Args: object_guids: List of object GUIDs to tag object_type: Type of objects (LOGICAL_TABLE for tables/models, PINBOARD_ANSWER_BOOK for liveboards) tag_name: Tag name to assign Returns: True if successful, False otherwise """ if not tag_name: # No tag name provided - skip silently (this is expected behavior) return True if not object_guids: return False try: import json as json_module # Ensure tag exists (create if needed) tag_ready = self.ensure_tag_exists(tag_name) if not tag_ready: print(f"[ThoughtSpot] ⚠️ Could not ensure tag exists, skipping assignment", flush=True) return False # Use V1 API which actually works assign_response = self.session.post( f"{self.base_url}/tspublic/v1/metadata/assigntag", data={ 'id': json_module.dumps(object_guids), 'type': object_type, 'tagname': json_module.dumps([tag_name]) }, headers={ 'X-Requested-By': 'ThoughtSpot', 'Content-Type': 'application/x-www-form-urlencoded' } ) if assign_response.status_code in [200, 204]: print(f"[ThoughtSpot] ✅ Tagged {len(object_guids)} {object_type} objects with '{tag_name}'", flush=True) return True else: print(f"[ThoughtSpot] ⚠️ Tag assignment failed: {assign_response.status_code}", flush=True) print(f"[ThoughtSpot] DEBUG: Response text: {assign_response.text[:500]}", flush=True) print(f"[ThoughtSpot] DEBUG: Object GUIDs: {object_guids}", flush=True) print(f"[ThoughtSpot] DEBUG: Object type: {object_type}", flush=True) return False except Exception as e: print(f"[ThoughtSpot] ⚠️ Tag assignment error: {str(e)}", flush=True) return False def share_objects(self, object_guids: List[str], object_type: str, share_with: str) -> bool: """ Share ThoughtSpot objects with a user or group (can_edit / MODIFY). Args: object_guids: GUIDs to share object_type: 'LOGICAL_TABLE' for models/tables, 'LIVEBOARD' for liveboards share_with: user email (contains '@') or group name """ if not share_with or not object_guids: return True principal_type = "USER" if '@' in share_with else "USER_GROUP" try: response = self.session.post( f"{self.base_url}/api/rest/2.0/security/metadata/share", json={ "permissions": [ { "principal": { "identifier": share_with, "type": principal_type }, "share_mode": "MODIFY" } ], "metadata": [ {"identifier": guid, "type": object_type} for guid in object_guids ] } ) if response.status_code in [200, 204]: print(f"[ThoughtSpot] ✅ Shared {len(object_guids)} {object_type} with {principal_type} '{share_with}'", flush=True) return True else: print(f"[ThoughtSpot] ⚠️ Share failed: {response.status_code} - {response.text[:200]}", flush=True) return False except Exception as e: print(f"[ThoughtSpot] ⚠️ Share error: {str(e)}", flush=True) return False def _generate_demo_names(self, company_name: str = None, use_case: str = None): """Generate standardized demo names using DM convention""" from datetime import datetime import re # Get timestamp components now = datetime.now() yymmdd = now.strftime('%y%m%d') hhmmss = now.strftime('%H%M%S') # Clean and truncate company name (5 chars) if company_name: company_clean = re.sub(r'[^a-zA-Z0-9]', '', company_name.upper())[:5] else: company_clean = 'DEMO'[:5] # Clean and truncate use case (3 chars) if use_case: usecase_clean = re.sub(r'[^a-zA-Z0-9]', '', use_case.upper())[:3] else: usecase_clean = 'GEN'[:3] # Generate names base_name = f"DM{yymmdd}_{hhmmss}_{company_clean}_{usecase_clean}" return { 'schema': base_name, 'connection': f"{base_name}_conn", 'model': f"{base_name}_model", 'base': base_name } def deploy_all(self, ddl: str, database: str, schema: str, base_name: str, connection_name: str = None, company_name: str = None, use_case: str = None, liveboard_name: str = None, llm_model: str = None, tag_name: str = None, liveboard_method: str = None, share_with: str = None, company_research: str = None, progress_callback=None) -> Dict: """ Deploy complete data model to ThoughtSpot Args: ddl: Data Definition Language statements database: Target database name schema: Target schema name connection_name: Optional connection name (auto-generated if not provided) Returns: Dict with deployment results and names of created objects """ results = { 'success': False, 'connection': None, 'tables': [], 'model': None, 'errors': [] } table_guids = {} # Store table GUIDs for model creation def log_progress(message): """Helper to log progress both to console and callback""" # ALWAYS print to console FIRST import sys print(f"[ThoughtSpot] {message}", flush=True) sys.stdout.flush() # Force flush # Then call callback if provided if progress_callback: try: progress_callback(message) except Exception as e: print(f"[Warning] Callback error: {e}", flush=True) try: import time start_time = time.time() # STEP 0: Authenticate first! log_progress("Authenticating...") if not self.authenticate(): raise Exception("ThoughtSpot authentication failed") auth_time = time.time() - start_time log_progress(f"[OK] Auth complete ({auth_time:.1f}s)") # Parse DDL tables, foreign_keys = self.parse_ddl(ddl) if not tables: raise Exception("No tables found in DDL") # Validate foreign key references before deployment (uses explicit FKs from DDL) fk_warnings = self.validate_foreign_key_references(tables, foreign_keys) if fk_warnings: log_progress(f"[WARN] {len(fk_warnings)} FK warning(s) - joins to missing tables will be skipped") for warning in fk_warnings: log_progress(f" {warning}") # Step 1: Create connection using base name # base_name is like "DEMO_AMA_12111207_X4R" # schema is like "DEMO_ # 2111207_X4R_sch" demo_names = { 'schema': schema, 'connection': f"{base_name}_conn", 'model': f"{base_name}_mdl", 'base': base_name } if not connection_name: connection_name = demo_names['connection'] log_progress(f"Creating connection: {connection_name}...") # Check if connection already exists first existing_connection = self.get_connection_by_name(connection_name) if existing_connection: connection_guid = existing_connection['header']['id_guid'] connection_fqn = connection_guid results['connection'] = connection_name results['connection_guid'] = connection_guid log_progress(f"[OK] Connection ready") # Assign tag to existing connection if tag_name and connection_guid: log_progress(f"Assigning tag '{tag_name}' to connection...") self.assign_tags_to_objects([connection_guid], 'DATA_SOURCE', tag_name) else: log_progress(f"Creating new connection: {connection_name}") # base_name already has random suffix (e.g., _UZM) so no need for additional uniqueness print(f"🔗 Creating connection: {connection_name}") print(f" Account: '{self.sf_account}' (length: {len(self.sf_account)})") print(f" User: '{self.sf_user}'") print(f" Database: '{database}'") connection_tml_yaml = self.create_connection_tml(connection_name) # Filter out sensitive info for display - use regex for private key import re display_tml = re.sub(r'-----BEGIN ENCRYPTED PRIVATE KEY-----.*?-----END ENCRYPTED PRIVATE KEY-----', '[PRIVATE_KEY_REDACTED]', connection_tml_yaml, flags=re.DOTALL) passphrase = get_admin_setting('SNOWFLAKE_KP_PASSPHRASE', required=False) if passphrase: display_tml = display_tml.replace(passphrase, "[PASSPHRASE_REDACTED]") print(f"\n📄 TML being sent:\n{display_tml}") # connection_tml_yaml is already a YAML string from create_connection_tml() response = self.session.post( f"{self.base_url}/api/rest/2.0/metadata/tml/import", json={ "metadata_tmls": [connection_tml_yaml], "import_policy": "PARTIAL" } ) print(f" Response status: {response.status_code}") if response.status_code != 200: # Print the actual error response from ThoughtSpot try: error_response = response.json() print(f"❌ Error response: {error_response}") except: print(f"❌ Error response (raw): {response.text}") if response.status_code == 200: result = response.json() print(f"📋 Connection response: {result}") print(f"📋 Response type: {type(result)}") # Handle both dict and list responses if isinstance(result, list): # Response is a list if len(result) > 0 and result[0].get('response', {}).get('status', {}).get('status_code') == 'OK': connection_guid = result[0].get('response', {}).get('header', {}).get('id_guid') print(f"✅ Connection created: {connection_name} (GUID: {connection_guid})") results['connection'] = connection_name results['connection_guid'] = connection_guid # Store connection GUID for table creation connection_fqn = connection_guid else: error_msg = result[0].get('response', {}).get('status', {}).get('error_message', 'Unknown error') if result else 'Empty response' raise Exception(f"Connection creation failed: {error_msg}") elif isinstance(result, dict) and result.get('object') and len(result['object']) > 0: obj = result['object'][0] if obj.get('status', {}).get('status_code') == 'OK': connection_guid = obj.get('header', {}).get('id_guid') print(f"✅ Connection created: {connection_name} (GUID: {connection_guid})") results['connection'] = connection_name results['connection_guid'] = connection_guid # Store connection GUID for table creation connection_fqn = connection_guid else: error_msg = obj.get('status', {}).get('error_message', 'Unknown error') raise Exception(f"Connection creation failed: {error_msg}") else: raise Exception("Connection creation failed: No object in response") else: raise Exception(f"Connection creation failed: HTTP {response.status_code}") # Assign tag to connection if tag_name and connection_guid: log_progress(f"Assigning tag '{tag_name}' to connection...") self.assign_tags_to_objects([connection_guid], 'DATA_SOURCE', tag_name) # Step 1.5: Schema should already exist (created by demo_prep tool) print("\n1️⃣.5 Using existing schema in Snowflake...") # Step 2: Build relationships for tables print("\n1️⃣.5 Building relationships...") table_relationships = self._build_table_relationships(tables, foreign_keys) # Step 2: TWO-PHASE TABLE CREATION (to avoid dependency order issues) table_count = len(tables) batch1_start = time.time() log_progress(f"Batch 1/2: Creating {table_count} tables...") # PHASE 1: Create all tables WITHOUT joins in ONE batch API call # Build array of all table TMLs table_tmls_batch1 = [] table_names_order = [] # Track order for matching response for table_name, columns in tables.items(): print(f"[ThoughtSpot] Preparing {table_name.upper()}...", flush=True) table_tml = self.create_table_tml(table_name, columns, connection_name, database, schema, all_tables=None, foreign_keys=foreign_keys) table_tmls_batch1.append(table_tml) table_names_order.append(table_name.upper()) # Send all tables in ONE API call log_progress(f" Sending batch request for {len(table_tmls_batch1)} tables...") response = self.session.post( f"{self.base_url}/api/rest/2.0/metadata/tml/import", json={ "metadata_tmls": table_tmls_batch1, "import_policy": "PARTIAL", "create_new": True } ) if response.status_code == 200: result = response.json() # Handle both response formats (list or dict with 'object' key) if isinstance(result, list): objects = result elif isinstance(result, dict) and 'object' in result: objects = result['object'] else: error = f"Batch 1 failed: Unexpected response format: {type(result)}" log_progress(f" ❌ {error}") results['errors'].append(error) return results # Process each table result for idx, obj in enumerate(objects): table_name = table_names_order[idx] if idx < len(table_names_order) else f"TABLE_{idx}" if obj.get('response', {}).get('status', {}).get('status_code') == 'OK': table_guid = obj.get('response', {}).get('header', {}).get('id_guid') print(f"[ThoughtSpot] ✅ {table_name} created", flush=True) results['tables'].append(table_name) table_guids[table_name] = table_guid else: error_msg = obj.get('response', {}).get('status', {}).get('error_message', 'Unknown error') error = f"Table {table_name} failed: {error_msg}" print(f"[ThoughtSpot] ❌ {table_name} failed: {error_msg}", flush=True) results['errors'].append(error) else: error = f"Batch 1 HTTP error: {response.status_code} - {response.text}" log_progress(f" ❌ {error}") results['errors'].append(error) return results # Check if we created any tables successfully if not table_guids: log_progress(" ❌ No tables were created successfully in Batch 1") return results # Assign tags to tables table_guid_list = list(table_guids.values()) print(f"🔍 DEBUG BEFORE TAG CALL: tag_name='{tag_name}', table_guid_list={table_guid_list}") log_progress(f"Assigning tag '{tag_name}' to {len(table_guid_list)} tables...") self.assign_tags_to_objects(table_guid_list, 'LOGICAL_TABLE', tag_name) batch1_time = time.time() - batch1_start log_progress(f"[OK] Batch 1 complete: {len(table_guids)} tables created ({batch1_time:.1f}s)") # PHASE 2: Update tables WITH joins in ONE batch API call batch2_start = time.time() log_progress(f"Batch 2/2: Adding joins to {len(table_guids)} tables...") # Build array of all table update TMLs (with joins) table_tmls_batch2 = [] table_names_order_batch2 = [] for table_name, columns in tables.items(): table_name_upper = table_name.upper() # Only add joins if the table was created successfully in Phase 1 if table_name_upper not in table_guids: print(f"[ThoughtSpot] Skipping {table_name_upper} (not created)", flush=True) continue # Get the GUID for this table table_guid = table_guids[table_name_upper] print(f"[ThoughtSpot] Preparing joins for {table_name_upper}...", flush=True) # Create table TML WITH joins_with section AND the table GUID table_tml = self.create_table_tml( table_name, columns, connection_name, database, schema, all_tables=tables, table_guid=table_guid, foreign_keys=foreign_keys ) table_tmls_batch2.append(table_tml) table_names_order_batch2.append(table_name_upper) # Send all table updates in ONE API call if table_tmls_batch2: log_progress(f" Sending batch request to add joins to {len(table_tmls_batch2)} tables...") response = self.session.post( f"{self.base_url}/api/rest/2.0/metadata/tml/import", json={ "metadata_tmls": table_tmls_batch2, "import_policy": "PARTIAL", "create_new": False # Update existing tables } ) if response.status_code == 200: result = response.json() # Handle both response formats if isinstance(result, list): objects = result elif isinstance(result, dict) and 'object' in result: objects = result['object'] else: objects = [] # Process each result for idx, obj in enumerate(objects): table_name = table_names_order_batch2[idx] if idx < len(table_names_order_batch2) else f"TABLE_{idx}" if obj.get('response', {}).get('status', {}).get('status_code') == 'OK': print(f"[ThoughtSpot] ✅ {table_name} joins added", flush=True) else: error_msg = obj.get('response', {}).get('status', {}).get('error_message', 'Unknown error') print(f"[ThoughtSpot] ⚠️ {table_name} joins failed: {error_msg}", flush=True) results['errors'].append(f"Joins for {table_name} failed: {error_msg}") else: log_progress(f" ⚠️ Batch 2 HTTP error: {response.status_code}") batch2_time = time.time() - batch2_start log_progress(f"[OK] Batch 2 complete: Joins added ({batch2_time:.1f}s)") actual_constraint_ids = {} # We'll generate these for the model # Skip separate relationship creation for now # print("\n2️⃣.5 Creating relationships separately...") # self.create_relationships_separately(table_relationships, table_guids) # Step 3: Extract constraint IDs from created tables table_constraints = {} for table_name, table_guid in table_guids.items(): print(f"[ThoughtSpot] Extracting joins from {table_name}...", flush=True) # Export table TML to get constraint IDs export_response = self.session.post( f"{self.base_url}/api/rest/2.0/metadata/tml/export", json={ "metadata": [{"identifier": table_guid, "type": "LOGICAL_TABLE"}], "export_associated": False, "format_type": "YAML" } ) if export_response.status_code == 200: tml_data = export_response.json() if tml_data and 'edoc' in tml_data[0]: import json tml_json = json.loads(tml_data[0]['edoc']) # Extract joins_with constraint IDs joins_with = tml_json.get('table', {}).get('joins_with', []) if joins_with: table_constraints[table_name] = [] for join in joins_with: constraint_id = join.get('name') destination = join.get('destination', {}).get('name') if constraint_id and destination: table_constraints[table_name].append({ 'constraint_id': constraint_id, 'destination': destination }) # Step 4: Create model (semantic layer) with constraint references model_start = time.time() model_name = demo_names['model'] log_progress(f"Creating model: {model_name}...") # Use the enhanced model creation that includes constraint references model_tml = self._create_model_with_constraints(tables, foreign_keys, table_guids, table_constraints, model_name, connection_name) print(f"\n📄 Model TML being sent:\n{model_tml}") response = self.session.post( f"{self.base_url}/api/rest/2.0/metadata/tml/import", json={ "metadata_tmls": [model_tml], "import_policy": "ALL_OR_NONE", "create_new": True } ) if response.status_code == 200: result = response.json() # Handle both response formats (list or dict with 'object' key) if isinstance(result, list): objects = result elif isinstance(result, dict) and 'object' in result: objects = result['object'] else: error = f"Model failed: Unexpected response format: {type(result)}" log_progress(f" ❌ {error}") results['errors'].append(error) objects = [] if objects and len(objects) > 0: if objects[0].get('response', {}).get('status', {}).get('status_code') == 'OK': model_guid = objects[0].get('response', {}).get('header', {}).get('id_guid') model_time = time.time() - model_start log_progress(f"[OK] Model created ({model_time:.1f}s)") results['model'] = model_name results['model_guid'] = model_guid # Assign tag to model print(f"🔍 DEBUG BEFORE TAG CALL: tag_name='{tag_name}', model_guid='{model_guid}'") log_progress(f"Assigning tag '{tag_name}' to model...") self.assign_tags_to_objects([model_guid], 'LOGICAL_TABLE', tag_name) # Share model _effective_share = share_with or get_admin_setting('SHARE_WITH', required=False) if _effective_share: log_progress(f"Sharing model with '{_effective_share}'...") self.share_objects([model_guid], 'LOGICAL_TABLE', _effective_share) # Step 3.5: Enable Spotter + enrich model semantics in a single export→update→reimport # (create_new=True import ignores spotter_config, so we always re-import here) try: export_resp = self.session.post( f"{self.base_url}/api/rest/2.0/metadata/tml/export", json={ "metadata": [{"identifier": model_guid, "type": "LOGICAL_TABLE"}], "export_associated": False, "format_type": "YAML" } ) if export_resp.status_code == 200: export_data = export_resp.json() if export_data and 'edoc' in export_data[0]: model_tml_dict = json.loads(export_data[0]['edoc']) # Enable Spotter model_tml_dict.setdefault('model', {}).setdefault('properties', {})['spotter_config'] = {'is_spotter_enabled': True} # Enrich with description, synonyms, and AI context if company_research: try: from model_semantic_updater import ModelSemanticUpdater log_progress("Generating model description, synonyms, and AI context...") sem_start = time.time() updater = ModelSemanticUpdater(self, llm_model=llm_model) model_description = updater.generate_model_description( company_research=company_research, use_case=use_case or '', company_name=company_name or '', model_name=model_name, ) column_semantics = updater.generate_column_semantics( company_research=company_research, model_tml=model_tml_dict, use_case=use_case or '', company_name=company_name or '', ) # Apply to TML dict in-place (returns YAML string) enriched_yaml = updater.apply_to_model_tml( model_tml_dict, column_semantics, model_description ) # Parse back so we can still dump consistently below model_tml_dict = yaml.safe_load(enriched_yaml) sem_time = time.time() - sem_start log_progress(f"[OK] Semantics generated: {len(column_semantics)} columns enriched ({sem_time:.1f}s)") except Exception as sem_err: log_progress(f"[WARN] Semantic enrichment failed (non-fatal): {sem_err}") updated_tml = yaml.dump(model_tml_dict, allow_unicode=True, sort_keys=False) update_resp = self.session.post( f"{self.base_url}/api/rest/2.0/metadata/tml/import", json={ "metadata_tmls": [updated_tml], "import_policy": "ALL_OR_NONE", "create_new": False } ) if update_resp.status_code == 200: log_progress("🤖 Spotter enabled + model semantics applied") else: log_progress(f"🤖 Model update failed: HTTP {update_resp.status_code} — {update_resp.text[:200]}") else: log_progress("🤖 Spotter enable: export returned no edoc") else: log_progress(f"🤖 Spotter enable: export failed HTTP {export_resp.status_code}") except Exception as spotter_error: log_progress(f"🤖 Spotter/semantics exception: {spotter_error}") # Step 4: Auto-create Liveboard from model lb_start = time.time() # HYBRID is the only supported liveboard creation method method = 'HYBRID' log_progress(f"Creating liveboard ({method} method)...") try: # Build company data from parameters # Clean company name for display (strip .com, .org, etc) clean_company = company_name.split('.')[0].title() if company_name and '.' in company_name else (company_name or 'Demo Company') company_data = { 'name': clean_company, 'use_case': use_case or 'General Analytics' } if method == 'HYBRID': # HYBRID: MCP creates liveboard, TML post-processes from liveboard_creator import create_liveboard_from_model_mcp, create_liveboard_from_model, enhance_mcp_liveboard # Get actual column names from ThoughtSpot model (not DDL) # This is critical because TS may rename columns to make them unique # e.g., PROCESSING_FEE becomes gift_processing_fee and tran_processing_fee model_columns = self.get_model_columns(model_guid) if not model_columns: log_progress(f" ⚠️ Could not get model columns, falling back to DDL") model_columns = [] for table_name, columns_list in tables.items(): for col in columns_list: model_columns.append(col) # Get liveboard questions from the vertical×function config outlier_dicts = [] try: from demo_personas import parse_use_case, get_use_case_config uc_vertical, uc_function = parse_use_case(use_case or '') uc_config = get_use_case_config(uc_vertical or "Generic", uc_function or "Generic") lq = uc_config.get("liveboard_questions", []) required_qs = [q for q in lq if q.get("required")] optional_qs = [q for q in lq if not q.get("required")] for q in required_qs: outlier_dicts.append({ 'title': q['title'], 'insight': q.get('insight', ''), 'viz_type': q['viz_type'], 'show_me_query': q['viz_question'], 'kpi_companion': True, 'spotter_questions': q.get('spotter_qs', []), }) for q in optional_qs[:2]: outlier_dicts.append({ 'title': q['title'], 'insight': q.get('insight', ''), 'viz_type': q['viz_type'], 'show_me_query': q['viz_question'], 'kpi_companion': False, 'spotter_questions': q.get('spotter_qs', []), }) if outlier_dicts: log_progress(f" [MCP] Using {len(outlier_dicts)} liveboard questions from {uc_vertical}×{uc_function}") except Exception as outlier_err: log_progress(f" [MCP] Liveboard questions loading skipped: {outlier_err}") # MCP creates liveboard if method == 'HYBRID': log_progress(f" Step 1/2: MCP creating initial liveboard...") log_progress(f" [MCP] Model: {model_name}, GUID: {model_guid}") log_progress(f" [MCP] Use case: {use_case or 'General Analytics'}") log_progress(f" [MCP] Using {len(model_columns)} columns from ThoughtSpot model") try: liveboard_result = create_liveboard_from_model_mcp( ts_client=self, model_id=model_guid, model_name=model_name, company_data=company_data, use_case=use_case or 'General Analytics', num_visualizations=10, liveboard_name=liveboard_name, outliers=outlier_dicts if outlier_dicts else None, llm_model=llm_model, model_columns=model_columns, prompt_logger=self.prompt_logger ) except Exception as mcp_error: import traceback error_trace = traceback.format_exc() log_progress(f" [MCP ERROR] {type(mcp_error).__name__}: {str(mcp_error)}") liveboard_result = {'success': False, 'error': str(mcp_error), 'traceback': error_trace} # Log result if liveboard_result.get('success'): log_progress(f" [MCP] Liveboard created: {liveboard_result.get('liveboard_guid')}") else: log_progress(f" [MCP FAILED] {liveboard_result.get('error', 'Unknown error')}") log_progress(" [FALLBACK] Trying direct TML liveboard creation...") try: liveboard_result = create_liveboard_from_model( ts_client=self, model_id=model_guid, model_name=model_name, company_data=company_data, use_case=use_case or 'General Analytics', num_visualizations=10, liveboard_name=liveboard_name, llm_model=llm_model, outliers=outlier_dicts if outlier_dicts else None, model_columns=model_columns, prompt_logger=self.prompt_logger ) if liveboard_result.get('success'): log_progress(f" [FALLBACK OK] Liveboard created: {liveboard_result.get('liveboard_guid')}") else: log_progress(f" [FALLBACK FAILED] {liveboard_result.get('error', 'Unknown error')}") except Exception as fallback_error: import traceback fallback_trace = traceback.format_exc() log_progress(f" [FALLBACK ERROR] {type(fallback_error).__name__}: {str(fallback_error)}") liveboard_result = { 'success': False, 'error': f"MCP failed and fallback failed: {fallback_error}", 'traceback': fallback_trace } # HYBRID: Add TML enhancement if MCP succeeded if method == 'HYBRID' and liveboard_result.get('success') and liveboard_result.get('liveboard_guid'): log_progress(f" Step 2/2: Enhancing with TML post-processing...") enhance_result = enhance_mcp_liveboard( liveboard_guid=liveboard_result['liveboard_guid'], company_data=company_data, ts_client=self, add_groups=True, fix_kpis=True, apply_brand_colors=True ) if enhance_result.get('success'): log_progress(f" [OK] Enhancement applied") else: enhance_err = f"TML enhancement failed: {enhance_result.get('message', 'unknown')[:100]}" log_progress(f" [ERROR] {enhance_err}") results['errors'].append(enhance_err) # Check result print(f"🔍 DEBUG: Liveboard result received: {liveboard_result}") print(f"🔍 DEBUG: Success flag: {liveboard_result.get('success')}") if liveboard_result.get('success'): lb_time = time.time() - lb_start log_progress(f"[OK] Liveboard created via {method} ({lb_time:.1f}s)") results['liveboard'] = liveboard_result.get('liveboard_name') results['liveboard_guid'] = liveboard_result.get('liveboard_guid') results['liveboard_method'] = method if liveboard_result.get('liveboard_url'): results['liveboard_url'] = liveboard_result.get('liveboard_url') # Assign tag to liveboard if tag_name and liveboard_result.get('liveboard_guid'): log_progress(f"Assigning tag '{tag_name}' to liveboard...") self.assign_tags_to_objects([liveboard_result['liveboard_guid']], 'PINBOARD_ANSWER_BOOK', tag_name) # Share liveboard _effective_share = share_with or get_admin_setting('SHARE_WITH', required=False) if _effective_share and liveboard_result.get('liveboard_guid'): log_progress(f"Sharing liveboard with '{_effective_share}'...") self.share_objects([liveboard_result['liveboard_guid']], 'LIVEBOARD', _effective_share) else: error = f"Liveboard creation failed: {liveboard_result.get('error', 'Unknown error')}" print(f"❌ DEBUG: Liveboard creation failed! Error: {error}") results['errors'].append(error) log_progress(f"[ERROR] {error}") except Exception as lb_error: error = f"Liveboard creation exception: {str(lb_error)}" results['errors'].append(error) log_progress(f"[ERROR] {error}") else: # Extract detailed error information obj_response = objects[0].get('response', {}) status = obj_response.get('status', {}) error_message = status.get('error_message', 'Unknown error') # Clean HTML tags from error message (ThoughtSpot sometimes returns HTML) error_message = re.sub(r'<[^>]+>', '', error_message).strip() if not error_message: error_message = 'Schema validation failed (no details provided)' error_code = status.get('error_code', 'N/A') # Try to extract additional error details from various response fields error_details = [] # Check for detailed error messages in different response structures if 'error_details' in status: error_details.append(f"Error details: {status.get('error_details')}") if 'validation_errors' in obj_response: error_details.append(f"Validation errors: {obj_response.get('validation_errors')}") if 'warnings' in obj_response: error_details.append(f"Warnings: {obj_response.get('warnings')}") # Check header for additional info header = obj_response.get('header', {}) if 'error' in header: error_details.append(f"Header error: {header.get('error')}") # Get any additional error details full_response = json.dumps(objects[0], indent=2) # Save the TML that failed for debugging import tempfile # os is already imported at module level try: debug_dir = os.path.join(tempfile.gettempdir(), 'thoughtspot_debug') os.makedirs(debug_dir, exist_ok=True) failed_tml_path = os.path.join(debug_dir, f'failed_model_{datetime.now().strftime("%Y%m%d_%H%M%S")}.tml') with open(failed_tml_path, 'w') as f: f.write(model_tml) log_progress(f"💾 Failed TML saved to: {failed_tml_path}") print(f"💾 Failed TML saved to: {failed_tml_path}") except Exception as save_error: log_progress(f"[WARN] Could not save failed TML: {save_error}") # Build comprehensive error message error = f"Model validation failed: {error_message}" if error_code != 'N/A': error += f" (Error code: {error_code})" if error_details: error += f"\n\nAdditional details:\n" + "\n".join(error_details) print(f"📋 Full model response: {full_response}") # DEBUG: Show full response print(f" ❌ {error}") log_progress(f" ❌ {error}") log_progress(f" 📋 Full response details:") log_progress(f"{full_response}") # Log full TML for debugging log_progress(f"\n📄 TML that was sent:\n{model_tml}") results['errors'].append(error) results['errors'].append(f"Full API response: {full_response}") results['errors'].append(f"Failed TML saved to: {failed_tml_path if 'failed_tml_path' in locals() else 'N/A'}") else: error = "Model failed: No objects in response" log_progress(f" ❌ {error}") results['errors'].append(error) # Mark as successful if we got this far results['success'] = len(results['errors']) == 0 except Exception as e: import traceback error_msg = str(e) full_trace = traceback.format_exc() # Log to console with full details print(f"\n{'='*60}") print(f"❌ DEPLOYMENT EXCEPTION") print(f"{'='*60}") print(f"Error: {error_msg}") print(f"\nFull traceback:") print(full_trace) print(f"{'='*60}\n") # Log through callback too log_progress(f"[ERROR] Deployment failed: {error_msg}") log_progress(f"Traceback: {full_trace}") results['errors'].append(error_msg) results['errors'].append(f"Traceback: {full_trace}") return results def deploy_to_thoughtspot(ddl: str, database: str, schema: str, connection_name: str = None, company_name: str = None, use_case: str = None, progress_callback=None) -> Dict: """ Convenience function for deploying to ThoughtSpot Args: ddl: Data Definition Language statements database: Target database name schema: Target schema name connection_name: Optional connection name progress_callback: Optional callback for progress updates Returns: Dict with deployment results """ deployer = ThoughtSpotDeployer() return deployer.deploy_all(ddl, database, schema, connection_name, company_name, use_case, progress_callback) if __name__ == "__main__": # Example usage test_ddl = """ CREATE TABLE CUSTOMERS ( CUSTOMERID INT64 PRIMARY KEY, NAME VARCHAR(255) ); """ # Test deployment - using a schema that exists results = deploy_to_thoughtspot( ddl=test_ddl, database="DEMOBUILD", # Use the actual Snowflake database schema="THOUGHTSPO_SALESA_20250915_193303" # Use the working schema from your working table ) print("\n" + "=" * 60) print("📊 DEPLOYMENT RESULTS:") print("=" * 60) print(json.dumps(results, indent=2))