Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| """ | |
| ThoughtSpot Deployment Module | |
| A comprehensive tool for deploying data models to ThoughtSpot: | |
| - Creates Snowflake connections | |
| - Parses DDL and creates tables | |
| - Generates and deploys models | |
| Usage: | |
| from thoughtspot_deployer import ThoughtSpotDeployer | |
| deployer = ThoughtSpotDeployer() | |
| results = deployer.deploy_all(ddl, database, schema) | |
| """ | |
| import os | |
| from supabase_client import get_admin_setting | |
| import re | |
| import yaml | |
| import json | |
| import requests | |
| import snowflake.connector | |
| from datetime import datetime | |
| from typing import Dict, List, Optional, Tuple | |
| from dotenv import load_dotenv | |
| from snowflake_auth import get_snowflake_connection_params | |
| # Load environment variables | |
| load_dotenv() | |
| def _safe_print(*args, **kwargs): | |
| """Print that ignores BrokenPipeError - prevents crashes when output is closed.""" | |
| try: | |
| print(*args, **kwargs) | |
| except BrokenPipeError: | |
| pass | |
| def _apply_naming_style(name: str, style: str = "snake_case") -> str: | |
| """ | |
| Convert column name to specified naming style for ThoughtSpot display. | |
| Args: | |
| name: Original column name | |
| style: Naming style - one of: Regular Case, snake_case, camelCase, PascalCase, UPPER_CASE, original | |
| Examples (for "SHIPPING_MODE"): | |
| Regular Case β Shipping Mode | |
| snake_case β shipping_mode | |
| camelCase β shippingMode | |
| PascalCase β ShippingMode | |
| UPPER_CASE β SHIPPING_MODE | |
| original β SHIPPING_MODE (unchanged) | |
| """ | |
| name = name.strip() | |
| if style == "original": | |
| return name | |
| if style == "UPPER_CASE": | |
| return name.upper().replace(" ", "_") | |
| # Split into words (handle underscores, spaces, and camelCase) | |
| import re | |
| # Split on underscores, spaces, or camelCase boundaries | |
| words = re.split(r'[_\s]+', name) | |
| # Further split camelCase words | |
| expanded_words = [] | |
| for word in words: | |
| # Split on camelCase boundaries (e.g., "firstName" -> ["first", "Name"]) | |
| parts = re.findall(r'[A-Z]?[a-z]+|[A-Z]+(?=[A-Z][a-z]|\d|\W|$)|\d+', word) | |
| if parts: | |
| expanded_words.extend(parts) | |
| else: | |
| expanded_words.append(word) | |
| words = [w.lower() for w in expanded_words if w] | |
| if not words: | |
| return name.lower() | |
| if style == "Regular Case": | |
| # Title case each word, join with spaces: STATE_ID -> State Id | |
| return " ".join(w.capitalize() for w in words) | |
| if style == "snake_case": | |
| return "_".join(words) | |
| elif style == "camelCase": | |
| # First word lowercase, rest capitalized | |
| return words[0] + "".join(w.capitalize() for w in words[1:]) | |
| elif style == "PascalCase": | |
| # All words capitalized | |
| return "".join(w.capitalize() for w in words) | |
| else: | |
| # Default to snake_case | |
| return "_".join(words) | |
| def _to_snake_case(name: str) -> str: | |
| """ | |
| Legacy function - converts to snake_case. | |
| Use _apply_naming_style() for more options. | |
| """ | |
| return _apply_naming_style(name, "snake_case") | |
| class ThoughtSpotDeployer: | |
| """ThoughtSpot deployment automation""" | |
| def __init__(self, base_url: str = None, username: str = None, secret_key: str = None): | |
| """ | |
| Initialize ThoughtSpot deployer (trusted auth only) | |
| Reads from environment variables if not passed directly. | |
| Env vars are populated from Supabase admin settings at login time. | |
| Raises ValueError if any required setting is missing. | |
| """ | |
| self.base_url = base_url if base_url else get_admin_setting('THOUGHTSPOT_URL') | |
| if not username: | |
| raise ValueError("ThoughtSpotDeployer requires username β pass the logged-in user's email") | |
| self.username = username | |
| if not secret_key: | |
| raise ValueError("ThoughtSpotDeployer requires secret_key β pass the trusted auth key for the selected environment") | |
| self.secret_key = secret_key | |
| # Snowflake connection details from environment (key pair auth) | |
| self.sf_account = get_admin_setting('SNOWFLAKE_ACCOUNT') | |
| self.sf_user = get_admin_setting('SNOWFLAKE_KP_USER') | |
| self.sf_role = get_admin_setting('SNOWFLAKE_ROLE') | |
| self.sf_warehouse = get_admin_setting('SNOWFLAKE_WAREHOUSE') | |
| self.headers = { | |
| 'Content-Type': 'application/json', | |
| 'X-Requested-By': 'ThoughtSpot' | |
| } | |
| # Use session to maintain cookies between requests | |
| self.session = requests.Session() | |
| self.session.headers.update(self.headers) | |
| # Column naming style for ThoughtSpot model columns | |
| # Options: Regular Case, snake_case, camelCase, PascalCase, UPPER_CASE, original | |
| self.column_naming_style = "Regular Case" | |
| # Per-session prompt logger β set by the chat controller after construction | |
| self.prompt_logger = None | |
| # Validate credentials for trusted auth | |
| if not all([self.base_url, self.username, self.secret_key]): | |
| raise ValueError("Missing ThoughtSpot URL, username, or trusted auth key") | |
| if not all([self.sf_account, self.sf_user, self.sf_role, self.sf_warehouse]): | |
| raise ValueError("Missing required Snowflake credentials in environment variables") | |
| def _get_private_key_for_thoughtspot(self) -> str: | |
| """Get private key in format suitable for ThoughtSpot TML""" | |
| private_key_raw = get_admin_setting('SNOWFLAKE_KP_PK') | |
| if not private_key_raw: | |
| raise ValueError("SNOWFLAKE_KP_PK environment variable not set") | |
| # ThoughtSpot expects the private key as raw PEM format string | |
| if not private_key_raw.startswith('-----BEGIN'): | |
| # If it's base64 encoded, decode it | |
| import base64 | |
| try: | |
| private_key_raw = base64.b64decode(private_key_raw).decode('utf-8') | |
| except Exception: | |
| pass | |
| return private_key_raw | |
| def authenticate(self) -> bool: | |
| """Authenticate with ThoughtSpot using trusted authentication""" | |
| return self.authenticate_trusted() | |
| def authenticate_trusted(self) -> bool: | |
| """Authenticate with ThoughtSpot using trusted authentication (secret key)""" | |
| try: | |
| auth_url = f"{self.base_url}/api/rest/2.0/auth/token/full" | |
| print(f" π Attempting trusted authentication to: {auth_url}") | |
| print(f" π€ Username: {self.username}") | |
| print(f" π Using secret key: {self.secret_key[:8]}...{self.secret_key[-4:]}" if self.secret_key and len(self.secret_key) > 12 else " π Using secret key") | |
| response = self.session.post( | |
| auth_url, | |
| json={ | |
| "username": self.username, | |
| "secret_key": self.secret_key, | |
| "validity_time_in_sec": 3600 # 1 hour token | |
| } | |
| ) | |
| print(f" π‘ HTTP Status: {response.status_code}") | |
| if response.status_code == 200: | |
| result = response.json() | |
| if 'token' in result: | |
| # Use the token as bearer auth | |
| self.session.headers['Authorization'] = f'Bearer {result["token"]}' | |
| print(" β Trusted authentication successful (bearer token)") | |
| return True | |
| else: | |
| print(f" β No token in response: {result}") | |
| return False | |
| elif response.status_code == 204: | |
| # Session cookie auth | |
| print(" β Trusted authentication successful (session cookies)") | |
| return True | |
| else: | |
| print(f" β HTTP Error {response.status_code}: {response.text}") | |
| return False | |
| except Exception as e: | |
| print(f" π₯ Trusted authentication exception: {e}") | |
| return False | |
| def authenticate_oauth(self, timeout: int = 120) -> bool: | |
| """ | |
| Authenticate with ThoughtSpot using browser-based SSO (Okta, SAML, etc.) | |
| Opens browser to ThoughtSpot login, user authenticates via SSO, | |
| and cookies are captured via a local callback server. | |
| Args: | |
| timeout: Seconds to wait for authentication (default 120) | |
| Returns: | |
| True if authentication successful, False otherwise | |
| """ | |
| import webbrowser | |
| import http.server | |
| import socketserver | |
| import threading | |
| import urllib.parse | |
| print(f" π Starting OAuth/SSO authentication for: {self.base_url}") | |
| print(f" π€ User: {self.username or 'SSO user'}") | |
| # Find an available port for the callback server | |
| callback_port = 8765 | |
| for port in range(8765, 8800): | |
| try: | |
| with socketserver.TCPServer(("", port), None) as test: | |
| callback_port = port | |
| break | |
| except OSError: | |
| continue | |
| callback_url = f"http://localhost:{callback_port}/callback" | |
| auth_complete = threading.Event() | |
| auth_success = [False] # Use list to allow modification in nested function | |
| class OAuthCallbackHandler(http.server.BaseHTTPRequestHandler): | |
| def log_message(self, format, *args): | |
| pass # Suppress logging | |
| def do_GET(self): | |
| if self.path.startswith('/callback'): | |
| # Authentication completed - show success page | |
| self.send_response(200) | |
| self.send_header('Content-type', 'text/html') | |
| self.end_headers() | |
| # Page that extracts cookies and displays success | |
| html = """ | |
| <!DOCTYPE html> | |
| <html> | |
| <head> | |
| <title>ThoughtSpot Authentication</title> | |
| <style> | |
| body { font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', sans-serif; | |
| display: flex; justify-content: center; align-items: center; | |
| height: 100vh; margin: 0; background: #f5f5f5; } | |
| .container { text-align: center; background: white; padding: 40px; | |
| border-radius: 10px; box-shadow: 0 2px 10px rgba(0,0,0,0.1); } | |
| .success { color: #28a745; font-size: 48px; } | |
| h1 { color: #333; } | |
| p { color: #666; } | |
| </style> | |
| </head> | |
| <body> | |
| <div class="container"> | |
| <div class="success">β</div> | |
| <h1>Authentication Successful!</h1> | |
| <p>You can close this window and return to the application.</p> | |
| </div> | |
| </body> | |
| </html> | |
| """ | |
| self.wfile.write(html.encode()) | |
| auth_success[0] = True | |
| auth_complete.set() | |
| elif self.path == '/check': | |
| # Health check endpoint | |
| self.send_response(200) | |
| self.send_header('Content-type', 'text/plain') | |
| self.end_headers() | |
| self.wfile.write(b'OK') | |
| else: | |
| self.send_response(404) | |
| self.end_headers() | |
| # Start callback server in background thread | |
| server = socketserver.TCPServer(("", callback_port), OAuthCallbackHandler) | |
| server_thread = threading.Thread(target=server.handle_request) | |
| server_thread.daemon = True | |
| server_thread.start() | |
| # Build the SSO login URL | |
| # ThoughtSpot redirects to SSO provider, then back to ThoughtSpot, then to our callback | |
| ts_login_url = f"{self.base_url}/?redirectURL={urllib.parse.quote(callback_url)}" | |
| print(f" π Opening browser for SSO login...") | |
| print(f" π Callback URL: {callback_url}") | |
| print(f" β³ Waiting up to {timeout} seconds for authentication...") | |
| # Open browser to ThoughtSpot login | |
| webbrowser.open(ts_login_url) | |
| # Wait for authentication to complete | |
| if auth_complete.wait(timeout=timeout): | |
| if auth_success[0]: | |
| print(" β Browser authentication completed!") | |
| # Now we need to get the session from ThoughtSpot | |
| # The user authenticated in the browser, so we need to get a session token | |
| # We'll use the session/token endpoint to get a token for API calls | |
| # Try to get a session token using the trusted auth flow | |
| # Since user is now logged in via browser, we attempt to get session info | |
| try: | |
| # Check if session is valid by calling a simple API endpoint | |
| # First, let's try to get current user info | |
| user_response = self.session.get( | |
| f"{self.base_url}/api/rest/2.0/auth/session/user", | |
| timeout=10 | |
| ) | |
| if user_response.status_code == 200: | |
| user_info = user_response.json() | |
| print(f" β Session active for: {user_info.get('name', 'unknown')}") | |
| return True | |
| else: | |
| # Browser auth completed but we don't have cookies in our session | |
| # This is expected - browser and Python have separate cookie jars | |
| print(" β οΈ Browser authenticated but Python session needs cookies") | |
| print(" π‘ For full OAuth support, please use the browser-based workflow") | |
| print(" π‘ Or configure trusted authentication on ThoughtSpot") | |
| return False | |
| except Exception as e: | |
| print(f" β οΈ Could not verify session: {e}") | |
| return False | |
| else: | |
| print(" β Authentication callback received but marked as failed") | |
| return False | |
| else: | |
| print(" β Authentication timed out") | |
| server.shutdown() | |
| return False | |
| def get_model_columns(self, model_guid: str) -> List[Dict]: | |
| """ | |
| Get actual column names from a ThoughtSpot model. | |
| This is important because ThoughtSpot may rename columns to make them unique | |
| (e.g., PROCESSING_FEE becomes gift_processing_fee and tran_processing_fee). | |
| Args: | |
| model_guid: GUID of the ThoughtSpot model | |
| Returns: | |
| List of column dicts with 'name' and 'type' keys | |
| """ | |
| try: | |
| # Export the model TML to get actual column names | |
| export_response = self.session.post( | |
| f"{self.base_url}/api/rest/2.0/metadata/tml/export", | |
| json={ | |
| 'metadata': [{'identifier': model_guid}], | |
| 'export_associated': False | |
| } | |
| ) | |
| if export_response.status_code != 200: | |
| print(f" β οΈ Could not export model TML: HTTP {export_response.status_code}") | |
| return [] | |
| tml_data = export_response.json() | |
| if not tml_data or len(tml_data) == 0: | |
| print(f" β οΈ Empty TML export response") | |
| return [] | |
| # Parse YAML TML | |
| tml_str = tml_data[0].get('edoc', '') | |
| model_tml = yaml.safe_load(tml_str) | |
| if not model_tml or 'model' not in model_tml: | |
| print(f" β οΈ Invalid model TML structure") | |
| return [] | |
| # Extract columns with their actual names from the model | |
| columns = [] | |
| for col in model_tml.get('model', {}).get('columns', []): | |
| col_name = col.get('name', '') | |
| col_props = col.get('properties', {}) | |
| col_type = col_props.get('column_type', 'ATTRIBUTE') | |
| # Map ThoughtSpot column types to SQL-like types for AI understanding | |
| if col_type == 'MEASURE': | |
| sql_type = 'NUMBER' # Measures are numeric | |
| elif col_props.get('calendar'): | |
| sql_type = 'DATE' # Calendar attribute = date column | |
| else: | |
| sql_type = 'VARCHAR' # Other attributes are typically strings | |
| columns.append({ | |
| 'name': col_name, | |
| 'type': sql_type, | |
| 'ts_type': col_type # Keep original for reference | |
| }) | |
| print(f" π Got {len(columns)} columns from ThoughtSpot model") | |
| return columns | |
| except Exception as e: | |
| print(f" β οΈ Error getting model columns: {e}") | |
| return [] | |
| def parse_ddl(self, ddl: str) -> Tuple[Dict, List]: | |
| """ | |
| Parse DDL to extract table definitions and foreign key relationships | |
| Returns: | |
| Tuple of (tables_dict, foreign_keys_list) | |
| """ | |
| tables = {} | |
| foreign_keys = [] | |
| # Find all CREATE TABLE statements | |
| table_pattern = ( | |
| r'CREATE\s+TABLE\s+(?:IF\s+NOT\s+EXISTS\s+)?' | |
| r'(?:"?([A-Za-z0-9_]+)"?)\s*\((.*?)\)\s*;' | |
| ) | |
| for match in re.finditer(table_pattern, ddl, re.IGNORECASE | re.DOTALL): | |
| table_name = match.group(1).upper() | |
| columns_text = match.group(2) | |
| columns = [] | |
| # Parse each column definition - PROPERLY FIXED parsing | |
| # Split by comma but be careful of commas inside parentheses | |
| column_lines = [] | |
| current_line = "" | |
| paren_count = 0 | |
| for char in columns_text: | |
| if char == '(': | |
| paren_count += 1 | |
| elif char == ')': | |
| paren_count -= 1 | |
| elif char == ',' and paren_count == 0: | |
| column_lines.append(current_line.strip()) | |
| current_line = "" | |
| continue | |
| current_line += char | |
| # Add the last line | |
| if current_line.strip(): | |
| column_lines.append(current_line.strip()) | |
| for line in column_lines: | |
| line = line.strip() | |
| line_upper = line.upper() | |
| def _normalize_table_name(raw_name: str) -> str: | |
| # Handle optional quoting and optional DB/SCHEMA qualifiers. | |
| normalized = raw_name.replace('"', '').strip() | |
| if '.' in normalized: | |
| normalized = normalized.split('.')[-1] | |
| return normalized.upper() | |
| # Parse FK constraints in any common table-level form: | |
| # 1) FOREIGN KEY (COL) REFERENCES TBL(COL) | |
| # 2) CONSTRAINT FK_NAME FOREIGN KEY (COL) REFERENCES TBL(COL) | |
| fk_match = re.search( | |
| r'FOREIGN\s+KEY\s*\((\w+)\)\s*REFERENCES\s+([A-Za-z0-9_".]+)\s*\((\w+)\)', | |
| line, | |
| re.IGNORECASE | |
| ) | |
| if fk_match: | |
| from_col = fk_match.group(1).upper() | |
| to_table = _normalize_table_name(fk_match.group(2)) | |
| to_col = fk_match.group(3).upper() | |
| foreign_keys.append({ | |
| 'from_table': table_name, | |
| 'from_column': from_col, | |
| 'to_table': to_table, | |
| 'to_column': to_col | |
| }) | |
| print(f" π Found FK: {table_name}.{from_col} -> {to_table}.{to_col}") | |
| continue | |
| # Parse inline FK form in column definitions: | |
| # COL_NAME <TYPE...> REFERENCES TARGET_TABLE(TARGET_COL) | |
| inline_fk_match = re.search( | |
| r'^(\w+)\s+.+?\s+REFERENCES\s+([A-Za-z0-9_".]+)\s*\((\w+)\)', | |
| line, | |
| re.IGNORECASE | |
| ) | |
| if inline_fk_match: | |
| from_col = inline_fk_match.group(1).upper() | |
| to_table = _normalize_table_name(inline_fk_match.group(2)) | |
| to_col = inline_fk_match.group(3).upper() | |
| foreign_keys.append({ | |
| 'from_table': table_name, | |
| 'from_column': from_col, | |
| 'to_table': to_table, | |
| 'to_column': to_col | |
| }) | |
| print(f" π Found inline FK: {table_name}.{from_col} -> {to_table}.{to_col}") | |
| if not line_upper.startswith(('PRIMARY KEY', 'CONSTRAINT', 'FOREIGN KEY', 'UNIQUE', 'CHECK', 'INDEX')): | |
| # Parse: COLUMNNAME DATATYPE(params) [IDENTITY] [NOT NULL] | |
| parts = line.split() | |
| if len(parts) >= 2: | |
| col_name_original = parts[0] # Preserve original casing for display name | |
| col_name = parts[0].upper() # Uppercase for DB reference | |
| # Get the FULL data type including parameters - HANDLE IDENTITY! | |
| col_type_match = re.match(r'(\w+(?:\([^)]+\))?)', parts[1]) | |
| col_type = col_type_match.group(1).upper() if col_type_match else parts[1].upper() | |
| columns.append({ | |
| 'name': col_name, | |
| 'original_name': col_name_original, # Keep original for naming style | |
| 'type': col_type, | |
| 'nullable': 'NOT NULL' not in line.upper() | |
| }) | |
| tables[table_name] = columns | |
| print(f"π Found {len(tables)} tables and {len(foreign_keys)} foreign keys in DDL") | |
| return tables, foreign_keys | |
| def create_relationships_separately(self, table_relationships: Dict, table_guids: Dict): | |
| """Create relationships as separate TML objects after tables exist""" | |
| for table_name, relationships in table_relationships.items(): | |
| for relationship in relationships: | |
| # Create relationship TML | |
| relationship_tml = { | |
| 'guid': None, | |
| 'relationship': { | |
| 'name': relationship['name'], | |
| 'destination_table': table_guids.get(relationship['to_table']), | |
| 'source_table': table_guids.get(table_name), | |
| 'type': relationship['type'], | |
| 'join_columns': [ | |
| { | |
| 'source_column': rel_on['from_column'], | |
| 'destination_column': rel_on['to_column'] | |
| } | |
| for rel_on in relationship['on'] | |
| ] | |
| } | |
| } | |
| relationship_yaml = yaml.dump(relationship_tml, default_flow_style=False, sort_keys=False) | |
| print(f" π Creating relationship: {relationship['name']}") | |
| print(f" π Relationship TML:\n{relationship_yaml}") | |
| response = self.session.post( | |
| f"{self.base_url}/api/rest/2.0/metadata/tml/import", | |
| json={ | |
| "metadata_tmls": [relationship_yaml], | |
| "import_policy": "ALL_OR_NONE", | |
| "create_new": True | |
| } | |
| ) | |
| if response.status_code == 200: | |
| result = response.json() | |
| print(f" π Relationship response: {result}") | |
| if result[0].get('response', {}).get('status', {}).get('status_code') == 'OK': | |
| print(f" β Relationship created: {relationship['name']}") | |
| else: | |
| error_msg = result[0].get('response', {}).get('status', {}).get('error_message', 'Unknown error') | |
| print(f" β Relationship failed: {error_msg}") | |
| else: | |
| print(f" β Relationship API call failed: {response.status_code}") | |
| print(f" π Response: {response.text}") | |
| def create_table_tml(self, table_name: str, columns: List, connection_name: str, | |
| database: str, schema: str, all_tables: Dict = None, table_guid: str = None, foreign_keys: List = None) -> str: | |
| """Generate table TML matching working example structure | |
| Args: | |
| table_guid: If provided, use this GUID (for updating existing tables with joins) | |
| foreign_keys: List of foreign key relationships parsed from DDL | |
| """ | |
| tml_columns = [] | |
| # Generate columns with proper typing | |
| for col in columns: | |
| ts_type = self._map_data_type(col['type']) | |
| col_name = col['name'].upper() | |
| # Determine column type - IDs are measures in table TML but not model TML | |
| if ts_type in ['INT64'] and col_name.endswith('ID'): | |
| col_type = 'MEASURE' | |
| properties = { | |
| 'column_type': col_type, | |
| 'aggregation': 'SUM', | |
| 'index_type': 'DONT_INDEX' | |
| } | |
| elif ts_type in ['DOUBLE', 'INT64'] and not col_name.endswith('ID'): | |
| col_type = 'MEASURE' | |
| properties = { | |
| 'column_type': col_type, | |
| 'aggregation': 'SUM', | |
| 'index_type': 'DONT_INDEX' | |
| } | |
| else: | |
| col_type = 'ATTRIBUTE' | |
| properties = { | |
| 'column_type': col_type, | |
| 'index_type': 'DONT_INDEX' | |
| } | |
| column_def = { | |
| 'name': col['name'].upper(), | |
| 'db_column_name': col['name'].upper(), | |
| 'properties': properties, | |
| 'db_column_properties': { | |
| 'data_type': ts_type | |
| } | |
| } | |
| tml_columns.append(column_def) | |
| table_tml = { | |
| 'guid': table_guid, # Use provided GUID or None for new tables | |
| 'table': { | |
| 'name': table_name.upper(), | |
| 'db': database, | |
| 'schema': schema, | |
| 'db_table': table_name.upper(), | |
| 'connection': { | |
| 'name': connection_name | |
| }, | |
| 'columns': tml_columns, | |
| 'properties': { | |
| 'sage_config': { | |
| 'is_sage_enabled': False | |
| } | |
| } | |
| } | |
| } | |
| # Add joins_with relationships (matching working example) | |
| if all_tables: | |
| joins_with = self._generate_table_joins(table_name, columns, all_tables, foreign_keys) | |
| if joins_with: | |
| table_tml['table']['joins_with'] = joins_with | |
| # Generate YAML with proper formatting | |
| yaml_output = yaml.dump(table_tml, default_flow_style=False, sort_keys=False) | |
| # Keep quotes around 'on' key as shown in working example | |
| return yaml_output | |
| def _generate_table_joins(self, table_name: str, columns: List, all_tables: Dict, foreign_keys: List = None) -> List: | |
| """Generate joins_with structure based on parsed foreign keys from DDL""" | |
| joins = [] | |
| table_name_upper = table_name.upper() | |
| if not foreign_keys: | |
| print(f" β οΈ No foreign keys provided for {table_name_upper}") | |
| return joins | |
| # Use actual foreign keys from DDL | |
| for fk in foreign_keys: | |
| if fk['from_table'] == table_name_upper: | |
| to_table = fk['to_table'] | |
| from_col = fk['from_column'] | |
| to_col = fk['to_column'] | |
| # Skip self-joins (e.g., EMPLOYEES.MANAGER_ID -> EMPLOYEES.EMPLOYEE_ID) | |
| # ThoughtSpot models don't handle self-referential joins well (causes cycles) | |
| if to_table == table_name_upper: | |
| print(f" βοΈ Skipping self-join: {table_name_upper}.{from_col} -> {to_table}.{to_col} (self-referential)") | |
| continue | |
| # Check if target table exists in THIS deployment | |
| available_tables_upper = [t.upper() for t in all_tables.keys()] | |
| if to_table in available_tables_upper: | |
| constraint_id = f"SYS_CONSTRAINT_{self._generate_constraint_id()}" | |
| join_def = { | |
| 'name': constraint_id, | |
| 'destination': { | |
| 'name': to_table | |
| }, | |
| 'on': f"[{table_name_upper}::{from_col}] = [{to_table}::{to_col}]", | |
| 'type': 'INNER' | |
| } | |
| joins.append(join_def) | |
| print(f" π Generated join: {table_name_upper}.{from_col} -> {to_table}.{to_col}") | |
| else: | |
| print(f" βοΈ Skipping join: {table_name_upper}.{from_col} -> {to_table} (table not in this deployment)") | |
| return joins | |
| def create_connection_tml(self, connection_name: str) -> str: | |
| """Generate connection TML matching working example""" | |
| connection_tml = { | |
| 'guid': None, # Will be generated by ThoughtSpot | |
| 'connection': { | |
| 'name': connection_name, | |
| 'type': 'RDBMS_SNOWFLAKE', | |
| 'authentication_type': 'KEY_PAIR', | |
| 'properties': [ | |
| {'key': 'accountName', 'value': self.sf_account}, | |
| {'key': 'user', 'value': self.sf_user}, | |
| {'key': 'private_key', 'value': self._get_private_key_for_thoughtspot()}, | |
| {'key': 'passphrase', 'value': get_admin_setting('SNOWFLAKE_KP_PASSPHRASE', required=False)}, | |
| {'key': 'role', 'value': self.sf_role}, | |
| {'key': 'warehouse', 'value': self.sf_warehouse} | |
| ], | |
| 'description': f'Auto-generated Snowflake connection for {connection_name}' | |
| } | |
| } | |
| yaml_output = yaml.dump(connection_tml, default_flow_style=False, sort_keys=False) | |
| return yaml_output | |
| def create_actual_model_tml(self, tables: Dict, foreign_keys: List, table_guids: Dict = None, | |
| model_name: str = None, connection_name: str = None) -> str: | |
| """Generate proper model TML matching boone_test5 working example""" | |
| if not model_name: | |
| model_name = f"demo_model_{datetime.now().strftime('%Y%m%d')}" | |
| if not connection_name: | |
| connection_name = model_name | |
| # Create model structure matching working example exactly | |
| model = { | |
| 'guid': None, # Will be generated by ThoughtSpot | |
| 'model': { | |
| 'name': model_name, | |
| 'model_tables': [], | |
| 'columns': [], | |
| 'properties': { | |
| 'is_bypass_rls': False, | |
| 'join_progressive': True, | |
| 'spotter_config': { | |
| 'is_spotter_enabled': True | |
| } | |
| } | |
| } | |
| } | |
| # Build column name conflict resolution | |
| column_name_counts = {} | |
| for table_name, columns in tables.items(): | |
| for col in columns: | |
| col_name = col['name'].upper() | |
| if col_name not in column_name_counts: | |
| column_name_counts[col_name] = [] | |
| column_name_counts[col_name].append(table_name.upper()) | |
| # Add model_tables - START WITH NO JOINS for now (we can add them later) | |
| print(" π Creating model without explicit joins (ThoughtSpot can auto-detect)") | |
| for table_name in tables.keys(): | |
| table_name_upper = table_name.upper() | |
| table_guid = table_guids.get(table_name_upper) if table_guids else None | |
| # Use FQN to resolve "multiple data sources with same name" issue | |
| # ThoughtSpot explicitly requires this when there are duplicate table names | |
| table_entry = { | |
| 'name': table_name_upper, | |
| 'fqn': table_guid # Required to uniquely identify which table to use | |
| } | |
| # For now, don't add explicit joins - let ThoughtSpot auto-detect | |
| # This matches the pattern where some tables in your working example don't have joins | |
| model['model']['model_tables'].append(table_entry) | |
| # Remove diamond join paths - ThoughtSpot rejects models where | |
| # table A joins to B, A joins to C, and C also joins to B | |
| self._remove_diamond_joins(model['model']['model_tables']) | |
| # Add columns with proper global conflict resolution | |
| used_display_names = set() # Track used names globally across all columns | |
| for table_name, columns in tables.items(): | |
| table_name_upper = table_name.upper() | |
| for col in columns: | |
| col_name = col['name'].upper() | |
| original_col_name = col.get('original_name', col['name']) # Use original casing for display | |
| # TODO: Later we can exclude ID columns for cleaner model | |
| # For now, include all columns to get the basic model working | |
| # Start with basic conflict resolution | |
| display_name = self._resolve_column_name_conflict( | |
| col_name, table_name_upper, column_name_counts, | |
| original_name=original_col_name | |
| ) | |
| # If the display name is still used, make it unique with table prefixes | |
| original_display_name = display_name | |
| counter = 1 | |
| while display_name.lower() in used_display_names: | |
| # Use table prefix for all tables consistently | |
| if counter == 1: | |
| # Generate consistent short prefix from table name | |
| if len(table_name_upper) <= 4: | |
| prefix = table_name_upper.lower() | |
| else: | |
| prefix = table_name_upper[:4].lower() | |
| # Use snake_case: prefix_name | |
| display_name = f"{prefix}_{original_display_name}" | |
| else: | |
| # Fallback: add number | |
| display_name = f"{original_display_name}_{counter}" | |
| counter += 1 | |
| used_display_names.add(display_name.lower()) | |
| # Determine column type based on data type | |
| col_type, aggregation = self._determine_column_type(col['type'], col_name) | |
| column_def = { | |
| 'name': display_name, | |
| 'column_id': f"{table_name_upper}::{col_name}", | |
| 'properties': { | |
| 'column_type': col_type, | |
| 'index_type': 'DONT_INDEX' | |
| } | |
| } | |
| # Add aggregation for measures | |
| if aggregation: | |
| column_def['properties']['aggregation'] = aggregation | |
| # Add calendar property for DATE columns so ThoughtSpot enables | |
| # time bucketing (.weekly, .monthly, etc.) on them | |
| if self._map_data_type(col['type']) == 'DATE': | |
| column_def['properties']['calendar'] = 'calendar' | |
| model['model']['columns'].append(column_def) | |
| # Generate YAML output with proper formatting | |
| yaml_output = yaml.dump(model, default_flow_style=False, sort_keys=False, | |
| default_style=None, indent=2, width=120) | |
| # Validate the generated YAML | |
| try: | |
| # Test if the YAML can be parsed back | |
| yaml.safe_load(yaml_output) | |
| print(" β Generated YAML is valid") | |
| except yaml.YAMLError as e: | |
| print(f" β Generated YAML is invalid: {e}") | |
| print(" π Invalid YAML:") | |
| print(yaml_output) | |
| raise ValueError(f"Generated invalid YAML: {e}") | |
| return yaml_output | |
| def _is_foreign_key_column(self, col_name: str, table_name: str, foreign_keys: List) -> bool: | |
| """Check if column is a foreign key (used only for joins, not analytics)""" | |
| for fk in foreign_keys: | |
| if (fk.get('source_table', '').upper() == table_name and | |
| fk.get('source_column', '').upper() == col_name): | |
| return True | |
| return False | |
| def _is_surrogate_primary_key(self, col: Dict, col_name: str) -> bool: | |
| """Check if column is a meaningless surrogate key (numeric ID)""" | |
| # Common patterns: ID, _ID, ID_, ends with 'id' | |
| if col_name.upper().endswith('ID'): | |
| # Check if it's numeric (INT, BIGINT, NUMBER) | |
| col_type = col.get('type', '').upper() | |
| if any(t in col_type for t in ['INT', 'NUMBER', 'NUMERIC', 'BIGINT']): | |
| return True | |
| return False | |
| def _create_model_with_constraints(self, tables: Dict, foreign_keys: List, table_guids: Dict, | |
| table_constraints: Dict, model_name: str, connection_name: str) -> str: | |
| """Generate model TML with constraint references like our successful test""" | |
| print(" π Creating model with constraint references") | |
| # Build column name conflict tracking | |
| column_name_counts = {} | |
| for table_name, columns in tables.items(): | |
| for col in columns: | |
| col_name = col['name'].upper() | |
| if col_name not in column_name_counts: | |
| column_name_counts[col_name] = [] | |
| column_name_counts[col_name].append(table_name.upper()) | |
| model = { | |
| 'guid': None, | |
| 'model': { | |
| 'name': model_name, | |
| 'model_tables': [], | |
| 'columns': [], | |
| 'properties': { | |
| 'is_bypass_rls': False, | |
| 'join_progressive': True, | |
| 'spotter_config': { | |
| 'is_spotter_enabled': True | |
| } | |
| } | |
| } | |
| } | |
| # Add model_tables with FQNs and constraint-based joins | |
| for table_name in tables.keys(): | |
| table_name_upper = table_name.upper() | |
| table_guid = table_guids.get(table_name_upper) | |
| table_entry = { | |
| 'name': table_name_upper, | |
| 'fqn': table_guid | |
| } | |
| # Build joins from foreign_keys list (more reliable than constraint extraction) | |
| table_joins = [] | |
| for fk in foreign_keys: | |
| if fk['from_table'].upper() == table_name_upper: | |
| to_table = fk['to_table'].upper() | |
| # Skip self-joins (e.g., EMPLOYEES.MANAGER_ID -> EMPLOYEES.EMPLOYEE_ID) | |
| # ThoughtSpot models don't handle self-referential joins well (causes cycles) | |
| if to_table == table_name_upper: | |
| print(f" βοΈ Skipping self-join in model: {table_name_upper}.{fk['from_column']} -> {to_table}") | |
| continue | |
| # Check if target table exists in this deployment | |
| if to_table in [t.upper() for t in tables.keys()]: | |
| # ThoughtSpot on clause format: [SOURCE::COL] = [DEST::COL] | |
| from_col = fk['from_column'].upper() | |
| to_col = fk['to_column'].upper() | |
| on_clause = f"[{table_name_upper}::{from_col}] = [{to_table}::{to_col}]" | |
| join_entry = { | |
| 'with': to_table, | |
| 'on': on_clause, | |
| 'type': 'LEFT_OUTER', | |
| 'cardinality': 'MANY_TO_ONE' # Fact to dimension is many-to-one | |
| } | |
| table_joins.append(join_entry) | |
| print(f" π Added join: {table_name_upper}.{from_col} -> {to_table}.{to_col}") | |
| if table_joins: | |
| table_entry['joins'] = table_joins | |
| model['model']['model_tables'].append(table_entry) | |
| # Remove diamond join paths - ThoughtSpot rejects models where | |
| # table A joins to B, A joins to C, and C also joins to B | |
| self._remove_diamond_joins(model['model']['model_tables']) | |
| # Add columns with proper global conflict resolution (same as working version) | |
| used_display_names = set() | |
| for table_name, columns in tables.items(): | |
| table_name_upper = table_name.upper() | |
| for col in columns: | |
| col_name = col['name'].upper() | |
| original_col_name = col.get('original_name', col['name']) # Use original casing for display | |
| # NOTE: We used to skip FK/PK columns, but ThoughtSpot requires them for joins | |
| # Even though users don't search "customer 23455", the join columns must be present | |
| # in the model's columns section for the joins to work properly. | |
| # | |
| # SKIP foreign key columns - they're join keys, not analytics columns | |
| # if self._is_foreign_key_column(col_name, table_name_upper, foreign_keys): | |
| # print(f" βοΈ Skipping FK column: {table_name_upper}.{col_name}") | |
| # continue | |
| # | |
| # SKIP surrogate primary keys (numeric IDs) - nobody searches "customer 23455" | |
| # if self._is_surrogate_primary_key(col, col_name): | |
| # print(f" βοΈ Skipping surrogate PK: {table_name_upper}.{col_name}") | |
| # continue | |
| # Start with basic conflict resolution | |
| display_name = self._resolve_column_name_conflict( | |
| col_name, table_name_upper, column_name_counts, | |
| original_name=original_col_name | |
| ) | |
| # If the display name is still used, make it unique with table prefixes | |
| original_display_name = display_name | |
| counter = 1 | |
| while display_name.lower() in used_display_names: | |
| # Use snake_case: prefix_name | |
| if counter == 1: | |
| if table_name_upper == 'CUSTOMERS': | |
| display_name = f"cust_{original_display_name}" | |
| elif table_name_upper == 'PRODUCTS': | |
| display_name = f"prod_{original_display_name}" | |
| elif table_name_upper == 'ORDERS': | |
| display_name = f"order_{original_display_name}" | |
| elif table_name_upper == 'ORDERITEMS': | |
| display_name = f"item_{original_display_name}" | |
| elif table_name_upper == 'SALES': | |
| display_name = f"sale_{original_display_name}" | |
| elif table_name_upper == 'SALESREPS': | |
| display_name = f"rep_{original_display_name}" | |
| else: | |
| display_name = f"{table_name_upper[:4].lower()}_{original_display_name}" | |
| else: | |
| display_name = f"{original_display_name}_{counter}" | |
| counter += 1 | |
| used_display_names.add(display_name.lower()) | |
| # Determine column type based on data type | |
| col_type, aggregation = self._determine_column_type(col['type'], col_name) | |
| column_def = { | |
| 'name': display_name, | |
| 'column_id': f"{table_name_upper}::{col_name}", | |
| 'properties': { | |
| 'column_type': col_type, | |
| 'index_type': 'DONT_INDEX' | |
| } | |
| } | |
| if aggregation: | |
| column_def['properties']['aggregation'] = aggregation | |
| # Add calendar property for DATE columns so ThoughtSpot enables | |
| # time bucketing (.weekly, .monthly, etc.) on them | |
| if self._map_data_type(col['type']) == 'DATE': | |
| column_def['properties']['calendar'] = 'calendar' | |
| model['model']['columns'].append(column_def) | |
| # Generate YAML output with validation | |
| yaml_output = yaml.dump(model, default_flow_style=False, sort_keys=False, | |
| default_style=None, indent=2, width=120) | |
| # Fix YAML reserved word quoting - 'on' gets quoted because it's a YAML boolean | |
| # ThoughtSpot needs it unquoted | |
| yaml_output = yaml_output.replace("'on':", "on:") | |
| # Validate the generated YAML | |
| try: | |
| yaml.safe_load(yaml_output) | |
| print(" β Generated YAML is valid") | |
| except yaml.YAMLError as e: | |
| print(f" β Generated YAML is invalid: {e}") | |
| raise ValueError(f"Generated invalid YAML: {e}") | |
| return yaml_output | |
| def _remove_diamond_joins(self, model_tables: list): | |
| """Remove join cycles that ThoughtSpot rejects using a spanning tree. | |
| ThoughtSpot requires exactly ONE path between any two tables in a model. | |
| Uses Kruskal's algorithm: add edges from highest-priority (fact) tables first, | |
| skip any edge that would create a cycle. Priority order ensures fact table | |
| joins are preserved and dimension-to-dimension "snowflake" joins are pruned. | |
| """ | |
| def edge_key(src_name: str, join_def: dict): | |
| return ( | |
| src_name, | |
| join_def.get('with'), | |
| join_def.get('on', ''), | |
| join_def.get('type', ''), | |
| join_def.get('cardinality', ''), | |
| ) | |
| all_edges = [] | |
| for t in model_tables: | |
| src_name = t['name'] | |
| for j in t.get('joins', []): | |
| all_edges.append((src_name, j.get('with'), j, edge_key(src_name, j))) | |
| if not all_edges: | |
| print(f" β No joins to check for cycles") | |
| return | |
| out_degree = {} | |
| for t in model_tables: | |
| out_degree[t['name']] = len(t.get('joins', [])) | |
| in_degree = {t['name']: 0 for t in model_tables} | |
| for src, dst, _, _ in all_edges: | |
| in_degree[dst] = in_degree.get(dst, 0) + 1 | |
| all_edges.sort(key=lambda e: (-out_degree.get(e[0], 0), -in_degree.get(e[1], 0), e[0], e[1])) | |
| parent = {t['name']: t['name'] for t in model_tables} | |
| def find(x): | |
| while parent[x] != x: | |
| parent[x] = parent[parent[x]] | |
| x = parent[x] | |
| return x | |
| def union(a, b): | |
| ra, rb = find(a), find(b) | |
| if ra == rb: | |
| return False | |
| parent[ra] = rb | |
| return True | |
| kept_edge_keys = set() | |
| removed = [] | |
| for src, dst, join_def, e_key in all_edges: | |
| if union(src, dst): | |
| kept_edge_keys.add(e_key) | |
| else: | |
| removed.append(f"{src}->{dst} ({join_def.get('on', '')})") | |
| for t in model_tables: | |
| if 'joins' not in t: | |
| continue | |
| src_name = t['name'] | |
| t['joins'] = [j for j in t['joins'] if edge_key(src_name, j) in kept_edge_keys] | |
| for t in model_tables: | |
| if 'joins' in t and not t['joins']: | |
| del t['joins'] | |
| if removed: | |
| print(f" πΆ Removed {len(removed)} redundant joins (cycle prevention):") | |
| for r in removed: | |
| print(f" - {r}") | |
| else: | |
| print(f" β No join cycles detected") | |
| def _generate_constraint_id(self) -> str: | |
| """Generate a constraint ID similar to ThoughtSpot's system constraints""" | |
| import uuid | |
| return str(uuid.uuid4()) | |
| def validate_foreign_key_references(self, tables: Dict, foreign_keys: List = None) -> List[str]: | |
| """ | |
| Validate that foreign key columns reference tables that exist in the schema. | |
| Uses explicit FK constraints from DDL - not heuristics. | |
| Args: | |
| tables: Dictionary of table definitions | |
| foreign_keys: List of FK relationships parsed from DDL | |
| Each FK is: {'from_table': str, 'from_column': str, | |
| 'to_table': str, 'to_column': str} | |
| Returns: | |
| List of warning messages about missing referenced tables | |
| """ | |
| warnings = [] | |
| if not foreign_keys: | |
| return warnings # No explicit FKs defined, nothing to validate | |
| table_names_upper = [t.upper() for t in tables.keys()] | |
| for fk in foreign_keys: | |
| target_table = fk.get('to_table', '').upper() | |
| from_table = fk.get('from_table', '') | |
| from_column = fk.get('from_column', '') | |
| # Check if the target table exists in this schema | |
| if target_table and target_table not in table_names_upper: | |
| warnings.append( | |
| f"β οΈ {from_table}.{from_column} references {fk.get('to_table')}, " | |
| f"but {fk.get('to_table')} is not in this schema. " | |
| f"The join will be skipped during deployment." | |
| ) | |
| return warnings | |
| def _resolve_column_name_conflict(self, col_name: str, table_name: str, | |
| column_name_counts: Dict, | |
| original_name: str = None) -> str: | |
| """ | |
| Resolve column name conflicts using configured naming style and prefixes. | |
| Examples (snake_case): | |
| SHIPPING_MODE β shipping_mode | |
| DAYS_TO_SHIP β days_to_ship | |
| ORDER_DATE (conflict) β order_order_date, cust_order_date, etc. | |
| Args: | |
| col_name: Uppercase column name (for conflict detection) | |
| table_name: Table name for prefix generation | |
| column_name_counts: Dict tracking column name occurrences | |
| original_name: Original casing of column name (for proper camelCase detection) | |
| """ | |
| # Use original name if provided (preserves camelCase boundaries) | |
| name_for_styling = original_name if original_name else col_name | |
| # Apply configured naming style | |
| styled_name = _apply_naming_style(name_for_styling, self.column_naming_style) | |
| if len(column_name_counts.get(col_name, [])) <= 1: | |
| # No conflict - use styled name directly | |
| return styled_name | |
| # For conflicts, generate prefix dynamically from table name | |
| if len(table_name) <= 4: | |
| prefix = table_name.lower() | |
| elif 'sales' in table_name.lower() and 'rep' in table_name.lower(): | |
| prefix = 'rep' # Special case for readability | |
| elif 'customer' in table_name.lower(): | |
| prefix = 'cust' # Common abbreviation | |
| elif 'product' in table_name.lower(): | |
| prefix = 'prod' # Common abbreviation | |
| elif 'order' in table_name.lower(): | |
| prefix = 'order' # Common abbreviation | |
| else: | |
| prefix = table_name[:4].lower() # First 4 characters | |
| # Apply naming style to prefix + name combination | |
| prefixed_name = f"{prefix}_{styled_name}" if styled_name else prefix | |
| return _apply_naming_style(prefixed_name, self.column_naming_style) | |
| def _get_table_prefix(self, table_name: str) -> str: | |
| """Get appropriate prefix for table to avoid column conflicts""" | |
| # Generate prefix dynamically based on table name patterns | |
| table_lower = table_name.lower() | |
| if 'customer' in table_lower: | |
| return '' # Primary table gets no prefix for readability | |
| elif 'sales' in table_lower and 'rep' in table_lower: | |
| return 'Rep' | |
| elif 'sales' in table_lower: | |
| return 'Sale' | |
| elif 'order' in table_lower and 'item' in table_lower: | |
| return 'Item' | |
| elif 'order' in table_lower: | |
| return 'Order' | |
| elif 'product' in table_lower: | |
| return 'Product' | |
| else: | |
| # Use first 3-4 characters as prefix, capitalize first letter | |
| prefix = table_name[:4] if len(table_name) > 3 else table_name | |
| return prefix.capitalize() | |
| def _determine_column_type(self, data_type: str, col_name: str) -> tuple: | |
| """Determine if column should be ATTRIBUTE or MEASURE""" | |
| base_type = data_type.upper().split('(')[0] | |
| col_upper = col_name.upper() | |
| # SALEID is special - it's treated as a measure in the working example | |
| if col_upper == 'SALEID': | |
| return 'MEASURE', 'SUM' | |
| # Numeric types should be measures (unless they're IDs or keys) | |
| if base_type in ['NUMBER', 'DECIMAL', 'FLOAT', 'DOUBLE', 'INT', 'INTEGER', 'BIGINT']: | |
| # Skip ID/KEY columns - they're join keys, not analytics columns | |
| if col_upper.endswith('ID') or col_upper.endswith('KEY') or col_upper.endswith('_CODE'): | |
| return 'ATTRIBUTE', None | |
| # All other numeric columns are measures | |
| # Determine aggregation based on column name patterns | |
| if any(word in col_upper for word in ['QUANTITY', 'QTY', 'COUNT', 'SOLD']): | |
| return 'MEASURE', 'SUM' | |
| elif any(word in col_upper for word in ['PRICE', 'COST', 'REVENUE', 'AMOUNT', 'TOTAL', 'PROFIT', 'DISCOUNT', 'SHIPPING', 'TAX']): | |
| return 'MEASURE', 'SUM' | |
| elif any(word in col_upper for word in ['RATING', 'SCORE', 'MARGIN', 'PERCENT', 'RATE']): | |
| return 'MEASURE', 'AVERAGE' | |
| else: | |
| # Default: numeric = measure with SUM | |
| return 'MEASURE', 'SUM' | |
| # Everything else is an attribute (strings, dates, booleans, etc.) | |
| return 'ATTRIBUTE', None | |
| def _build_table_relationships(self, tables: Dict, foreign_keys: List) -> Dict: | |
| """Build table relationships for joins""" | |
| relationships = {} | |
| # Auto-detect relationships based on common ID patterns | |
| table_names = list(tables.keys()) | |
| for table_name in table_names: | |
| table_name_upper = table_name.upper() | |
| table_cols = [col['name'].upper() for col in tables[table_name]] | |
| # Find foreign key relationships | |
| for col_name in table_cols: | |
| if col_name.endswith('ID') and col_name != f"{table_name_upper}ID": | |
| # This looks like a foreign key | |
| target_table = col_name[:-2] + 'S' # CUSTOMERID -> CUSTOMERS | |
| if target_table in [t.upper() for t in table_names]: | |
| if table_name_upper not in relationships: | |
| relationships[table_name_upper] = [] | |
| relationships[table_name_upper].append({ | |
| 'to_table': target_table, | |
| 'on_column': col_name | |
| }) | |
| return relationships | |
| def _create_model_level_joins(self, tables, foreign_keys): | |
| """Create joins at model level using the format from working example""" | |
| joins = [] | |
| # Auto-detect joins if no explicit foreign keys | |
| if len(tables) > 1: | |
| table_names = list(tables.keys()) | |
| for i, table1 in enumerate(table_names): | |
| table1_upper = table1.upper() | |
| table1_cols = [col['name'].upper() for col in tables[table1]] | |
| for j, table2 in enumerate(table_names): | |
| if i >= j: # Avoid duplicates and self-joins | |
| continue | |
| table2_upper = table2.upper() | |
| table2_cols = [col['name'].upper() for col in tables[table2]] | |
| # Look for matching ID columns | |
| for col1 in table1_cols: | |
| if col1.endswith('ID') and col1 in table2_cols: | |
| join_entry = { | |
| 'name': f"{table1_upper.lower()}_{table2_upper.lower()}", | |
| 'source': table1_upper, | |
| 'destination': table2_upper, | |
| 'type': 'INNER', | |
| 'on': f"{table1_upper}.{col1} = {table2_upper}.{col1}" | |
| } | |
| joins.append(join_entry) | |
| print(f" π Model-level join: {table1_upper} -> {table2_upper} on {col1}") | |
| break | |
| return joins | |
| def _add_joins_to_tables(self, model_tables, tables, foreign_keys): | |
| """Add joins to individual tables (not as separate section)""" | |
| # Build join relationships | |
| table_joins = {} | |
| # Skip joins for now - test basic model creation first | |
| if False and foreign_keys: | |
| for fk in foreign_keys: | |
| from_table = fk['from_table'].upper() | |
| to_table = fk['to_table'].upper() | |
| if from_table not in table_joins: | |
| table_joins[from_table] = [] | |
| join_entry = { | |
| 'with': to_table, | |
| 'on': f"[{from_table}].[{fk['from_column'].upper()}] = [{to_table}].[{fk['to_column'].upper()}]", | |
| 'type': 'INNER', | |
| 'cardinality': 'MANY_TO_ONE' | |
| } | |
| table_joins[from_table].append(join_entry) | |
| print(f" π Adding join: {from_table} -> {to_table}") | |
| # Skip joins for now - test basic model creation first | |
| elif False and len(tables) > 1: | |
| table_names = list(tables.keys()) | |
| for i, table1 in enumerate(table_names): | |
| table1_upper = table1.upper() | |
| table1_cols = [col['name'].upper() for col in tables[table1]] | |
| for j, table2 in enumerate(table_names[i+1:], i+1): | |
| table2_upper = table2.upper() | |
| table2_cols = [col['name'].upper() for col in tables[table2]] | |
| # Look for matching ID columns | |
| for col1 in table1_cols: | |
| if col1.endswith('ID') and col1 in table2_cols: | |
| if table1_upper not in table_joins: | |
| table_joins[table1_upper] = [] | |
| join_entry = { | |
| 'with': table2_upper, | |
| 'on': f"[{table1_upper}].[{col1}] = [{table2_upper}].[{col1}]", | |
| 'type': 'INNER', | |
| 'cardinality': 'MANY_TO_ONE' | |
| } | |
| table_joins[table1_upper].append(join_entry) | |
| print(f" π Auto-detected join: {table1_upper} -> {table2_upper} on {col1}") | |
| break | |
| # Apply joins to model_tables | |
| for table_entry in model_tables: | |
| table_name = table_entry['name'] | |
| if table_name in table_joins: | |
| table_entry['joins'] = table_joins[table_name] | |
| def _build_table_relationships(self, tables: Dict, foreign_keys: List) -> Dict: | |
| """Build relationships for each table based on foreign keys""" | |
| table_relationships = {} | |
| if foreign_keys: | |
| for fk in foreign_keys: | |
| from_table = fk['from_table'].upper() | |
| to_table = fk['to_table'].upper() | |
| from_column = fk['from_column'].upper() | |
| to_column = fk['to_column'].upper() | |
| # Add relationship to the from_table | |
| if from_table not in table_relationships: | |
| table_relationships[from_table] = [] | |
| relationship = { | |
| 'name': f"{from_table}_{to_table}_{from_column}", | |
| 'to_table': to_table, | |
| 'type': 'many_to_one', # Assuming FK relationships are many-to-one | |
| 'on': [ | |
| { | |
| 'from_column': from_column, | |
| 'to_column': to_column | |
| } | |
| ] | |
| } | |
| table_relationships[from_table].append(relationship) | |
| print(f" π Relationship: {from_table}.{from_column} -> {to_table}.{to_column}") | |
| # Auto-detect relationships if no explicit foreign keys | |
| elif len(tables) > 1: | |
| table_names = list(tables.keys()) | |
| for i, table1 in enumerate(table_names): | |
| table1_upper = table1.upper() | |
| table1_cols = [col['name'].upper() for col in tables[table1]] | |
| for j, table2 in enumerate(table_names[i+1:], i+1): | |
| table2_upper = table2.upper() | |
| table2_cols = [col['name'].upper() for col in tables[table2]] | |
| # Look for matching ID columns | |
| for col1 in table1_cols: | |
| if col1.endswith('ID') and col1 in table2_cols: | |
| if table1_upper not in table_relationships: | |
| table_relationships[table1_upper] = [] | |
| relationship = { | |
| 'name': f"{table1_upper}_{table2_upper}_{col1}", | |
| 'to_table': table2_upper, | |
| 'type': 'many_to_one', | |
| 'on': [ | |
| { | |
| 'from_column': col1, | |
| 'to_column': col1 | |
| } | |
| ] | |
| } | |
| table_relationships[table1_upper].append(relationship) | |
| print(f" π Auto-detected relationship: {table1_upper}.{col1} -> {table2_upper}.{col1}") | |
| break | |
| return table_relationships | |
| def create_model_tml(self, tables: Dict, foreign_keys: List, table_guids: Dict = None, | |
| model_name: str = None) -> str: | |
| """Generate worksheet TML (ORIGINAL APPROACH - keeping for comparison)""" | |
| if not model_name: | |
| model_name = f"demo_worksheet_{datetime.now().strftime('%Y%m%d')}" | |
| worksheet = { | |
| 'guid': None, | |
| 'worksheet': { | |
| 'name': model_name, | |
| 'description': 'Auto-generated worksheet from DDL', | |
| 'tables': [], | |
| 'worksheet_columns': [], # Adding back - but with GUID references | |
| 'properties': { | |
| 'is_bypass_rls': False, | |
| 'join_progressive': True, | |
| 'spotter_config': { | |
| 'is_spotter_enabled': True | |
| } | |
| } | |
| } | |
| } | |
| # Add tables with joins | |
| for table_name in tables.keys(): | |
| table_entry = {'name': table_name.upper()} | |
| # Add FQN (GUID) if available to resolve multiple tables with same name | |
| if table_guids and table_name.upper() in table_guids: | |
| table_entry['fqn'] = table_guids[table_name.upper()] | |
| joins = [] | |
| for fk in foreign_keys: | |
| if fk['source_table'] == table_name: | |
| joins.append({ | |
| 'with': fk['target_table'].upper(), | |
| 'referencing_join': f"FK_{table_name.upper()}_{fk['target_table'].upper()}" | |
| }) | |
| if joins: | |
| table_entry['joins'] = joins | |
| # Just populate the required 'tables' field with GUID reference | |
| worksheet['worksheet']['tables'].append({ | |
| 'name': table_name.upper(), | |
| 'fqn': table_guids.get(table_name.upper()) if table_guids else f"table_{table_name.lower()}" | |
| }) | |
| # Add columns using table GUIDs in expressions | |
| for table_name, columns in tables.items(): | |
| table_guid = table_guids.get(table_name.upper()) if table_guids else None | |
| for col in columns: | |
| col_type = 'MEASURE' if 'DECIMAL' in col['type'] else 'ATTRIBUTE' | |
| # Use GUID in expression if available | |
| if table_guid: | |
| expr = f"[{table_guid}].[{col['name']}]" | |
| else: | |
| expr = f"[{table_name.upper()}].[{col['name']}]" | |
| column_def = { | |
| 'name': col['name'].upper(), | |
| 'data_type': col_type, | |
| 'expr': expr | |
| } | |
| worksheet['worksheet']['worksheet_columns'].append(column_def) | |
| return yaml.dump(worksheet, default_flow_style=False, sort_keys=False) | |
| def _map_data_type(self, sql_type: str) -> str: | |
| """Map SQL data types to ThoughtSpot types""" | |
| sql_type = sql_type.upper() | |
| # DEBUG: Print what we're mapping (commented out for cleaner output) | |
| # print(f" π Mapping data type: '{sql_type}'") | |
| # Handle NUMBER with precision/scale intelligently | |
| if sql_type.startswith('NUMBER'): | |
| # Extract precision and scale from NUMBER(precision,scale) | |
| if '(' in sql_type and ')' in sql_type: | |
| params = sql_type[sql_type.find('(')+1:sql_type.find(')')].split(',') | |
| if len(params) >= 2: | |
| scale = int(params[1].strip()) | |
| result = 'INT64' if scale == 0 else 'DOUBLE' | |
| # print(f" β NUMBER({params[0].strip()},{scale}) β {result}") | |
| return result | |
| else: | |
| # print(f" β NUMBER({params[0].strip()}) β INT64") | |
| return 'INT64' # NUMBER(x) defaults to integer | |
| else: | |
| # print(f" β Plain NUMBER β DOUBLE") | |
| return 'DOUBLE' # Plain NUMBER defaults to double | |
| type_mapping = { | |
| 'INT64': 'INT64', | |
| 'INT': 'INT64', # FIXED: INT should map to INT64 | |
| 'INTEGER': 'INT64', | |
| 'BIGINT': 'INT64', | |
| 'VARCHAR': 'VARCHAR', | |
| 'TEXT': 'VARCHAR', | |
| 'STRING': 'VARCHAR', | |
| 'DATE': 'DATE', | |
| 'TIMESTAMP': 'DATE', # Try DATE for TIMESTAMP - DATE fields worked fine | |
| 'TIMESTAMP_NTZ': 'DATE', # Try DATE for TIMESTAMP_NTZ - we know DATE works | |
| 'DECIMAL': 'DOUBLE', | |
| 'FLOAT': 'DOUBLE', | |
| 'BOOLEAN': 'BOOL' | |
| } | |
| for sql_key, ts_type in type_mapping.items(): | |
| if sql_key in sql_type: | |
| return ts_type | |
| return 'VARCHAR' # Default fallback | |
| def get_connection_by_name(self, connection_name: str) -> Dict: | |
| """Check if a connection with this name already exists""" | |
| try: | |
| response = self.session.get( | |
| f"{self.base_url}/api/rest/2.0/metadata/search", | |
| params={ | |
| "metadata": [{"type": "DATA_SOURCE", "identifier": connection_name}], | |
| "record_size": 10 | |
| } | |
| ) | |
| if response.status_code == 200: | |
| data = response.json() | |
| if data and len(data) > 0: | |
| return data[0] # Return first matching connection | |
| return None | |
| except Exception as e: | |
| print(f" β οΈ Could not check existing connections: {e}") | |
| return None | |
| def create_snowflake_schema(self, database: str, schema: str): | |
| """Create schema in Snowflake via ThoughtSpot connection""" | |
| try: | |
| print(f" ποΈ Creating schema {database}.{schema}...") | |
| # Use ThoughtSpot's SQL execution API to create schema | |
| create_schema_sql = f"CREATE SCHEMA IF NOT EXISTS {database}.{schema}" | |
| response = self.session.post( | |
| f"{self.base_url}/api/rest/2.0/database/executeQuery", | |
| json={ | |
| "sql_query": create_schema_sql, | |
| "connection_guid": self.sf_connection_guid if hasattr(self, 'sf_connection_guid') else None | |
| } | |
| ) | |
| if response.status_code == 200: | |
| print(f" β Schema {database}.{schema} created/verified") | |
| else: | |
| print(f" β οΈ Schema creation response: {response.status_code} - {response.text}") | |
| print(f" π Will proceed assuming schema exists or will be created by table operations") | |
| except Exception as e: | |
| print(f" β οΈ Could not create schema: {e}") | |
| print(f" π Will proceed assuming schema exists or will be created by table operations") | |
| def ensure_tag_exists(self, tag_name: str) -> bool: | |
| """ | |
| Check if a tag exists, create it if it doesn't. | |
| Args: | |
| tag_name: Name of the tag | |
| Returns: | |
| True if tag exists or was created, False on error | |
| """ | |
| if not tag_name: | |
| # No tag name provided - skip silently | |
| return True | |
| try: | |
| # First, try to get the tag to see if it exists | |
| print(f"[ThoughtSpot] π Checking if tag '{tag_name}' exists...", flush=True) | |
| search_response = self.session.post( | |
| f"{self.base_url}/api/rest/2.0/tags/search", | |
| json={"tag_identifier": tag_name} | |
| ) | |
| print(f"[ThoughtSpot] π Tag search response: {search_response.status_code}", flush=True) | |
| if search_response.status_code == 200: | |
| tags = search_response.json() | |
| print(f"[ThoughtSpot] π Tags found: {len(tags) if tags else 0}", flush=True) | |
| if tags and len(tags) > 0: | |
| # Tag exists | |
| tag_id = tags[0].get('id', 'unknown') | |
| print(f"[ThoughtSpot] β Tag '{tag_name}' exists (ID: {tag_id})", flush=True) | |
| return True | |
| elif search_response.status_code == 400: | |
| # 400 might mean tag not found in some ThoughtSpot versions | |
| print(f"[ThoughtSpot] π Tag search returned 400 - tag likely doesn't exist", flush=True) | |
| else: | |
| print(f"[ThoughtSpot] β οΈ Tag search error: {search_response.status_code}", flush=True) | |
| try: | |
| print(f"[ThoughtSpot] β οΈ Response: {search_response.text[:200]}", flush=True) | |
| except: | |
| pass | |
| # Tag doesn't exist - create it | |
| print(f"[ThoughtSpot] π Creating tag '{tag_name}'...", flush=True) | |
| create_response = self.session.post( | |
| f"{self.base_url}/api/rest/2.0/tags/create", | |
| json={"name": tag_name} | |
| ) | |
| print(f"[ThoughtSpot] π Create tag response: {create_response.status_code}", flush=True) | |
| if create_response.status_code in [200, 201]: | |
| try: | |
| result = create_response.json() | |
| tag_id = result.get('id', 'unknown') | |
| print(f"[ThoughtSpot] β Tag '{tag_name}' created (ID: {tag_id})", flush=True) | |
| except: | |
| print(f"[ThoughtSpot] β Tag '{tag_name}' created", flush=True) | |
| return True | |
| else: | |
| print(f"[ThoughtSpot] β οΈ Could not create tag: {create_response.status_code}", flush=True) | |
| try: | |
| print(f"[ThoughtSpot] β οΈ Response: {create_response.text[:200]}", flush=True) | |
| except: | |
| pass | |
| # Return False - don't silently proceed | |
| return False | |
| except Exception as e: | |
| import traceback | |
| print(f"[ThoughtSpot] β οΈ Tag check/create error: {str(e)}", flush=True) | |
| print(f"[ThoughtSpot] β οΈ Traceback: {traceback.format_exc()}", flush=True) | |
| return False | |
| def assign_tags_to_objects(self, object_guids: List[str], object_type: str, tag_name: str) -> bool: | |
| """ | |
| Assign tags to ThoughtSpot objects using REST API v1. | |
| Auto-creates the tag if it doesn't exist. | |
| Args: | |
| object_guids: List of object GUIDs to tag | |
| object_type: Type of objects (LOGICAL_TABLE for tables/models, PINBOARD_ANSWER_BOOK for liveboards) | |
| tag_name: Tag name to assign | |
| Returns: | |
| True if successful, False otherwise | |
| """ | |
| if not tag_name: | |
| # No tag name provided - skip silently (this is expected behavior) | |
| return True | |
| if not object_guids: | |
| return False | |
| try: | |
| import json as json_module | |
| # Ensure tag exists (create if needed) | |
| tag_ready = self.ensure_tag_exists(tag_name) | |
| if not tag_ready: | |
| print(f"[ThoughtSpot] β οΈ Could not ensure tag exists, skipping assignment", flush=True) | |
| return False | |
| # Use V1 API which actually works | |
| assign_response = self.session.post( | |
| f"{self.base_url}/tspublic/v1/metadata/assigntag", | |
| data={ | |
| 'id': json_module.dumps(object_guids), | |
| 'type': object_type, | |
| 'tagname': json_module.dumps([tag_name]) | |
| }, | |
| headers={ | |
| 'X-Requested-By': 'ThoughtSpot', | |
| 'Content-Type': 'application/x-www-form-urlencoded' | |
| } | |
| ) | |
| if assign_response.status_code in [200, 204]: | |
| print(f"[ThoughtSpot] β Tagged {len(object_guids)} {object_type} objects with '{tag_name}'", flush=True) | |
| return True | |
| else: | |
| print(f"[ThoughtSpot] β οΈ Tag assignment failed: {assign_response.status_code}", flush=True) | |
| print(f"[ThoughtSpot] DEBUG: Response text: {assign_response.text[:500]}", flush=True) | |
| print(f"[ThoughtSpot] DEBUG: Object GUIDs: {object_guids}", flush=True) | |
| print(f"[ThoughtSpot] DEBUG: Object type: {object_type}", flush=True) | |
| return False | |
| except Exception as e: | |
| print(f"[ThoughtSpot] β οΈ Tag assignment error: {str(e)}", flush=True) | |
| return False | |
| def share_objects(self, object_guids: List[str], object_type: str, share_with: str) -> bool: | |
| """ | |
| Share ThoughtSpot objects with a user or group (can_edit / MODIFY). | |
| Args: | |
| object_guids: GUIDs to share | |
| object_type: 'LOGICAL_TABLE' for models/tables, 'LIVEBOARD' for liveboards | |
| share_with: user email (contains '@') or group name | |
| """ | |
| if not share_with or not object_guids: | |
| return True | |
| principal_type = "USER" if '@' in share_with else "USER_GROUP" | |
| try: | |
| response = self.session.post( | |
| f"{self.base_url}/api/rest/2.0/security/metadata/share", | |
| json={ | |
| "permissions": [ | |
| { | |
| "principal": { | |
| "identifier": share_with, | |
| "type": principal_type | |
| }, | |
| "share_mode": "MODIFY" | |
| } | |
| ], | |
| "metadata": [ | |
| {"identifier": guid, "type": object_type} | |
| for guid in object_guids | |
| ] | |
| } | |
| ) | |
| if response.status_code in [200, 204]: | |
| print(f"[ThoughtSpot] β Shared {len(object_guids)} {object_type} with {principal_type} '{share_with}'", flush=True) | |
| return True | |
| else: | |
| print(f"[ThoughtSpot] β οΈ Share failed: {response.status_code} - {response.text[:200]}", flush=True) | |
| return False | |
| except Exception as e: | |
| print(f"[ThoughtSpot] β οΈ Share error: {str(e)}", flush=True) | |
| return False | |
| def _generate_demo_names(self, company_name: str = None, use_case: str = None): | |
| """Generate standardized demo names using DM convention""" | |
| from datetime import datetime | |
| import re | |
| # Get timestamp components | |
| now = datetime.now() | |
| yymmdd = now.strftime('%y%m%d') | |
| hhmmss = now.strftime('%H%M%S') | |
| # Clean and truncate company name (5 chars) | |
| if company_name: | |
| company_clean = re.sub(r'[^a-zA-Z0-9]', '', company_name.upper())[:5] | |
| else: | |
| company_clean = 'DEMO'[:5] | |
| # Clean and truncate use case (3 chars) | |
| if use_case: | |
| usecase_clean = re.sub(r'[^a-zA-Z0-9]', '', use_case.upper())[:3] | |
| else: | |
| usecase_clean = 'GEN'[:3] | |
| # Generate names | |
| base_name = f"DM{yymmdd}_{hhmmss}_{company_clean}_{usecase_clean}" | |
| return { | |
| 'schema': base_name, | |
| 'connection': f"{base_name}_conn", | |
| 'model': f"{base_name}_model", | |
| 'base': base_name | |
| } | |
| def deploy_all(self, ddl: str, database: str, schema: str, base_name: str, | |
| connection_name: str = None, company_name: str = None, | |
| use_case: str = None, liveboard_name: str = None, | |
| llm_model: str = None, tag_name: str = None, | |
| liveboard_method: str = None, share_with: str = None, | |
| company_research: str = None, | |
| progress_callback=None) -> Dict: | |
| """ | |
| Deploy complete data model to ThoughtSpot | |
| Args: | |
| ddl: Data Definition Language statements | |
| database: Target database name | |
| schema: Target schema name | |
| connection_name: Optional connection name (auto-generated if not provided) | |
| Returns: | |
| Dict with deployment results and names of created objects | |
| """ | |
| results = { | |
| 'success': False, | |
| 'connection': None, | |
| 'tables': [], | |
| 'model': None, | |
| 'errors': [] | |
| } | |
| table_guids = {} # Store table GUIDs for model creation | |
| def log_progress(message): | |
| """Helper to log progress both to console and callback""" | |
| # ALWAYS print to console FIRST | |
| import sys | |
| print(f"[ThoughtSpot] {message}", flush=True) | |
| sys.stdout.flush() # Force flush | |
| # Then call callback if provided | |
| if progress_callback: | |
| try: | |
| progress_callback(message) | |
| except Exception as e: | |
| print(f"[Warning] Callback error: {e}", flush=True) | |
| try: | |
| import time | |
| start_time = time.time() | |
| # STEP 0: Authenticate first! | |
| log_progress("Authenticating...") | |
| if not self.authenticate(): | |
| raise Exception("ThoughtSpot authentication failed") | |
| auth_time = time.time() - start_time | |
| log_progress(f"[OK] Auth complete ({auth_time:.1f}s)") | |
| # Parse DDL | |
| tables, foreign_keys = self.parse_ddl(ddl) | |
| if not tables: | |
| raise Exception("No tables found in DDL") | |
| # Validate foreign key references before deployment (uses explicit FKs from DDL) | |
| fk_warnings = self.validate_foreign_key_references(tables, foreign_keys) | |
| if fk_warnings: | |
| log_progress(f"[WARN] {len(fk_warnings)} FK warning(s) - joins to missing tables will be skipped") | |
| for warning in fk_warnings: | |
| log_progress(f" {warning}") | |
| # Step 1: Create connection using base name | |
| # base_name is like "DEMO_AMA_12111207_X4R" | |
| # schema is like "DEMO_ | |
| # 2111207_X4R_sch" | |
| demo_names = { | |
| 'schema': schema, | |
| 'connection': f"{base_name}_conn", | |
| 'model': f"{base_name}_mdl", | |
| 'base': base_name | |
| } | |
| if not connection_name: | |
| connection_name = demo_names['connection'] | |
| log_progress(f"Creating connection: {connection_name}...") | |
| # Check if connection already exists first | |
| existing_connection = self.get_connection_by_name(connection_name) | |
| if existing_connection: | |
| connection_guid = existing_connection['header']['id_guid'] | |
| connection_fqn = connection_guid | |
| results['connection'] = connection_name | |
| results['connection_guid'] = connection_guid | |
| log_progress(f"[OK] Connection ready") | |
| # Assign tag to existing connection | |
| if tag_name and connection_guid: | |
| log_progress(f"Assigning tag '{tag_name}' to connection...") | |
| self.assign_tags_to_objects([connection_guid], 'DATA_SOURCE', tag_name) | |
| else: | |
| log_progress(f"Creating new connection: {connection_name}") | |
| # base_name already has random suffix (e.g., _UZM) so no need for additional uniqueness | |
| print(f"π Creating connection: {connection_name}") | |
| print(f" Account: '{self.sf_account}' (length: {len(self.sf_account)})") | |
| print(f" User: '{self.sf_user}'") | |
| print(f" Database: '{database}'") | |
| connection_tml_yaml = self.create_connection_tml(connection_name) | |
| # Filter out sensitive info for display - use regex for private key | |
| import re | |
| display_tml = re.sub(r'-----BEGIN ENCRYPTED PRIVATE KEY-----.*?-----END ENCRYPTED PRIVATE KEY-----', '[PRIVATE_KEY_REDACTED]', connection_tml_yaml, flags=re.DOTALL) | |
| passphrase = get_admin_setting('SNOWFLAKE_KP_PASSPHRASE', required=False) | |
| if passphrase: | |
| display_tml = display_tml.replace(passphrase, "[PASSPHRASE_REDACTED]") | |
| print(f"\nπ TML being sent:\n{display_tml}") | |
| # connection_tml_yaml is already a YAML string from create_connection_tml() | |
| response = self.session.post( | |
| f"{self.base_url}/api/rest/2.0/metadata/tml/import", | |
| json={ | |
| "metadata_tmls": [connection_tml_yaml], | |
| "import_policy": "PARTIAL" | |
| } | |
| ) | |
| print(f" Response status: {response.status_code}") | |
| if response.status_code != 200: | |
| # Print the actual error response from ThoughtSpot | |
| try: | |
| error_response = response.json() | |
| print(f"β Error response: {error_response}") | |
| except: | |
| print(f"β Error response (raw): {response.text}") | |
| if response.status_code == 200: | |
| result = response.json() | |
| print(f"π Connection response: {result}") | |
| print(f"π Response type: {type(result)}") | |
| # Handle both dict and list responses | |
| if isinstance(result, list): | |
| # Response is a list | |
| if len(result) > 0 and result[0].get('response', {}).get('status', {}).get('status_code') == 'OK': | |
| connection_guid = result[0].get('response', {}).get('header', {}).get('id_guid') | |
| print(f"β Connection created: {connection_name} (GUID: {connection_guid})") | |
| results['connection'] = connection_name | |
| results['connection_guid'] = connection_guid | |
| # Store connection GUID for table creation | |
| connection_fqn = connection_guid | |
| else: | |
| error_msg = result[0].get('response', {}).get('status', {}).get('error_message', 'Unknown error') if result else 'Empty response' | |
| raise Exception(f"Connection creation failed: {error_msg}") | |
| elif isinstance(result, dict) and result.get('object') and len(result['object']) > 0: | |
| obj = result['object'][0] | |
| if obj.get('status', {}).get('status_code') == 'OK': | |
| connection_guid = obj.get('header', {}).get('id_guid') | |
| print(f"β Connection created: {connection_name} (GUID: {connection_guid})") | |
| results['connection'] = connection_name | |
| results['connection_guid'] = connection_guid | |
| # Store connection GUID for table creation | |
| connection_fqn = connection_guid | |
| else: | |
| error_msg = obj.get('status', {}).get('error_message', 'Unknown error') | |
| raise Exception(f"Connection creation failed: {error_msg}") | |
| else: | |
| raise Exception("Connection creation failed: No object in response") | |
| else: | |
| raise Exception(f"Connection creation failed: HTTP {response.status_code}") | |
| # Assign tag to connection | |
| if tag_name and connection_guid: | |
| log_progress(f"Assigning tag '{tag_name}' to connection...") | |
| self.assign_tags_to_objects([connection_guid], 'DATA_SOURCE', tag_name) | |
| # Step 1.5: Schema should already exist (created by demo_prep tool) | |
| print("\n1οΈβ£.5 Using existing schema in Snowflake...") | |
| # Step 2: Build relationships for tables | |
| print("\n1οΈβ£.5 Building relationships...") | |
| table_relationships = self._build_table_relationships(tables, foreign_keys) | |
| # Step 2: TWO-PHASE TABLE CREATION (to avoid dependency order issues) | |
| table_count = len(tables) | |
| batch1_start = time.time() | |
| log_progress(f"Batch 1/2: Creating {table_count} tables...") | |
| # PHASE 1: Create all tables WITHOUT joins in ONE batch API call | |
| # Build array of all table TMLs | |
| table_tmls_batch1 = [] | |
| table_names_order = [] # Track order for matching response | |
| for table_name, columns in tables.items(): | |
| print(f"[ThoughtSpot] Preparing {table_name.upper()}...", flush=True) | |
| table_tml = self.create_table_tml(table_name, columns, connection_name, database, schema, all_tables=None, foreign_keys=foreign_keys) | |
| table_tmls_batch1.append(table_tml) | |
| table_names_order.append(table_name.upper()) | |
| # Send all tables in ONE API call | |
| log_progress(f" Sending batch request for {len(table_tmls_batch1)} tables...") | |
| response = self.session.post( | |
| f"{self.base_url}/api/rest/2.0/metadata/tml/import", | |
| json={ | |
| "metadata_tmls": table_tmls_batch1, | |
| "import_policy": "PARTIAL", | |
| "create_new": True | |
| } | |
| ) | |
| if response.status_code == 200: | |
| result = response.json() | |
| # Handle both response formats (list or dict with 'object' key) | |
| if isinstance(result, list): | |
| objects = result | |
| elif isinstance(result, dict) and 'object' in result: | |
| objects = result['object'] | |
| else: | |
| error = f"Batch 1 failed: Unexpected response format: {type(result)}" | |
| log_progress(f" β {error}") | |
| results['errors'].append(error) | |
| return results | |
| # Process each table result | |
| for idx, obj in enumerate(objects): | |
| table_name = table_names_order[idx] if idx < len(table_names_order) else f"TABLE_{idx}" | |
| if obj.get('response', {}).get('status', {}).get('status_code') == 'OK': | |
| table_guid = obj.get('response', {}).get('header', {}).get('id_guid') | |
| print(f"[ThoughtSpot] β {table_name} created", flush=True) | |
| results['tables'].append(table_name) | |
| table_guids[table_name] = table_guid | |
| else: | |
| error_msg = obj.get('response', {}).get('status', {}).get('error_message', 'Unknown error') | |
| error = f"Table {table_name} failed: {error_msg}" | |
| print(f"[ThoughtSpot] β {table_name} failed: {error_msg}", flush=True) | |
| results['errors'].append(error) | |
| else: | |
| error = f"Batch 1 HTTP error: {response.status_code} - {response.text}" | |
| log_progress(f" β {error}") | |
| results['errors'].append(error) | |
| return results | |
| # Check if we created any tables successfully | |
| if not table_guids: | |
| log_progress(" β No tables were created successfully in Batch 1") | |
| return results | |
| # Assign tags to tables | |
| table_guid_list = list(table_guids.values()) | |
| print(f"π DEBUG BEFORE TAG CALL: tag_name='{tag_name}', table_guid_list={table_guid_list}") | |
| log_progress(f"Assigning tag '{tag_name}' to {len(table_guid_list)} tables...") | |
| self.assign_tags_to_objects(table_guid_list, 'LOGICAL_TABLE', tag_name) | |
| batch1_time = time.time() - batch1_start | |
| log_progress(f"[OK] Batch 1 complete: {len(table_guids)} tables created ({batch1_time:.1f}s)") | |
| # PHASE 2: Update tables WITH joins in ONE batch API call | |
| batch2_start = time.time() | |
| log_progress(f"Batch 2/2: Adding joins to {len(table_guids)} tables...") | |
| # Build array of all table update TMLs (with joins) | |
| table_tmls_batch2 = [] | |
| table_names_order_batch2 = [] | |
| for table_name, columns in tables.items(): | |
| table_name_upper = table_name.upper() | |
| # Only add joins if the table was created successfully in Phase 1 | |
| if table_name_upper not in table_guids: | |
| print(f"[ThoughtSpot] Skipping {table_name_upper} (not created)", flush=True) | |
| continue | |
| # Get the GUID for this table | |
| table_guid = table_guids[table_name_upper] | |
| print(f"[ThoughtSpot] Preparing joins for {table_name_upper}...", flush=True) | |
| # Create table TML WITH joins_with section AND the table GUID | |
| table_tml = self.create_table_tml( | |
| table_name, columns, connection_name, database, schema, | |
| all_tables=tables, table_guid=table_guid, foreign_keys=foreign_keys | |
| ) | |
| table_tmls_batch2.append(table_tml) | |
| table_names_order_batch2.append(table_name_upper) | |
| # Send all table updates in ONE API call | |
| if table_tmls_batch2: | |
| log_progress(f" Sending batch request to add joins to {len(table_tmls_batch2)} tables...") | |
| response = self.session.post( | |
| f"{self.base_url}/api/rest/2.0/metadata/tml/import", | |
| json={ | |
| "metadata_tmls": table_tmls_batch2, | |
| "import_policy": "PARTIAL", | |
| "create_new": False # Update existing tables | |
| } | |
| ) | |
| if response.status_code == 200: | |
| result = response.json() | |
| # Handle both response formats | |
| if isinstance(result, list): | |
| objects = result | |
| elif isinstance(result, dict) and 'object' in result: | |
| objects = result['object'] | |
| else: | |
| objects = [] | |
| # Process each result | |
| for idx, obj in enumerate(objects): | |
| table_name = table_names_order_batch2[idx] if idx < len(table_names_order_batch2) else f"TABLE_{idx}" | |
| if obj.get('response', {}).get('status', {}).get('status_code') == 'OK': | |
| print(f"[ThoughtSpot] β {table_name} joins added", flush=True) | |
| else: | |
| error_msg = obj.get('response', {}).get('status', {}).get('error_message', 'Unknown error') | |
| print(f"[ThoughtSpot] β οΈ {table_name} joins failed: {error_msg}", flush=True) | |
| results['errors'].append(f"Joins for {table_name} failed: {error_msg}") | |
| else: | |
| log_progress(f" β οΈ Batch 2 HTTP error: {response.status_code}") | |
| batch2_time = time.time() - batch2_start | |
| log_progress(f"[OK] Batch 2 complete: Joins added ({batch2_time:.1f}s)") | |
| actual_constraint_ids = {} # We'll generate these for the model | |
| # Skip separate relationship creation for now | |
| # print("\n2οΈβ£.5 Creating relationships separately...") | |
| # self.create_relationships_separately(table_relationships, table_guids) | |
| # Step 3: Extract constraint IDs from created tables | |
| table_constraints = {} | |
| for table_name, table_guid in table_guids.items(): | |
| print(f"[ThoughtSpot] Extracting joins from {table_name}...", flush=True) | |
| # Export table TML to get constraint IDs | |
| export_response = self.session.post( | |
| f"{self.base_url}/api/rest/2.0/metadata/tml/export", | |
| json={ | |
| "metadata": [{"identifier": table_guid, "type": "LOGICAL_TABLE"}], | |
| "export_associated": False, | |
| "format_type": "YAML" | |
| } | |
| ) | |
| if export_response.status_code == 200: | |
| tml_data = export_response.json() | |
| if tml_data and 'edoc' in tml_data[0]: | |
| import json | |
| tml_json = json.loads(tml_data[0]['edoc']) | |
| # Extract joins_with constraint IDs | |
| joins_with = tml_json.get('table', {}).get('joins_with', []) | |
| if joins_with: | |
| table_constraints[table_name] = [] | |
| for join in joins_with: | |
| constraint_id = join.get('name') | |
| destination = join.get('destination', {}).get('name') | |
| if constraint_id and destination: | |
| table_constraints[table_name].append({ | |
| 'constraint_id': constraint_id, | |
| 'destination': destination | |
| }) | |
| # Step 4: Create model (semantic layer) with constraint references | |
| model_start = time.time() | |
| model_name = demo_names['model'] | |
| log_progress(f"Creating model: {model_name}...") | |
| # Use the enhanced model creation that includes constraint references | |
| model_tml = self._create_model_with_constraints(tables, foreign_keys, table_guids, table_constraints, model_name, connection_name) | |
| print(f"\nπ Model TML being sent:\n{model_tml}") | |
| response = self.session.post( | |
| f"{self.base_url}/api/rest/2.0/metadata/tml/import", | |
| json={ | |
| "metadata_tmls": [model_tml], | |
| "import_policy": "ALL_OR_NONE", | |
| "create_new": True | |
| } | |
| ) | |
| if response.status_code == 200: | |
| result = response.json() | |
| # Handle both response formats (list or dict with 'object' key) | |
| if isinstance(result, list): | |
| objects = result | |
| elif isinstance(result, dict) and 'object' in result: | |
| objects = result['object'] | |
| else: | |
| error = f"Model failed: Unexpected response format: {type(result)}" | |
| log_progress(f" β {error}") | |
| results['errors'].append(error) | |
| objects = [] | |
| if objects and len(objects) > 0: | |
| if objects[0].get('response', {}).get('status', {}).get('status_code') == 'OK': | |
| model_guid = objects[0].get('response', {}).get('header', {}).get('id_guid') | |
| model_time = time.time() - model_start | |
| log_progress(f"[OK] Model created ({model_time:.1f}s)") | |
| results['model'] = model_name | |
| results['model_guid'] = model_guid | |
| # Assign tag to model | |
| print(f"π DEBUG BEFORE TAG CALL: tag_name='{tag_name}', model_guid='{model_guid}'") | |
| log_progress(f"Assigning tag '{tag_name}' to model...") | |
| self.assign_tags_to_objects([model_guid], 'LOGICAL_TABLE', tag_name) | |
| # Share model | |
| _effective_share = share_with or get_admin_setting('SHARE_WITH', required=False) | |
| if _effective_share: | |
| log_progress(f"Sharing model with '{_effective_share}'...") | |
| self.share_objects([model_guid], 'LOGICAL_TABLE', _effective_share) | |
| # Step 3.5: Enable Spotter + enrich model semantics in a single exportβupdateβreimport | |
| # (create_new=True import ignores spotter_config, so we always re-import here) | |
| try: | |
| export_resp = self.session.post( | |
| f"{self.base_url}/api/rest/2.0/metadata/tml/export", | |
| json={ | |
| "metadata": [{"identifier": model_guid, "type": "LOGICAL_TABLE"}], | |
| "export_associated": False, | |
| "format_type": "YAML" | |
| } | |
| ) | |
| if export_resp.status_code == 200: | |
| export_data = export_resp.json() | |
| if export_data and 'edoc' in export_data[0]: | |
| model_tml_dict = json.loads(export_data[0]['edoc']) | |
| # Enable Spotter | |
| model_tml_dict.setdefault('model', {}).setdefault('properties', {})['spotter_config'] = {'is_spotter_enabled': True} | |
| # Enrich with description, synonyms, and AI context | |
| if company_research: | |
| try: | |
| from model_semantic_updater import ModelSemanticUpdater | |
| log_progress("Generating model description, synonyms, and AI context...") | |
| sem_start = time.time() | |
| updater = ModelSemanticUpdater(self, llm_model=llm_model) | |
| model_description = updater.generate_model_description( | |
| company_research=company_research, | |
| use_case=use_case or '', | |
| company_name=company_name or '', | |
| model_name=model_name, | |
| ) | |
| column_semantics = updater.generate_column_semantics( | |
| company_research=company_research, | |
| model_tml=model_tml_dict, | |
| use_case=use_case or '', | |
| company_name=company_name or '', | |
| ) | |
| # Apply to TML dict in-place (returns YAML string) | |
| enriched_yaml = updater.apply_to_model_tml( | |
| model_tml_dict, column_semantics, model_description | |
| ) | |
| # Parse back so we can still dump consistently below | |
| model_tml_dict = yaml.safe_load(enriched_yaml) | |
| sem_time = time.time() - sem_start | |
| log_progress(f"[OK] Semantics generated: {len(column_semantics)} columns enriched ({sem_time:.1f}s)") | |
| except Exception as sem_err: | |
| log_progress(f"[WARN] Semantic enrichment failed (non-fatal): {sem_err}") | |
| updated_tml = yaml.dump(model_tml_dict, allow_unicode=True, sort_keys=False) | |
| update_resp = self.session.post( | |
| f"{self.base_url}/api/rest/2.0/metadata/tml/import", | |
| json={ | |
| "metadata_tmls": [updated_tml], | |
| "import_policy": "ALL_OR_NONE", | |
| "create_new": False | |
| } | |
| ) | |
| if update_resp.status_code == 200: | |
| log_progress("π€ Spotter enabled + model semantics applied") | |
| else: | |
| log_progress(f"π€ Model update failed: HTTP {update_resp.status_code} β {update_resp.text[:200]}") | |
| else: | |
| log_progress("π€ Spotter enable: export returned no edoc") | |
| else: | |
| log_progress(f"π€ Spotter enable: export failed HTTP {export_resp.status_code}") | |
| except Exception as spotter_error: | |
| log_progress(f"π€ Spotter/semantics exception: {spotter_error}") | |
| # Step 4: Auto-create Liveboard from model | |
| lb_start = time.time() | |
| # HYBRID is the only supported liveboard creation method | |
| method = 'HYBRID' | |
| log_progress(f"Creating liveboard ({method} method)...") | |
| try: | |
| # Build company data from parameters | |
| # Clean company name for display (strip .com, .org, etc) | |
| clean_company = company_name.split('.')[0].title() if company_name and '.' in company_name else (company_name or 'Demo Company') | |
| company_data = { | |
| 'name': clean_company, | |
| 'use_case': use_case or 'General Analytics' | |
| } | |
| if method == 'HYBRID': | |
| # HYBRID: MCP creates liveboard, TML post-processes | |
| from liveboard_creator import create_liveboard_from_model_mcp, create_liveboard_from_model, enhance_mcp_liveboard | |
| # Get actual column names from ThoughtSpot model (not DDL) | |
| # This is critical because TS may rename columns to make them unique | |
| # e.g., PROCESSING_FEE becomes gift_processing_fee and tran_processing_fee | |
| model_columns = self.get_model_columns(model_guid) | |
| if not model_columns: | |
| log_progress(f" β οΈ Could not get model columns, falling back to DDL") | |
| model_columns = [] | |
| for table_name, columns_list in tables.items(): | |
| for col in columns_list: | |
| model_columns.append(col) | |
| # Get liveboard questions from the verticalΓfunction config | |
| outlier_dicts = [] | |
| try: | |
| from demo_personas import parse_use_case, get_use_case_config | |
| uc_vertical, uc_function = parse_use_case(use_case or '') | |
| uc_config = get_use_case_config(uc_vertical or "Generic", uc_function or "Generic") | |
| lq = uc_config.get("liveboard_questions", []) | |
| required_qs = [q for q in lq if q.get("required")] | |
| optional_qs = [q for q in lq if not q.get("required")] | |
| for q in required_qs: | |
| outlier_dicts.append({ | |
| 'title': q['title'], | |
| 'insight': q.get('insight', ''), | |
| 'viz_type': q['viz_type'], | |
| 'show_me_query': q['viz_question'], | |
| 'kpi_companion': True, | |
| 'spotter_questions': q.get('spotter_qs', []), | |
| }) | |
| for q in optional_qs[:2]: | |
| outlier_dicts.append({ | |
| 'title': q['title'], | |
| 'insight': q.get('insight', ''), | |
| 'viz_type': q['viz_type'], | |
| 'show_me_query': q['viz_question'], | |
| 'kpi_companion': False, | |
| 'spotter_questions': q.get('spotter_qs', []), | |
| }) | |
| if outlier_dicts: | |
| log_progress(f" [MCP] Using {len(outlier_dicts)} liveboard questions from {uc_vertical}Γ{uc_function}") | |
| except Exception as outlier_err: | |
| log_progress(f" [MCP] Liveboard questions loading skipped: {outlier_err}") | |
| # MCP creates liveboard | |
| if method == 'HYBRID': | |
| log_progress(f" Step 1/2: MCP creating initial liveboard...") | |
| log_progress(f" [MCP] Model: {model_name}, GUID: {model_guid}") | |
| log_progress(f" [MCP] Use case: {use_case or 'General Analytics'}") | |
| log_progress(f" [MCP] Using {len(model_columns)} columns from ThoughtSpot model") | |
| try: | |
| liveboard_result = create_liveboard_from_model_mcp( | |
| ts_client=self, | |
| model_id=model_guid, | |
| model_name=model_name, | |
| company_data=company_data, | |
| use_case=use_case or 'General Analytics', | |
| num_visualizations=10, | |
| liveboard_name=liveboard_name, | |
| outliers=outlier_dicts if outlier_dicts else None, | |
| llm_model=llm_model, | |
| model_columns=model_columns, | |
| prompt_logger=self.prompt_logger | |
| ) | |
| except Exception as mcp_error: | |
| import traceback | |
| error_trace = traceback.format_exc() | |
| log_progress(f" [MCP ERROR] {type(mcp_error).__name__}: {str(mcp_error)}") | |
| liveboard_result = {'success': False, 'error': str(mcp_error), 'traceback': error_trace} | |
| # Log result | |
| if liveboard_result.get('success'): | |
| log_progress(f" [MCP] Liveboard created: {liveboard_result.get('liveboard_guid')}") | |
| else: | |
| log_progress(f" [MCP FAILED] {liveboard_result.get('error', 'Unknown error')}") | |
| log_progress(" [FALLBACK] Trying direct TML liveboard creation...") | |
| try: | |
| liveboard_result = create_liveboard_from_model( | |
| ts_client=self, | |
| model_id=model_guid, | |
| model_name=model_name, | |
| company_data=company_data, | |
| use_case=use_case or 'General Analytics', | |
| num_visualizations=10, | |
| liveboard_name=liveboard_name, | |
| llm_model=llm_model, | |
| outliers=outlier_dicts if outlier_dicts else None, | |
| model_columns=model_columns, | |
| prompt_logger=self.prompt_logger | |
| ) | |
| if liveboard_result.get('success'): | |
| log_progress(f" [FALLBACK OK] Liveboard created: {liveboard_result.get('liveboard_guid')}") | |
| else: | |
| log_progress(f" [FALLBACK FAILED] {liveboard_result.get('error', 'Unknown error')}") | |
| except Exception as fallback_error: | |
| import traceback | |
| fallback_trace = traceback.format_exc() | |
| log_progress(f" [FALLBACK ERROR] {type(fallback_error).__name__}: {str(fallback_error)}") | |
| liveboard_result = { | |
| 'success': False, | |
| 'error': f"MCP failed and fallback failed: {fallback_error}", | |
| 'traceback': fallback_trace | |
| } | |
| # HYBRID: Add TML enhancement if MCP succeeded | |
| if method == 'HYBRID' and liveboard_result.get('success') and liveboard_result.get('liveboard_guid'): | |
| log_progress(f" Step 2/2: Enhancing with TML post-processing...") | |
| enhance_result = enhance_mcp_liveboard( | |
| liveboard_guid=liveboard_result['liveboard_guid'], | |
| company_data=company_data, | |
| ts_client=self, | |
| add_groups=True, | |
| fix_kpis=True, | |
| apply_brand_colors=True | |
| ) | |
| if enhance_result.get('success'): | |
| log_progress(f" [OK] Enhancement applied") | |
| else: | |
| enhance_err = f"TML enhancement failed: {enhance_result.get('message', 'unknown')[:100]}" | |
| log_progress(f" [ERROR] {enhance_err}") | |
| results['errors'].append(enhance_err) | |
| # Check result | |
| print(f"π DEBUG: Liveboard result received: {liveboard_result}") | |
| print(f"π DEBUG: Success flag: {liveboard_result.get('success')}") | |
| if liveboard_result.get('success'): | |
| lb_time = time.time() - lb_start | |
| log_progress(f"[OK] Liveboard created via {method} ({lb_time:.1f}s)") | |
| results['liveboard'] = liveboard_result.get('liveboard_name') | |
| results['liveboard_guid'] = liveboard_result.get('liveboard_guid') | |
| results['liveboard_method'] = method | |
| if liveboard_result.get('liveboard_url'): | |
| results['liveboard_url'] = liveboard_result.get('liveboard_url') | |
| # Assign tag to liveboard | |
| if tag_name and liveboard_result.get('liveboard_guid'): | |
| log_progress(f"Assigning tag '{tag_name}' to liveboard...") | |
| self.assign_tags_to_objects([liveboard_result['liveboard_guid']], 'PINBOARD_ANSWER_BOOK', tag_name) | |
| # Share liveboard | |
| _effective_share = share_with or get_admin_setting('SHARE_WITH', required=False) | |
| if _effective_share and liveboard_result.get('liveboard_guid'): | |
| log_progress(f"Sharing liveboard with '{_effective_share}'...") | |
| self.share_objects([liveboard_result['liveboard_guid']], 'LIVEBOARD', _effective_share) | |
| else: | |
| error = f"Liveboard creation failed: {liveboard_result.get('error', 'Unknown error')}" | |
| print(f"β DEBUG: Liveboard creation failed! Error: {error}") | |
| results['errors'].append(error) | |
| log_progress(f"[ERROR] {error}") | |
| except Exception as lb_error: | |
| error = f"Liveboard creation exception: {str(lb_error)}" | |
| results['errors'].append(error) | |
| log_progress(f"[ERROR] {error}") | |
| else: | |
| # Extract detailed error information | |
| obj_response = objects[0].get('response', {}) | |
| status = obj_response.get('status', {}) | |
| error_message = status.get('error_message', 'Unknown error') | |
| # Clean HTML tags from error message (ThoughtSpot sometimes returns HTML) | |
| error_message = re.sub(r'<[^>]+>', '', error_message).strip() | |
| if not error_message: | |
| error_message = 'Schema validation failed (no details provided)' | |
| error_code = status.get('error_code', 'N/A') | |
| # Try to extract additional error details from various response fields | |
| error_details = [] | |
| # Check for detailed error messages in different response structures | |
| if 'error_details' in status: | |
| error_details.append(f"Error details: {status.get('error_details')}") | |
| if 'validation_errors' in obj_response: | |
| error_details.append(f"Validation errors: {obj_response.get('validation_errors')}") | |
| if 'warnings' in obj_response: | |
| error_details.append(f"Warnings: {obj_response.get('warnings')}") | |
| # Check header for additional info | |
| header = obj_response.get('header', {}) | |
| if 'error' in header: | |
| error_details.append(f"Header error: {header.get('error')}") | |
| # Get any additional error details | |
| full_response = json.dumps(objects[0], indent=2) | |
| # Save the TML that failed for debugging | |
| import tempfile | |
| # os is already imported at module level | |
| try: | |
| debug_dir = os.path.join(tempfile.gettempdir(), 'thoughtspot_debug') | |
| os.makedirs(debug_dir, exist_ok=True) | |
| failed_tml_path = os.path.join(debug_dir, f'failed_model_{datetime.now().strftime("%Y%m%d_%H%M%S")}.tml') | |
| with open(failed_tml_path, 'w') as f: | |
| f.write(model_tml) | |
| log_progress(f"πΎ Failed TML saved to: {failed_tml_path}") | |
| print(f"πΎ Failed TML saved to: {failed_tml_path}") | |
| except Exception as save_error: | |
| log_progress(f"[WARN] Could not save failed TML: {save_error}") | |
| # Build comprehensive error message | |
| error = f"Model validation failed: {error_message}" | |
| if error_code != 'N/A': | |
| error += f" (Error code: {error_code})" | |
| if error_details: | |
| error += f"\n\nAdditional details:\n" + "\n".join(error_details) | |
| print(f"π Full model response: {full_response}") # DEBUG: Show full response | |
| print(f" β {error}") | |
| log_progress(f" β {error}") | |
| log_progress(f" π Full response details:") | |
| log_progress(f"{full_response}") | |
| # Log full TML for debugging | |
| log_progress(f"\nπ TML that was sent:\n{model_tml}") | |
| results['errors'].append(error) | |
| results['errors'].append(f"Full API response: {full_response}") | |
| results['errors'].append(f"Failed TML saved to: {failed_tml_path if 'failed_tml_path' in locals() else 'N/A'}") | |
| else: | |
| error = "Model failed: No objects in response" | |
| log_progress(f" β {error}") | |
| results['errors'].append(error) | |
| # Mark as successful if we got this far | |
| results['success'] = len(results['errors']) == 0 | |
| except Exception as e: | |
| import traceback | |
| error_msg = str(e) | |
| full_trace = traceback.format_exc() | |
| # Log to console with full details | |
| print(f"\n{'='*60}") | |
| print(f"β DEPLOYMENT EXCEPTION") | |
| print(f"{'='*60}") | |
| print(f"Error: {error_msg}") | |
| print(f"\nFull traceback:") | |
| print(full_trace) | |
| print(f"{'='*60}\n") | |
| # Log through callback too | |
| log_progress(f"[ERROR] Deployment failed: {error_msg}") | |
| log_progress(f"Traceback: {full_trace}") | |
| results['errors'].append(error_msg) | |
| results['errors'].append(f"Traceback: {full_trace}") | |
| return results | |
| def deploy_to_thoughtspot(ddl: str, database: str, schema: str, | |
| connection_name: str = None, company_name: str = None, | |
| use_case: str = None, progress_callback=None) -> Dict: | |
| """ | |
| Convenience function for deploying to ThoughtSpot | |
| Args: | |
| ddl: Data Definition Language statements | |
| database: Target database name | |
| schema: Target schema name | |
| connection_name: Optional connection name | |
| progress_callback: Optional callback for progress updates | |
| Returns: | |
| Dict with deployment results | |
| """ | |
| deployer = ThoughtSpotDeployer() | |
| return deployer.deploy_all(ddl, database, schema, connection_name, company_name, use_case, progress_callback) | |
| if __name__ == "__main__": | |
| # Example usage | |
| test_ddl = """ | |
| CREATE TABLE CUSTOMERS ( | |
| CUSTOMERID INT64 PRIMARY KEY, | |
| NAME VARCHAR(255) | |
| ); | |
| """ | |
| # Test deployment - using a schema that exists | |
| results = deploy_to_thoughtspot( | |
| ddl=test_ddl, | |
| database="DEMOBUILD", # Use the actual Snowflake database | |
| schema="THOUGHTSPO_SALESA_20250915_193303" # Use the working schema from your working table | |
| ) | |
| print("\n" + "=" * 60) | |
| print("π DEPLOYMENT RESULTS:") | |
| print("=" * 60) | |
| print(json.dumps(results, indent=2)) | |