Spaces:
Running
Running
| """ | |
| Liveboard Creator Module | |
| Creates ThoughtSpot Liveboards from company research data and existing models. | |
| Uses reference TML files as templates and AI to generate appropriate visualizations. | |
| PHASE 1 ENHANCEMENTS: | |
| - Outlier-driven visualization generation | |
| - Natural language to ThoughtSpot search query translation | |
| - Intelligent chart type inference | |
| - Support for diverse chart types (12+ types) | |
| """ | |
| import json | |
| import yaml | |
| import os | |
| from supabase_client import get_admin_setting | |
| import re | |
| import requests | |
| from typing import Dict, List, Optional | |
| from llm_config import DEFAULT_LLM_MODEL, map_llm_display_to_provider | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| # Cache for direct API auth token | |
| _direct_api_token = None | |
| _direct_api_session = None | |
| def _clean_viz_title(title: str) -> str: | |
| """ | |
| Clean up visualization titles to be more readable. | |
| Examples: | |
| 'shipping_cost by month last 18 months' → 'Shipping Cost by Month' | |
| 'Top 15 product_name by quantity_shipped' → 'Top 15 Products by Quantity Shipped' | |
| 'total_revenue weekly' → 'Total Revenue Weekly' | |
| """ | |
| if not title: | |
| return title | |
| # Remove date filter suffixes | |
| date_filters = [ | |
| ' last 18 months', ' last 12 months', ' last 2 years', ' last year', | |
| ' last 6 months', ' last 3 months', ' last 30 days', ' last 90 days' | |
| ] | |
| for filter_str in date_filters: | |
| if title.lower().endswith(filter_str): | |
| title = title[:-len(filter_str)] | |
| # Replace underscores with spaces | |
| title = title.replace('_', ' ') | |
| # Clean up common column name patterns | |
| replacements = { | |
| 'product name': 'Products', | |
| 'supplier name': 'Suppliers', | |
| 'warehouse name': 'Warehouses', | |
| 'customer name': 'Customers', | |
| 'brand name': 'Brands', | |
| 'store name': 'Stores', | |
| 'category name': 'Categories', | |
| 'region name': 'Regions', | |
| } | |
| title_lower = title.lower() | |
| for old, new in replacements.items(): | |
| if old in title_lower: | |
| # Case-insensitive replace | |
| import re | |
| title = re.sub(re.escape(old), new, title, flags=re.IGNORECASE) | |
| # Title case the result, but preserve words like 'by', 'vs', 'and' | |
| words = title.split() | |
| result = [] | |
| small_words = {'by', 'vs', 'and', 'or', 'the', 'a', 'an', 'of', 'in', 'on', 'to'} | |
| for i, word in enumerate(words): | |
| if i == 0 or word.lower() not in small_words: | |
| result.append(word.capitalize()) | |
| else: | |
| result.append(word.lower()) | |
| return ' '.join(result) | |
| def _get_direct_api_session(username: str, secret_key: str): | |
| """ | |
| Create an authenticated session for direct ThoughtSpot API calls. | |
| Uses trusted auth — authenticates as the given username. | |
| Returns (session, token) tuple, or (None, None) on failure. | |
| """ | |
| base_url = get_admin_setting('THOUGHTSPOT_URL').rstrip('/') | |
| if not all([base_url, username, secret_key]): | |
| print(f"⚠️ Direct API: Missing settings (URL={bool(base_url)}, USER={bool(username)}, KEY={bool(secret_key)})") | |
| return None, None | |
| # Get auth token | |
| auth_url = f"{base_url}/api/rest/2.0/auth/token/full" | |
| resp = requests.post(auth_url, json={ | |
| "username": username, | |
| "secret_key": secret_key, | |
| "validity_time_in_sec": 3600 | |
| }) | |
| if resp.status_code != 200: | |
| print(f"⚠️ Direct API auth failed: {resp.status_code}") | |
| return None, None | |
| token = resp.json().get('token') | |
| if not token: | |
| print(f"⚠️ Direct API: No token in response") | |
| return None, None | |
| session = requests.Session() | |
| session.headers['Authorization'] = f'Bearer {token}' | |
| session.headers['Content-Type'] = 'application/json' | |
| session.headers['Accept'] = 'application/json' | |
| print(f"✅ Direct API: Authenticated as {username} to {base_url}") | |
| return session, token | |
| def _get_answer_direct(question: str, model_id: str, username: str, secret_key: str) -> dict: | |
| """ | |
| Get an answer from ThoughtSpot using direct API call instead of MCP. | |
| This bypasses MCP's agent.thoughtspot.app proxy which may have issues | |
| with certain clusters (e.g., Spotter Agent v3 clusters). | |
| Args: | |
| question: Natural language question | |
| model_id: ThoughtSpot model GUID | |
| username: ThoughtSpot username to authenticate as | |
| Returns: | |
| dict with answer data including session_identifier, tokens, question, etc. | |
| Returns None on failure. | |
| """ | |
| session, _ = _get_direct_api_session(username, secret_key) | |
| if not session: | |
| return None | |
| base_url = get_admin_setting('THOUGHTSPOT_URL').rstrip('/') | |
| url = f"{base_url}/api/rest/2.0/ai/answer/create" | |
| payload = { | |
| "query": question, | |
| "metadata_identifier": model_id | |
| } | |
| try: | |
| resp = session.post(url, json=payload, timeout=60) | |
| if resp.status_code == 200: | |
| data = resp.json() | |
| print(f" [direct API] HTTP 200 — keys: {list(data.keys())}", flush=True) | |
| print(f" [direct API] session_identifier: {data.get('session_identifier')}", flush=True) | |
| print(f" [direct API] visualization_type: {data.get('visualization_type')}", flush=True) | |
| print(f" [direct API] message_type: {data.get('message_type')}", flush=True) | |
| has_tokens = bool(data.get('tokens')) | |
| print(f" [direct API] tokens present: {has_tokens}", flush=True) | |
| return { | |
| 'question': question, | |
| 'session_identifier': data.get('session_identifier'), | |
| 'tokens': data.get('tokens'), | |
| 'display_tokens': data.get('display_tokens'), | |
| 'visualization_type': data.get('visualization_type', 'Chart'), | |
| 'generation_number': data.get('generation_number', 1), | |
| 'message_type': data.get('message_type', 'TSAnswer') | |
| } | |
| else: | |
| error_msg = resp.text[:500] if resp.text else f"Status {resp.status_code}" | |
| print(f" ⚠️ Direct API answer FAILED — HTTP {resp.status_code}: {error_msg}", flush=True) | |
| return None | |
| except Exception as e: | |
| print(f" ⚠️ Direct API exception: {str(e)}") | |
| return None | |
| def _strip_chart_type_hints(text: str) -> str: | |
| """ | |
| Strip chart type hints like (line chart), (bar chart), (KPI) from question text. | |
| These hints help AI infer chart type but shouldn't appear in visualization titles. | |
| Examples: | |
| "Monthly sales - last 12 months (line chart)" → "Monthly sales - last 12 months" | |
| "Top 10 products (bar chart)" → "Top 10 products" | |
| "Total revenue (KPI)" → "Total revenue" | |
| "Sales by region - bar chart" → "Sales by region" | |
| """ | |
| # Pattern matches common chart type hints - both (in parens) and - dash prefix | |
| patterns = [ | |
| # Parentheses format | |
| r'\s*\(line chart\)\s*$', | |
| r'\s*\(bar chart\)\s*$', | |
| r'\s*\(horizontal bar chart\)\s*$', | |
| r'\s*\(stacked column chart\)\s*$', | |
| r'\s*\(stacked column\)\s*$', | |
| r'\s*\(stacked bar chart\)\s*$', | |
| r'\s*\(stacked bar\)\s*$', | |
| r'\s*\(column chart\)\s*$', | |
| r'\s*\(pie chart\)\s*$', | |
| r'\s*\(donut chart\)\s*$', | |
| r'\s*\(area chart\)\s*$', | |
| r'\s*\(scatter chart\)\s*$', | |
| r'\s*\(geo map\)\s*$', | |
| r'\s*\(table\)\s*$', | |
| r'\s*\(KPI\)\s*$', | |
| r'\s*\(single number\)\s*$', | |
| # Dash format (no parens) | |
| r'\s*-\s*line chart\s*$', | |
| r'\s*-\s*bar chart\s*$', | |
| r'\s*-\s*horizontal bar chart\s*$', | |
| r'\s*-\s*stacked column chart\s*$', | |
| r'\s*-\s*stacked column\s*$', | |
| r'\s*-\s*stacked bar chart\s*$', | |
| r'\s*-\s*stacked bar\s*$', | |
| r'\s*-\s*column chart\s*$', | |
| r'\s*-\s*pie chart\s*$', | |
| r'\s*-\s*donut chart\s*$', | |
| r'\s*-\s*area chart\s*$', | |
| r'\s*-\s*KPI\s*$', | |
| ] | |
| result = text | |
| for pattern in patterns: | |
| result = re.sub(pattern, '', result, flags=re.IGNORECASE) | |
| return result.strip() | |
| # Chart type definitions with use cases | |
| CHART_TYPES = { | |
| 'KPI': { | |
| 'best_for': 'single metric with trend, sparkline, and comparison', | |
| 'requires': 'one measure, optional time dimension', | |
| 'ts_type': 'KPI' | |
| }, | |
| 'LINE': { | |
| 'best_for': 'trends over time, showing patterns', | |
| 'requires': 'measure + time dimension', | |
| 'ts_type': 'LINE' | |
| }, | |
| 'COLUMN': { | |
| 'best_for': 'comparing categories, discrete values', | |
| 'requires': 'measure + 1-2 dimensions', | |
| 'ts_type': 'COLUMN' | |
| }, | |
| 'BAR': { | |
| 'best_for': 'ranking, top N lists, horizontal comparison', | |
| 'requires': 'measure + dimension', | |
| 'ts_type': 'BAR' | |
| }, | |
| 'STACKED_COLUMN': { | |
| 'best_for': 'part-to-whole over categories', | |
| 'requires': 'measure + 2 dimensions (x-axis + color)', | |
| 'ts_type': 'STACKED_COLUMN' | |
| }, | |
| 'AREA': { | |
| 'best_for': 'cumulative trends over time', | |
| 'requires': 'measure + time dimension', | |
| 'ts_type': 'AREA' | |
| }, | |
| 'PIE': { | |
| 'best_for': 'simple part-to-whole for few categories', | |
| 'requires': 'measure + dimension (max 7 values)', | |
| 'ts_type': 'PIE' | |
| }, | |
| 'SCATTER': { | |
| 'best_for': 'correlation between two measures', | |
| 'requires': '2 measures + optional category dimension', | |
| 'ts_type': 'SCATTER' | |
| }, | |
| 'TABLE': { | |
| 'best_for': 'detailed data with multiple columns', | |
| 'requires': 'any combination of measures and dimensions', | |
| 'ts_type': 'GRID_TABLE' # ThoughtSpot uses GRID_TABLE, not TABLE | |
| }, | |
| 'GEO_AREA': { | |
| 'best_for': 'geographic regions on a map', | |
| 'requires': 'measure + geographic dimension (region, country, state)', | |
| 'ts_type': 'GEO_AREA' | |
| }, | |
| 'GEO_BUBBLE': { | |
| 'best_for': 'geographic points with size', | |
| 'requires': 'measure + lat/long or city', | |
| 'ts_type': 'GEO_BUBBLE' | |
| }, | |
| 'PIVOT_TABLE': { | |
| 'best_for': 'multi-dimensional data breakdown with drill-down', | |
| 'requires': 'multiple dimensions + measures', | |
| 'ts_type': 'PIVOT_TABLE' | |
| }, | |
| 'SANKEY': { | |
| 'best_for': 'flow visualization between categories', | |
| 'requires': '2+ dimensions showing hierarchy + measure', | |
| 'ts_type': 'SANKEY' | |
| }, | |
| 'BUBBLE': { | |
| 'best_for': 'three-variable comparison (x, y, size)', | |
| 'requires': '2 measures + size measure + optional dimension', | |
| 'ts_type': 'BUBBLE' | |
| }, | |
| 'TREEMAP': { | |
| 'best_for': 'hierarchical data with size comparison', | |
| 'requires': 'dimension hierarchy + measure', | |
| 'ts_type': 'TREEMAP' | |
| }, | |
| 'TEXT': { | |
| 'best_for': 'text descriptions, titles, annotations with colored backgrounds', | |
| 'requires': 'no data, just static text content', | |
| 'ts_type': 'TEXT' | |
| }, | |
| 'HEATMAP': { | |
| 'best_for': 'matrix visualization with color intensity', | |
| 'requires': '2 dimensions + measure for color', | |
| 'ts_type': 'HEATMAP' | |
| } | |
| } | |
| class OutlierParser: | |
| """Parse demo outliers from population scripts""" | |
| def parse_demo_outliers(population_script: str) -> List[Dict]: | |
| """ | |
| Extract structured outlier comments from population script | |
| Format expected: | |
| # DEMO_OUTLIER: High-Value Customers at Risk | |
| # INSIGHT: Top 5 customers (>$50K LTV) showing declining satisfaction | |
| # SHOW_ME: "Show customers where lifetime_value > 50000 and satisfaction < 3" | |
| # IMPACT: $250K annual revenue at risk | |
| # TALKING_POINT: Notice how ThoughtSpot instantly surfaces... | |
| Returns: | |
| List of outlier dictionaries | |
| """ | |
| outliers = [] | |
| # Multi-line regex to capture structured comments | |
| pattern = r'# DEMO_OUTLIER: (.*?)\n# INSIGHT: (.*?)\n# SHOW_ME: ["\']?(.*?)["\']?\n# IMPACT: (.*?)\n# TALKING_POINT: (.*?)(?:\n|$)' | |
| matches = re.findall(pattern, population_script, re.MULTILINE | re.DOTALL) | |
| for match in matches: | |
| outliers.append({ | |
| 'title': match[0].strip(), | |
| 'insight': match[1].strip(), | |
| 'show_me_query': match[2].strip(), | |
| 'impact': match[3].strip(), | |
| 'talking_point': match[4].strip() | |
| }) | |
| return outliers | |
| def clean_viz_title(question: str) -> str: | |
| """ | |
| Extract a clean, short title from a verbose question | |
| Examples: | |
| "What is the total revenue? Show only..." -> "Total Revenue" | |
| "Show the top 10 products by revenue..." -> "Top 10 Products by Revenue" | |
| """ | |
| # Remove instructions after question mark or period | |
| question = question.split('?')[0].strip() | |
| question = question.split('.')[0].strip() | |
| # Common patterns to clean | |
| patterns = [ | |
| (r'^What is the ', ''), | |
| (r'^What are the ', ''), | |
| (r'^Show me the ', ''), | |
| (r'^Show me ', ''), | |
| (r'^Show the ', ''), | |
| (r'^Show ', ''), | |
| (r'^Create a detailed table showing ', ''), | |
| (r'^How (?:does|do) ', ''), | |
| (r' for Amazon\.com', ''), | |
| (r' for the company', ''), | |
| ] | |
| clean = question | |
| for pattern, replacement in patterns: | |
| clean = re.sub(pattern, replacement, clean, flags=re.IGNORECASE) | |
| # Capitalize first letter | |
| if clean: | |
| clean = clean[0].upper() + clean[1:] | |
| # Limit length | |
| if len(clean) > 80: | |
| clean = clean[:77] + "..." | |
| return clean or question[:80] | |
| def extract_brand_colors_from_css(css_url: str) -> List[str]: | |
| """Extract color codes from a CSS file, filtering for brand-appropriate colors""" | |
| try: | |
| response = requests.get(css_url, timeout=5) | |
| if response.status_code == 200: | |
| css_content = response.text | |
| # Find all hex colors | |
| hex_colors = re.findall(r'#([0-9A-Fa-f]{6}|[0-9A-Fa-f]{3})\b', css_content) | |
| # Convert 3-digit to 6-digit and calculate brightness | |
| color_data = [] | |
| for color in hex_colors: | |
| if len(color) == 3: | |
| color = ''.join([c*2 for c in color]) | |
| # Calculate brightness | |
| r, g, b = int(color[0:2], 16), int(color[2:4], 16), int(color[4:6], 16) | |
| brightness = (r * 299 + g * 587 + b * 114) / 1000 | |
| # Filter out too dark (< 30) or too light (> 240) colors | |
| # These are usually backgrounds, not brand colors | |
| if 30 < brightness < 240: | |
| # Calculate saturation | |
| max_c = max(r, g, b) | |
| min_c = min(r, g, b) | |
| saturation = (max_c - min_c) / max_c if max_c > 0 else 0 | |
| color_data.append({ | |
| 'color': f'#{color}', | |
| 'brightness': brightness, | |
| 'saturation': saturation | |
| }) | |
| # Deduplicate | |
| seen = set() | |
| unique_colors = [] | |
| for c in color_data: | |
| if c['color'] not in seen: | |
| seen.add(c['color']) | |
| unique_colors.append(c) | |
| # Sort by saturation (vibrant colors first) then brightness | |
| unique_colors.sort(key=lambda x: (x['saturation'], x['brightness']), reverse=True) | |
| # Take top 5 most vibrant colors | |
| brand_colors = [c['color'] for c in unique_colors[:5]] | |
| return brand_colors if brand_colors else None | |
| except: | |
| pass | |
| return None | |
| def create_branded_note_tile(company_data: Dict, use_case: str, answers: List[Dict], | |
| source_info: List[str], outlier_section: str = "") -> str: | |
| """ | |
| Create a branded note tile matching golden demo format | |
| Uses ThoughtSpot's native theme classes for proper rendering | |
| Args: | |
| company_data: Company information including logo_url and brand_colors | |
| use_case: Use case name | |
| answers: List of visualization answers | |
| source_info: List of source information strings | |
| outlier_section: HTML for outlier highlights section | |
| Returns: | |
| HTML string for the note tile | |
| """ | |
| # Extract company info | |
| company_name = company_data.get('name', 'Company') | |
| use_case_display = use_case if use_case else company_data.get('use_case', 'Analytics') | |
| viz_count = len(answers) | |
| # Cleaner note tile - simpler format like golden demo | |
| note_tile = f"""<h2 class="theme-module__editor-h2" dir="ltr"><span style="color: rgb(255, 255, 255); white-space: pre-wrap;">{company_name} {use_case_display}</span></h2><hr><p class="theme-module__editor-paragraph" dir="ltr"><span style="color: rgb(255, 255, 255); white-space: pre-wrap;">Featuring </span><b><strong class="theme-module__editor-text-bold" style="color: rgb(64, 193, 192); white-space: pre-wrap;">{viz_count} visualizations</strong></b><span style="color: rgb(255, 255, 255); white-space: pre-wrap;"> powered by ThoughtSpot AI</span></p><p class="theme-module__editor-paragraph"><br></p><p class="theme-module__editor-paragraph" dir="ltr"><span style="color: rgb(255, 255, 255); white-space: pre-wrap;">Built with </span><b><strong class="theme-module__editor-text-bold" style="color: rgb(64, 193, 192); white-space: pre-wrap;">Demo Wire</strong></b></p>""" | |
| return note_tile | |
| class QueryTranslator: | |
| """Translate natural language queries to ThoughtSpot search syntax""" | |
| def __init__(self, llm_researcher): | |
| self.llm_researcher = llm_researcher | |
| def convert_natural_to_search( | |
| self, | |
| natural_query: str, | |
| model_columns: List[Dict] | |
| ) -> str: | |
| """ | |
| Convert natural language to ThoughtSpot search query | |
| Args: | |
| natural_query: Natural language query (e.g., "Show customers where lifetime_value > 50000") | |
| model_columns: Available columns from model | |
| Returns: | |
| ThoughtSpot search query string | |
| """ | |
| # Extract column names (exact names required) | |
| column_names = [col.get('name', '') for col in model_columns if col.get('name')] | |
| columns_text = ", ".join(sorted(column_names)) | |
| prompt = f"""Convert this natural language query to ThoughtSpot search syntax. | |
| Natural Query: {natural_query} | |
| AVAILABLE COLUMNS (use EXACT names): | |
| {columns_text} | |
| CRITICAL RULES: | |
| 1. You MUST use EXACT column names from the list above - no fuzzy matching or abbreviations | |
| 2. If query mentions "product", find the EXACT column name (e.g., "Productname" or "prodProductid") | |
| 3. If query mentions "customer", find the EXACT column name (e.g., "Name", "custCustomerid") | |
| 4. If query mentions "location", find the EXACT column name (e.g., "Storename", "locaLocationid") | |
| 5. ThoughtSpot auto-aggregates - DO NOT use sum(), count(), avg() functions | |
| 6. Use [column name] syntax with exact column name: [Productname] not [product] | |
| 7. TIME GRANULARITY: NEVER use bare "weekly", "monthly", "daily" as standalone tokens. | |
| Always attach granularity to the date column: [Date Column].weekly, [Date Column].monthly | |
| Example: "sales weekly" → "[Sales Amount] [Order Date].weekly" | |
| Example: "revenue by month" → "[Revenue] [Transaction Date].monthly" | |
| 8. If no date column is needed, just use the measure and dimension columns. | |
| Examples: | |
| - "Show products where stock > 900" → "[Productname] [Stocklevel] [Stocklevel] > 900" | |
| - "Show customers where lifetime value > 50000" → "[Name] [Lifetimevalue] [Lifetimevalue] > 50000" | |
| - "Show sales by region" → "[Salesamount] [Region]" | |
| - "Show ASP weekly trend" → "[Asp Amount] [Order Date].weekly" | |
| - "Revenue by month last 12 months" → "[Revenue] [Date].monthly [Date].'last 12 months'" | |
| Convert the natural query above to ThoughtSpot search syntax using EXACT column names. | |
| Return ONLY the search query string, nothing else.""" | |
| try: | |
| messages = [{"role": "user", "content": prompt}] | |
| search_query = self.llm_researcher.make_request(messages, temperature=0.1, max_tokens=200, stream=False).strip() | |
| # Remove any quotes or extra formatting | |
| search_query = search_query.strip('"\'`') | |
| return search_query | |
| except Exception as e: | |
| print(f"⚠️ Error converting query: {e}") | |
| # Fallback: try simple pattern matching | |
| return self._fallback_conversion(natural_query, model_columns) | |
| def _fallback_conversion(self, natural_query: str, model_columns: List[Dict]) -> str: | |
| """Simple fallback conversion if AI fails""" | |
| # Extract column names that appear in the query | |
| query_lower = natural_query.lower() | |
| found_columns = [] | |
| for col in model_columns: | |
| col_name = col.get('name', '') | |
| if col_name.lower().replace('_', ' ') in query_lower: | |
| found_columns.append(f"[{col_name}]") | |
| if found_columns: | |
| return " ".join(found_columns) | |
| return "[data]" # Minimal fallback | |
| def infer_chart_type( | |
| self, | |
| search_query: str, | |
| model_columns: List[Dict], | |
| outlier_context: Optional[Dict] = None | |
| ) -> str: | |
| """ | |
| Infer best chart type for a search query | |
| Args: | |
| search_query: ThoughtSpot search query | |
| model_columns: Available model columns | |
| outlier_context: Optional outlier context for better inference | |
| Returns: | |
| Chart type name (e.g., 'KPI', 'LINE', 'COLUMN') | |
| """ | |
| # Build context for AI | |
| chart_types_desc = "\n".join([ | |
| f"- {name}: {info['best_for']} (requires: {info['requires']})" | |
| for name, info in CHART_TYPES.items() | |
| ]) | |
| context = f"Search Query: {search_query}" | |
| if outlier_context: | |
| context += f"\nOutlier Context: {outlier_context.get('title')} - {outlier_context.get('insight')}" | |
| prompt = f"""{context} | |
| Available Chart Types: | |
| {chart_types_desc} | |
| What is the BEST chart type for this query? | |
| Guidelines: | |
| - If showing a single number or metric over time → KPI | |
| - If showing trend over time (date/month/year) → LINE or AREA | |
| - If comparing categories or groups → COLUMN or BAR | |
| - If showing part-to-whole → PIE or STACKED_COLUMN | |
| - If showing correlation between two numbers → SCATTER | |
| - If showing detailed data → TABLE | |
| Return ONLY the chart type name (e.g., "LINE" or "COLUMN"), nothing else.""" | |
| try: | |
| messages = [{"role": "user", "content": prompt}] | |
| chart_type = self.llm_researcher.make_request(messages, temperature=0.1, max_tokens=20, stream=False).strip().upper() | |
| # Validate it's a known type | |
| if chart_type in CHART_TYPES: | |
| return chart_type | |
| # If not valid, return default | |
| return 'COLUMN' | |
| except Exception as e: | |
| print(f"⚠️ Error inferring chart type: {e}") | |
| return 'COLUMN' # Safe default | |
| class LiveboardCreator: | |
| """Create Liveboards from company research and model metadata""" | |
| def __init__(self, ts_client, model_id: str, model_name: str, llm_model: str = None): | |
| """ | |
| Initialize LiveboardCreator | |
| Args: | |
| ts_client: ThoughtSpotDeployer instance (already authenticated) | |
| model_id: GUID of the ThoughtSpot model to build Liveboard on | |
| model_name: Display name of the model | |
| llm_model: LLM model to use (e.g., 'gpt-5.4', 'gpt-5.2') | |
| """ | |
| self.ts_client = ts_client | |
| self.model_id = model_id | |
| self.model_name = model_name | |
| self.model_columns = self._fetch_model_columns() | |
| # Use selected LLM model instead of hardcoded OpenAI | |
| from main_research import MultiLLMResearcher | |
| model_to_use = llm_model or DEFAULT_LLM_MODEL | |
| provider_name, model_name_llm = map_llm_display_to_provider(model_to_use) | |
| self.llm_researcher = MultiLLMResearcher(provider=provider_name, model=model_name_llm) | |
| self.query_translator = QueryTranslator(self.llm_researcher) | |
| def _fetch_model_columns(self) -> List[Dict]: | |
| """Get available columns from the model via TML export""" | |
| try: | |
| import yaml | |
| response = self.ts_client.session.post( | |
| f"{self.ts_client.base_url}/api/rest/2.0/metadata/tml/export", | |
| headers=self.ts_client.headers, | |
| json={"metadata": [{"identifier": self.model_id}]} | |
| ) | |
| if response.status_code == 200: | |
| result = response.json() | |
| if isinstance(result, list) and len(result) > 0: | |
| edoc = result[0].get('edoc') | |
| if edoc: | |
| tml = yaml.safe_load(edoc) | |
| # Get columns from model TML | |
| if 'model' in tml: | |
| columns = tml['model'].get('columns', []) | |
| # Convert to simpler format | |
| return [{ | |
| 'name': col.get('name'), | |
| 'type': col.get('properties', {}).get('column_type', 'ATTRIBUTE'), | |
| 'column_id': col.get('column_id') | |
| } for col in columns] | |
| print(f"Warning: Could not parse model columns") | |
| return [] | |
| else: | |
| print(f"Warning: Could not fetch model TML: {response.status_code}") | |
| return [] | |
| except Exception as e: | |
| print(f"Error fetching model columns: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| return [] | |
| def load_reference_tml(self, reference_type: str = "simple") -> Dict: | |
| """ | |
| Load reference TML from project folders | |
| Args: | |
| reference_type: 'simple' or 'demogold' | |
| - simple: dev_notes/tml_examples/liveboard_simple/Global Retail Apparel Sales.liveboard.tml | |
| - demogold: dev_notes/tml_examples/liveboard_demogold/rl test.liveboard.tml | |
| Returns: | |
| Parsed TML dictionary | |
| """ | |
| if reference_type == "simple": | |
| tml_path = "dev_notes/tml_examples/liveboard_simple/Global Retail Apparel Sales.liveboard.tml" | |
| else: | |
| tml_path = "dev_notes/tml_examples/liveboard_demogold/rl test.liveboard.tml" | |
| try: | |
| with open(tml_path, 'r') as f: | |
| return yaml.safe_load(f) | |
| except Exception as e: | |
| print(f"Error loading reference TML: {e}") | |
| return None | |
| def generate_search_query(self, viz_config: Dict) -> str: | |
| """ | |
| Generate ThoughtSpot search query from visualization intent | |
| Examples from golden demo TML: | |
| - KPI: "[sales] [date].weekly [date].'last 8 quarters'" (note: date appears TWICE!) | |
| - KPI: "[sales] [date].monthly [date].'last 8 quarters'" | |
| - Top N: "[sales] top 1 [product] [date].'last quarter'" | |
| - Compare: "[sales] by [city] [date].2023 vs [date].2024" | |
| IMPORTANT FOR KPIs WITH SPARKLINES: | |
| - Must include time_column with granularity (e.g., [date].monthly) | |
| - Should include a time filter (e.g., [date].'last 8 quarters') | |
| - The time column appears twice: once for granularity, once for filter | |
| Args: | |
| viz_config: Dictionary with keys: | |
| - measure: Measure column name | |
| - time_column: Optional time dimension | |
| - granularity: Optional time granularity (daily, weekly, monthly) | |
| - dimensions: List of grouping dimensions | |
| - filters: List of filter expressions | |
| - top_n: Optional top N limit | |
| - comparison: Optional comparison (e.g., "2023 vs 2024") | |
| Returns: | |
| ThoughtSpot search query string | |
| """ | |
| query_parts = [] | |
| chart_type = viz_config.get('chart_type', 'COLUMN') | |
| # Add measure | |
| if 'measure' in viz_config: | |
| query_parts.append(f"[{viz_config['measure']}]") | |
| # Add top N if specified (goes right after measure) | |
| if 'top_n' in viz_config: | |
| query_parts.append(f"top {viz_config['top_n']}") | |
| # Add time dimension with granularity | |
| time_column = viz_config.get('time_column') | |
| granularity = viz_config.get('granularity') | |
| if time_column: | |
| time_part = f"[{time_column}]" | |
| if granularity: | |
| time_part += f".{granularity}" # .daily, .weekly, .monthly | |
| # Add comparison if specified | |
| if 'comparison' in viz_config: | |
| time_part += f" {viz_config['comparison']}" # "2023 vs 2024" | |
| query_parts.append(time_part) | |
| # Add grouping dimensions | |
| if 'dimensions' in viz_config: | |
| for dim in viz_config['dimensions']: | |
| query_parts.append(f"[{dim}]") | |
| # Add filters | |
| filters = viz_config.get('filters', []) | |
| # For KPIs with time_column but no time filter, add a default time filter | |
| # This is CRITICAL for sparklines and comparisons to work! | |
| if chart_type == 'KPI' and time_column and granularity: | |
| has_time_filter = any(time_column.lower() in f.lower() for f in filters) | |
| if not has_time_filter: | |
| # Add default time filter based on granularity | |
| default_time_filter = { | |
| 'daily': f"[{time_column}].'last 30 days'", | |
| 'weekly': f"[{time_column}].'last 8 quarters'", | |
| 'monthly': f"[{time_column}].'last 8 quarters'", | |
| 'quarterly': f"[{time_column}].'last 2 years'", | |
| 'yearly': f"[{time_column}].'last 5 years'" | |
| } | |
| filters = list(filters) + [default_time_filter.get(granularity, f"[{time_column}].'last 12 months'")] | |
| for filter_spec in filters: | |
| # Filters should already be in format: "[date].'last quarter'" | |
| query_parts.append(filter_spec) | |
| return " ".join(query_parts) | |
| def build_advanced_query(self, measures: List[str], dimensions: List[str], | |
| chart_type: str, time_config: Dict = None, | |
| filters: List[str] = None) -> str: | |
| """ | |
| Build advanced ThoughtSpot search query with time comparisons and filters | |
| Args: | |
| measures: List of measure columns | |
| dimensions: List of dimension columns | |
| chart_type: Type of chart for context | |
| time_config: Optional time configuration: | |
| - granularity: 'weekly', 'monthly', 'quarterly', 'yearly' | |
| - comparison: 'vs_previous', 'vs_year_ago' | |
| - range: 'last 12 months', 'this year', 'last quarter' | |
| filters: Optional list of filter conditions | |
| Returns: | |
| ThoughtSpot search query string | |
| """ | |
| query_parts = [] | |
| # Handle special chart types that need different query structures | |
| if chart_type == 'PIVOT_TABLE' and len(dimensions) > 1: | |
| # For pivot tables, structure as: [measures] [dim1] [dim2] [time] | |
| for measure in measures: | |
| query_parts.append(f"sum [{measure}]") | |
| # Add time granularity if specified | |
| if time_config and 'granularity' in time_config: | |
| date_col = self.map_semantic_to_column('date') or 'date' | |
| query_parts.append(f"[{date_col}].{time_config['granularity']}") | |
| # Add dimensions | |
| for dim in dimensions: | |
| query_parts.append(f"[{dim}]") | |
| elif chart_type == 'SANKEY' and len(dimensions) >= 2: | |
| # For Sankey, need hierarchical dimensions | |
| for measure in measures: | |
| query_parts.append(f"sum [{measure}]") | |
| # Add dimensions in hierarchical order | |
| for dim in dimensions[:3]: # Limit to 3 levels | |
| query_parts.append(f"[{dim}]") | |
| elif chart_type in ['BUBBLE', 'SCATTER'] and len(measures) >= 2: | |
| # For bubble/scatter, need multiple measures | |
| for measure in measures[:3]: # x, y, and optionally size | |
| query_parts.append(f"sum [{measure}]") | |
| # Add category dimension if present | |
| if dimensions: | |
| query_parts.append("by") | |
| query_parts.append(f"[{dimensions[0]}]") | |
| else: | |
| # Standard query building with aggregation | |
| for measure in measures: | |
| # Check if growth calculation is needed | |
| if time_config and time_config.get('growth'): | |
| query_parts.append(f"growth of [{measure}]") | |
| else: | |
| query_parts.append(f"sum [{measure}]") | |
| # Add dimensions with "by" keyword | |
| if dimensions and chart_type != 'KPI': | |
| query_parts.append("by") | |
| for dim in dimensions: | |
| query_parts.append(f"[{dim}]") | |
| # Add time comparisons (e.g., [date].2023 vs [date].2022) | |
| if time_config: | |
| date_col = self.map_semantic_to_column('date') or 'date' | |
| if 'comparison' in time_config: | |
| if time_config['comparison'] == 'vs_year_ago': | |
| current_year = 2024 # Could make this dynamic | |
| query_parts.append(f"[{date_col}].{current_year} vs [{date_col}].{current_year-1}") | |
| elif time_config['comparison'] == 'vs_previous': | |
| query_parts.append(f"[{date_col}].'this quarter' vs [{date_col}].'last quarter'") | |
| # Add time range filter | |
| if 'range' in time_config: | |
| query_parts.append(f"[{date_col}].'{time_config['range']}'") | |
| # Add any additional filters | |
| if filters: | |
| for filter_condition in filters: | |
| query_parts.append(filter_condition) | |
| # Add top N for non-geographic dimensions | |
| if dimensions and chart_type in ['COLUMN', 'BAR']: | |
| is_geo = any(geo_term in dim.lower() for dim in dimensions | |
| for geo_term in ['region', 'country', 'state', 'city', 'location']) | |
| if not is_geo: | |
| query_parts.append("top 10") | |
| return ' '.join(query_parts) | |
| def map_semantic_to_column(self, semantic_type: str) -> Optional[str]: | |
| """ | |
| Map semantic type to actual column name using AI | |
| Args: | |
| semantic_type: Semantic description like 'sales_amount', 'product_name', 'customer_name' | |
| Returns: | |
| Actual column name from model, or None if no good match | |
| Examples: | |
| 'sales_amount' → 'Salesamount' | |
| 'product_name' → 'Productname' | |
| 'customer_name' → 'Name' | |
| 'geographic_region' → 'Region' | |
| """ | |
| # Get available column names | |
| column_names = [col.get('name', '') for col in self.model_columns if col.get('name')] | |
| columns_text = ", ".join(sorted(column_names)) | |
| prompt = f"""Find the best matching column name for this semantic type. | |
| Semantic Type: {semantic_type} | |
| Available Columns: {columns_text} | |
| MATCHING RULES: | |
| 1. Look for exact or partial matches (e.g., 'sales_amount' matches 'Salesamount' or 'Sales') | |
| 2. Consider common abbreviations (e.g., 'customer_name' matches 'Name' if in customer context) | |
| 3. Geographic terms: 'geographic_region' matches 'Region', 'State', 'Territory', etc. | |
| 4. Time terms: 'time_period' matches 'Date', 'Month', 'Quarter', 'Year' | |
| 5. If no good match exists, return "NONE" | |
| Return ONLY the exact column name from the available list, or "NONE" if no match. | |
| Examples: | |
| - Input: "sales_amount" → Output: "Salesamount" | |
| - Input: "product_name" → Output: "Productname" | |
| - Input: "customer_name" → Output: "Name" | |
| - Input: "foo_bar_baz" (no match) → Output: "NONE" | |
| """ | |
| try: | |
| messages = [{"role": "user", "content": prompt}] | |
| column_name = self.llm_researcher.make_request(messages, temperature=0.1, max_tokens=30, stream=False).strip() | |
| # Parse out just the column name if AI included "Output: " prefix | |
| if column_name.startswith('Output:'): | |
| column_name = column_name.replace('Output:', '').strip().strip('"').strip("'") | |
| # Debug: print what AI returned | |
| if column_name not in column_names and column_name != "NONE": | |
| print(f" ⚠️ AI returned '{column_name}' but it's not in column list") | |
| print(f" Available: {', '.join(column_names[:10])}...") | |
| # Validate it's actually in our column list | |
| if column_name != "NONE" and column_name in column_names: | |
| return column_name | |
| return None | |
| except Exception as e: | |
| print(f" ⚠️ Error mapping semantic type '{semantic_type}': {e}") | |
| return None | |
| def create_outlier_visualizations(self, outliers: List[Dict]) -> List[Dict]: | |
| """ | |
| Create visualization configs from demo outliers | |
| Args: | |
| outliers: List of outlier dictionaries with either: | |
| OLD FORMAT: | |
| - title, insight, show_me_query, impact, talking_point | |
| NEW FORMAT (semantic): | |
| - title, insight, viz_type, viz_measure_types, viz_dimension_types, | |
| show_me_query, kpi_metric, impact, talking_point | |
| Returns: | |
| List of visualization config dictionaries | |
| """ | |
| viz_configs = [] | |
| viz_counter = 1 | |
| print(f"\n🎨 Generating outlier-driven visualizations...") | |
| for i, outlier in enumerate(outliers): | |
| print(f" {i+1}. {outlier['title']}...") | |
| try: | |
| # Check if this uses NEW semantic format | |
| if 'viz_measure_types' in outlier or 'viz_dimension_types' in outlier: | |
| # NEW FORMAT: Use semantic mapping | |
| print(f" Using semantic mapping...") | |
| # Map semantic types to actual columns | |
| measure_types = outlier.get('viz_measure_types', []) | |
| dimension_types = outlier.get('viz_dimension_types', []) | |
| if isinstance(measure_types, str): | |
| measure_types = [m.strip() for m in measure_types.split(',')] | |
| if isinstance(dimension_types, str): | |
| dimension_types = [d.strip() for d in dimension_types.split(',')] | |
| measure_cols = [] | |
| for semantic in measure_types: | |
| col = self.map_semantic_to_column(semantic) | |
| if col: | |
| measure_cols.append(col) | |
| print(f" {semantic} → {col}") | |
| dimension_cols = [] | |
| for semantic in dimension_types: | |
| col = self.map_semantic_to_column(semantic) | |
| if col: | |
| dimension_cols.append(col) | |
| print(f" {semantic} → {col}") | |
| # Build search query from mapped columns | |
| # Format: sum [measure] by [dimension] by [dimension2] ... | |
| # CRITICAL: Must use explicit aggregation function (sum, avg, etc.) for proper aggregation! | |
| query_parts = [] | |
| chart_type = outlier.get('viz_type', 'COLUMN') | |
| # Add measures with explicit aggregation function | |
| for measure in measure_cols: | |
| # Use 'sum' as default aggregation for all measures | |
| # KPIs also need explicit aggregation when there are multiple rows | |
| query_parts.append(f"sum [{measure}]") | |
| # Add dimensions with "by" keyword for aggregation (except for KPI without dimensions) | |
| if dimension_cols and chart_type != 'KPI': | |
| query_parts.append("by") | |
| for dim in dimension_cols: | |
| query_parts.append(f"[{dim}]") | |
| # Add top N filter for non-geographic dimensions to avoid overwhelming charts | |
| is_geo = any(geo_term in dim.lower() for dim in dimension_cols | |
| for geo_term in ['region', 'country', 'state', 'city', 'location']) | |
| # For STACKED_COLUMN, limit to top 5-7 for readability | |
| if chart_type == 'STACKED_COLUMN': | |
| query_parts.append("top 5") | |
| elif not is_geo and chart_type in ['COLUMN', 'BAR', 'LINE']: | |
| query_parts.append("top 10") | |
| # Extract filters from show_me query if present | |
| show_me = outlier.get('show_me_query', '') | |
| if '>' in show_me or '<' in show_me or '=' in show_me: | |
| # Try to extract filter condition | |
| for measure in measure_cols: | |
| if measure.lower() in show_me.lower(): | |
| if '>' in show_me: | |
| threshold = show_me.split('>')[1].split()[0].strip() | |
| query_parts.append(f"[{measure}] > {threshold}") | |
| elif '<' in show_me: | |
| threshold = show_me.split('<')[1].split()[0].strip() | |
| query_parts.append(f"[{measure}] < {threshold}") | |
| search_query = ' '.join(query_parts) | |
| # Auto-select GEO_AREA for geographic dimensions | |
| is_geo = any(geo_term in dim.lower() for dim in dimension_cols | |
| for geo_term in ['region', 'country', 'state', 'city', 'location']) | |
| if is_geo and chart_type in ['COLUMN', 'BAR']: | |
| chart_type = 'GEO_AREA' | |
| print(f" 🗺️ Auto-selected GEO_AREA chart type for geographic dimension") | |
| print(f" Search: {search_query}") | |
| print(f" Chart: {chart_type}") | |
| # Create main visualization | |
| viz_configs.append({ | |
| 'id': f'Viz_{viz_counter}', | |
| 'name': outlier['title'], | |
| 'description': outlier['insight'], | |
| 'chart_type': chart_type, | |
| 'search_query_direct': search_query, | |
| 'outlier_context': outlier | |
| }) | |
| viz_counter += 1 | |
| # Optionally create companion KPI | |
| if outlier.get('kpi_companion') and measure_cols: | |
| kpi_query = f"[{measure_cols[0]}]" | |
| if query_parts and ('>' in query_parts[-1] or '<' in query_parts[-1]): | |
| kpi_query += ' ' + query_parts[-1].split('[')[0] # Add filter | |
| viz_configs.append({ | |
| 'id': f'Viz_{viz_counter}', | |
| 'name': f"{outlier['title']} - Total", | |
| 'description': outlier.get('kpi_metric', 'Summary metric'), | |
| 'chart_type': 'KPI', | |
| 'search_query_direct': kpi_query, | |
| 'outlier_context': outlier | |
| }) | |
| viz_counter += 1 | |
| print(f" + KPI: {kpi_query}") | |
| else: | |
| # OLD FORMAT: Use natural language translation | |
| print(f" Using NL translation...") | |
| search_query = self.query_translator.convert_natural_to_search( | |
| outlier['show_me_query'], | |
| self.model_columns | |
| ) | |
| print(f" Search: {search_query}") | |
| # Validate all [Column] tokens exist in the model — skip if any are missing | |
| import re as _re | |
| col_names_lower = {col.get('name', '').lower() for col in self.model_columns if col.get('name')} | |
| # Extract tokens (strip granularity suffix like .weekly) | |
| unknown_tokens = [ | |
| t for t in _re.findall(r'\[([^\]]+)\]', search_query) | |
| if t.split('.')[0].strip().lower() not in col_names_lower | |
| ] | |
| if unknown_tokens: | |
| print(f" ⚠️ Skipping '{outlier['title']}' — columns not in model: {unknown_tokens}") | |
| continue | |
| chart_type = self.query_translator.infer_chart_type( | |
| search_query, | |
| self.model_columns, | |
| outlier_context=outlier | |
| ) | |
| print(f" Chart: {chart_type}") | |
| viz_configs.append({ | |
| 'id': f'Viz_{viz_counter}', | |
| 'name': outlier['title'], | |
| 'description': outlier['insight'], | |
| 'chart_type': chart_type, | |
| 'search_query_direct': search_query, | |
| 'outlier_context': outlier | |
| }) | |
| viz_counter += 1 | |
| except Exception as e: | |
| print(f" ⚠️ Skipping '{outlier.get('title', 'unknown')}' due to error: {e}") | |
| # Don't add a broken viz — skip and continue | |
| print(f" Generated {len(viz_configs)} visualizations total") | |
| return viz_configs | |
| def _classify_visualizations_for_groups(self, viz_configs: List[Dict]) -> Dict[str, List[str]]: | |
| """ | |
| Classify visualizations into logical groups based on chart type and content. | |
| Groups: | |
| - KPI Overview: All KPI charts (for sparklines, big numbers) | |
| - Trends: Line, Area charts (time-based) | |
| - Comparisons: Bar, Column, Stacked charts | |
| - Details: Tables, Pivot tables | |
| - Geo: Maps | |
| Returns: | |
| Dict mapping group names to lists of viz IDs | |
| """ | |
| groups = { | |
| 'KPI Overview': [], | |
| 'Trends': [], | |
| 'Comparisons': [], | |
| 'Details': [], | |
| 'Geo': [] | |
| } | |
| for config in viz_configs: | |
| viz_id = config.get('id', '') | |
| chart_type = config.get('chart_type', 'COLUMN') | |
| if chart_type == 'KPI': | |
| groups['KPI Overview'].append(viz_id) | |
| elif chart_type in ['LINE', 'AREA']: | |
| groups['Trends'].append(viz_id) | |
| elif chart_type in ['GEO_AREA', 'GEO_BUBBLE', 'GEO_HEATMAP']: | |
| groups['Geo'].append(viz_id) | |
| elif chart_type in ['TABLE', 'PIVOT_TABLE']: | |
| groups['Details'].append(viz_id) | |
| else: | |
| # Bar, Column, Stacked, Pie, etc. | |
| groups['Comparisons'].append(viz_id) | |
| # Remove empty groups | |
| return {k: v for k, v in groups.items() if v} | |
| def _create_groups_structure(self, viz_configs: List[Dict]) -> List[Dict]: | |
| """ | |
| Create Groups (containers) for visualizations - like tabs within a tab. | |
| Golden demo structure: | |
| groups: | |
| - id: Group_1 | |
| name: Sales Overview | |
| visualizations: | |
| - Viz_3 | |
| - Viz_10 | |
| group_guid: null | |
| Returns: | |
| List of group TML structures | |
| """ | |
| classified = self._classify_visualizations_for_groups(viz_configs) | |
| groups = [] | |
| # Create a group for KPIs if we have 2+ KPIs | |
| kpis = classified.get('KPI Overview', []) | |
| if len(kpis) >= 2: | |
| groups.append({ | |
| 'id': 'Group_1', | |
| 'name': '📊 KPI Overview', | |
| 'visualizations': kpis, | |
| 'group_guid': None | |
| }) | |
| # Create a group for trends if we have 2+ trend charts | |
| trends = classified.get('Trends', []) | |
| if len(trends) >= 2: | |
| groups.append({ | |
| 'id': f'Group_{len(groups) + 1}', | |
| 'name': '📈 Trends', | |
| 'visualizations': trends, | |
| 'group_guid': None | |
| }) | |
| return groups | |
| def _create_group_layout(self, group_id: str, viz_ids: List[str], viz_configs: List[Dict]) -> Dict: | |
| """ | |
| Create layout for tiles inside a group. | |
| Groups have their own internal 12-column grid. | |
| """ | |
| tiles = [] | |
| current_y = 0 | |
| current_x = 0 | |
| for viz_id in viz_ids: | |
| # Find the config for this viz | |
| config = next((c for c in viz_configs if c.get('id') == viz_id), {}) | |
| chart_type = config.get('chart_type', 'COLUMN') | |
| # KPIs in groups are typically 6 cols wide, 5 tall | |
| if chart_type == 'KPI': | |
| width = 6 | |
| height = 5 | |
| else: | |
| width = 6 | |
| height = 5 | |
| # Check if fits in row | |
| if current_x + width > 12: | |
| current_y += height | |
| current_x = 0 | |
| tiles.append({ | |
| 'visualization_id': viz_id, | |
| 'x': current_x, | |
| 'y': current_y, | |
| 'height': height, | |
| 'width': width | |
| }) | |
| current_x += width | |
| return { | |
| 'id': group_id, | |
| 'tiles': tiles | |
| } | |
| def _create_tabbed_layout(self, visualizations: List[Dict], viz_configs: List[Dict], groups: List[Dict]) -> Dict: | |
| """ | |
| Create a tabbed layout structure like the golden demo. | |
| Structure: | |
| layout: | |
| tabs: | |
| - name: 📈 Sales | |
| tiles: [...] | |
| group_layouts: [...] | |
| - name: 💵 Finance | |
| tiles: [...] | |
| """ | |
| import uuid | |
| # Get viz IDs that are in groups | |
| grouped_viz_ids = set() | |
| for group in groups: | |
| grouped_viz_ids.update(group.get('visualizations', [])) | |
| # Main tab tiles - groups and ungrouped visualizations | |
| main_tiles = [] | |
| group_layouts = [] | |
| current_y = 0 | |
| current_x = 0 | |
| # Add groups as tiles first (they take priority at top) | |
| for group in groups: | |
| group_id = group['id'] | |
| group_viz_ids = group.get('visualizations', []) | |
| # Group tiles are typically wide | |
| width = 8 if len(group_viz_ids) > 2 else 6 | |
| height = 6 | |
| if current_x + width > 12: | |
| current_y += height | |
| current_x = 0 | |
| main_tiles.append({ | |
| 'visualization_id': group_id, | |
| 'x': current_x, | |
| 'y': current_y, | |
| 'height': height, | |
| 'width': width | |
| }) | |
| current_x += width | |
| if current_x >= 12: | |
| current_y += height | |
| current_x = 0 | |
| # Create internal layout for this group | |
| group_layouts.append( | |
| self._create_group_layout(group_id, group_viz_ids, viz_configs) | |
| ) | |
| # Move to next row for ungrouped vizzes | |
| if current_x > 0: | |
| current_y += 6 | |
| current_x = 0 | |
| # Add ungrouped visualizations | |
| for config in viz_configs: | |
| viz_id = config.get('id', '') | |
| if viz_id in grouped_viz_ids: | |
| continue # Skip grouped viz | |
| chart_type = config.get('chart_type', 'COLUMN') | |
| # Determine size | |
| if chart_type == 'KPI': | |
| width = 3 | |
| height = 3 | |
| elif chart_type in ['GEO_AREA', 'GEO_BUBBLE']: | |
| width = 12 | |
| height = 7 | |
| elif chart_type in ['TABLE', 'PIVOT_TABLE']: | |
| width = 8 | |
| height = 5 | |
| else: | |
| width = 6 | |
| height = 5 | |
| if current_x + width > 12: | |
| current_y += height | |
| current_x = 0 | |
| main_tiles.append({ | |
| 'visualization_id': viz_id, | |
| 'x': current_x, | |
| 'y': current_y, | |
| 'height': height, | |
| 'width': width | |
| }) | |
| current_x += width | |
| # Single tab for now (can expand later) | |
| layout = { | |
| 'tabs': [{ | |
| 'name': '📊 Overview', | |
| 'description': '', | |
| 'tiles': main_tiles, | |
| 'id': str(uuid.uuid4()), | |
| 'group_layouts': group_layouts if group_layouts else None | |
| }] | |
| } | |
| # Remove None values | |
| if not layout['tabs'][0].get('group_layouts'): | |
| del layout['tabs'][0]['group_layouts'] | |
| return layout | |
| def _create_style_properties(self, groups: List[Dict], viz_configs: List[Dict]) -> Dict: | |
| """ | |
| Create style properties for brand colors and appearance. | |
| From golden demo: | |
| style: | |
| style_properties: | |
| - name: lb_border_type | |
| value: CURVED | |
| - name: lb_brand_color | |
| value: LBC_C | |
| overrides: | |
| - object_id: Group_1 | |
| style_properties: | |
| - name: group_brand_color | |
| value: GBC_B | |
| """ | |
| style = { | |
| 'style_properties': [ | |
| {'name': 'lb_border_type', 'value': 'CURVED'}, | |
| {'name': 'lb_brand_color', 'value': 'LBC_C'}, | |
| {'name': 'hide_group_title', 'value': 'false'}, | |
| {'name': 'hide_group_description', 'value': 'true'}, | |
| {'name': 'hide_tile_description', 'value': 'false'} | |
| ], | |
| 'overrides': [] | |
| } | |
| # Brand colors for groups (GBC_A through GBC_J) | |
| group_colors = ['GBC_B', 'GBC_C', 'GBC_D', 'GBC_E', 'GBC_F'] | |
| for i, group in enumerate(groups): | |
| style['overrides'].append({ | |
| 'object_id': group['id'], | |
| 'style_properties': [ | |
| {'name': 'group_brand_color', 'value': group_colors[i % len(group_colors)]}, | |
| {'name': 'hide_group_description', 'value': 'true'} | |
| ] | |
| }) | |
| # KPI tiles get special colors (TBC_I is cyan) | |
| for config in viz_configs: | |
| if config.get('chart_type') == 'KPI': | |
| style['overrides'].append({ | |
| 'object_id': config.get('id'), | |
| 'style_properties': [ | |
| {'name': 'tile_brand_color', 'value': 'TBC_I'} | |
| ] | |
| }) | |
| return style | |
| def _create_smart_layout(self, visualizations: List[Dict], viz_configs: List[Dict]) -> List[Dict]: | |
| """ | |
| Create intelligent layout based on visualization types | |
| Uses a 12-column grid system with variable heights based on chart type: | |
| - KPIs: 2-3 units height, can be narrow (3-4 cols) | |
| - Maps: Full width (12 cols), 5-8 units height | |
| - Scatter/Bubble: 6 cols width, 6-8 units height | |
| - Tables/Pivots: 6-12 cols, 4-6 units height | |
| - Standard charts: 6 cols, 5 units height | |
| """ | |
| tiles = [] | |
| current_y = 0 | |
| current_row_x = 0 | |
| max_row_height = 0 | |
| for i, (viz, config) in enumerate(zip(visualizations, viz_configs)): | |
| chart_type = config.get('chart_type', 'COLUMN') | |
| # Determine size based on chart type | |
| if chart_type == 'TEXT': | |
| # Text tiles are small - like in reference | |
| width = 2 | |
| height = 2 if 'Global' in config.get('name', '') else 4 | |
| elif chart_type == 'KPI': | |
| # KPIs are compact - can fit 3-4 across | |
| width = 3 | |
| height = 3 | |
| elif chart_type in ['GEO_AREA', 'GEO_BUBBLE']: | |
| # Maps need full width | |
| width = 12 | |
| height = 7 | |
| elif chart_type in ['SCATTER', 'BUBBLE']: | |
| # Scatter/bubble need more space for detail | |
| width = 6 | |
| height = 7 | |
| elif chart_type in ['PIVOT_TABLE', 'TABLE']: | |
| # Tables can be wider | |
| width = 8 | |
| height = 5 | |
| elif chart_type == 'SANKEY': | |
| # Sankey needs width for flow | |
| width = 6 | |
| height = 8 | |
| elif chart_type in ['STACKED_COLUMN', 'TREEMAP']: | |
| # Complex charts need more height | |
| width = 6 | |
| height = 6 | |
| else: | |
| # Default for standard charts | |
| width = 6 | |
| height = 5 | |
| # Check if this fits in the current row | |
| if current_row_x + width > 12: | |
| # Move to next row | |
| current_y += max_row_height | |
| current_row_x = 0 | |
| max_row_height = 0 | |
| tiles.append({ | |
| 'visualization_id': viz['id'], | |
| 'x': current_row_x, | |
| 'y': current_y, | |
| 'height': height, | |
| 'width': width | |
| }) | |
| current_row_x += width | |
| max_row_height = max(max_row_height, height) | |
| # Add layout configuration to the liveboard | |
| layout = { | |
| 'tabs': [{ | |
| 'name': '📊 Dashboard', | |
| 'tiles': tiles | |
| }] | |
| } | |
| return tiles | |
| def _build_axis_configs(self, x_axis_cols: List[str], y_axis_cols: List[str], chart_type: str) -> List[Dict]: | |
| """Build axis configuration based on chart type""" | |
| if chart_type == 'KPI': | |
| # KPIs need both x (time) and y (measure) for sparklines | |
| if x_axis_cols and y_axis_cols: | |
| return [{'x': x_axis_cols, 'y': y_axis_cols}] | |
| elif y_axis_cols: | |
| return [{'y': y_axis_cols}] | |
| else: | |
| return [] | |
| elif chart_type == 'BUBBLE' and len(y_axis_cols) >= 2: | |
| # Bubble charts need x, y, and optionally size | |
| config = {'x': [y_axis_cols[0]], 'y': [y_axis_cols[1]]} | |
| if len(y_axis_cols) > 2: | |
| config['size'] = y_axis_cols[2] | |
| if x_axis_cols: | |
| config['category'] = x_axis_cols | |
| return [config] | |
| elif chart_type == 'SANKEY' and x_axis_cols: | |
| # Sankey needs hierarchical x-axis configuration | |
| return [{'x': x_axis_cols[:3], 'y': y_axis_cols}] | |
| elif x_axis_cols and y_axis_cols: | |
| # Charts with both dimensions and measures | |
| return [{'x': x_axis_cols, 'y': y_axis_cols}] | |
| elif y_axis_cols: | |
| # Only measures (no dimensions) | |
| return [{'y': y_axis_cols}] | |
| else: | |
| return [] | |
| def _build_client_state_v2(self, chart_type: str, viz_config: Dict) -> str: | |
| """Build client_state_v2 with advanced chart properties and colors""" | |
| import json | |
| base_state = { | |
| "version": "V4DOT2", | |
| "chartProperties": {} | |
| } | |
| # Define vibrant color palette (like reference dashboard) | |
| COLOR_PALETTE = { | |
| 'purple': '#9c6ade', | |
| 'teal': '#40c1c0', | |
| 'yellow': '#fed109', | |
| 'blue': '#009eec', | |
| 'pink': '#85016b', | |
| 'green': '#06BF7F', | |
| 'orange': '#FCC838', | |
| 'light_blue': '#48D1E0', | |
| 'violet': '#8C62F5', | |
| 'coral': '#FF8142' | |
| } | |
| # Add KPI-specific settings for sparklines, comparisons, anomalies | |
| if chart_type == 'KPI': | |
| base_state["chartProperties"]["responsiveLayoutPreference"] = "AUTO_ON" | |
| base_state["chartProperties"]["chartSpecific"] = { | |
| "customProps": json.dumps({ | |
| "showLabel": True, | |
| "showComparison": True, | |
| "showSparkline": True, | |
| "showAnomalies": viz_config.get('show_anomalies', False), | |
| "showBounds": viz_config.get('show_bounds', True), | |
| "customCompare": "PREV_AVAILABLE", | |
| "showOnlyLatestAnomaly": False | |
| }) | |
| } | |
| # Add specific colors for KPIs | |
| if 'Monthly' in viz_config.get('name', ''): | |
| base_state["seriesColors"] = [ | |
| {"serieName": "Total Salesamount", "color": COLOR_PALETTE['teal']}, | |
| {"serieName": "Total sales", "color": COLOR_PALETTE['teal']} | |
| ] | |
| base_state["customColorSelectorArray"] = [COLOR_PALETTE['green'], COLOR_PALETTE['coral'], COLOR_PALETTE['coral'], COLOR_PALETTE['teal']] | |
| elif 'Weekly' in viz_config.get('name', ''): | |
| base_state["seriesColors"] = [ | |
| {"serieName": "Total Salesamount", "color": COLOR_PALETTE['purple']}, | |
| {"serieName": "Total sales", "color": COLOR_PALETTE['purple']} | |
| ] | |
| base_state["customColorSelectorArray"] = [COLOR_PALETTE['green'], COLOR_PALETTE['coral'], COLOR_PALETTE['coral'], COLOR_PALETTE['purple']] | |
| else: | |
| # Default KPI colors | |
| base_state["seriesColors"] = [ | |
| {"serieName": "Total Salesamount", "color": COLOR_PALETTE['blue']}, | |
| {"serieName": "Total sales", "color": COLOR_PALETTE['blue']} | |
| ] | |
| # Add scatter/bubble color configurations | |
| elif chart_type in ['SCATTER', 'BUBBLE']: | |
| base_state["chartProperties"]["gridLines"] = { | |
| "xGridlineEnabled": True, | |
| "yGridlineEnabled": False | |
| } | |
| base_state["chartProperties"]["responsiveLayoutPreference"] = "AUTO_ON" | |
| # Add stacked column settings with vibrant colors | |
| elif chart_type == 'STACKED_COLUMN': | |
| base_state["chartProperties"]["showStackedLabels"] = True | |
| base_state["chartProperties"]["allLabels"] = True | |
| # Add diverse colors for regions and products | |
| base_state["seriesColors"] = [ | |
| {"serieName": "East", "color": COLOR_PALETTE['pink']}, | |
| {"serieName": "West", "color": COLOR_PALETTE['purple']}, | |
| {"serieName": "Midwest", "color": COLOR_PALETTE['teal']}, | |
| {"serieName": "South", "color": COLOR_PALETTE['yellow']}, | |
| {"serieName": "Southwest", "color": COLOR_PALETTE['blue']}, | |
| {"serieName": "North", "color": COLOR_PALETTE['green']}, | |
| {"serieName": "Northeast", "color": COLOR_PALETTE['orange']}, | |
| {"serieName": "Northwest", "color": COLOR_PALETTE['coral']}, | |
| {"serieName": "Southeast", "color": COLOR_PALETTE['violet']}, | |
| {"serieName": "Central", "color": COLOR_PALETTE['light_blue']} | |
| ] | |
| base_state["systemSeriesColors"] = base_state["seriesColors"] | |
| # Add geo map settings with gradient colors | |
| elif chart_type == 'GEO_AREA': | |
| base_state["chartProperties"]["mapviewport"] = { | |
| "center": [-1.076e7, 5.192e6], | |
| "zoomLevel": 3.88 | |
| } | |
| # Add heat map gradient colors | |
| base_state["systemMultiColorSeriesColors"] = [{ | |
| "serieName": "Total sales", | |
| "colorMap": [{ | |
| "serieName": "state", | |
| "color": ["#ffffb2", "#fddd87", "#fba35d", "#f75534", "#f9140a", "#d70315", "#b10026"] | |
| }] | |
| }] | |
| # Add column chart colors | |
| elif chart_type == 'COLUMN': | |
| base_state["seriesColors"] = [ | |
| {"serieName": "Total sales", "color": COLOR_PALETTE['blue']}, | |
| {"serieName": "Total Salesamount", "color": COLOR_PALETTE['green']} | |
| ] | |
| # Add custom colors if specified in config | |
| if 'colors' in viz_config: | |
| base_state["seriesColors"] = viz_config['colors'] | |
| return json.dumps(base_state) | |
| def _create_text_tile(self, viz_config: Dict) -> Dict: | |
| """ | |
| Create a TEXT tile visualization (colored text box with markdown content) | |
| Args: | |
| viz_config: Dictionary with: | |
| - id: Viz ID | |
| - name: Title | |
| - text_content: Markdown text to display | |
| - background_color: Hex color (e.g., '#85016b') | |
| Returns: | |
| Text tile TML dictionary | |
| """ | |
| text_content = viz_config.get('text_content', viz_config.get('name', '')) | |
| bg_color = viz_config.get('background_color', '#2E3D4D') # Default dark background | |
| # TEXT tiles in ThoughtSpot need tables field even though they don't query data | |
| text_tml = { | |
| 'id': viz_config['id'], | |
| 'answer': { | |
| 'name': viz_config.get('name', 'Text'), | |
| 'description': viz_config.get('description', ''), | |
| 'tables': [{ | |
| 'id': self.model_name, # Use model NAME for TML (not GUID!) | |
| 'name': self.model_name | |
| }], | |
| 'text_tile': { | |
| 'text': text_content, | |
| 'background_color': bg_color | |
| } | |
| }, | |
| 'viz_guid': None | |
| } | |
| return text_tml | |
| def create_visualization_tml(self, viz_config: Dict) -> Dict: | |
| """ | |
| Create single visualization TML structure. | |
| Uses MINIMAL TML - only specifies essentials and lets ThoughtSpot figure out | |
| column names, axis configs, etc. This is more reliable than trying to predict | |
| what column names ThoughtSpot will generate (e.g., "Total sales" vs "sales"). | |
| Args: | |
| viz_config: Dictionary with visualization specifications | |
| - id: Viz_1, Viz_2, etc. | |
| - name: Display name | |
| - chart_type: LINE, COLUMN, KPI, BAR, STACKED_COLUMN, TEXT, etc. | |
| - search_query_direct: Direct search query (optional, takes precedence) | |
| - text_content: For TEXT charts, the markdown content to display | |
| - background_color: For TEXT charts, hex color for background | |
| - measure, time_column, dimensions, filters (for search query generation) | |
| - display_mode: CHART_MODE or TABLE_MODE (default: CHART_MODE) | |
| Returns: | |
| Visualization TML dictionary | |
| """ | |
| # Handle TEXT charts specially - they don't use search queries | |
| chart_type = viz_config.get('chart_type', 'COLUMN') | |
| if chart_type == 'TEXT': | |
| return self._create_text_tile(viz_config) | |
| # Use direct search query if provided, otherwise generate from config | |
| if 'search_query_direct' in viz_config: | |
| search_query = viz_config['search_query_direct'] | |
| else: | |
| search_query = self.generate_search_query(viz_config) | |
| # Get chart type for ThoughtSpot | |
| chart_type = viz_config.get('chart_type', 'COLUMN') | |
| ts_chart_type = CHART_TYPES.get(chart_type, {}).get('ts_type', chart_type) | |
| # Create MINIMAL visualization TML - let ThoughtSpot fill in everything | |
| # This is more reliable than trying to predict exact column names or chart configs | |
| viz_tml = { | |
| 'id': viz_config['id'], | |
| 'answer': { | |
| 'name': viz_config['name'], | |
| 'description': viz_config.get('description', ''), | |
| 'tables': [{ | |
| 'id': self.model_name, # Use model NAME for TML (not GUID!) | |
| 'name': self.model_name | |
| }], | |
| 'search_query': search_query | |
| } | |
| # NOTE: Do NOT include viz_guid for new visualizations | |
| # NOTE: Do NOT include answer_columns, table_columns - let ThoughtSpot derive them | |
| } | |
| # Add chart type hint for non-KPI charts | |
| # KPIs need special post-processing, so we skip them here and handle them in deploy_liveboard() | |
| if ts_chart_type != 'KPI': | |
| viz_tml['answer']['chart'] = {'type': ts_chart_type} | |
| else: | |
| # Mark this viz as needing KPI conversion (stored in config, used later) | |
| viz_config['_needs_kpi_conversion'] = True | |
| return viz_tml | |
| def generate_visualizations_from_research( | |
| self, | |
| company_data: Dict, | |
| use_case: str, | |
| num_visualizations: int = 6 | |
| ) -> List[Dict]: | |
| """ | |
| Generate visualization configs based on company research and use case | |
| Uses AI to determine what visualizations would be valuable | |
| Args: | |
| company_data: Dictionary with company research data (name, description, etc.) | |
| use_case: Use case name (Merchandising, Sales AI Analyst, etc.) | |
| num_visualizations: Number of visualizations to generate (default: 6) | |
| Returns: | |
| List of visualization config dictionaries | |
| """ | |
| # Extract available columns from model (supports both metadata and TML-export formats) | |
| def _col_name(col: Dict) -> str: | |
| return col.get('header', {}).get('name', col.get('name', '')) | |
| def _is_measure(col: Dict) -> bool: | |
| col_type = str(col.get('type', '')).upper() | |
| ts_type = str(col.get('ts_type', '')).upper() | |
| return ( | |
| col_type in ['MEASURE', 'INT_MEASURE', 'FLOAT_MEASURE', 'NUMBER', 'INT64', 'INT32', 'DOUBLE', 'FLOAT', 'DECIMAL', 'INTEGER', 'BIGINT'] | |
| or ts_type == 'MEASURE' | |
| ) | |
| def _is_date(col: Dict) -> bool: | |
| col_type = str(col.get('type', '')).upper() | |
| ts_type = str(col.get('ts_type', '')).upper() | |
| name_lower = _col_name(col).lower() | |
| if col_type in ['DATE', 'DATE_TIME', 'TIMESTAMP', 'TIME'] or ts_type in ['DATE', 'DATE_TIME', 'TIMESTAMP', 'TIME']: | |
| return True | |
| # TML export doesn't include native date types; infer from attribute names. | |
| if ts_type != 'MEASURE' and any(tok in name_lower for tok in ['date', 'timestamp', 'time']) and not name_lower.endswith(' key'): | |
| return True | |
| return False | |
| available_measures = [_col_name(col) for col in self.model_columns if _is_measure(col)] | |
| available_dimensions = [_col_name(col) for col in self.model_columns if not _is_measure(col)] | |
| date_columns = [_col_name(col) for col in self.model_columns if _is_date(col)] | |
| # Find numeric columns that LOOK like dates but are NOT (to warn the AI) | |
| fake_date_columns = [ | |
| _col_name(col) | |
| for col in self.model_columns | |
| if _is_measure(col) and any(d in _col_name(col).lower() for d in ['month', 'year', 'quarter', 'week', 'day', 'date']) | |
| ] | |
| prompt = f"""Based on this company: {company_data.get('name', 'Unknown Company')} | |
| Use case: {use_case} | |
| Available measures (numeric columns): {available_measures} | |
| Available dimensions (text/date columns): {available_dimensions} | |
| **ACTUAL DATE columns** (USE THESE for time_column and granularity): {date_columns} | |
| **DO NOT USE these for granularity** (they are integers, not dates): {fake_date_columns} | |
| Generate {num_visualizations} visualization configurations for a compelling demo dashboard. | |
| For each visualization, create a JSON object with: | |
| - name: Clear, business-friendly title (e.g., "Revenue Trend Last 12 Months") | |
| - description: Optional 1-sentence description | |
| - chart_type: Choose from: KPI, LINE, COLUMN, BAR, STACKED_COLUMN, AREA, PIE | |
| - measure: Which measure to visualize (from available measures) | |
| - time_column: MUST be an actual DATE column from the list above (NOT month/year/quarter integers!) | |
| - granularity: If time_column used, specify: daily, weekly, monthly, quarterly, yearly | |
| - dimensions: Array of dimensions to group by (from available dimensions, can be empty) | |
| - filters: Array of filter expressions like "[date].'last quarter'", "[date].'last 12 months'", "[date].'this year'" (optional) | |
| - top_n: If showing top N items, specify number (e.g., 5, 10) (optional) | |
| Guidelines: | |
| - Mix chart types (don't use all the same type) | |
| - Include at least 1-2 KPI charts for key metrics | |
| * IMPORTANT: For KPIs, ALWAYS include time_column and granularity (monthly/quarterly/yearly) | |
| * CRITICAL: time_column MUST be an actual DATE type column like 'full_date', NOT 'month' or 'year'! | |
| * This enables sparklines and percent change comparisons (MoM/QoQ/YoY) | |
| * Example KPI: measure="total_sales", time_column="full_date", granularity="monthly" | |
| - Include trend analysis with LINE or AREA charts | |
| - Include comparisons with COLUMN or BAR charts | |
| - Use appropriate time filters for business context | |
| - Make visualizations relevant to the use case | |
| Return ONLY a valid JSON object with structure: | |
| {{ | |
| "visualizations": [ | |
| {{ | |
| "name": "...", | |
| "chart_type": "...", | |
| "measure": "...", | |
| ... | |
| }} | |
| ] | |
| }}""" | |
| try: | |
| messages = [{"role": "user", "content": prompt}] | |
| response_text = self.llm_researcher.make_request(messages, temperature=0.7, max_tokens=4000, stream=False) | |
| # Debug: Check what we got back | |
| print(f"🔍 DEBUG: AI response type: {type(response_text)}") | |
| print(f"🔍 DEBUG: AI response length: {len(response_text) if response_text else 0}") | |
| if response_text: | |
| print(f"🔍 DEBUG: AI response first 200 chars: {response_text[:200]}") | |
| else: | |
| print(f"❌ ERROR: AI returned empty response!") | |
| return self._generate_fallback_visualizations(available_measures, date_columns) | |
| # Strip markdown code fences if present | |
| response_text = response_text.strip() | |
| if response_text.startswith('```'): | |
| # Remove opening fence (```json or ```) | |
| lines = response_text.split('\n') | |
| response_text = '\n'.join(lines[1:]) | |
| # Remove closing fence | |
| if response_text.endswith('```'): | |
| response_text = response_text[:-3].strip() | |
| result = json.loads(response_text) | |
| viz_results = result.get('visualizations', []) | |
| # Strictly sanitize to actual model columns to prevent invalid TML imports. | |
| viz_results = self._sanitize_visualization_configs( | |
| viz_results, | |
| available_measures, | |
| available_dimensions, | |
| date_columns | |
| ) | |
| if not viz_results: | |
| print(" ⚠️ AI returned unusable visualizations, using fallback set") | |
| return self._generate_fallback_visualizations(available_measures, date_columns) | |
| # Enforce variety: at least 2 KPIs, no duplicate chart types | |
| dimensions = [col.get('header', {}).get('name', col.get('name', '')) for col in self.model_columns if col.get('type') in ['ATTRIBUTE', 'STRING']] | |
| viz_results = self._enforce_visualization_variety(viz_results, available_measures, date_columns, dimensions) | |
| return self._sanitize_visualization_configs( | |
| viz_results, | |
| available_measures, | |
| available_dimensions, | |
| date_columns | |
| ) | |
| except Exception as e: | |
| print(f"Error generating visualizations: {e}") | |
| print(f" Response text was: {response_text[:500] if response_text else 'None'}") | |
| # Return fallback simple visualizations | |
| return self._generate_fallback_visualizations(available_measures, date_columns) | |
| def _normalize_col_name(name: str) -> str: | |
| """Normalize column names for fuzzy matching.""" | |
| if not name: | |
| return "" | |
| return re.sub(r'[^a-z0-9]', '', name.lower()) | |
| def _match_column_name(self, requested: str, allowed: List[str]) -> Optional[str]: | |
| """Map AI-provided column names to exact model column names.""" | |
| if not requested or not allowed: | |
| return None | |
| if requested in allowed: | |
| return requested | |
| req_norm = self._normalize_col_name(requested) | |
| if not req_norm: | |
| return None | |
| for col in allowed: | |
| if self._normalize_col_name(col) == req_norm: | |
| return col | |
| for col in allowed: | |
| col_norm = self._normalize_col_name(col) | |
| if req_norm in col_norm or col_norm in req_norm: | |
| return col | |
| return None | |
| def _sanitize_visualization_configs( | |
| self, | |
| viz_configs: List[Dict], | |
| measures: List[str], | |
| dimensions: List[str], | |
| date_columns: List[str] | |
| ) -> List[Dict]: | |
| """Sanitize AI configs so all columns exist in the model.""" | |
| if not viz_configs or not measures: | |
| return [] | |
| valid_chart_types = set(CHART_TYPES.keys()) | |
| fallback_measure = measures[0] | |
| fallback_time = date_columns[0] if date_columns else None | |
| fallback_dimension = next((d for d in dimensions if d not in date_columns), dimensions[0] if dimensions else None) | |
| sanitized = [] | |
| for idx, raw in enumerate(viz_configs, 1): | |
| if not isinstance(raw, dict): | |
| continue | |
| cfg = dict(raw) | |
| cfg['id'] = cfg.get('id') or f'Viz_{idx}' | |
| chart_type = str(cfg.get('chart_type', 'COLUMN')).upper() | |
| if chart_type not in valid_chart_types: | |
| chart_type = 'COLUMN' | |
| cfg['chart_type'] = chart_type | |
| measure = self._match_column_name(cfg.get('measure'), measures) or fallback_measure | |
| if not measure: | |
| continue | |
| cfg['measure'] = measure | |
| dims = [] | |
| raw_dims = cfg.get('dimensions') or [] | |
| if isinstance(raw_dims, list): | |
| for dim in raw_dims: | |
| matched = self._match_column_name(dim, dimensions) | |
| if matched and matched not in dims and matched != measure: | |
| dims.append(matched) | |
| if chart_type in ['BAR', 'COLUMN', 'STACKED_COLUMN', 'PIE'] and not dims and fallback_dimension: | |
| dims = [fallback_dimension] | |
| if dims: | |
| cfg['dimensions'] = dims | |
| else: | |
| cfg.pop('dimensions', None) | |
| matched_time = self._match_column_name(cfg.get('time_column'), date_columns) if cfg.get('time_column') else None | |
| if chart_type in ['KPI', 'LINE', 'AREA'] and not matched_time: | |
| matched_time = fallback_time | |
| if matched_time: | |
| cfg['time_column'] = matched_time | |
| granularity = str(cfg.get('granularity', 'monthly')).lower() | |
| if granularity not in {'daily', 'weekly', 'monthly', 'quarterly', 'yearly'}: | |
| granularity = 'monthly' | |
| cfg['granularity'] = granularity | |
| else: | |
| cfg.pop('time_column', None) | |
| cfg.pop('granularity', None) | |
| top_n = cfg.get('top_n') | |
| if top_n is not None: | |
| try: | |
| cfg['top_n'] = max(1, int(top_n)) | |
| except Exception: | |
| cfg.pop('top_n', None) | |
| sanitized.append(cfg) | |
| return sanitized | |
| def _enforce_visualization_variety( | |
| self, | |
| viz_configs: List[Dict], | |
| measures: List[str], | |
| date_columns: List[str], | |
| dimensions: List[str] | |
| ) -> List[Dict]: | |
| """ | |
| Post-process AI-generated visualizations to ensure variety. | |
| Enforces: at least 2 KPIs, no more than 2 of same chart type. | |
| """ | |
| if not viz_configs: | |
| return viz_configs | |
| # Count chart types | |
| chart_type_counts = {} | |
| for viz in viz_configs: | |
| ct = viz.get('chart_type', 'LINE') | |
| chart_type_counts[ct] = chart_type_counts.get(ct, 0) + 1 | |
| kpi_count = chart_type_counts.get('KPI', 0) | |
| # If we have fewer than 2 KPIs and have measures/dates, add them | |
| if kpi_count < 2 and measures and date_columns: | |
| print(f" ⚠️ Only {kpi_count} KPIs found, adding to ensure 2 minimum") | |
| # Find vizs we can convert to KPI (line charts showing single measure) | |
| converted = 0 | |
| for viz in viz_configs: | |
| if converted >= (2 - kpi_count): | |
| break | |
| if viz.get('chart_type') in ['LINE', 'AREA'] and not viz.get('dimensions'): | |
| old_type = viz['chart_type'] | |
| viz['chart_type'] = 'KPI' | |
| if not viz.get('time_column') and date_columns: | |
| viz['time_column'] = date_columns[0] | |
| if not viz.get('granularity'): | |
| viz['granularity'] = 'monthly' | |
| print(f" Converted '{viz.get('name')}' from {old_type} to KPI") | |
| converted += 1 | |
| # If we still need more KPIs, add new ones | |
| kpi_count_now = sum(1 for v in viz_configs if v.get('chart_type') == 'KPI') | |
| while kpi_count_now < 2 and measures: | |
| measure_idx = kpi_count_now % len(measures) | |
| new_kpi = { | |
| 'id': f'Viz_KPI_{kpi_count_now + 1}', | |
| 'name': f'Total {measures[measure_idx]}', | |
| 'chart_type': 'KPI', | |
| 'measure': measures[measure_idx], | |
| 'time_column': date_columns[0] if date_columns else None, | |
| 'granularity': 'monthly' | |
| } | |
| viz_configs.insert(0, new_kpi) | |
| print(f" Added KPI: {new_kpi['name']}") | |
| kpi_count_now += 1 | |
| # Check for duplicate chart types (more than 2 of same non-KPI type) | |
| chart_type_counts = {} | |
| for viz in viz_configs: | |
| ct = viz.get('chart_type', 'LINE') | |
| chart_type_counts[ct] = chart_type_counts.get(ct, 0) + 1 | |
| for chart_type, count in chart_type_counts.items(): | |
| if chart_type != 'KPI' and count > 2: | |
| print(f" ⚠️ {count} {chart_type} charts found, diversifying...") | |
| converted = 0 | |
| alternative_types = ['BAR', 'COLUMN', 'STACKED_COLUMN', 'AREA'] | |
| alternative_types = [t for t in alternative_types if t != chart_type] | |
| for viz in viz_configs: | |
| if converted >= (count - 2): | |
| break | |
| if viz.get('chart_type') == chart_type: | |
| new_type = alternative_types[converted % len(alternative_types)] | |
| viz['chart_type'] = new_type | |
| print(f" Converted '{viz.get('name')}' from {chart_type} to {new_type}") | |
| converted += 1 | |
| return viz_configs | |
| def _generate_fallback_visualizations( | |
| self, | |
| measures: List[str], | |
| date_columns: List[str] | |
| ) -> List[Dict]: | |
| """Generate fallback visualizations if AI generation fails - now creates 6 visualizations""" | |
| fallback_viz = [] | |
| # Get available dimensions (non-date attributes) | |
| dimensions = [ | |
| col.get('header', {}).get('name', col.get('name', '')) | |
| for col in self.model_columns | |
| if col.get('type') in ['ATTRIBUTE', 'STRING'] | |
| ] | |
| if measures and date_columns: | |
| # 1. KPI of first measure (weekly for sparkline) | |
| fallback_viz.append({ | |
| 'id': 'Viz_1', | |
| 'name': f'Total {measures[0]}', | |
| 'chart_type': 'KPI', | |
| 'measure': measures[0], | |
| 'time_column': date_columns[0], | |
| 'granularity': 'weekly' | |
| }) | |
| # 2. Line trend of first measure | |
| fallback_viz.append({ | |
| 'id': 'Viz_2', | |
| 'name': f'{measures[0]} Trend - Last 12 Months', | |
| 'chart_type': 'LINE', | |
| 'measure': measures[0], | |
| 'time_column': date_columns[0], | |
| 'granularity': 'monthly', | |
| 'filters': [f"[{date_columns[0]}].'last 12 months'"] | |
| }) | |
| if len(measures) > 1: | |
| # 3. Second measure KPI (monthly for sparkline) | |
| fallback_viz.append({ | |
| 'id': 'Viz_3', | |
| 'name': f'Total {measures[1]}', | |
| 'chart_type': 'KPI', | |
| 'measure': measures[1], | |
| 'time_column': date_columns[0], | |
| 'granularity': 'monthly' | |
| }) | |
| # 4. Line trend of second measure | |
| fallback_viz.append({ | |
| 'id': 'Viz_4', | |
| 'name': f'{measures[1]} Trend - Last 12 Months', | |
| 'chart_type': 'LINE', | |
| 'measure': measures[1], | |
| 'time_column': date_columns[0], | |
| 'granularity': 'monthly', | |
| 'filters': [f"[{date_columns[0]}].'last 12 months'"] | |
| }) | |
| # 5. Bar chart by dimension (if we have dimensions) | |
| if dimensions: | |
| fallback_viz.append({ | |
| 'id': f'Viz_{len(fallback_viz) + 1}', | |
| 'name': f'{measures[0]} by {dimensions[0]}', | |
| 'chart_type': 'BAR', | |
| 'measure': measures[0], | |
| 'dimensions': [dimensions[0]], | |
| 'top_n': 10 | |
| }) | |
| # 6. Column chart by month | |
| fallback_viz.append({ | |
| 'id': f'Viz_{len(fallback_viz) + 1}', | |
| 'name': f'Monthly {measures[0]}', | |
| 'chart_type': 'COLUMN', | |
| 'measure': measures[0], | |
| 'time_column': date_columns[0], | |
| 'granularity': 'monthly', | |
| 'filters': [f"[{date_columns[0]}].'last 6 months'"] | |
| }) | |
| print(f" ⚠️ AI generation failed, using {len(fallback_viz)} fallback visualizations") | |
| return fallback_viz | |
| def create_liveboard_tml( | |
| self, | |
| company_data: Dict, | |
| use_case: str, | |
| num_visualizations: int = 6, | |
| liveboard_name: str = None, | |
| outliers: Optional[List[Dict]] = None | |
| ) -> str: | |
| """ | |
| Create complete Liveboard TML YAML | |
| Args: | |
| company_data: Company research data | |
| use_case: Use case name | |
| num_visualizations: Number of visualizations to create | |
| liveboard_name: Optional custom name for the liveboard | |
| outliers: Optional list of demo outliers to create targeted visualizations | |
| Returns: | |
| Complete Liveboard TML as YAML string | |
| """ | |
| # Generate visualization configs | |
| visualizations = [] | |
| tiles = [] | |
| # Priority 1: Create visualizations from outliers if provided | |
| if outliers and len(outliers) > 0: | |
| print("🎯 Using outlier-driven visualization generation") | |
| viz_configs = self.create_outlier_visualizations(outliers) | |
| # If we need more visualizations, generate additional ones with AI | |
| remaining_viz = num_visualizations - len(viz_configs) | |
| if remaining_viz > 0: | |
| print(f" + Generating {remaining_viz} additional visualizations...") | |
| additional_viz = self.generate_visualizations_from_research( | |
| company_data, | |
| use_case, | |
| remaining_viz | |
| ) | |
| # Adjust IDs for additional viz | |
| for i, viz in enumerate(additional_viz): | |
| viz['id'] = f'Viz_{len(viz_configs) + i + 1}' | |
| viz_configs.extend(additional_viz) | |
| # Fallback: Generate visualizations using AI if no outliers | |
| elif num_visualizations > 0: | |
| print("🤖 Using AI-driven visualization generation") | |
| viz_configs = self.generate_visualizations_from_research( | |
| company_data, | |
| use_case, | |
| num_visualizations | |
| ) | |
| # Create visualization TML objects | |
| for i, viz_config in enumerate(viz_configs): | |
| viz_config['id'] = f'Viz_{i+1}' | |
| else: | |
| viz_configs = [] | |
| # TEMPORARILY DISABLED - Text tiles causing TML import errors | |
| # Add text tiles for context (like in sample liveboard) | |
| text_tiles = [] | |
| # text_tiles = [ | |
| # { | |
| # 'id': 'Text_1', | |
| # 'name': '📊 Dashboard Overview', | |
| # 'chart_type': 'TEXT', | |
| # 'text_content': f"## {company_data.get('name', 'Company')} Analytics\n\n{use_case} insights and metrics", | |
| # 'background_color': '#2E3D4D' # Dark blue-gray | |
| # }, | |
| # { | |
| # 'id': 'Text_2', | |
| # 'name': 'Key Insights', | |
| # 'chart_type': 'TEXT', | |
| # 'text_content': "💡 **Key Performance Indicators**\n\nTrack trends and identify opportunities", | |
| # 'background_color': '#85016b' # Pink (from sample) | |
| # } | |
| # ] | |
| # Create text tile visualizations | |
| # for text_config in text_tiles: | |
| # viz_tml = self.create_visualization_tml(text_config) | |
| # visualizations.append(viz_tml) | |
| # Create visualization TML objects | |
| # Track which visualizations need KPI conversion (done in post-processing) | |
| self._kpi_viz_ids = [] | |
| if viz_configs: | |
| for viz_config in viz_configs: | |
| # Ensure ID is set | |
| if 'id' not in viz_config: | |
| viz_config['id'] = f'Viz_{len(visualizations) + 1}' | |
| viz_tml = self.create_visualization_tml(viz_config) | |
| visualizations.append(viz_tml) | |
| # Track KPIs for post-processing | |
| if viz_config.get('_needs_kpi_conversion') or viz_config.get('chart_type') == 'KPI': | |
| self._kpi_viz_ids.append(viz_config['id']) | |
| # Use custom name if provided, otherwise generate default | |
| if not liveboard_name: | |
| liveboard_name = f"{company_data.get('name', 'Demo')} - {use_case} Liveboard" | |
| # Check if we should use advanced TML features (Groups, Tabs, Style) | |
| # Set USE_ADVANCED_TML=true in .env to enable Groups/Tabs structure | |
| use_advanced_tml = os.getenv('USE_ADVANCED_TML', 'true').lower() == 'true' | |
| if use_advanced_tml: | |
| # === ADVANCED: Create Groups structure (like golden demo) === | |
| # Groups are containers that hold related visualizations | |
| groups = self._create_groups_structure(viz_configs) if viz_configs else [] | |
| # === ADVANCED: Create tabbed layout with groups === | |
| layout = self._create_tabbed_layout(visualizations, viz_configs, groups) if viz_configs else None | |
| # === ADVANCED: Create style properties (brand colors, borders) === | |
| style = self._create_style_properties(groups, viz_configs) if viz_configs else None | |
| else: | |
| groups = [] | |
| layout = None | |
| style = None | |
| print(" ℹ️ Using basic TML structure (set USE_ADVANCED_TML=true to enable Groups)") | |
| # Assemble complete Liveboard TML | |
| liveboard_tml = { | |
| 'guid': None, # Will be generated by ThoughtSpot | |
| 'liveboard': { | |
| 'name': liveboard_name, | |
| 'visualizations': visualizations | |
| } | |
| } | |
| # Add groups if we created any | |
| if groups: | |
| liveboard_tml['liveboard']['groups'] = groups | |
| print(f" 📦 Created {len(groups)} groups: {[g['name'] for g in groups]}") | |
| # Add layout with tabs (only if using advanced TML) | |
| if layout: | |
| liveboard_tml['liveboard']['layout'] = layout | |
| print(f" 📐 Created tabbed layout with {len(layout.get('tabs', []))} tab(s)") | |
| # Add style properties (only if using advanced TML) | |
| if style: | |
| liveboard_tml['liveboard']['style'] = style | |
| print(f" 🎨 Added brand colors and style properties") | |
| # Convert to YAML format (TML is YAML, not JSON) | |
| return yaml.dump(liveboard_tml, default_flow_style=False, sort_keys=False) | |
| def _convert_to_kpis(self, liveboard_id: str, kpi_viz_ids: List[str]) -> bool: | |
| """ | |
| Convert specified visualizations to KPIs with sparkline settings. | |
| This is done via export/modify/import cycle because ThoughtSpot's TML import | |
| fails with "Index: 0" error when creating new KPIs directly. | |
| Args: | |
| liveboard_id: GUID of the liveboard to modify | |
| kpi_viz_ids: List of visualization IDs to convert (e.g., ['Viz_1', 'Viz_2']) | |
| Returns: | |
| True if successful, False otherwise | |
| """ | |
| import json | |
| try: | |
| # Export the liveboard | |
| export_response = self.ts_client.session.post( | |
| f"{self.ts_client.base_url}/api/rest/2.0/metadata/tml/export", | |
| headers=self.ts_client.headers, | |
| json={'metadata': [{'type': 'LIVEBOARD', 'identifier': liveboard_id}], 'export_fqn': False} | |
| ) | |
| if export_response.status_code != 200: | |
| print(f" Export failed: {export_response.status_code}") | |
| return False | |
| export_data = export_response.json() | |
| tml_key = 'edoc' if 'edoc' in export_data[0] else 'object' | |
| lb_tml = yaml.safe_load(export_data[0][tml_key]) | |
| # KPI sparkline settings - enables sparkline, comparison, and period-over-period | |
| kpi_client_state_v2 = json.dumps({ | |
| 'version': 'V4DOT2', | |
| 'chartProperties': { | |
| 'responsiveLayoutPreference': 'AUTO_ON', | |
| 'chartSpecific': { | |
| 'customProps': json.dumps({ | |
| 'showLabel': True, | |
| 'showComparison': True, | |
| 'showSparkline': True, | |
| 'showAnomalies': False, | |
| 'showBounds': True, | |
| 'customCompare': 'PREV_AVAILABLE', | |
| 'showOnlyLatestAnomaly': False | |
| }) | |
| } | |
| } | |
| }) | |
| # Convert specified visualizations to KPIs | |
| converted_count = 0 | |
| for viz in lb_tml['liveboard']['visualizations']: | |
| viz_id = viz.get('id', '') | |
| if viz_id in kpi_viz_ids: | |
| if 'chart' not in viz.get('answer', {}): | |
| viz['answer']['chart'] = {} | |
| viz['answer']['chart']['type'] = 'KPI' | |
| viz['answer']['chart']['client_state_v2'] = kpi_client_state_v2 | |
| converted_count += 1 | |
| print(f" Converting {viz_id} ({viz.get('answer', {}).get('name', '?')}) to KPI") | |
| if converted_count == 0: | |
| print(f" No matching visualizations found to convert") | |
| return False | |
| # Re-import the modified TML | |
| modified_tml = yaml.dump(lb_tml, default_flow_style=False, sort_keys=False) | |
| import_response = self.ts_client.session.post( | |
| f"{self.ts_client.base_url}/api/rest/2.0/metadata/tml/import", | |
| headers=self.ts_client.headers, | |
| json={'metadata_tmls': [modified_tml], 'import_policy': 'ALL_OR_NONE', 'create_new': False} | |
| ) | |
| if import_response.status_code != 200: | |
| print(f" Import failed: {import_response.status_code}") | |
| return False | |
| result = import_response.json() | |
| status = result[0].get('response', {}).get('status', {}).get('status_code', '') | |
| if status == 'OK': | |
| return True | |
| else: | |
| error = result[0].get('response', {}).get('status', {}).get('error_message', 'Unknown') | |
| print(f" Import error: {error[:100]}") | |
| return False | |
| except Exception as e: | |
| print(f" KPI conversion error: {str(e)}") | |
| return False | |
| def _check_liveboard_errors(self, liveboard_id: str) -> Dict: | |
| """ | |
| Check if liveboard visualizations have errors | |
| Returns: | |
| Dict with 'has_errors' boolean and 'errors' list | |
| """ | |
| try: | |
| # Fetch liveboard data to see if vizzes render | |
| response = self.ts_client.session.post( | |
| f"{self.ts_client.base_url}/api/rest/2.0/metadata/liveboard/data", | |
| headers=self.ts_client.headers, | |
| json={'metadata_identifier': liveboard_id} | |
| ) | |
| if response.status_code != 200: | |
| return {'has_errors': True, 'errors': [{'viz_name': 'Unknown', 'error': f'HTTP {response.status_code}'}]} | |
| data = response.json() | |
| errors = [] | |
| # Check each visualization | |
| for viz_data in data.get('contents', []): | |
| viz_name = viz_data.get('visualization_name', 'Unknown') | |
| # Check for error indicators | |
| if 'error' in viz_data or 'error_message' in viz_data: | |
| errors.append({ | |
| 'viz_name': viz_name, | |
| 'error': viz_data.get('error_message', viz_data.get('error', 'Unknown error')) | |
| }) | |
| # Check if no data returned (might indicate query error) | |
| elif viz_data.get('returned_data_row_count', 0) == 0 and viz_data.get('available_data_row_count', 0) == 0: | |
| # This might be okay for some viz, so just warn | |
| pass | |
| return {'has_errors': len(errors) > 0, 'errors': errors} | |
| except Exception as e: | |
| return {'has_errors': True, 'errors': [{'viz_name': 'Check failed', 'error': str(e)}]} | |
| def export_liveboard_tml(self, liveboard_id: str) -> Optional[Dict]: | |
| """Export liveboard TML from ThoughtSpot to verify what was created""" | |
| try: | |
| response = self.ts_client.session.post( | |
| f"{self.ts_client.base_url}/api/rest/2.0/metadata/tml/export", | |
| headers=self.ts_client.headers, | |
| json={ | |
| 'metadata': [{'identifier': liveboard_id}], | |
| 'export_associated': False, | |
| 'export_fqn': False | |
| } | |
| ) | |
| if response.status_code == 200: | |
| result = response.json() | |
| if result and len(result) > 0: | |
| # Parse the YAML/JSON TML | |
| import json | |
| tml_content = result[0].get('edoc', '') | |
| if tml_content: | |
| try: | |
| return json.loads(tml_content) | |
| except: | |
| # Might be YAML, return raw | |
| return {'raw': tml_content} | |
| return None | |
| else: | |
| print(f" ⚠️ Export failed: {response.status_code}") | |
| return None | |
| except Exception as e: | |
| print(f" ⚠️ Export error: {str(e)}") | |
| return None | |
| def deploy_liveboard(self, liveboard_tml_json: str) -> Dict: | |
| """ | |
| Deploy Liveboard TML to ThoughtSpot | |
| Args: | |
| liveboard_tml_json: Complete Liveboard TML as JSON string | |
| Returns: | |
| Dictionary with deployment result: | |
| - success: Boolean | |
| - liveboard_id: GUID of created liveboard | |
| - url: Direct URL to liveboard in ThoughtSpot | |
| - error: Error message if failed | |
| """ | |
| try: | |
| # Debug: Log the TML being sent | |
| print(f"🔍 DEBUG: Sending TML to ThoughtSpot") | |
| print(f"🔍 DEBUG: TML length: {len(liveboard_tml_json)}") | |
| print(f"🔍 DEBUG: TML first 500 chars:\n{liveboard_tml_json[:500]}") | |
| response = self.ts_client.session.post( | |
| f"{self.ts_client.base_url}/api/rest/2.0/metadata/tml/import", | |
| headers=self.ts_client.headers, | |
| json={ | |
| "metadata_tmls": [liveboard_tml_json], | |
| "import_policy": "PARTIAL", | |
| "create_new": True | |
| } | |
| ) | |
| if response.status_code == 200: | |
| result = response.json() | |
| # Extract liveboard ID from response | |
| liveboard_id = None | |
| # Debug: print response structure | |
| print(f" API Response type: {type(result)}") | |
| if isinstance(result, list) and len(result) > 0: | |
| print(f" First item keys: {list(result[0].keys())}") | |
| response_obj = result[0].get('response', {}) | |
| print(f" Response obj type: {type(response_obj)}, keys: {list(response_obj.keys()) if isinstance(response_obj, dict) else 'not a dict'}") | |
| status_obj = response_obj.get('status', {}) | |
| print(f" Status: {status_obj}") | |
| # If error, check if there's more info in the full result | |
| if status_obj.get('status_code') == 'ERROR': | |
| print(f" Full error details:") | |
| import json | |
| print(json.dumps(result[0], indent=2)[:2000]) | |
| # Navigate response structure | |
| liveboard_id = result[0].get('response', {}).get('header', {}).get('id_guid') | |
| if not liveboard_id: | |
| # Try alternate paths | |
| liveboard_id = result[0].get('response', {}).get('header', {}).get('id') | |
| # Check for visualization errors after creation | |
| if liveboard_id and status_obj.get('status_code') == 'OK': | |
| print(f"\n ✅ Liveboard created, verifying TML and checking errors...") | |
| # Export the TML to see what was actually created | |
| exported_tml = self.export_liveboard_tml(liveboard_id) | |
| if exported_tml and 'raw' not in exported_tml: | |
| print(f"\n 📋 Exported TML verification:") | |
| visualizations = exported_tml.get('liveboard', {}).get('visualizations', []) | |
| for i, viz in enumerate(visualizations[:3]): # Show first 3 | |
| viz_name = viz.get('answer', {}).get('name', 'Unknown') | |
| search_query = viz.get('answer', {}).get('search_query', 'No query') | |
| print(f" Viz {i+1}: {viz_name}") | |
| print(f" Query: {search_query}") | |
| error_check = self._check_liveboard_errors(liveboard_id) | |
| if error_check['has_errors']: | |
| print(f"\n ⚠️ Found {len(error_check['errors'])} visualization errors:") | |
| for err in error_check['errors']: | |
| print(f" - {err['viz_name']}: {err['error']}") | |
| else: | |
| print(f" ✅ All visualizations rendering successfully!") | |
| # Post-process: Convert KPI visualizations with sparkline settings | |
| if hasattr(self, '_kpi_viz_ids') and self._kpi_viz_ids: | |
| print(f"\n 🔄 Converting {len(self._kpi_viz_ids)} visualizations to KPIs...") | |
| kpi_result = self._convert_to_kpis(liveboard_id, self._kpi_viz_ids) | |
| if kpi_result: | |
| print(f" ✅ KPI conversion successful!") | |
| else: | |
| print(f" ⚠️ KPI conversion failed - visualizations will display as auto-selected chart types") | |
| if not liveboard_id: | |
| liveboard_id = result[0].get('response', {}).get('id') | |
| if not liveboard_id: | |
| # Try edoc_id | |
| liveboard_id = result[0].get('response', {}).get('edoc_header', {}).get('id_guid') | |
| elif isinstance(result, dict): | |
| print(f" Dict keys: {list(result.keys())}") | |
| if result.get('object') and len(result['object']) > 0: | |
| liveboard_id = result['object'][0].get('response', {}).get('header', {}).get('id') | |
| print(f" Extracted liveboard ID: {liveboard_id}") | |
| # Build liveboard name from model or TML | |
| lb_name = f"{self.model_name} Liveboard" | |
| # Check if TML import actually succeeded | |
| # BUG FIX: Don't return success=True if liveboard_id is None or status is ERROR | |
| if isinstance(result, list) and len(result) > 0: | |
| status_code = result[0].get('response', {}).get('status', {}).get('status_code', '') | |
| error_message = result[0].get('response', {}).get('status', {}).get('error_message', '') | |
| if status_code == 'ERROR' or not liveboard_id: | |
| print(f" ❌ TML import failed: {error_message or 'No liveboard ID returned'}") | |
| return { | |
| 'success': False, | |
| 'liveboard_id': None, | |
| 'liveboard_guid': None, | |
| 'liveboard_name': lb_name, | |
| 'liveboard_url': None, | |
| 'error': error_message or 'TML import failed - no liveboard ID returned', | |
| 'response': result | |
| } | |
| return { | |
| 'success': True, | |
| 'liveboard_id': liveboard_id, | |
| 'liveboard_guid': liveboard_id, # Alias for compatibility with deployer | |
| 'liveboard_name': lb_name, | |
| 'liveboard_url': f"{self.ts_client.base_url}/#/pinboard/{liveboard_id}" if liveboard_id else None, | |
| 'url': f"{self.ts_client.base_url}/#/pinboard/{liveboard_id}" if liveboard_id else None, | |
| 'response': result | |
| } | |
| else: | |
| return { | |
| 'success': False, | |
| 'error': f"API returned status {response.status_code}: {response.text}" | |
| } | |
| except Exception as e: | |
| return { | |
| 'success': False, | |
| 'error': str(e) | |
| } | |
| def create_liveboard_from_model( | |
| ts_client, | |
| model_id: str, | |
| model_name: str, | |
| company_data: Dict, | |
| use_case: str, | |
| num_visualizations: int = 6, | |
| liveboard_name: str = None, | |
| llm_model: str = None, | |
| outliers: Optional[List[Dict]] = None, | |
| model_columns: Optional[List[Dict]] = None, | |
| prompt_logger=None, | |
| ) -> Dict: | |
| """ | |
| Create and deploy a Liveboard via TML (Spotter Viz path). | |
| This is the direct API approach — builds complete TML and imports it. | |
| Used by the direct TML liveboard creation path. | |
| Args: | |
| ts_client: Authenticated ThoughtSpotDeployer instance | |
| model_id: GUID of ThoughtSpot model | |
| model_name: Display name of model | |
| company_data: Company research data | |
| use_case: Use case name | |
| num_visualizations: Number of visualizations to create | |
| liveboard_name: Optional custom name for the liveboard | |
| llm_model: LLM model to use for AI-driven viz generation | |
| outliers: Optional list of outlier pattern dicts for targeted viz generation | |
| model_columns: Optional list of column metadata from TS model | |
| Returns: | |
| Deployment result dictionary with: | |
| - success: bool | |
| - liveboard_guid: str | |
| - liveboard_name: str | |
| - liveboard_url: str | |
| """ | |
| print(f"🚀 Starting Spotter Viz liveboard creation...", flush=True) | |
| print(f" - Model: {model_name}", flush=True) | |
| print(f" - Use case: {use_case}", flush=True) | |
| print(f" - Visualizations: {num_visualizations}", flush=True) | |
| if outliers: | |
| print(f" - Outlier patterns: {len(outliers)}", flush=True) | |
| if model_columns: | |
| print(f" - Model columns: {len(model_columns)}", flush=True) | |
| creator = LiveboardCreator(ts_client, model_id, model_name, llm_model) | |
| # If external model_columns provided, override the auto-fetched ones | |
| if model_columns: | |
| creator.model_columns = model_columns | |
| liveboard_tml = creator.create_liveboard_tml( | |
| company_data, use_case, num_visualizations, liveboard_name, outliers | |
| ) | |
| # Deploy and get result | |
| result = creator.deploy_liveboard(liveboard_tml) | |
| # Ensure liveboard_name is in the result | |
| if result.get('success') and not result.get('liveboard_name'): | |
| # Generate default name if not provided | |
| final_name = liveboard_name or f"{company_data.get('name', 'Demo')} - {use_case} Liveboard" | |
| result['liveboard_name'] = final_name | |
| return result | |
| def _convert_outlier_to_mcp_question(outlier: Dict) -> str: | |
| """ | |
| Convert outlier metadata to natural language question for MCP. | |
| Args: | |
| outlier: Dictionary with outlier metadata including 'show_me_query' | |
| Returns: | |
| Natural language question suitable for MCP getAnswer | |
| """ | |
| # Get the SHOW_ME query (already in natural language) | |
| show_me = outlier.get('show_me_query', '') | |
| if show_me: | |
| # Clean up the query - remove quotes, extra formatting | |
| question = show_me.replace('"', '').replace("'", '').strip() | |
| # Capitalize first letter if needed | |
| question = question[0].upper() + question[1:] if question else question | |
| return question | |
| # Fallback: use title directly | |
| return outlier.get('title', 'data') | |
| def _create_kpi_question_from_outlier(outlier: Dict) -> Optional[str]: | |
| """ | |
| Create companion KPI question from outlier if KPI metric is specified. | |
| For sparklines, KPI questions need time dimension! | |
| Example: "What is the total revenue by week over the last 8 quarters?" | |
| Args: | |
| outlier: Dictionary with outlier metadata | |
| Returns: | |
| KPI question string or None if no KPI specified | |
| """ | |
| if not outlier.get('kpi_companion'): | |
| return None | |
| kpi_metric = outlier.get('kpi_metric', '') | |
| if kpi_metric: | |
| # Include time dimension for sparkline visualization | |
| return f"What is the total {kpi_metric} by week over the last 8 quarters?" | |
| # Fallback: extract first measure from viz_measure_types | |
| measure_types = outlier.get('viz_measure_types', '') | |
| if measure_types: | |
| first_measure = measure_types.split(',')[0].strip() | |
| return f"What is the total {first_measure} by week over the last 8 quarters?" | |
| return None | |
| def _generate_smart_questions_with_ai( | |
| company_data: Dict, | |
| use_case: str, | |
| num_questions: int, | |
| llm_model: str = None, | |
| model_columns: List[Dict] = None, | |
| prompt_logger=None, | |
| ) -> List[str]: | |
| """ | |
| Use AI to generate smart, targeted questions for the use case. | |
| Args: | |
| company_data: Company research data | |
| use_case: Use case name | |
| num_questions: Number of questions to generate | |
| llm_model: LLM model to use (e.g., 'gpt-5.4', 'gpt-5.2') | |
| model_columns: List of column dicts with 'name' and 'type' from DDL | |
| Returns: | |
| List of natural language questions | |
| """ | |
| try: | |
| # Use the selected LLM model | |
| from main_research import MultiLLMResearcher | |
| model_to_use = llm_model or DEFAULT_LLM_MODEL | |
| provider_name, model_name = map_llm_display_to_provider(model_to_use) | |
| print(f" 🤖 Using {provider_name}/{model_name} for AI question generation") | |
| researcher = MultiLLMResearcher(provider=provider_name, model=model_name) | |
| # Build column context for the AI | |
| column_context = "" | |
| if model_columns: | |
| measures = [] | |
| dimensions = [] | |
| dates = [] | |
| for col in model_columns: | |
| col_name = col.get('name', '') | |
| col_type = col.get('type', '').upper() | |
| # Classify columns | |
| if any(t in col_type for t in ['DATE', 'TIMESTAMP', 'TIME']): | |
| dates.append(col_name) | |
| elif any(t in col_type for t in ['INT', 'DECIMAL', 'NUMBER', 'FLOAT', 'DOUBLE', 'BIGINT']): | |
| # Numeric - likely measure unless it ends with _ID | |
| if col_name.endswith('_ID') or col_name.endswith('ID'): | |
| dimensions.append(col_name) | |
| else: | |
| measures.append(col_name) | |
| else: | |
| dimensions.append(col_name) | |
| column_context = f""" | |
| AVAILABLE COLUMNS IN THE DATA MODEL (USE THESE EXACT NAMES!): | |
| - Measures (numeric): {', '.join(measures) if measures else 'None found'} | |
| - Dimensions (categorical): {', '.join(dimensions) if dimensions else 'None found'} | |
| - Date columns: {', '.join(dates) if dates else 'None found'} | |
| CRITICAL: You MUST use the actual column names above in your questions! | |
| For example, if measures are [IMPRESSIONS, CLICKS, CONVERSIONS], use those - NOT generic "revenue" or "sales"! | |
| """ | |
| prompt = f"""Generate {num_questions} business intelligence questions for a demo dashboard. | |
| Company: {company_data.get('name', 'Unknown Company')} | |
| Use Case: {use_case} | |
| {column_context} | |
| The liveboard already has KPI cards showing single metrics. These fill questions should surface INTERESTING BUSINESS INSIGHTS that a senior executive would find compelling — comparisons, rankings, and breakdowns that tell a story. | |
| WHAT MAKES A GREAT FILL QUESTION: | |
| - Reveals which dimension is driving performance: "Top 10 [DIMENSION] by [MEASURE] last 12 months" | |
| - Compares performance across a meaningful grouping: "[MEASURE] by [DIMENSION] last 12 months" | |
| - Shows how a breakdown has shifted: "[MEASURE] by [DIMENSION1] and [DIMENSION2] last 18 months" | |
| - Surfaces a ranking with business stakes: "Top 5 [DIMENSION] by [MEASURE] vs prior year" | |
| DO NOT generate simple single-metric questions like "[MEASURE] by month" — those become redundant KPI cards and the liveboard already has enough of those. Every question here should involve at least one dimension to compare against. | |
| TIME FILTER RULES: | |
| - Use "last 12 months" or "last 18 months" — never "this year" or "this quarter" (too restrictive with generated data) | |
| - Use "last 2 years" for broad comparisons | |
| CRITICAL: | |
| - Use the EXACT column names from the data model above | |
| - Do NOT mention chart types — ThoughtSpot picks automatically | |
| - Each question should make a business leader say "that's interesting" — not just confirm a number | |
| Return ONLY a JSON object with this exact structure (no other text): | |
| {{ | |
| "questions": [ | |
| "question 1 using actual column names", | |
| "question 2 using actual column names", | |
| "question 3 using actual column names" | |
| ] | |
| }}""" | |
| messages = [{"role": "user", "content": prompt}] | |
| import time as _time | |
| _q_start = _time.time() | |
| response_text = researcher.make_request(messages, temperature=0.7, max_tokens=2000, stream=False) | |
| _q_ms = int((_time.time() - _q_start) * 1000) | |
| # Log the prompt/response | |
| from prompt_logger import log_researcher_call | |
| log_researcher_call("liveboard_questions", researcher, messages, response_text or "", duration_ms=_q_ms, logger=prompt_logger) | |
| # Parse the response | |
| content = response_text | |
| if not content or content.strip() == '': | |
| print(f" ⚠️ AI returned empty content") | |
| raise ValueError("Empty AI response") | |
| # Try to extract JSON if AI added extra text | |
| content = content.strip() | |
| if not content.startswith('{') and not content.startswith('['): | |
| # Look for JSON in the response | |
| json_match = re.search(r'(\{[\s\S]*\})', content) | |
| if json_match: | |
| content = json_match.group(1) | |
| else: | |
| print(f" ⚠️ No JSON found in AI response") | |
| raise ValueError("No JSON in response") | |
| result = json.loads(content) | |
| # Handle different response formats | |
| if 'questions' in result and isinstance(result['questions'], list): | |
| questions = result['questions'] | |
| elif isinstance(result, list): | |
| questions = result | |
| else: | |
| print(f" ⚠️ Unexpected AI response format: {list(result.keys())}") | |
| questions = [] | |
| # Ensure at least 2 questions are in KPI format | |
| # KPI format: "[measure] weekly" or "[measure] monthly" (very short, 2-4 words) | |
| kpi_count = 0 | |
| for q in questions[:2]: # Check first 2 | |
| words = q.lower().split() | |
| if len(words) <= 4 and any(kw in words for kw in ['weekly', 'monthly', 'quarterly', 'yearly']): | |
| kpi_count += 1 | |
| if kpi_count < 2 and model_columns: | |
| # Find measures to create KPI questions | |
| measures = [col['name'] for col in model_columns | |
| if any(t in col.get('type', '').upper() for t in ['INT', 'DECIMAL', 'NUMBER', 'FLOAT']) | |
| and not col['name'].endswith('ID') and not col['name'].endswith('_ID')] | |
| if len(measures) >= 2: | |
| # Insert proper KPI questions at the start | |
| kpi_questions = [f"{measures[0].lower().replace('_', ' ')} weekly", | |
| f"{measures[1].lower().replace('_', ' ')} monthly"] | |
| print(f" 🔧 Inserting KPI questions: {kpi_questions}") | |
| questions = kpi_questions + [q for q in questions if not any(kw in q.lower() for kw in ['weekly', 'monthly', 'quarterly', 'yearly'])] | |
| return questions[:num_questions] | |
| except Exception as e: | |
| print(f" ⚠️ AI question generation failed: {e}") | |
| print(f" 🔍 DEBUG: AI response was: {response_text[:500] if 'response_text' in locals() else 'N/A'}") | |
| # Fallback: try to use actual column names from model_columns | |
| if model_columns: | |
| measures = [col['name'] for col in model_columns | |
| if any(t in col.get('type', '').upper() for t in ['INT', 'DECIMAL', 'NUMBER', 'FLOAT']) | |
| and not col['name'].endswith('ID') and not col['name'].endswith('_ID')] | |
| dimensions = [col['name'] for col in model_columns | |
| if col.get('type', '').upper() == 'VARCHAR' and not col['name'].endswith('ID')] | |
| if measures and dimensions: | |
| m1 = measures[0].lower().replace('_', ' ') | |
| m2 = measures[1].lower().replace('_', ' ') if len(measures) > 1 else m1 | |
| d1 = dimensions[0].lower().replace('_', ' ') if dimensions else 'category' | |
| return [ | |
| f"{m1} by week", | |
| f"{m2} by month", | |
| f"top 10 {d1} by {m1}", | |
| f"{m1} by {d1}", | |
| f"{m1} by month last 12 months", | |
| f"{m2} by {d1}" | |
| ][:num_questions] | |
| # Ultimate fallback | |
| return [ | |
| "revenue by week", | |
| "sales by month", | |
| "top 10 products by revenue", | |
| "revenue by category", | |
| "revenue by month last 12 months", | |
| "sales by region" | |
| ][:num_questions] | |
| def create_liveboard_from_model_mcp( | |
| ts_client, | |
| model_id: str, | |
| model_name: str, | |
| company_data: Dict, | |
| use_case: str, | |
| num_visualizations: int = 8, | |
| liveboard_name: str = None, | |
| outliers: Optional[List[Dict]] = None, | |
| llm_model: str = None, | |
| model_columns: List[Dict] = None, | |
| prompt_logger=None, | |
| session_logger=None, | |
| ) -> Dict: | |
| """ | |
| Create and deploy a Liveboard using ThoughtSpot's Model Context Protocol (MCP) | |
| This is an alternative to the TML-based approach that uses MCP's AI-driven | |
| question generation and answer retrieval to build liveboards. | |
| PHASE 1 ENHANCEMENT: Now supports outlier-driven question generation for | |
| more targeted, demo-ready visualizations. | |
| Args: | |
| ts_client: Authenticated ThoughtSpotDeployer instance (used for base_url) | |
| model_id: GUID of ThoughtSpot model/datasource | |
| model_name: Display name of model | |
| company_data: Company research data | |
| use_case: Use case name | |
| num_visualizations: Number of visualizations to create (default: 6) | |
| liveboard_name: Optional custom name for the liveboard | |
| outliers: Optional list of demo outliers to create targeted visualizations | |
| llm_model: LLM model for AI question generation | |
| model_columns: List of column dicts with 'name' and 'type' from DDL | |
| Returns: | |
| Deployment result dictionary with structure: | |
| { | |
| 'success': bool, | |
| 'liveboard_name': str, | |
| 'liveboard_guid': str, | |
| 'liveboard_url': str, | |
| 'error': str (if failed) | |
| } | |
| """ | |
| _slog = session_logger | |
| import time as _time | |
| _t_lb = _slog.log_start("liveboard") if _slog else None | |
| print(f"🚀 Starting MCP liveboard creation...", flush=True) | |
| print(f" [MCP] Model: {model_name}", flush=True) | |
| print(f" [MCP] Model ID: {model_id}", flush=True) | |
| print(f" [MCP] Use case: {use_case}", flush=True) | |
| print(f" [MCP] Visualizations: {num_visualizations}", flush=True) | |
| print(f" [MCP] Endpoint: https://agent.thoughtspot.app/bearer/mcp (bearer auth)", flush=True) | |
| import asyncio | |
| import sys | |
| async def _create_mcp_liveboard(): | |
| """Inner async function to handle MCP workflow""" | |
| print(f"[MCP] Starting async MCP liveboard creation...") | |
| try: | |
| print(f"[MCP] Importing MCP modules...") | |
| from mcp import ClientSession | |
| from mcp.client.streamable_http import streamablehttp_client | |
| print(f"[MCP] MCP modules imported successfully (streamable HTTP transport)") | |
| # Get trusted auth token for bearer auth | |
| print(f"[MCP] Getting trusted auth token for {ts_client.username}...") | |
| _, mcp_token = _get_direct_api_session(ts_client.username, ts_client.secret_key) | |
| if not mcp_token: | |
| return { | |
| 'success': False, | |
| 'error': 'Failed to get trusted auth token. Check THOUGHTSPOT_TRUSTED_AUTH_KEY in .env' | |
| } | |
| # Build bearer auth header: {token}@{host} | |
| ts_url = get_admin_setting('THOUGHTSPOT_URL').rstrip('/') | |
| ts_host = ts_url.replace('https://', '').replace('http://', '') | |
| mcp_endpoint = "https://agent.thoughtspot.app/bearer/mcp" | |
| headers = { | |
| "Authorization": f"Bearer {mcp_token}@{ts_host}" | |
| } | |
| print(f"[MCP] Connecting to {mcp_endpoint} (bearer auth, host={ts_host})") | |
| async with streamablehttp_client(mcp_endpoint, headers=headers) as (read, write, _): | |
| async with ClientSession(read, write) as session: | |
| await session.initialize() | |
| # Verify connection with ping | |
| print(f"Pinging MCP server...") | |
| ping_result = await session.call_tool("ping", {}) | |
| print(f"✅ MCP connection established: {ping_result.content[0].text}") | |
| # Step 2: Generate questions intelligently | |
| print(f"🤔 Generating questions for: {use_case}") | |
| questions_to_ask = [] | |
| # Priority 1: Use outlier-driven questions if available | |
| if outliers and len(outliers) > 0: | |
| print(f"🎯 Using {len(outliers)} outlier-driven questions", flush=True) | |
| for outlier in outliers: | |
| # Convert outlier to MCP question | |
| question = _convert_outlier_to_mcp_question(outlier) | |
| questions_to_ask.append({ | |
| 'question': question, | |
| 'source': 'outlier', | |
| 'outlier_title': outlier.get('title', '') | |
| }) | |
| print(f" 📌 {outlier.get('title', 'Outlier')}: {question}", flush=True) | |
| # Add companion KPI if specified | |
| kpi_question = _create_kpi_question_from_outlier(outlier) | |
| if kpi_question: | |
| questions_to_ask.append({ | |
| 'question': kpi_question, | |
| 'source': 'kpi', | |
| 'outlier_title': outlier.get('title', '') + ' - KPI' | |
| }) | |
| print(f" 📊 KPI: {kpi_question}", flush=True) | |
| # Priority 2: If we need more questions, use AI to generate them | |
| remaining_viz = num_visualizations - len(questions_to_ask) | |
| if remaining_viz > 0: | |
| print(f"🤖 Generating {remaining_viz} additional AI-driven questions...", flush=True) | |
| if model_columns: | |
| print(f" 📋 Using {len(model_columns)} columns from DDL for question generation", flush=True) | |
| ai_questions = _generate_smart_questions_with_ai( | |
| company_data, | |
| use_case, | |
| remaining_viz, | |
| llm_model=llm_model, | |
| model_columns=model_columns, | |
| prompt_logger=prompt_logger, | |
| ) | |
| for q in ai_questions: | |
| questions_to_ask.append({ | |
| 'question': q, | |
| 'source': 'ai' | |
| }) | |
| print(f" 🧠 AI: {q}", flush=True) | |
| # Fallback: If still no questions, use MCP's getRelevantQuestions | |
| if not questions_to_ask: | |
| print(f"⚠️ No outliers or AI questions, falling back to MCP getRelevantQuestions", flush=True) | |
| simple_query = use_case.lower() | |
| questions_result = await session.call_tool("getRelevantQuestions", { | |
| "query": simple_query, | |
| "datasourceIds": [model_id] | |
| }) | |
| questions_data = json.loads(questions_result.content[0].text) | |
| mcp_questions = questions_data.get('questions', []) | |
| for q in mcp_questions[:num_visualizations]: | |
| questions_to_ask.append({ | |
| 'question': q['question'], | |
| 'source': 'mcp' | |
| }) | |
| if not questions_to_ask: | |
| print(f"❌ No questions generated!", flush=True) | |
| return { | |
| 'success': False, | |
| 'error': 'No questions generated. Model may be empty or incompatible.' | |
| } | |
| print(f"💡 Total questions to answer: {len(questions_to_ask)}", flush=True) | |
| # Step 3: Get answers for each question | |
| # Use direct API instead of MCP's getAnswer (bypasses agent.thoughtspot.app proxy) | |
| use_direct_api = os.getenv('MCP_USE_DIRECT_API', 'true').lower() == 'true' | |
| print(f"📊 Getting answers for {len(questions_to_ask)} questions...", flush=True) | |
| print(f" Using: {'Direct ThoughtSpot API' if use_direct_api else 'MCP getAnswer'}", flush=True) | |
| answers = [] | |
| for i, q_obj in enumerate(questions_to_ask, 1): | |
| try: | |
| # Strip chart type hints from question - they're for AI inference, not viz titles | |
| question_text = _strip_chart_type_hints(q_obj['question']) | |
| source = q_obj.get('source', 'unknown') | |
| print(f" {i}/{len(questions_to_ask)}: [{source}] {question_text}", flush=True) | |
| if use_direct_api: | |
| # Use direct ThoughtSpot API (bypasses MCP proxy issues) | |
| answer_data = _get_answer_direct(question_text, model_id, ts_client.username, ts_client.secret_key) | |
| if answer_data: | |
| # Clean up the viz title | |
| if 'question' in answer_data: | |
| answer_data['question'] = _clean_viz_title(answer_data['question']) | |
| print(f" 🔍 DEBUG: Direct API answer keys: {list(answer_data.keys())}") | |
| answers.append(answer_data) | |
| print(f" ✅ Answer retrieved (direct API)", flush=True) | |
| else: | |
| print(f" ⚠️ Direct API returned no answer, trying MCP fallback...", flush=True) | |
| # Fallback to MCP if direct API fails | |
| answer_result = await session.call_tool("getAnswer", { | |
| "question": question_text, | |
| "datasourceId": model_id | |
| }) | |
| answer_data = json.loads(answer_result.content[0].text) | |
| # Clean up the viz title | |
| if 'question' in answer_data: | |
| answer_data['question'] = _clean_viz_title(answer_data['question']) | |
| answers.append(answer_data) | |
| print(f" ✅ Answer retrieved (MCP fallback)", flush=True) | |
| else: | |
| # Use MCP's getAnswer (original method) | |
| answer_result = await session.call_tool("getAnswer", { | |
| "question": question_text, | |
| "datasourceId": model_id | |
| }) | |
| print(f" 🔍 DEBUG: answer_result type: {type(answer_result)}") | |
| print(f" 🔍 DEBUG: answer_result.content: {answer_result.content}") | |
| if answer_result.content and len(answer_result.content) > 0: | |
| print(f" 🔍 DEBUG: answer_result.content[0].text: '{answer_result.content[0].text}'") | |
| # Parse answer data | |
| answer_data = json.loads(answer_result.content[0].text) | |
| # Clean up the viz title | |
| if 'question' in answer_data: | |
| answer_data['question'] = _clean_viz_title(answer_data['question']) | |
| print(f" 🔍 DEBUG: Answer keys: {list(answer_data.keys())}") | |
| answers.append(answer_data) | |
| print(f" ✅ Answer retrieved", flush=True) | |
| except Exception as e: | |
| print(f" ⚠️ Failed to get answer: {str(e)}", flush=True) | |
| print(f" 🔍 DEBUG: Exception type: {type(e).__name__}") | |
| import traceback | |
| print(f" 🔍 DEBUG: Traceback: {traceback.format_exc()}") | |
| # Continue with other questions | |
| continue | |
| if not answers: | |
| print(f"❌ DEBUG: No answers retrieved!") | |
| print(f" Total questions attempted: {len(questions_to_ask)}") | |
| return { | |
| 'success': False, | |
| 'error': 'Failed to get answers for any questions. Model may not contain data.' | |
| } | |
| print(f"✅ Successfully retrieved {len(answers)} answers") | |
| # Step 4: Create liveboard with rich note tile | |
| # Use custom name if provided, otherwise generate default | |
| final_liveboard_name = liveboard_name | |
| if not final_liveboard_name: | |
| final_liveboard_name = f"📊 {company_data.get('name', 'Company')} - {use_case}" | |
| print(f"🔍 DEBUG: Liveboard name to use: '{final_liveboard_name}'") | |
| print(f"🔍 DEBUG: Company data: {company_data}") | |
| # Generate rich HTML note tile with outlier highlights | |
| outlier_section = "" | |
| if outliers and len(outliers) > 0: | |
| outlier_section = """ | |
| <div style="margin-top: 25px; padding: 25px; background: rgba(255,255,255,0.1); | |
| border-radius: 12px; border-left: 4px solid #fbbf24;"> | |
| <h3 style="margin: 0 0 15px 0; font-size: 18px;">🎯 Strategic Insights</h3> | |
| <ul style="margin: 0; padding-left: 20px; font-size: 14px; line-height: 1.8;">""" | |
| for outlier in outliers[:3]: # Show top 3 outliers | |
| title = outlier.get('title', 'Insight') | |
| insight = outlier.get('insight', '') | |
| impact = outlier.get('impact', '') | |
| outlier_section += f""" | |
| <li style="margin-bottom: 12px;"> | |
| <strong style="color: #fbbf24;">{title}</strong><br/> | |
| <span style="opacity: 0.9;">{insight}</span> | |
| {f'<br/><span style="color: #10b981; font-weight: 500;">💡 {impact}</span>' if impact else ''} | |
| </li>""" | |
| outlier_section += """ | |
| </ul> | |
| </div>""" | |
| # Count question sources | |
| outlier_count = sum(1 for q in questions_to_ask if q.get('source') == 'outlier') | |
| ai_count = sum(1 for q in questions_to_ask if q.get('source') == 'ai') | |
| mcp_count = sum(1 for q in questions_to_ask if q.get('source') == 'mcp') | |
| source_info = [] | |
| if outlier_count > 0: | |
| source_info.append(f"🎯 {outlier_count} outlier-driven") | |
| if ai_count > 0: | |
| source_info.append(f"🧠 {ai_count} AI-generated") | |
| if mcp_count > 0: | |
| source_info.append(f"⚡ {mcp_count} MCP-suggested") | |
| # Create branded note tile with logo and colors | |
| note_tile = create_branded_note_tile( | |
| company_data=company_data, | |
| use_case=use_case, | |
| answers=answers, | |
| source_info=source_info, | |
| outlier_section=outlier_section | |
| ) | |
| print(f"🎨 Creating liveboard: {final_liveboard_name}") | |
| print(f"📊 Preparing to send {len(answers)} answers to createLiveboard") | |
| # Log what we're sending to createLiveboard | |
| print(f"📋 [createLiveboard] Answer summary ({len(answers)} total):", flush=True) | |
| for idx, ans in enumerate(answers): | |
| sid = ans.get('session_identifier', 'MISSING') | |
| viz = ans.get('visualization_type', 'MISSING') | |
| q = ans.get('question', 'MISSING')[:60] | |
| has_tok = bool(ans.get('tokens')) | |
| print(f" [{idx+1}] session={sid} viz={viz} tokens={has_tok} q='{q}'", flush=True) | |
| try: | |
| print(f"📦 [createLiveboard] Sending {len(answers)} answers...", flush=True) | |
| print(f" Name: '{final_liveboard_name}'", flush=True) | |
| print(f" Note tile: {len(note_tile)} chars", flush=True) | |
| liveboard_result = await session.call_tool("createLiveboard", { | |
| "name": final_liveboard_name, | |
| "answers": answers, | |
| "noteTile": note_tile | |
| }) | |
| result_text = liveboard_result.content[0].text | |
| print(f" [createLiveboard] FULL response: {result_text}", flush=True) | |
| if not result_text or result_text.strip() == '': | |
| raise ValueError("createLiveboard returned empty response") | |
| if result_text.startswith('ERROR:'): | |
| error_msg = result_text.replace('ERROR:', '').strip() | |
| print(f"❌ [createLiveboard] Server error: {error_msg}", flush=True) | |
| return { | |
| 'success': False, | |
| 'error': f'MCP createLiveboard failed: {error_msg}', | |
| 'liveboard_name': final_liveboard_name | |
| } | |
| except Exception as create_error: | |
| error_str = str(create_error) | |
| print(f"❌ [createLiveboard] FAILED with {len(answers)} answers: {type(create_error).__name__}: {error_str[:200]}", flush=True) | |
| # Retry with fewer answers if we had multiple | |
| if len(answers) > 1: | |
| retry_count = max(1, len(answers) // 2) | |
| print(f"🔄 [createLiveboard] Retrying with {retry_count}/{len(answers)} answers...", flush=True) | |
| try: | |
| liveboard_result = await session.call_tool("createLiveboard", { | |
| "name": final_liveboard_name, | |
| "answers": answers[:retry_count], | |
| "noteTile": note_tile | |
| }) | |
| result_text = liveboard_result.content[0].text | |
| print(f" [createLiveboard] Retry OK: {result_text[:200]}", flush=True) | |
| if result_text.startswith('ERROR:'): | |
| raise ValueError(result_text) | |
| except Exception as retry_error: | |
| print(f"❌ [createLiveboard] Retry also failed: {type(retry_error).__name__}: {str(retry_error)[:200]}", flush=True) | |
| if retry_count > 1: | |
| print(f"🔄 [createLiveboard] Last resort: trying with 1 answer...", flush=True) | |
| try: | |
| liveboard_result = await session.call_tool("createLiveboard", { | |
| "name": final_liveboard_name, | |
| "answers": answers[:1], | |
| "noteTile": note_tile | |
| }) | |
| result_text = liveboard_result.content[0].text | |
| print(f" [createLiveboard] Minimal OK: {result_text[:200]}", flush=True) | |
| if result_text.startswith('ERROR:'): | |
| raise ValueError(result_text) | |
| except Exception as last_error: | |
| print(f"❌ [createLiveboard] All attempts failed. Last error: {str(last_error)[:200]}", flush=True) | |
| raise last_error | |
| else: | |
| raise retry_error | |
| else: | |
| raise create_error | |
| # Extract URL from the text response using regex | |
| import re | |
| url_match = re.search(r'https://[^\s]+', result_text) | |
| liveboard_url = url_match.group(0) if url_match else '' | |
| # Extract GUID from URL if present | |
| liveboard_guid = None | |
| if liveboard_url: | |
| # URL format: https://instance.thoughtspot.cloud/#/pinboard/GUID | |
| import re | |
| guid_match = re.search(r'/pinboard/([a-f0-9\-]+)', liveboard_url) | |
| if guid_match: | |
| liveboard_guid = guid_match.group(1) | |
| print(f"✅ Liveboard created successfully!", flush=True) | |
| print(f"🔗 URL: {liveboard_url}", flush=True) | |
| print(f"🆔 GUID: {liveboard_guid}", flush=True) | |
| # POST-MCP PROCESSING: Fix note tile layout | |
| # MCP creates note tiles with height: 8, but we want height: 2 like golden demo | |
| if liveboard_guid: | |
| print(f"🔧 Post-processing: Fixing note tile layout...", flush=True) | |
| try: | |
| # Use ts_client session (already authenticated) | |
| ts_base_url = ts_client.base_url | |
| # Export liveboard TML using authenticated session | |
| export_response = ts_client.session.post( | |
| f"{ts_base_url}/api/rest/2.0/metadata/tml/export", | |
| json={ | |
| 'metadata': [{'identifier': liveboard_guid}], | |
| 'export_associated': False | |
| } | |
| ) | |
| if export_response.status_code == 200: | |
| tml_data = export_response.json() | |
| if tml_data and len(tml_data) > 0: | |
| # Parse YAML TML | |
| import yaml | |
| tml_str = tml_data[0].get('edoc', '') | |
| liveboard_tml = yaml.safe_load(tml_str) | |
| # Fix Viz_1 layout (note tile) | |
| layout = liveboard_tml.get('liveboard', {}).get('layout', {}) | |
| tiles = layout.get('tiles', []) | |
| # Find and fix Viz_1 note tile dimensions (readable size) | |
| for tile in tiles: | |
| if tile.get('visualization_id') == 'Viz_1': | |
| # Make it readable: height 4, width 6 (half screen) | |
| tile['height'] = 4 | |
| tile['width'] = 6 | |
| print(f" ✓ Fixed Viz_1: height={tile['height']}, width={tile['width']}") | |
| break | |
| # Replace Viz_1 content with company name using golden demo styling | |
| company_name = company_data.get('name', 'Company') | |
| # Use the actual create_branded_note_tile function | |
| company_note = create_branded_note_tile( | |
| company_data=company_data, | |
| use_case=use_case or '', | |
| answers=answers, | |
| source_info=source_info, | |
| outlier_section='' | |
| ) | |
| # Find and update Viz_1 content in visualizations | |
| visualizations = liveboard_tml.get('liveboard', {}).get('visualizations', []) | |
| for viz in visualizations: | |
| if viz.get('id') == 'Viz_1': | |
| # Handle different TML structures | |
| if 'note_tile' in viz: | |
| # MCP creates note tiles with note_tile structure | |
| # note_tile is a DICT with html_parsed_string key | |
| if isinstance(viz['note_tile'], dict) and 'html_parsed_string' in viz['note_tile']: | |
| viz['note_tile']['html_parsed_string'] = company_note | |
| else: | |
| viz['note_tile'] = {'html_parsed_string': company_note} | |
| elif 'answer' in viz: | |
| viz['answer']['text_data'] = company_note | |
| viz['answer']['name'] = f'{company_name} Info' | |
| else: | |
| # Direct structure without wrappers | |
| if 'text_data' in viz: | |
| viz['text_data'] = company_note | |
| if 'name' in viz: | |
| viz['name'] = f'{company_name} Info' | |
| print(f" ✓ Replaced Viz_1 content with {company_name}") | |
| break | |
| # Add style_properties to make note tile dark themed (like golden demo) | |
| style = liveboard_tml.get('liveboard', {}).get('style', {}) | |
| if 'overrides' not in style: | |
| style['overrides'] = [] | |
| liveboard_tml['liveboard']['style'] = style | |
| # Check if Viz_1 already has style override | |
| viz_1_has_style = False | |
| for override in style['overrides']: | |
| if override.get('object_id') == 'Viz_1': | |
| viz_1_has_style = True | |
| # Ensure it has tile_brand_color for dark background | |
| if 'style_properties' not in override: | |
| override['style_properties'] = [] | |
| has_brand_color = any(prop.get('name') == 'tile_brand_color' for prop in override['style_properties']) | |
| if not has_brand_color: | |
| override['style_properties'].append({ | |
| 'name': 'tile_brand_color', | |
| 'value': 'TBC_I' | |
| }) | |
| print(f" ✓ Added dark theme to Viz_1") | |
| break | |
| if not viz_1_has_style: | |
| # Add new style override for Viz_1 with dark background | |
| style['overrides'].append({ | |
| 'object_id': 'Viz_1', | |
| 'style_properties': [{ | |
| 'name': 'tile_brand_color', | |
| 'value': 'TBC_I' | |
| }] | |
| }) | |
| print(f" ✓ Added dark theme style to Viz_1") | |
| # Convert time-series visualizations to KPIs with sparklines | |
| print(f" 🔄 Converting time-series charts to KPIs...") | |
| kpi_count = 0 | |
| for viz in visualizations: | |
| if viz.get('id') == 'Viz_1': | |
| continue # Skip note tile | |
| answer = viz.get('answer', {}) | |
| viz_name = answer.get('name', '').lower() | |
| search_query = answer.get('search_query', '').lower() | |
| # Check if this is a time-series viz (weekly, monthly, daily patterns) | |
| time_patterns = ['weekly', 'monthly', 'daily', 'quarterly', 'yearly', '.week', '.month', '.day', '.quarter', '.year'] | |
| is_time_series = any(p in viz_name or p in search_query for p in time_patterns) | |
| if is_time_series and 'chart' in answer: | |
| # Convert to KPI | |
| answer['chart']['type'] = 'KPI' | |
| # Add KPI-specific settings for sparkline and comparison | |
| kpi_settings = { | |
| "showLabel": True, | |
| "showComparison": True, | |
| "showSparkline": True, | |
| "showAnomalies": False, | |
| "showBounds": False, | |
| "customCompare": "PREV_AVAILABLE", | |
| "showOnlyLatestAnomaly": False | |
| } | |
| # Update client_state_v2 with KPI settings | |
| import json as json_module | |
| client_state = answer['chart'].get('client_state_v2', '{}') | |
| try: | |
| cs = json_module.loads(client_state) if client_state else {} | |
| if 'chartProperties' not in cs: | |
| cs['chartProperties'] = {} | |
| if 'chartSpecific' not in cs['chartProperties']: | |
| cs['chartProperties']['chartSpecific'] = {} | |
| cs['chartProperties']['chartSpecific']['customProps'] = json_module.dumps(kpi_settings) | |
| cs['chartProperties']['chartSpecific']['dataFieldArea'] = 'column' | |
| answer['chart']['client_state_v2'] = json_module.dumps(cs) | |
| except: | |
| pass # Keep existing if parsing fails | |
| kpi_count += 1 | |
| print(f" ✓ Converted '{answer.get('name', '?')}' to KPI") | |
| if kpi_count > 0: | |
| print(f" ✅ Converted {kpi_count} visualizations to KPIs with sparklines") | |
| # Re-import fixed TML using authenticated session | |
| import_response = ts_client.session.post( | |
| f"{ts_base_url}/api/rest/2.0/metadata/tml/import", | |
| json={ | |
| 'metadata_tmls': [yaml.dump(liveboard_tml, default_flow_style=False, sort_keys=False)], | |
| 'import_policy': 'PARTIAL', | |
| 'create_new': False | |
| } | |
| ) | |
| if import_response.status_code == 200: | |
| print(f" ✅ Layout fixed successfully!") | |
| else: | |
| print(f" ⚠️ Could not re-import TML: {import_response.status_code}") | |
| else: | |
| print(f" ⚠️ No TML data in export response") | |
| else: | |
| print(f" ⚠️ Could not export TML: {export_response.status_code}") | |
| except Exception as fix_error: | |
| print(f" ⚠️ Layout fix failed: {str(fix_error)}") | |
| # Don't fail the whole operation if post-processing fails | |
| return { | |
| 'success': True, | |
| 'liveboard_name': final_liveboard_name, | |
| 'liveboard_guid': liveboard_guid or 'unknown', | |
| 'liveboard_url': liveboard_url, | |
| 'message': 'Liveboard created successfully via MCP', | |
| 'visualizations_created': len(answers) | |
| } | |
| except ImportError as e: | |
| print(f"❌ ImportError in MCP liveboard creation: {str(e)}") | |
| import traceback | |
| print(f" Traceback: {traceback.format_exc()}") | |
| return { | |
| 'success': False, | |
| 'error': f'MCP dependencies not installed. Run: pip install "mcp>=1.26.0" for streamable HTTP support. Error: {str(e)}' | |
| } | |
| except Exception as e: | |
| print(f"❌ Exception in MCP liveboard creation: {str(e)}") | |
| print(f" Exception type: {type(e).__name__}") | |
| import traceback | |
| print(f" Traceback: {traceback.format_exc()}") | |
| # Extract real errors from ExceptionGroup/TaskGroup | |
| error_msg = str(e) | |
| if hasattr(e, 'exceptions'): | |
| real_errors = [] | |
| for sub_exc in e.exceptions: | |
| print(f" Sub-exception: {type(sub_exc).__name__}: {str(sub_exc)}") | |
| # Check for nested exception groups | |
| if hasattr(sub_exc, 'exceptions'): | |
| for nested_exc in sub_exc.exceptions: | |
| print(f" Nested sub-exception: {type(nested_exc).__name__}: {str(nested_exc)}") | |
| real_errors.append(f"{type(nested_exc).__name__}: {str(nested_exc)}") | |
| else: | |
| real_errors.append(f"{type(sub_exc).__name__}: {str(sub_exc)}") | |
| if real_errors: | |
| error_msg = "; ".join(real_errors) | |
| return { | |
| 'success': False, | |
| 'error': f'MCP liveboard creation failed: {error_msg}' | |
| } | |
| # Run the async function with a hard overall timeout (5 minutes) | |
| MCP_OVERALL_TIMEOUT = 300 # seconds | |
| async def _run_with_timeout(): | |
| return await asyncio.wait_for(_create_mcp_liveboard(), timeout=MCP_OVERALL_TIMEOUT) | |
| try: | |
| print(f"DEBUG: About to call asyncio.run(_create_mcp_liveboard()) [timeout={MCP_OVERALL_TIMEOUT}s]") | |
| result = asyncio.run(_run_with_timeout()) | |
| print(f"DEBUG: asyncio.run() completed, result: {result}") | |
| if _slog and _t_lb is not None: | |
| if result and result.get('success'): | |
| _slog.log_end("liveboard", _t_lb, liveboard_guid=result.get('liveboard_guid', '')) | |
| else: | |
| _slog.log_end("liveboard", _t_lb, error=result.get('error', 'unknown error') if result else 'no result') | |
| return result | |
| except asyncio.TimeoutError: | |
| err = f'MCP liveboard creation timed out after {MCP_OVERALL_TIMEOUT} seconds. ThoughtSpot MCP endpoint may be slow or unresponsive.' | |
| print(f"[ERROR] MCP liveboard creation timed out after {MCP_OVERALL_TIMEOUT}s") | |
| if _slog and _t_lb is not None: | |
| _slog.log_end("liveboard", _t_lb, error=err) | |
| return {'success': False, 'error': err} | |
| except Exception as e: | |
| print(f"[ERROR] Exception in asyncio.run(): {str(e)}") | |
| print(f" Exception type: {type(e).__name__}") | |
| import traceback | |
| print(f" Traceback: {traceback.format_exc()}") | |
| # Check if it's an ExceptionGroup (TaskGroup error) - extract real errors | |
| if hasattr(e, 'exceptions'): | |
| real_errors = [] | |
| for sub_exc in e.exceptions: | |
| print(f" Sub-exception: {type(sub_exc).__name__}: {str(sub_exc)}") | |
| real_errors.append(f"{type(sub_exc).__name__}: {str(sub_exc)}") | |
| err = f'MCP async errors: {"; ".join(real_errors)}' | |
| if _slog and _t_lb is not None: | |
| _slog.log_end("liveboard", _t_lb, error=err) | |
| return {'success': False, 'error': err} | |
| err = f'Failed to run MCP async workflow: {str(e)}' | |
| if _slog and _t_lb is not None: | |
| _slog.log_end("liveboard", _t_lb, error=err) | |
| return {'success': False, 'error': err} | |
| def enhance_mcp_liveboard( | |
| liveboard_guid: str, | |
| company_data: Dict, | |
| ts_client, | |
| add_groups: bool = True, | |
| fix_kpis: bool = True, | |
| apply_brand_colors: bool = True, | |
| add_layout: bool = True | |
| ) -> Dict: | |
| """ | |
| Enhance an MCP-created liveboard with TML post-processing (Golden Demo style). | |
| This is the key function for the HYBRID approach: | |
| - MCP creates the liveboard quickly with AI-driven questions | |
| - This function then polishes it to match the Golden Demo standard | |
| Enhancements applied: | |
| - Add Groups to organize visualizations by business function | |
| - Add Tabs layout for multi-page organization | |
| - Fix KPI visualizations for sparklines and comparisons | |
| - Apply brand colors (liveboard-level and group-level) | |
| - Set border style and hide unnecessary descriptions | |
| - Create proper grid layout for visualizations | |
| Args: | |
| liveboard_guid: GUID of the MCP-created liveboard | |
| company_data: Company research data (name, use_case, etc.) | |
| ts_client: Authenticated ThoughtSpotDeployer instance | |
| add_groups: Whether to add Groups organization | |
| fix_kpis: Whether to fix KPI visualizations for sparklines | |
| apply_brand_colors: Whether to apply brand color styling | |
| add_layout: Whether to add proper tab/tile layout | |
| Returns: | |
| Dict with success, message, enhancements list | |
| """ | |
| print(f"Starting hybrid post-processing for liveboard {liveboard_guid}...", flush=True) | |
| enhancements_applied = [] | |
| try: | |
| # Step 1: Export the liveboard TML | |
| print(f" Exporting liveboard TML...", flush=True) | |
| export_response = ts_client.session.post( | |
| f"{ts_client.base_url}/api/rest/2.0/metadata/tml/export", | |
| json={ | |
| 'metadata': [{'identifier': liveboard_guid}], | |
| 'export_associated': False | |
| } | |
| ) | |
| if export_response.status_code != 200: | |
| return { | |
| 'success': False, | |
| 'message': f'Failed to export liveboard TML: HTTP {export_response.status_code}', | |
| 'enhancements': [] | |
| } | |
| tml_data = export_response.json() | |
| if not tml_data or len(tml_data) == 0: | |
| return { | |
| 'success': False, | |
| 'message': 'Empty TML export response', | |
| 'enhancements': [] | |
| } | |
| # Parse YAML TML | |
| tml_str = tml_data[0].get('edoc', '') | |
| liveboard_tml = yaml.safe_load(tml_str) | |
| if not liveboard_tml or 'liveboard' not in liveboard_tml: | |
| return { | |
| 'success': False, | |
| 'message': 'Invalid TML structure', | |
| 'enhancements': [] | |
| } | |
| visualizations = liveboard_tml.get('liveboard', {}).get('visualizations', []) | |
| print(f" Found {len(visualizations)} visualizations", flush=True) | |
| for _v in visualizations: | |
| _vid = _v.get('id', '?') | |
| _is_note = 'note_tile' in _v | |
| _chart_type = _v.get('answer', {}).get('chart', {}).get('type', 'N/A') if not _is_note else 'note' | |
| _name = _v.get('answer', {}).get('name', '') if not _is_note else '(note tile)' | |
| print(f" viz {_vid}: type={_chart_type} note={_is_note} name='{_name}'", flush=True) | |
| # Step 2: Classify visualizations by type and purpose | |
| kpi_vizs = [] # KPI charts (big numbers, sparklines) | |
| trend_vizs = [] # Line/area charts (time series) | |
| bar_vizs = [] # Column/bar charts | |
| table_vizs = [] # Tables and pivot tables | |
| geo_vizs = [] # Maps | |
| note_vizs = [] # Text tiles | |
| other_vizs = [] # Everything else | |
| viz_details = {} # Store details for layout | |
| for viz in visualizations: | |
| viz_id = viz.get('id', '') | |
| # Check if it's a note tile | |
| if 'note_tile' in viz: | |
| note_vizs.append(viz_id) | |
| viz_details[viz_id] = {'type': 'note', 'name': 'Note'} | |
| continue | |
| # Get chart type from answer | |
| answer = viz.get('answer', {}) | |
| chart = answer.get('chart', {}) | |
| chart_type = chart.get('type', 'UNKNOWN') | |
| viz_name = answer.get('name', viz_id) | |
| viz_details[viz_id] = {'type': chart_type, 'name': viz_name} | |
| if chart_type == 'KPI': | |
| kpi_vizs.append(viz_id) | |
| elif chart_type in ['LINE', 'AREA']: | |
| trend_vizs.append(viz_id) | |
| elif chart_type in ['COLUMN', 'BAR', 'STACKED_COLUMN', 'STACKED_BAR']: | |
| bar_vizs.append(viz_id) | |
| elif chart_type in ['TABLE', 'PIVOT_TABLE']: | |
| table_vizs.append(viz_id) | |
| elif chart_type in ['GEO_AREA', 'GEO_BUBBLE', 'GEO_HEATMAP']: | |
| geo_vizs.append(viz_id) | |
| elif chart_type in ['PIE', 'DONUT', 'TREEMAP', 'SANKEY', 'FUNNEL']: | |
| other_vizs.append(viz_id) | |
| else: | |
| other_vizs.append(viz_id) | |
| print(f" Classification: {len(kpi_vizs)} KPIs, {len(trend_vizs)} trends, {len(bar_vizs)} bars, {len(table_vizs)} tables, {len(note_vizs)} notes", flush=True) | |
| # Step 2.5: Remove note tiles (MCP requires them but we don't want them in final liveboard) | |
| # Safety: only remove note tiles if there are other vizzes to show — never leave liveboard empty | |
| non_note_count = len(visualizations) - len(note_vizs) | |
| if note_vizs and non_note_count > 0: | |
| print(f" Removing {len(note_vizs)} note tile(s)...", flush=True) | |
| original_count = len(visualizations) | |
| liveboard_tml['liveboard']['visualizations'] = [ | |
| v for v in visualizations if v.get('id') not in note_vizs | |
| ] | |
| visualizations = liveboard_tml['liveboard']['visualizations'] | |
| print(f" [OK] Removed note tiles ({original_count} -> {len(visualizations)} visualizations)", flush=True) | |
| enhancements_applied.append(f"Removed {len(note_vizs)} note tile(s)") | |
| elif note_vizs and non_note_count == 0: | |
| print(f" ⚠️ Only note tiles found ({len(note_vizs)}) — MCP may have returned no chart answers. Keeping note tiles to preserve liveboard content.", flush=True) | |
| # Step 3: Add Groups - simplified: just KPI section, rest ungrouped | |
| if add_groups: | |
| print(f" Adding Groups (simplified)...", flush=True) | |
| groups = [] | |
| # Create ONE group: Key Metrics with ALL KPIs (always at top) | |
| if kpi_vizs: | |
| groups.append({ | |
| 'id': 'Group_1', | |
| 'name': 'Key Metrics', | |
| 'visualizations': list(kpi_vizs) # ALL KPIs in the group | |
| }) | |
| print(f" [OK] Added Key Metrics group with {len(kpi_vizs)} KPIs", flush=True) | |
| # Leave trends, bars, tables, etc. ungrouped (no Trends or Analysis groups) | |
| ungrouped_count = len(trend_vizs) + len(bar_vizs) + len(table_vizs) + len(other_vizs) | |
| if ungrouped_count > 0: | |
| print(f" [OK] {ungrouped_count} visualizations left ungrouped", flush=True) | |
| if groups: | |
| liveboard_tml['liveboard']['groups'] = groups | |
| enhancements_applied.append(f"Added {len(groups)} group with KPIs") | |
| # Step 3.5: Convert LINE charts to KPIs (MCP creates LINE instead of KPI) | |
| # ThoughtSpot's MCP doesn't create KPI chart types - convert the first 2 trend charts | |
| if fix_kpis and len(kpi_vizs) == 0 and len(trend_vizs) >= 1: | |
| print(f" Converting LINE charts to KPIs (MCP creates LINE instead of KPI)...", flush=True) | |
| converted_count = 0 | |
| # Convert first 2 LINE/AREA charts to KPIs | |
| for viz_id in trend_vizs[:2]: | |
| for viz in visualizations: | |
| if viz.get('id') != viz_id: | |
| continue | |
| answer = viz.get('answer', {}) | |
| chart = answer.get('chart', {}) | |
| # Change chart type from LINE/AREA to KPI | |
| old_type = chart.get('type', 'LINE') | |
| chart['type'] = 'KPI' | |
| # Move from trend_vizs to kpi_vizs | |
| kpi_vizs.append(viz_id) | |
| converted_count += 1 | |
| viz_name = answer.get('name', viz_id) | |
| print(f" Converted '{viz_name}' from {old_type} to KPI", flush=True) | |
| # Remove converted vizs from trend_vizs | |
| trend_vizs = [v for v in trend_vizs if v not in kpi_vizs] | |
| if converted_count > 0: | |
| enhancements_applied.append(f"Converted {converted_count} LINE charts to KPIs") | |
| print(f" [OK] Converted {converted_count} LINE→KPI", flush=True) | |
| # Update the groups if we just created KPIs | |
| if add_groups and 'groups' in liveboard_tml.get('liveboard', {}): | |
| groups = liveboard_tml['liveboard']['groups'] | |
| if not groups: # No groups yet | |
| groups.append({ | |
| 'id': 'Group_1', | |
| 'name': 'Key Metrics', | |
| 'visualizations': list(kpi_vizs) | |
| }) | |
| liveboard_tml['liveboard']['groups'] = groups | |
| print(f" [OK] Added Key Metrics group with {len(kpi_vizs)} KPIs", flush=True) | |
| # Step 4: Fix KPI visualizations for sparklines | |
| if fix_kpis and kpi_vizs: | |
| print(f" Fixing KPI sparklines...", flush=True) | |
| kpis_fixed = 0 | |
| for viz in visualizations: | |
| if viz.get('id') not in kpi_vizs: | |
| continue | |
| answer = viz.get('answer', {}) | |
| chart = answer.get('chart', {}) | |
| # Parse and update client_state_v2 for KPI settings | |
| client_state_str = chart.get('client_state_v2', '{}') | |
| try: | |
| client_state = json.loads(client_state_str) | |
| except: | |
| client_state = {} | |
| # Ensure structure exists | |
| if 'version' not in client_state: | |
| client_state['version'] = 'V4DOT2' | |
| if 'chartProperties' not in client_state: | |
| client_state['chartProperties'] = {} | |
| if 'chartSpecific' not in client_state['chartProperties']: | |
| client_state['chartProperties']['chartSpecific'] = {} | |
| # Golden Demo KPI settings | |
| kpi_settings = { | |
| 'showLabel': True, | |
| 'showComparison': True, | |
| 'showSparkline': True, | |
| 'showAnomalies': False, # Cleaner look | |
| 'showBounds': False, | |
| 'customCompare': 'PREV_AVAILABLE', | |
| 'showOnlyLatestAnomaly': False | |
| } | |
| client_state['chartProperties']['chartSpecific']['customProps'] = json.dumps(kpi_settings) | |
| client_state['chartProperties']['responsiveLayoutPreference'] = 'AUTO_ON' | |
| # Update the TML | |
| chart['client_state_v2'] = json.dumps(client_state) | |
| kpis_fixed += 1 | |
| if kpis_fixed > 0: | |
| enhancements_applied.append(f"Fixed {kpis_fixed} KPI sparklines") | |
| print(f" [OK] Fixed {kpis_fixed} KPIs", flush=True) | |
| # Step 4b: Convert categorical breakdowns to donut charts with colors | |
| # Look for bar/pie charts with "by category/channel/segment" in the name | |
| print(f" Converting categorical charts to donuts...", flush=True) | |
| donuts_converted = 0 | |
| # Define vibrant donut colors (from Golden Demo) | |
| DONUT_COLORS = [ | |
| '#40C1C0', # Teal | |
| '#9c6ade', # Purple | |
| '#FF8142', # Orange | |
| '#4CAF50', # Green | |
| '#2196F3', # Blue | |
| '#E91E63', # Pink | |
| '#FFC107', # Amber | |
| '#00BCD4', # Cyan | |
| ] | |
| for viz in visualizations: | |
| viz_id = viz.get('id') | |
| answer = viz.get('answer', {}) | |
| viz_name = answer.get('name', '').lower() | |
| chart = answer.get('chart', {}) | |
| chart_type = chart.get('type', '').upper() | |
| # Check if this looks like a categorical breakdown that should be a donut | |
| is_breakdown = any(keyword in viz_name for keyword in | |
| ['by channel', 'by category', 'by segment', 'breakdown', 'by audience', | |
| 'by campaign', 'distribution', 'mix', 'split']) | |
| is_convertible = chart_type in ['PIE', 'BAR', 'COLUMN', 'STACKED_COLUMN'] | |
| if is_breakdown and is_convertible: | |
| # Convert to donut chart | |
| chart['type'] = 'PIE' | |
| # Set donut (ring) style with colors | |
| client_state_str = chart.get('client_state_v2', '{}') | |
| try: | |
| client_state = json.loads(client_state_str) | |
| except: | |
| client_state = {} | |
| if 'version' not in client_state: | |
| client_state['version'] = 'V4DOT2' | |
| if 'chartProperties' not in client_state: | |
| client_state['chartProperties'] = {} | |
| # Donut settings | |
| client_state['chartProperties']['chartSpecific'] = { | |
| 'customProps': json.dumps({ | |
| 'pieStyle': 'RING', # Makes it a donut | |
| 'showLabel': True, | |
| 'showValue': True, | |
| 'showPercent': True | |
| }) | |
| } | |
| # Add custom colors | |
| client_state['chartProperties']['chartLevelColorConfig'] = { | |
| 'colorPalette': { | |
| 'colors': DONUT_COLORS | |
| } | |
| } | |
| chart['client_state_v2'] = json.dumps(client_state) | |
| donuts_converted += 1 | |
| print(f" Converted '{answer.get('name', viz_id)}' to donut chart", flush=True) | |
| if donuts_converted > 0: | |
| enhancements_applied.append(f"Converted {donuts_converted} charts to donuts") | |
| print(f" [OK] Converted {donuts_converted} charts to donuts", flush=True) | |
| # Step 5: Apply brand colors and styling (Golden Demo style) | |
| if apply_brand_colors: | |
| print(f" Applying brand colors...", flush=True) | |
| # Ensure style section exists | |
| if 'style' not in liveboard_tml['liveboard']: | |
| liveboard_tml['liveboard']['style'] = {} | |
| style = liveboard_tml['liveboard']['style'] | |
| # Set liveboard-level style properties (Golden Demo uses these) | |
| if 'style_properties' not in style: | |
| style['style_properties'] = [] | |
| # Add/update liveboard-level properties | |
| lb_props = { | |
| 'lb_border_type': 'CURVED', # Rounded corners | |
| 'lb_brand_color': 'LBC_C', # Green theme | |
| 'hide_group_title': 'false', | |
| 'hide_group_description': 'false', | |
| 'hide_tile_description': 'false' | |
| } | |
| for prop_name, prop_value in lb_props.items(): | |
| existing = next((p for p in style['style_properties'] if p.get('name') == prop_name), None) | |
| if existing: | |
| existing['value'] = prop_value | |
| else: | |
| style['style_properties'].append({'name': prop_name, 'value': prop_value}) | |
| if 'overrides' not in style: | |
| style['overrides'] = [] | |
| # Group colors: Teal, Green, Purple, Orange (matches Golden Demo) | |
| group_colors = ['GBC_B', 'GBC_C', 'GBC_D', 'GBC_E'] | |
| # Apply colors to groups | |
| groups = liveboard_tml.get('liveboard', {}).get('groups', []) | |
| colors_applied = 0 | |
| for i, group in enumerate(groups): | |
| grp_id = group.get('id') | |
| color = group_colors[i % len(group_colors)] | |
| existing = next((o for o in style['overrides'] if o.get('object_id') == grp_id), None) | |
| if not existing: | |
| style['overrides'].append({ | |
| 'object_id': grp_id, | |
| 'style_properties': [ | |
| {'name': 'group_brand_color', 'value': color}, | |
| {'name': 'hide_group_description', 'value': 'true'}, | |
| {'name': 'hide_group_tile_description', 'value': 'true'} | |
| ] | |
| }) | |
| colors_applied += 1 | |
| # Style note tiles | |
| for note_id in note_vizs: | |
| existing = next((o for o in style['overrides'] if o.get('object_id') == note_id), None) | |
| if not existing: | |
| style['overrides'].append({ | |
| 'object_id': note_id, | |
| 'style_properties': [ | |
| {'name': 'tile_brand_color', 'value': 'TBC_I'} | |
| ] | |
| }) | |
| colors_applied += 1 | |
| # Hide descriptions on analysis charts | |
| for viz_id in (bar_vizs + table_vizs + other_vizs): | |
| existing = next((o for o in style['overrides'] if o.get('object_id') == viz_id), None) | |
| if not existing: | |
| style['overrides'].append({ | |
| 'object_id': viz_id, | |
| 'style_properties': [ | |
| {'name': 'hide_tile_description', 'value': 'true'} | |
| ] | |
| }) | |
| if colors_applied > 0: | |
| enhancements_applied.append(f"Applied styling ({colors_applied} colors)") | |
| print(f" [OK] Applied {colors_applied} colors + liveboard styling", flush=True) | |
| # Step 6: Add proper layout with tabs and group_layouts (Golden Demo style) | |
| if add_layout: | |
| print(f" Adding layout structure (tab-based with group_layouts)...", flush=True) | |
| groups = liveboard_tml.get('liveboard', {}).get('groups', []) | |
| all_viz_ids = kpi_vizs + trend_vizs + bar_vizs + geo_vizs + table_vizs + other_vizs | |
| # Collect grouped visualization IDs | |
| grouped_vizs = set() | |
| for group in groups: | |
| grouped_vizs.update(group.get('visualizations', [])) | |
| # Ungrouped visualizations (everything not in a group) | |
| ungrouped = [v for v in all_viz_ids if v not in grouped_vizs] | |
| # === Build tab tiles: groups FIRST (KPIs at top), then ungrouped below === | |
| tab_tiles = [] | |
| row = 0 | |
| # Add KPI group at the very top spanning full width | |
| for group in groups: | |
| grp_id = group.get('id') | |
| grp_vizs = group.get('visualizations', []) | |
| # KPI group spans full width at top | |
| group_height = max(3, (len(grp_vizs) + 1) // 2 * 3) # 3 rows per pair | |
| tab_tiles.append({ | |
| 'visualization_id': grp_id, | |
| 'x': 0, | |
| 'y': row, | |
| 'height': group_height, | |
| 'width': 12 # Full width for KPI group | |
| }) | |
| row += group_height | |
| # Add ungrouped visualizations in 2-column grid below groups | |
| for i, viz_id in enumerate(ungrouped): | |
| tab_tiles.append({ | |
| 'visualization_id': viz_id, | |
| 'x': 0 if i % 2 == 0 else 6, | |
| 'y': row + (i // 2) * 5, | |
| 'height': 5, | |
| 'width': 6 | |
| }) | |
| # === Build group_layouts: define tile positions INSIDE each group === | |
| group_layouts = [] | |
| for group in groups: | |
| grp_id = group.get('id') | |
| grp_vizs = group.get('visualizations', []) | |
| grp_tiles = [] | |
| for j, gviz_id in enumerate(grp_vizs): | |
| # KPIs: 2-column layout inside group, each 3 rows tall | |
| grp_tiles.append({ | |
| 'visualization_id': gviz_id, | |
| 'x': 0 if j % 2 == 0 else 6, | |
| 'y': (j // 2) * 3, | |
| 'height': 3, | |
| 'width': 6 | |
| }) | |
| group_layouts.append({ | |
| 'id': grp_id, | |
| 'tiles': grp_tiles | |
| }) | |
| # === Assemble the layout with tabs structure === | |
| if tab_tiles: | |
| tab = { | |
| 'name': 'Overview', | |
| 'description': '', | |
| 'tiles': tab_tiles | |
| } | |
| if group_layouts: | |
| tab['group_layouts'] = group_layouts | |
| liveboard_tml['liveboard']['layout'] = { | |
| 'tabs': [tab] | |
| } | |
| total_tiles = len(tab_tiles) + sum(len(gl.get('tiles', [])) for gl in group_layouts) | |
| enhancements_applied.append(f"Added tab layout ({total_tiles} tiles, {len(group_layouts)} group layouts)") | |
| print(f" [OK] Added tab layout: {len(tab_tiles)} top-level tiles, {len(group_layouts)} group layouts", flush=True) | |
| # Step 7: Re-import the enhanced TML | |
| if enhancements_applied: | |
| print(f" Re-importing enhanced TML...", flush=True) | |
| import_response = ts_client.session.post( | |
| f"{ts_client.base_url}/api/rest/2.0/metadata/tml/import", | |
| json={ | |
| 'metadata_tmls': [yaml.dump(liveboard_tml, default_flow_style=False, sort_keys=False)], | |
| 'import_policy': 'PARTIAL', | |
| 'create_new': False | |
| } | |
| ) | |
| if import_response.status_code == 200: | |
| result = import_response.json() | |
| if result and len(result) > 0: | |
| status = result[0].get('response', {}).get('status', {}) | |
| if status.get('status_code') == 'OK': | |
| print(f" [OK] Enhancement complete!", flush=True) | |
| return { | |
| 'success': True, | |
| 'message': f'Enhanced liveboard with {len(enhancements_applied)} improvements', | |
| 'enhancements': enhancements_applied | |
| } | |
| else: | |
| error_msg = status.get('error_message', 'Unknown TML import error') | |
| print(f" [WARN] TML import warning: {error_msg}", flush=True) | |
| return { | |
| 'success': True, | |
| 'message': f'Enhancement applied with warnings: {error_msg}', | |
| 'enhancements': enhancements_applied | |
| } | |
| else: | |
| print(f" [WARN] Re-import failed: HTTP {import_response.status_code}", flush=True) | |
| return { | |
| 'success': False, | |
| 'message': f'TML re-import failed: HTTP {import_response.status_code}', | |
| 'enhancements': enhancements_applied | |
| } | |
| else: | |
| print(f" No enhancements needed", flush=True) | |
| return { | |
| 'success': True, | |
| 'message': 'No enhancements needed', | |
| 'enhancements': [] | |
| } | |
| except Exception as e: | |
| print(f" [ERROR] Enhancement error: {str(e)}", flush=True) | |
| import traceback | |
| print(f" Traceback: {traceback.format_exc()}", flush=True) | |
| return { | |
| 'success': False, | |
| 'message': f'Enhancement failed: {str(e)}', | |
| 'enhancements': enhancements_applied | |
| } | |
| return { | |
| 'success': True, | |
| 'message': f'Successfully enhanced liveboard', | |
| 'enhancements': enhancements_applied | |
| } | |