demoprep / liveboard_creator.py
mikeboone's picture
feat: comprehensive logging + pipeline status cleanup
d4c98c6
"""
Liveboard Creator Module
Creates ThoughtSpot Liveboards from company research data and existing models.
Uses reference TML files as templates and AI to generate appropriate visualizations.
PHASE 1 ENHANCEMENTS:
- Outlier-driven visualization generation
- Natural language to ThoughtSpot search query translation
- Intelligent chart type inference
- Support for diverse chart types (12+ types)
"""
import json
import yaml
import os
from supabase_client import get_admin_setting
import re
import requests
from typing import Dict, List, Optional
from llm_config import DEFAULT_LLM_MODEL, map_llm_display_to_provider
from dotenv import load_dotenv
load_dotenv()
# Cache for direct API auth token
_direct_api_token = None
_direct_api_session = None
def _clean_viz_title(title: str) -> str:
"""
Clean up visualization titles to be more readable.
Examples:
'shipping_cost by month last 18 months' → 'Shipping Cost by Month'
'Top 15 product_name by quantity_shipped' → 'Top 15 Products by Quantity Shipped'
'total_revenue weekly' → 'Total Revenue Weekly'
"""
if not title:
return title
# Remove date filter suffixes
date_filters = [
' last 18 months', ' last 12 months', ' last 2 years', ' last year',
' last 6 months', ' last 3 months', ' last 30 days', ' last 90 days'
]
for filter_str in date_filters:
if title.lower().endswith(filter_str):
title = title[:-len(filter_str)]
# Replace underscores with spaces
title = title.replace('_', ' ')
# Clean up common column name patterns
replacements = {
'product name': 'Products',
'supplier name': 'Suppliers',
'warehouse name': 'Warehouses',
'customer name': 'Customers',
'brand name': 'Brands',
'store name': 'Stores',
'category name': 'Categories',
'region name': 'Regions',
}
title_lower = title.lower()
for old, new in replacements.items():
if old in title_lower:
# Case-insensitive replace
import re
title = re.sub(re.escape(old), new, title, flags=re.IGNORECASE)
# Title case the result, but preserve words like 'by', 'vs', 'and'
words = title.split()
result = []
small_words = {'by', 'vs', 'and', 'or', 'the', 'a', 'an', 'of', 'in', 'on', 'to'}
for i, word in enumerate(words):
if i == 0 or word.lower() not in small_words:
result.append(word.capitalize())
else:
result.append(word.lower())
return ' '.join(result)
def _get_direct_api_session(username: str, secret_key: str):
"""
Create an authenticated session for direct ThoughtSpot API calls.
Uses trusted auth — authenticates as the given username.
Returns (session, token) tuple, or (None, None) on failure.
"""
base_url = get_admin_setting('THOUGHTSPOT_URL').rstrip('/')
if not all([base_url, username, secret_key]):
print(f"⚠️ Direct API: Missing settings (URL={bool(base_url)}, USER={bool(username)}, KEY={bool(secret_key)})")
return None, None
# Get auth token
auth_url = f"{base_url}/api/rest/2.0/auth/token/full"
resp = requests.post(auth_url, json={
"username": username,
"secret_key": secret_key,
"validity_time_in_sec": 3600
})
if resp.status_code != 200:
print(f"⚠️ Direct API auth failed: {resp.status_code}")
return None, None
token = resp.json().get('token')
if not token:
print(f"⚠️ Direct API: No token in response")
return None, None
session = requests.Session()
session.headers['Authorization'] = f'Bearer {token}'
session.headers['Content-Type'] = 'application/json'
session.headers['Accept'] = 'application/json'
print(f"✅ Direct API: Authenticated as {username} to {base_url}")
return session, token
def _get_answer_direct(question: str, model_id: str, username: str, secret_key: str) -> dict:
"""
Get an answer from ThoughtSpot using direct API call instead of MCP.
This bypasses MCP's agent.thoughtspot.app proxy which may have issues
with certain clusters (e.g., Spotter Agent v3 clusters).
Args:
question: Natural language question
model_id: ThoughtSpot model GUID
username: ThoughtSpot username to authenticate as
Returns:
dict with answer data including session_identifier, tokens, question, etc.
Returns None on failure.
"""
session, _ = _get_direct_api_session(username, secret_key)
if not session:
return None
base_url = get_admin_setting('THOUGHTSPOT_URL').rstrip('/')
url = f"{base_url}/api/rest/2.0/ai/answer/create"
payload = {
"query": question,
"metadata_identifier": model_id
}
try:
resp = session.post(url, json=payload, timeout=60)
if resp.status_code == 200:
data = resp.json()
print(f" [direct API] HTTP 200 — keys: {list(data.keys())}", flush=True)
print(f" [direct API] session_identifier: {data.get('session_identifier')}", flush=True)
print(f" [direct API] visualization_type: {data.get('visualization_type')}", flush=True)
print(f" [direct API] message_type: {data.get('message_type')}", flush=True)
has_tokens = bool(data.get('tokens'))
print(f" [direct API] tokens present: {has_tokens}", flush=True)
return {
'question': question,
'session_identifier': data.get('session_identifier'),
'tokens': data.get('tokens'),
'display_tokens': data.get('display_tokens'),
'visualization_type': data.get('visualization_type', 'Chart'),
'generation_number': data.get('generation_number', 1),
'message_type': data.get('message_type', 'TSAnswer')
}
else:
error_msg = resp.text[:500] if resp.text else f"Status {resp.status_code}"
print(f" ⚠️ Direct API answer FAILED — HTTP {resp.status_code}: {error_msg}", flush=True)
return None
except Exception as e:
print(f" ⚠️ Direct API exception: {str(e)}")
return None
def _strip_chart_type_hints(text: str) -> str:
"""
Strip chart type hints like (line chart), (bar chart), (KPI) from question text.
These hints help AI infer chart type but shouldn't appear in visualization titles.
Examples:
"Monthly sales - last 12 months (line chart)" → "Monthly sales - last 12 months"
"Top 10 products (bar chart)" → "Top 10 products"
"Total revenue (KPI)" → "Total revenue"
"Sales by region - bar chart" → "Sales by region"
"""
# Pattern matches common chart type hints - both (in parens) and - dash prefix
patterns = [
# Parentheses format
r'\s*\(line chart\)\s*$',
r'\s*\(bar chart\)\s*$',
r'\s*\(horizontal bar chart\)\s*$',
r'\s*\(stacked column chart\)\s*$',
r'\s*\(stacked column\)\s*$',
r'\s*\(stacked bar chart\)\s*$',
r'\s*\(stacked bar\)\s*$',
r'\s*\(column chart\)\s*$',
r'\s*\(pie chart\)\s*$',
r'\s*\(donut chart\)\s*$',
r'\s*\(area chart\)\s*$',
r'\s*\(scatter chart\)\s*$',
r'\s*\(geo map\)\s*$',
r'\s*\(table\)\s*$',
r'\s*\(KPI\)\s*$',
r'\s*\(single number\)\s*$',
# Dash format (no parens)
r'\s*-\s*line chart\s*$',
r'\s*-\s*bar chart\s*$',
r'\s*-\s*horizontal bar chart\s*$',
r'\s*-\s*stacked column chart\s*$',
r'\s*-\s*stacked column\s*$',
r'\s*-\s*stacked bar chart\s*$',
r'\s*-\s*stacked bar\s*$',
r'\s*-\s*column chart\s*$',
r'\s*-\s*pie chart\s*$',
r'\s*-\s*donut chart\s*$',
r'\s*-\s*area chart\s*$',
r'\s*-\s*KPI\s*$',
]
result = text
for pattern in patterns:
result = re.sub(pattern, '', result, flags=re.IGNORECASE)
return result.strip()
# Chart type definitions with use cases
CHART_TYPES = {
'KPI': {
'best_for': 'single metric with trend, sparkline, and comparison',
'requires': 'one measure, optional time dimension',
'ts_type': 'KPI'
},
'LINE': {
'best_for': 'trends over time, showing patterns',
'requires': 'measure + time dimension',
'ts_type': 'LINE'
},
'COLUMN': {
'best_for': 'comparing categories, discrete values',
'requires': 'measure + 1-2 dimensions',
'ts_type': 'COLUMN'
},
'BAR': {
'best_for': 'ranking, top N lists, horizontal comparison',
'requires': 'measure + dimension',
'ts_type': 'BAR'
},
'STACKED_COLUMN': {
'best_for': 'part-to-whole over categories',
'requires': 'measure + 2 dimensions (x-axis + color)',
'ts_type': 'STACKED_COLUMN'
},
'AREA': {
'best_for': 'cumulative trends over time',
'requires': 'measure + time dimension',
'ts_type': 'AREA'
},
'PIE': {
'best_for': 'simple part-to-whole for few categories',
'requires': 'measure + dimension (max 7 values)',
'ts_type': 'PIE'
},
'SCATTER': {
'best_for': 'correlation between two measures',
'requires': '2 measures + optional category dimension',
'ts_type': 'SCATTER'
},
'TABLE': {
'best_for': 'detailed data with multiple columns',
'requires': 'any combination of measures and dimensions',
'ts_type': 'GRID_TABLE' # ThoughtSpot uses GRID_TABLE, not TABLE
},
'GEO_AREA': {
'best_for': 'geographic regions on a map',
'requires': 'measure + geographic dimension (region, country, state)',
'ts_type': 'GEO_AREA'
},
'GEO_BUBBLE': {
'best_for': 'geographic points with size',
'requires': 'measure + lat/long or city',
'ts_type': 'GEO_BUBBLE'
},
'PIVOT_TABLE': {
'best_for': 'multi-dimensional data breakdown with drill-down',
'requires': 'multiple dimensions + measures',
'ts_type': 'PIVOT_TABLE'
},
'SANKEY': {
'best_for': 'flow visualization between categories',
'requires': '2+ dimensions showing hierarchy + measure',
'ts_type': 'SANKEY'
},
'BUBBLE': {
'best_for': 'three-variable comparison (x, y, size)',
'requires': '2 measures + size measure + optional dimension',
'ts_type': 'BUBBLE'
},
'TREEMAP': {
'best_for': 'hierarchical data with size comparison',
'requires': 'dimension hierarchy + measure',
'ts_type': 'TREEMAP'
},
'TEXT': {
'best_for': 'text descriptions, titles, annotations with colored backgrounds',
'requires': 'no data, just static text content',
'ts_type': 'TEXT'
},
'HEATMAP': {
'best_for': 'matrix visualization with color intensity',
'requires': '2 dimensions + measure for color',
'ts_type': 'HEATMAP'
}
}
class OutlierParser:
"""Parse demo outliers from population scripts"""
@staticmethod
def parse_demo_outliers(population_script: str) -> List[Dict]:
"""
Extract structured outlier comments from population script
Format expected:
# DEMO_OUTLIER: High-Value Customers at Risk
# INSIGHT: Top 5 customers (>$50K LTV) showing declining satisfaction
# SHOW_ME: "Show customers where lifetime_value > 50000 and satisfaction < 3"
# IMPACT: $250K annual revenue at risk
# TALKING_POINT: Notice how ThoughtSpot instantly surfaces...
Returns:
List of outlier dictionaries
"""
outliers = []
# Multi-line regex to capture structured comments
pattern = r'# DEMO_OUTLIER: (.*?)\n# INSIGHT: (.*?)\n# SHOW_ME: ["\']?(.*?)["\']?\n# IMPACT: (.*?)\n# TALKING_POINT: (.*?)(?:\n|$)'
matches = re.findall(pattern, population_script, re.MULTILINE | re.DOTALL)
for match in matches:
outliers.append({
'title': match[0].strip(),
'insight': match[1].strip(),
'show_me_query': match[2].strip(),
'impact': match[3].strip(),
'talking_point': match[4].strip()
})
return outliers
def clean_viz_title(question: str) -> str:
"""
Extract a clean, short title from a verbose question
Examples:
"What is the total revenue? Show only..." -> "Total Revenue"
"Show the top 10 products by revenue..." -> "Top 10 Products by Revenue"
"""
# Remove instructions after question mark or period
question = question.split('?')[0].strip()
question = question.split('.')[0].strip()
# Common patterns to clean
patterns = [
(r'^What is the ', ''),
(r'^What are the ', ''),
(r'^Show me the ', ''),
(r'^Show me ', ''),
(r'^Show the ', ''),
(r'^Show ', ''),
(r'^Create a detailed table showing ', ''),
(r'^How (?:does|do) ', ''),
(r' for Amazon\.com', ''),
(r' for the company', ''),
]
clean = question
for pattern, replacement in patterns:
clean = re.sub(pattern, replacement, clean, flags=re.IGNORECASE)
# Capitalize first letter
if clean:
clean = clean[0].upper() + clean[1:]
# Limit length
if len(clean) > 80:
clean = clean[:77] + "..."
return clean or question[:80]
def extract_brand_colors_from_css(css_url: str) -> List[str]:
"""Extract color codes from a CSS file, filtering for brand-appropriate colors"""
try:
response = requests.get(css_url, timeout=5)
if response.status_code == 200:
css_content = response.text
# Find all hex colors
hex_colors = re.findall(r'#([0-9A-Fa-f]{6}|[0-9A-Fa-f]{3})\b', css_content)
# Convert 3-digit to 6-digit and calculate brightness
color_data = []
for color in hex_colors:
if len(color) == 3:
color = ''.join([c*2 for c in color])
# Calculate brightness
r, g, b = int(color[0:2], 16), int(color[2:4], 16), int(color[4:6], 16)
brightness = (r * 299 + g * 587 + b * 114) / 1000
# Filter out too dark (< 30) or too light (> 240) colors
# These are usually backgrounds, not brand colors
if 30 < brightness < 240:
# Calculate saturation
max_c = max(r, g, b)
min_c = min(r, g, b)
saturation = (max_c - min_c) / max_c if max_c > 0 else 0
color_data.append({
'color': f'#{color}',
'brightness': brightness,
'saturation': saturation
})
# Deduplicate
seen = set()
unique_colors = []
for c in color_data:
if c['color'] not in seen:
seen.add(c['color'])
unique_colors.append(c)
# Sort by saturation (vibrant colors first) then brightness
unique_colors.sort(key=lambda x: (x['saturation'], x['brightness']), reverse=True)
# Take top 5 most vibrant colors
brand_colors = [c['color'] for c in unique_colors[:5]]
return brand_colors if brand_colors else None
except:
pass
return None
def create_branded_note_tile(company_data: Dict, use_case: str, answers: List[Dict],
source_info: List[str], outlier_section: str = "") -> str:
"""
Create a branded note tile matching golden demo format
Uses ThoughtSpot's native theme classes for proper rendering
Args:
company_data: Company information including logo_url and brand_colors
use_case: Use case name
answers: List of visualization answers
source_info: List of source information strings
outlier_section: HTML for outlier highlights section
Returns:
HTML string for the note tile
"""
# Extract company info
company_name = company_data.get('name', 'Company')
use_case_display = use_case if use_case else company_data.get('use_case', 'Analytics')
viz_count = len(answers)
# Cleaner note tile - simpler format like golden demo
note_tile = f"""<h2 class="theme-module__editor-h2" dir="ltr"><span style="color: rgb(255, 255, 255); white-space: pre-wrap;">{company_name} {use_case_display}</span></h2><hr><p class="theme-module__editor-paragraph" dir="ltr"><span style="color: rgb(255, 255, 255); white-space: pre-wrap;">Featuring </span><b><strong class="theme-module__editor-text-bold" style="color: rgb(64, 193, 192); white-space: pre-wrap;">{viz_count} visualizations</strong></b><span style="color: rgb(255, 255, 255); white-space: pre-wrap;"> powered by ThoughtSpot AI</span></p><p class="theme-module__editor-paragraph"><br></p><p class="theme-module__editor-paragraph" dir="ltr"><span style="color: rgb(255, 255, 255); white-space: pre-wrap;">Built with </span><b><strong class="theme-module__editor-text-bold" style="color: rgb(64, 193, 192); white-space: pre-wrap;">Demo Wire</strong></b></p>"""
return note_tile
class QueryTranslator:
"""Translate natural language queries to ThoughtSpot search syntax"""
def __init__(self, llm_researcher):
self.llm_researcher = llm_researcher
def convert_natural_to_search(
self,
natural_query: str,
model_columns: List[Dict]
) -> str:
"""
Convert natural language to ThoughtSpot search query
Args:
natural_query: Natural language query (e.g., "Show customers where lifetime_value > 50000")
model_columns: Available columns from model
Returns:
ThoughtSpot search query string
"""
# Extract column names (exact names required)
column_names = [col.get('name', '') for col in model_columns if col.get('name')]
columns_text = ", ".join(sorted(column_names))
prompt = f"""Convert this natural language query to ThoughtSpot search syntax.
Natural Query: {natural_query}
AVAILABLE COLUMNS (use EXACT names):
{columns_text}
CRITICAL RULES:
1. You MUST use EXACT column names from the list above - no fuzzy matching or abbreviations
2. If query mentions "product", find the EXACT column name (e.g., "Productname" or "prodProductid")
3. If query mentions "customer", find the EXACT column name (e.g., "Name", "custCustomerid")
4. If query mentions "location", find the EXACT column name (e.g., "Storename", "locaLocationid")
5. ThoughtSpot auto-aggregates - DO NOT use sum(), count(), avg() functions
6. Use [column name] syntax with exact column name: [Productname] not [product]
7. TIME GRANULARITY: NEVER use bare "weekly", "monthly", "daily" as standalone tokens.
Always attach granularity to the date column: [Date Column].weekly, [Date Column].monthly
Example: "sales weekly" → "[Sales Amount] [Order Date].weekly"
Example: "revenue by month" → "[Revenue] [Transaction Date].monthly"
8. If no date column is needed, just use the measure and dimension columns.
Examples:
- "Show products where stock > 900" → "[Productname] [Stocklevel] [Stocklevel] > 900"
- "Show customers where lifetime value > 50000" → "[Name] [Lifetimevalue] [Lifetimevalue] > 50000"
- "Show sales by region" → "[Salesamount] [Region]"
- "Show ASP weekly trend" → "[Asp Amount] [Order Date].weekly"
- "Revenue by month last 12 months" → "[Revenue] [Date].monthly [Date].'last 12 months'"
Convert the natural query above to ThoughtSpot search syntax using EXACT column names.
Return ONLY the search query string, nothing else."""
try:
messages = [{"role": "user", "content": prompt}]
search_query = self.llm_researcher.make_request(messages, temperature=0.1, max_tokens=200, stream=False).strip()
# Remove any quotes or extra formatting
search_query = search_query.strip('"\'`')
return search_query
except Exception as e:
print(f"⚠️ Error converting query: {e}")
# Fallback: try simple pattern matching
return self._fallback_conversion(natural_query, model_columns)
def _fallback_conversion(self, natural_query: str, model_columns: List[Dict]) -> str:
"""Simple fallback conversion if AI fails"""
# Extract column names that appear in the query
query_lower = natural_query.lower()
found_columns = []
for col in model_columns:
col_name = col.get('name', '')
if col_name.lower().replace('_', ' ') in query_lower:
found_columns.append(f"[{col_name}]")
if found_columns:
return " ".join(found_columns)
return "[data]" # Minimal fallback
def infer_chart_type(
self,
search_query: str,
model_columns: List[Dict],
outlier_context: Optional[Dict] = None
) -> str:
"""
Infer best chart type for a search query
Args:
search_query: ThoughtSpot search query
model_columns: Available model columns
outlier_context: Optional outlier context for better inference
Returns:
Chart type name (e.g., 'KPI', 'LINE', 'COLUMN')
"""
# Build context for AI
chart_types_desc = "\n".join([
f"- {name}: {info['best_for']} (requires: {info['requires']})"
for name, info in CHART_TYPES.items()
])
context = f"Search Query: {search_query}"
if outlier_context:
context += f"\nOutlier Context: {outlier_context.get('title')} - {outlier_context.get('insight')}"
prompt = f"""{context}
Available Chart Types:
{chart_types_desc}
What is the BEST chart type for this query?
Guidelines:
- If showing a single number or metric over time → KPI
- If showing trend over time (date/month/year) → LINE or AREA
- If comparing categories or groups → COLUMN or BAR
- If showing part-to-whole → PIE or STACKED_COLUMN
- If showing correlation between two numbers → SCATTER
- If showing detailed data → TABLE
Return ONLY the chart type name (e.g., "LINE" or "COLUMN"), nothing else."""
try:
messages = [{"role": "user", "content": prompt}]
chart_type = self.llm_researcher.make_request(messages, temperature=0.1, max_tokens=20, stream=False).strip().upper()
# Validate it's a known type
if chart_type in CHART_TYPES:
return chart_type
# If not valid, return default
return 'COLUMN'
except Exception as e:
print(f"⚠️ Error inferring chart type: {e}")
return 'COLUMN' # Safe default
class LiveboardCreator:
"""Create Liveboards from company research and model metadata"""
def __init__(self, ts_client, model_id: str, model_name: str, llm_model: str = None):
"""
Initialize LiveboardCreator
Args:
ts_client: ThoughtSpotDeployer instance (already authenticated)
model_id: GUID of the ThoughtSpot model to build Liveboard on
model_name: Display name of the model
llm_model: LLM model to use (e.g., 'gpt-5.4', 'gpt-5.2')
"""
self.ts_client = ts_client
self.model_id = model_id
self.model_name = model_name
self.model_columns = self._fetch_model_columns()
# Use selected LLM model instead of hardcoded OpenAI
from main_research import MultiLLMResearcher
model_to_use = llm_model or DEFAULT_LLM_MODEL
provider_name, model_name_llm = map_llm_display_to_provider(model_to_use)
self.llm_researcher = MultiLLMResearcher(provider=provider_name, model=model_name_llm)
self.query_translator = QueryTranslator(self.llm_researcher)
def _fetch_model_columns(self) -> List[Dict]:
"""Get available columns from the model via TML export"""
try:
import yaml
response = self.ts_client.session.post(
f"{self.ts_client.base_url}/api/rest/2.0/metadata/tml/export",
headers=self.ts_client.headers,
json={"metadata": [{"identifier": self.model_id}]}
)
if response.status_code == 200:
result = response.json()
if isinstance(result, list) and len(result) > 0:
edoc = result[0].get('edoc')
if edoc:
tml = yaml.safe_load(edoc)
# Get columns from model TML
if 'model' in tml:
columns = tml['model'].get('columns', [])
# Convert to simpler format
return [{
'name': col.get('name'),
'type': col.get('properties', {}).get('column_type', 'ATTRIBUTE'),
'column_id': col.get('column_id')
} for col in columns]
print(f"Warning: Could not parse model columns")
return []
else:
print(f"Warning: Could not fetch model TML: {response.status_code}")
return []
except Exception as e:
print(f"Error fetching model columns: {e}")
import traceback
traceback.print_exc()
return []
def load_reference_tml(self, reference_type: str = "simple") -> Dict:
"""
Load reference TML from project folders
Args:
reference_type: 'simple' or 'demogold'
- simple: dev_notes/tml_examples/liveboard_simple/Global Retail Apparel Sales.liveboard.tml
- demogold: dev_notes/tml_examples/liveboard_demogold/rl test.liveboard.tml
Returns:
Parsed TML dictionary
"""
if reference_type == "simple":
tml_path = "dev_notes/tml_examples/liveboard_simple/Global Retail Apparel Sales.liveboard.tml"
else:
tml_path = "dev_notes/tml_examples/liveboard_demogold/rl test.liveboard.tml"
try:
with open(tml_path, 'r') as f:
return yaml.safe_load(f)
except Exception as e:
print(f"Error loading reference TML: {e}")
return None
def generate_search_query(self, viz_config: Dict) -> str:
"""
Generate ThoughtSpot search query from visualization intent
Examples from golden demo TML:
- KPI: "[sales] [date].weekly [date].'last 8 quarters'" (note: date appears TWICE!)
- KPI: "[sales] [date].monthly [date].'last 8 quarters'"
- Top N: "[sales] top 1 [product] [date].'last quarter'"
- Compare: "[sales] by [city] [date].2023 vs [date].2024"
IMPORTANT FOR KPIs WITH SPARKLINES:
- Must include time_column with granularity (e.g., [date].monthly)
- Should include a time filter (e.g., [date].'last 8 quarters')
- The time column appears twice: once for granularity, once for filter
Args:
viz_config: Dictionary with keys:
- measure: Measure column name
- time_column: Optional time dimension
- granularity: Optional time granularity (daily, weekly, monthly)
- dimensions: List of grouping dimensions
- filters: List of filter expressions
- top_n: Optional top N limit
- comparison: Optional comparison (e.g., "2023 vs 2024")
Returns:
ThoughtSpot search query string
"""
query_parts = []
chart_type = viz_config.get('chart_type', 'COLUMN')
# Add measure
if 'measure' in viz_config:
query_parts.append(f"[{viz_config['measure']}]")
# Add top N if specified (goes right after measure)
if 'top_n' in viz_config:
query_parts.append(f"top {viz_config['top_n']}")
# Add time dimension with granularity
time_column = viz_config.get('time_column')
granularity = viz_config.get('granularity')
if time_column:
time_part = f"[{time_column}]"
if granularity:
time_part += f".{granularity}" # .daily, .weekly, .monthly
# Add comparison if specified
if 'comparison' in viz_config:
time_part += f" {viz_config['comparison']}" # "2023 vs 2024"
query_parts.append(time_part)
# Add grouping dimensions
if 'dimensions' in viz_config:
for dim in viz_config['dimensions']:
query_parts.append(f"[{dim}]")
# Add filters
filters = viz_config.get('filters', [])
# For KPIs with time_column but no time filter, add a default time filter
# This is CRITICAL for sparklines and comparisons to work!
if chart_type == 'KPI' and time_column and granularity:
has_time_filter = any(time_column.lower() in f.lower() for f in filters)
if not has_time_filter:
# Add default time filter based on granularity
default_time_filter = {
'daily': f"[{time_column}].'last 30 days'",
'weekly': f"[{time_column}].'last 8 quarters'",
'monthly': f"[{time_column}].'last 8 quarters'",
'quarterly': f"[{time_column}].'last 2 years'",
'yearly': f"[{time_column}].'last 5 years'"
}
filters = list(filters) + [default_time_filter.get(granularity, f"[{time_column}].'last 12 months'")]
for filter_spec in filters:
# Filters should already be in format: "[date].'last quarter'"
query_parts.append(filter_spec)
return " ".join(query_parts)
def build_advanced_query(self, measures: List[str], dimensions: List[str],
chart_type: str, time_config: Dict = None,
filters: List[str] = None) -> str:
"""
Build advanced ThoughtSpot search query with time comparisons and filters
Args:
measures: List of measure columns
dimensions: List of dimension columns
chart_type: Type of chart for context
time_config: Optional time configuration:
- granularity: 'weekly', 'monthly', 'quarterly', 'yearly'
- comparison: 'vs_previous', 'vs_year_ago'
- range: 'last 12 months', 'this year', 'last quarter'
filters: Optional list of filter conditions
Returns:
ThoughtSpot search query string
"""
query_parts = []
# Handle special chart types that need different query structures
if chart_type == 'PIVOT_TABLE' and len(dimensions) > 1:
# For pivot tables, structure as: [measures] [dim1] [dim2] [time]
for measure in measures:
query_parts.append(f"sum [{measure}]")
# Add time granularity if specified
if time_config and 'granularity' in time_config:
date_col = self.map_semantic_to_column('date') or 'date'
query_parts.append(f"[{date_col}].{time_config['granularity']}")
# Add dimensions
for dim in dimensions:
query_parts.append(f"[{dim}]")
elif chart_type == 'SANKEY' and len(dimensions) >= 2:
# For Sankey, need hierarchical dimensions
for measure in measures:
query_parts.append(f"sum [{measure}]")
# Add dimensions in hierarchical order
for dim in dimensions[:3]: # Limit to 3 levels
query_parts.append(f"[{dim}]")
elif chart_type in ['BUBBLE', 'SCATTER'] and len(measures) >= 2:
# For bubble/scatter, need multiple measures
for measure in measures[:3]: # x, y, and optionally size
query_parts.append(f"sum [{measure}]")
# Add category dimension if present
if dimensions:
query_parts.append("by")
query_parts.append(f"[{dimensions[0]}]")
else:
# Standard query building with aggregation
for measure in measures:
# Check if growth calculation is needed
if time_config and time_config.get('growth'):
query_parts.append(f"growth of [{measure}]")
else:
query_parts.append(f"sum [{measure}]")
# Add dimensions with "by" keyword
if dimensions and chart_type != 'KPI':
query_parts.append("by")
for dim in dimensions:
query_parts.append(f"[{dim}]")
# Add time comparisons (e.g., [date].2023 vs [date].2022)
if time_config:
date_col = self.map_semantic_to_column('date') or 'date'
if 'comparison' in time_config:
if time_config['comparison'] == 'vs_year_ago':
current_year = 2024 # Could make this dynamic
query_parts.append(f"[{date_col}].{current_year} vs [{date_col}].{current_year-1}")
elif time_config['comparison'] == 'vs_previous':
query_parts.append(f"[{date_col}].'this quarter' vs [{date_col}].'last quarter'")
# Add time range filter
if 'range' in time_config:
query_parts.append(f"[{date_col}].'{time_config['range']}'")
# Add any additional filters
if filters:
for filter_condition in filters:
query_parts.append(filter_condition)
# Add top N for non-geographic dimensions
if dimensions and chart_type in ['COLUMN', 'BAR']:
is_geo = any(geo_term in dim.lower() for dim in dimensions
for geo_term in ['region', 'country', 'state', 'city', 'location'])
if not is_geo:
query_parts.append("top 10")
return ' '.join(query_parts)
def map_semantic_to_column(self, semantic_type: str) -> Optional[str]:
"""
Map semantic type to actual column name using AI
Args:
semantic_type: Semantic description like 'sales_amount', 'product_name', 'customer_name'
Returns:
Actual column name from model, or None if no good match
Examples:
'sales_amount' → 'Salesamount'
'product_name' → 'Productname'
'customer_name' → 'Name'
'geographic_region' → 'Region'
"""
# Get available column names
column_names = [col.get('name', '') for col in self.model_columns if col.get('name')]
columns_text = ", ".join(sorted(column_names))
prompt = f"""Find the best matching column name for this semantic type.
Semantic Type: {semantic_type}
Available Columns: {columns_text}
MATCHING RULES:
1. Look for exact or partial matches (e.g., 'sales_amount' matches 'Salesamount' or 'Sales')
2. Consider common abbreviations (e.g., 'customer_name' matches 'Name' if in customer context)
3. Geographic terms: 'geographic_region' matches 'Region', 'State', 'Territory', etc.
4. Time terms: 'time_period' matches 'Date', 'Month', 'Quarter', 'Year'
5. If no good match exists, return "NONE"
Return ONLY the exact column name from the available list, or "NONE" if no match.
Examples:
- Input: "sales_amount" → Output: "Salesamount"
- Input: "product_name" → Output: "Productname"
- Input: "customer_name" → Output: "Name"
- Input: "foo_bar_baz" (no match) → Output: "NONE"
"""
try:
messages = [{"role": "user", "content": prompt}]
column_name = self.llm_researcher.make_request(messages, temperature=0.1, max_tokens=30, stream=False).strip()
# Parse out just the column name if AI included "Output: " prefix
if column_name.startswith('Output:'):
column_name = column_name.replace('Output:', '').strip().strip('"').strip("'")
# Debug: print what AI returned
if column_name not in column_names and column_name != "NONE":
print(f" ⚠️ AI returned '{column_name}' but it's not in column list")
print(f" Available: {', '.join(column_names[:10])}...")
# Validate it's actually in our column list
if column_name != "NONE" and column_name in column_names:
return column_name
return None
except Exception as e:
print(f" ⚠️ Error mapping semantic type '{semantic_type}': {e}")
return None
def create_outlier_visualizations(self, outliers: List[Dict]) -> List[Dict]:
"""
Create visualization configs from demo outliers
Args:
outliers: List of outlier dictionaries with either:
OLD FORMAT:
- title, insight, show_me_query, impact, talking_point
NEW FORMAT (semantic):
- title, insight, viz_type, viz_measure_types, viz_dimension_types,
show_me_query, kpi_metric, impact, talking_point
Returns:
List of visualization config dictionaries
"""
viz_configs = []
viz_counter = 1
print(f"\n🎨 Generating outlier-driven visualizations...")
for i, outlier in enumerate(outliers):
print(f" {i+1}. {outlier['title']}...")
try:
# Check if this uses NEW semantic format
if 'viz_measure_types' in outlier or 'viz_dimension_types' in outlier:
# NEW FORMAT: Use semantic mapping
print(f" Using semantic mapping...")
# Map semantic types to actual columns
measure_types = outlier.get('viz_measure_types', [])
dimension_types = outlier.get('viz_dimension_types', [])
if isinstance(measure_types, str):
measure_types = [m.strip() for m in measure_types.split(',')]
if isinstance(dimension_types, str):
dimension_types = [d.strip() for d in dimension_types.split(',')]
measure_cols = []
for semantic in measure_types:
col = self.map_semantic_to_column(semantic)
if col:
measure_cols.append(col)
print(f" {semantic}{col}")
dimension_cols = []
for semantic in dimension_types:
col = self.map_semantic_to_column(semantic)
if col:
dimension_cols.append(col)
print(f" {semantic}{col}")
# Build search query from mapped columns
# Format: sum [measure] by [dimension] by [dimension2] ...
# CRITICAL: Must use explicit aggregation function (sum, avg, etc.) for proper aggregation!
query_parts = []
chart_type = outlier.get('viz_type', 'COLUMN')
# Add measures with explicit aggregation function
for measure in measure_cols:
# Use 'sum' as default aggregation for all measures
# KPIs also need explicit aggregation when there are multiple rows
query_parts.append(f"sum [{measure}]")
# Add dimensions with "by" keyword for aggregation (except for KPI without dimensions)
if dimension_cols and chart_type != 'KPI':
query_parts.append("by")
for dim in dimension_cols:
query_parts.append(f"[{dim}]")
# Add top N filter for non-geographic dimensions to avoid overwhelming charts
is_geo = any(geo_term in dim.lower() for dim in dimension_cols
for geo_term in ['region', 'country', 'state', 'city', 'location'])
# For STACKED_COLUMN, limit to top 5-7 for readability
if chart_type == 'STACKED_COLUMN':
query_parts.append("top 5")
elif not is_geo and chart_type in ['COLUMN', 'BAR', 'LINE']:
query_parts.append("top 10")
# Extract filters from show_me query if present
show_me = outlier.get('show_me_query', '')
if '>' in show_me or '<' in show_me or '=' in show_me:
# Try to extract filter condition
for measure in measure_cols:
if measure.lower() in show_me.lower():
if '>' in show_me:
threshold = show_me.split('>')[1].split()[0].strip()
query_parts.append(f"[{measure}] > {threshold}")
elif '<' in show_me:
threshold = show_me.split('<')[1].split()[0].strip()
query_parts.append(f"[{measure}] < {threshold}")
search_query = ' '.join(query_parts)
# Auto-select GEO_AREA for geographic dimensions
is_geo = any(geo_term in dim.lower() for dim in dimension_cols
for geo_term in ['region', 'country', 'state', 'city', 'location'])
if is_geo and chart_type in ['COLUMN', 'BAR']:
chart_type = 'GEO_AREA'
print(f" 🗺️ Auto-selected GEO_AREA chart type for geographic dimension")
print(f" Search: {search_query}")
print(f" Chart: {chart_type}")
# Create main visualization
viz_configs.append({
'id': f'Viz_{viz_counter}',
'name': outlier['title'],
'description': outlier['insight'],
'chart_type': chart_type,
'search_query_direct': search_query,
'outlier_context': outlier
})
viz_counter += 1
# Optionally create companion KPI
if outlier.get('kpi_companion') and measure_cols:
kpi_query = f"[{measure_cols[0]}]"
if query_parts and ('>' in query_parts[-1] or '<' in query_parts[-1]):
kpi_query += ' ' + query_parts[-1].split('[')[0] # Add filter
viz_configs.append({
'id': f'Viz_{viz_counter}',
'name': f"{outlier['title']} - Total",
'description': outlier.get('kpi_metric', 'Summary metric'),
'chart_type': 'KPI',
'search_query_direct': kpi_query,
'outlier_context': outlier
})
viz_counter += 1
print(f" + KPI: {kpi_query}")
else:
# OLD FORMAT: Use natural language translation
print(f" Using NL translation...")
search_query = self.query_translator.convert_natural_to_search(
outlier['show_me_query'],
self.model_columns
)
print(f" Search: {search_query}")
# Validate all [Column] tokens exist in the model — skip if any are missing
import re as _re
col_names_lower = {col.get('name', '').lower() for col in self.model_columns if col.get('name')}
# Extract tokens (strip granularity suffix like .weekly)
unknown_tokens = [
t for t in _re.findall(r'\[([^\]]+)\]', search_query)
if t.split('.')[0].strip().lower() not in col_names_lower
]
if unknown_tokens:
print(f" ⚠️ Skipping '{outlier['title']}' — columns not in model: {unknown_tokens}")
continue
chart_type = self.query_translator.infer_chart_type(
search_query,
self.model_columns,
outlier_context=outlier
)
print(f" Chart: {chart_type}")
viz_configs.append({
'id': f'Viz_{viz_counter}',
'name': outlier['title'],
'description': outlier['insight'],
'chart_type': chart_type,
'search_query_direct': search_query,
'outlier_context': outlier
})
viz_counter += 1
except Exception as e:
print(f" ⚠️ Skipping '{outlier.get('title', 'unknown')}' due to error: {e}")
# Don't add a broken viz — skip and continue
print(f" Generated {len(viz_configs)} visualizations total")
return viz_configs
def _classify_visualizations_for_groups(self, viz_configs: List[Dict]) -> Dict[str, List[str]]:
"""
Classify visualizations into logical groups based on chart type and content.
Groups:
- KPI Overview: All KPI charts (for sparklines, big numbers)
- Trends: Line, Area charts (time-based)
- Comparisons: Bar, Column, Stacked charts
- Details: Tables, Pivot tables
- Geo: Maps
Returns:
Dict mapping group names to lists of viz IDs
"""
groups = {
'KPI Overview': [],
'Trends': [],
'Comparisons': [],
'Details': [],
'Geo': []
}
for config in viz_configs:
viz_id = config.get('id', '')
chart_type = config.get('chart_type', 'COLUMN')
if chart_type == 'KPI':
groups['KPI Overview'].append(viz_id)
elif chart_type in ['LINE', 'AREA']:
groups['Trends'].append(viz_id)
elif chart_type in ['GEO_AREA', 'GEO_BUBBLE', 'GEO_HEATMAP']:
groups['Geo'].append(viz_id)
elif chart_type in ['TABLE', 'PIVOT_TABLE']:
groups['Details'].append(viz_id)
else:
# Bar, Column, Stacked, Pie, etc.
groups['Comparisons'].append(viz_id)
# Remove empty groups
return {k: v for k, v in groups.items() if v}
def _create_groups_structure(self, viz_configs: List[Dict]) -> List[Dict]:
"""
Create Groups (containers) for visualizations - like tabs within a tab.
Golden demo structure:
groups:
- id: Group_1
name: Sales Overview
visualizations:
- Viz_3
- Viz_10
group_guid: null
Returns:
List of group TML structures
"""
classified = self._classify_visualizations_for_groups(viz_configs)
groups = []
# Create a group for KPIs if we have 2+ KPIs
kpis = classified.get('KPI Overview', [])
if len(kpis) >= 2:
groups.append({
'id': 'Group_1',
'name': '📊 KPI Overview',
'visualizations': kpis,
'group_guid': None
})
# Create a group for trends if we have 2+ trend charts
trends = classified.get('Trends', [])
if len(trends) >= 2:
groups.append({
'id': f'Group_{len(groups) + 1}',
'name': '📈 Trends',
'visualizations': trends,
'group_guid': None
})
return groups
def _create_group_layout(self, group_id: str, viz_ids: List[str], viz_configs: List[Dict]) -> Dict:
"""
Create layout for tiles inside a group.
Groups have their own internal 12-column grid.
"""
tiles = []
current_y = 0
current_x = 0
for viz_id in viz_ids:
# Find the config for this viz
config = next((c for c in viz_configs if c.get('id') == viz_id), {})
chart_type = config.get('chart_type', 'COLUMN')
# KPIs in groups are typically 6 cols wide, 5 tall
if chart_type == 'KPI':
width = 6
height = 5
else:
width = 6
height = 5
# Check if fits in row
if current_x + width > 12:
current_y += height
current_x = 0
tiles.append({
'visualization_id': viz_id,
'x': current_x,
'y': current_y,
'height': height,
'width': width
})
current_x += width
return {
'id': group_id,
'tiles': tiles
}
def _create_tabbed_layout(self, visualizations: List[Dict], viz_configs: List[Dict], groups: List[Dict]) -> Dict:
"""
Create a tabbed layout structure like the golden demo.
Structure:
layout:
tabs:
- name: 📈 Sales
tiles: [...]
group_layouts: [...]
- name: 💵 Finance
tiles: [...]
"""
import uuid
# Get viz IDs that are in groups
grouped_viz_ids = set()
for group in groups:
grouped_viz_ids.update(group.get('visualizations', []))
# Main tab tiles - groups and ungrouped visualizations
main_tiles = []
group_layouts = []
current_y = 0
current_x = 0
# Add groups as tiles first (they take priority at top)
for group in groups:
group_id = group['id']
group_viz_ids = group.get('visualizations', [])
# Group tiles are typically wide
width = 8 if len(group_viz_ids) > 2 else 6
height = 6
if current_x + width > 12:
current_y += height
current_x = 0
main_tiles.append({
'visualization_id': group_id,
'x': current_x,
'y': current_y,
'height': height,
'width': width
})
current_x += width
if current_x >= 12:
current_y += height
current_x = 0
# Create internal layout for this group
group_layouts.append(
self._create_group_layout(group_id, group_viz_ids, viz_configs)
)
# Move to next row for ungrouped vizzes
if current_x > 0:
current_y += 6
current_x = 0
# Add ungrouped visualizations
for config in viz_configs:
viz_id = config.get('id', '')
if viz_id in grouped_viz_ids:
continue # Skip grouped viz
chart_type = config.get('chart_type', 'COLUMN')
# Determine size
if chart_type == 'KPI':
width = 3
height = 3
elif chart_type in ['GEO_AREA', 'GEO_BUBBLE']:
width = 12
height = 7
elif chart_type in ['TABLE', 'PIVOT_TABLE']:
width = 8
height = 5
else:
width = 6
height = 5
if current_x + width > 12:
current_y += height
current_x = 0
main_tiles.append({
'visualization_id': viz_id,
'x': current_x,
'y': current_y,
'height': height,
'width': width
})
current_x += width
# Single tab for now (can expand later)
layout = {
'tabs': [{
'name': '📊 Overview',
'description': '',
'tiles': main_tiles,
'id': str(uuid.uuid4()),
'group_layouts': group_layouts if group_layouts else None
}]
}
# Remove None values
if not layout['tabs'][0].get('group_layouts'):
del layout['tabs'][0]['group_layouts']
return layout
def _create_style_properties(self, groups: List[Dict], viz_configs: List[Dict]) -> Dict:
"""
Create style properties for brand colors and appearance.
From golden demo:
style:
style_properties:
- name: lb_border_type
value: CURVED
- name: lb_brand_color
value: LBC_C
overrides:
- object_id: Group_1
style_properties:
- name: group_brand_color
value: GBC_B
"""
style = {
'style_properties': [
{'name': 'lb_border_type', 'value': 'CURVED'},
{'name': 'lb_brand_color', 'value': 'LBC_C'},
{'name': 'hide_group_title', 'value': 'false'},
{'name': 'hide_group_description', 'value': 'true'},
{'name': 'hide_tile_description', 'value': 'false'}
],
'overrides': []
}
# Brand colors for groups (GBC_A through GBC_J)
group_colors = ['GBC_B', 'GBC_C', 'GBC_D', 'GBC_E', 'GBC_F']
for i, group in enumerate(groups):
style['overrides'].append({
'object_id': group['id'],
'style_properties': [
{'name': 'group_brand_color', 'value': group_colors[i % len(group_colors)]},
{'name': 'hide_group_description', 'value': 'true'}
]
})
# KPI tiles get special colors (TBC_I is cyan)
for config in viz_configs:
if config.get('chart_type') == 'KPI':
style['overrides'].append({
'object_id': config.get('id'),
'style_properties': [
{'name': 'tile_brand_color', 'value': 'TBC_I'}
]
})
return style
def _create_smart_layout(self, visualizations: List[Dict], viz_configs: List[Dict]) -> List[Dict]:
"""
Create intelligent layout based on visualization types
Uses a 12-column grid system with variable heights based on chart type:
- KPIs: 2-3 units height, can be narrow (3-4 cols)
- Maps: Full width (12 cols), 5-8 units height
- Scatter/Bubble: 6 cols width, 6-8 units height
- Tables/Pivots: 6-12 cols, 4-6 units height
- Standard charts: 6 cols, 5 units height
"""
tiles = []
current_y = 0
current_row_x = 0
max_row_height = 0
for i, (viz, config) in enumerate(zip(visualizations, viz_configs)):
chart_type = config.get('chart_type', 'COLUMN')
# Determine size based on chart type
if chart_type == 'TEXT':
# Text tiles are small - like in reference
width = 2
height = 2 if 'Global' in config.get('name', '') else 4
elif chart_type == 'KPI':
# KPIs are compact - can fit 3-4 across
width = 3
height = 3
elif chart_type in ['GEO_AREA', 'GEO_BUBBLE']:
# Maps need full width
width = 12
height = 7
elif chart_type in ['SCATTER', 'BUBBLE']:
# Scatter/bubble need more space for detail
width = 6
height = 7
elif chart_type in ['PIVOT_TABLE', 'TABLE']:
# Tables can be wider
width = 8
height = 5
elif chart_type == 'SANKEY':
# Sankey needs width for flow
width = 6
height = 8
elif chart_type in ['STACKED_COLUMN', 'TREEMAP']:
# Complex charts need more height
width = 6
height = 6
else:
# Default for standard charts
width = 6
height = 5
# Check if this fits in the current row
if current_row_x + width > 12:
# Move to next row
current_y += max_row_height
current_row_x = 0
max_row_height = 0
tiles.append({
'visualization_id': viz['id'],
'x': current_row_x,
'y': current_y,
'height': height,
'width': width
})
current_row_x += width
max_row_height = max(max_row_height, height)
# Add layout configuration to the liveboard
layout = {
'tabs': [{
'name': '📊 Dashboard',
'tiles': tiles
}]
}
return tiles
def _build_axis_configs(self, x_axis_cols: List[str], y_axis_cols: List[str], chart_type: str) -> List[Dict]:
"""Build axis configuration based on chart type"""
if chart_type == 'KPI':
# KPIs need both x (time) and y (measure) for sparklines
if x_axis_cols and y_axis_cols:
return [{'x': x_axis_cols, 'y': y_axis_cols}]
elif y_axis_cols:
return [{'y': y_axis_cols}]
else:
return []
elif chart_type == 'BUBBLE' and len(y_axis_cols) >= 2:
# Bubble charts need x, y, and optionally size
config = {'x': [y_axis_cols[0]], 'y': [y_axis_cols[1]]}
if len(y_axis_cols) > 2:
config['size'] = y_axis_cols[2]
if x_axis_cols:
config['category'] = x_axis_cols
return [config]
elif chart_type == 'SANKEY' and x_axis_cols:
# Sankey needs hierarchical x-axis configuration
return [{'x': x_axis_cols[:3], 'y': y_axis_cols}]
elif x_axis_cols and y_axis_cols:
# Charts with both dimensions and measures
return [{'x': x_axis_cols, 'y': y_axis_cols}]
elif y_axis_cols:
# Only measures (no dimensions)
return [{'y': y_axis_cols}]
else:
return []
def _build_client_state_v2(self, chart_type: str, viz_config: Dict) -> str:
"""Build client_state_v2 with advanced chart properties and colors"""
import json
base_state = {
"version": "V4DOT2",
"chartProperties": {}
}
# Define vibrant color palette (like reference dashboard)
COLOR_PALETTE = {
'purple': '#9c6ade',
'teal': '#40c1c0',
'yellow': '#fed109',
'blue': '#009eec',
'pink': '#85016b',
'green': '#06BF7F',
'orange': '#FCC838',
'light_blue': '#48D1E0',
'violet': '#8C62F5',
'coral': '#FF8142'
}
# Add KPI-specific settings for sparklines, comparisons, anomalies
if chart_type == 'KPI':
base_state["chartProperties"]["responsiveLayoutPreference"] = "AUTO_ON"
base_state["chartProperties"]["chartSpecific"] = {
"customProps": json.dumps({
"showLabel": True,
"showComparison": True,
"showSparkline": True,
"showAnomalies": viz_config.get('show_anomalies', False),
"showBounds": viz_config.get('show_bounds', True),
"customCompare": "PREV_AVAILABLE",
"showOnlyLatestAnomaly": False
})
}
# Add specific colors for KPIs
if 'Monthly' in viz_config.get('name', ''):
base_state["seriesColors"] = [
{"serieName": "Total Salesamount", "color": COLOR_PALETTE['teal']},
{"serieName": "Total sales", "color": COLOR_PALETTE['teal']}
]
base_state["customColorSelectorArray"] = [COLOR_PALETTE['green'], COLOR_PALETTE['coral'], COLOR_PALETTE['coral'], COLOR_PALETTE['teal']]
elif 'Weekly' in viz_config.get('name', ''):
base_state["seriesColors"] = [
{"serieName": "Total Salesamount", "color": COLOR_PALETTE['purple']},
{"serieName": "Total sales", "color": COLOR_PALETTE['purple']}
]
base_state["customColorSelectorArray"] = [COLOR_PALETTE['green'], COLOR_PALETTE['coral'], COLOR_PALETTE['coral'], COLOR_PALETTE['purple']]
else:
# Default KPI colors
base_state["seriesColors"] = [
{"serieName": "Total Salesamount", "color": COLOR_PALETTE['blue']},
{"serieName": "Total sales", "color": COLOR_PALETTE['blue']}
]
# Add scatter/bubble color configurations
elif chart_type in ['SCATTER', 'BUBBLE']:
base_state["chartProperties"]["gridLines"] = {
"xGridlineEnabled": True,
"yGridlineEnabled": False
}
base_state["chartProperties"]["responsiveLayoutPreference"] = "AUTO_ON"
# Add stacked column settings with vibrant colors
elif chart_type == 'STACKED_COLUMN':
base_state["chartProperties"]["showStackedLabels"] = True
base_state["chartProperties"]["allLabels"] = True
# Add diverse colors for regions and products
base_state["seriesColors"] = [
{"serieName": "East", "color": COLOR_PALETTE['pink']},
{"serieName": "West", "color": COLOR_PALETTE['purple']},
{"serieName": "Midwest", "color": COLOR_PALETTE['teal']},
{"serieName": "South", "color": COLOR_PALETTE['yellow']},
{"serieName": "Southwest", "color": COLOR_PALETTE['blue']},
{"serieName": "North", "color": COLOR_PALETTE['green']},
{"serieName": "Northeast", "color": COLOR_PALETTE['orange']},
{"serieName": "Northwest", "color": COLOR_PALETTE['coral']},
{"serieName": "Southeast", "color": COLOR_PALETTE['violet']},
{"serieName": "Central", "color": COLOR_PALETTE['light_blue']}
]
base_state["systemSeriesColors"] = base_state["seriesColors"]
# Add geo map settings with gradient colors
elif chart_type == 'GEO_AREA':
base_state["chartProperties"]["mapviewport"] = {
"center": [-1.076e7, 5.192e6],
"zoomLevel": 3.88
}
# Add heat map gradient colors
base_state["systemMultiColorSeriesColors"] = [{
"serieName": "Total sales",
"colorMap": [{
"serieName": "state",
"color": ["#ffffb2", "#fddd87", "#fba35d", "#f75534", "#f9140a", "#d70315", "#b10026"]
}]
}]
# Add column chart colors
elif chart_type == 'COLUMN':
base_state["seriesColors"] = [
{"serieName": "Total sales", "color": COLOR_PALETTE['blue']},
{"serieName": "Total Salesamount", "color": COLOR_PALETTE['green']}
]
# Add custom colors if specified in config
if 'colors' in viz_config:
base_state["seriesColors"] = viz_config['colors']
return json.dumps(base_state)
def _create_text_tile(self, viz_config: Dict) -> Dict:
"""
Create a TEXT tile visualization (colored text box with markdown content)
Args:
viz_config: Dictionary with:
- id: Viz ID
- name: Title
- text_content: Markdown text to display
- background_color: Hex color (e.g., '#85016b')
Returns:
Text tile TML dictionary
"""
text_content = viz_config.get('text_content', viz_config.get('name', ''))
bg_color = viz_config.get('background_color', '#2E3D4D') # Default dark background
# TEXT tiles in ThoughtSpot need tables field even though they don't query data
text_tml = {
'id': viz_config['id'],
'answer': {
'name': viz_config.get('name', 'Text'),
'description': viz_config.get('description', ''),
'tables': [{
'id': self.model_name, # Use model NAME for TML (not GUID!)
'name': self.model_name
}],
'text_tile': {
'text': text_content,
'background_color': bg_color
}
},
'viz_guid': None
}
return text_tml
def create_visualization_tml(self, viz_config: Dict) -> Dict:
"""
Create single visualization TML structure.
Uses MINIMAL TML - only specifies essentials and lets ThoughtSpot figure out
column names, axis configs, etc. This is more reliable than trying to predict
what column names ThoughtSpot will generate (e.g., "Total sales" vs "sales").
Args:
viz_config: Dictionary with visualization specifications
- id: Viz_1, Viz_2, etc.
- name: Display name
- chart_type: LINE, COLUMN, KPI, BAR, STACKED_COLUMN, TEXT, etc.
- search_query_direct: Direct search query (optional, takes precedence)
- text_content: For TEXT charts, the markdown content to display
- background_color: For TEXT charts, hex color for background
- measure, time_column, dimensions, filters (for search query generation)
- display_mode: CHART_MODE or TABLE_MODE (default: CHART_MODE)
Returns:
Visualization TML dictionary
"""
# Handle TEXT charts specially - they don't use search queries
chart_type = viz_config.get('chart_type', 'COLUMN')
if chart_type == 'TEXT':
return self._create_text_tile(viz_config)
# Use direct search query if provided, otherwise generate from config
if 'search_query_direct' in viz_config:
search_query = viz_config['search_query_direct']
else:
search_query = self.generate_search_query(viz_config)
# Get chart type for ThoughtSpot
chart_type = viz_config.get('chart_type', 'COLUMN')
ts_chart_type = CHART_TYPES.get(chart_type, {}).get('ts_type', chart_type)
# Create MINIMAL visualization TML - let ThoughtSpot fill in everything
# This is more reliable than trying to predict exact column names or chart configs
viz_tml = {
'id': viz_config['id'],
'answer': {
'name': viz_config['name'],
'description': viz_config.get('description', ''),
'tables': [{
'id': self.model_name, # Use model NAME for TML (not GUID!)
'name': self.model_name
}],
'search_query': search_query
}
# NOTE: Do NOT include viz_guid for new visualizations
# NOTE: Do NOT include answer_columns, table_columns - let ThoughtSpot derive them
}
# Add chart type hint for non-KPI charts
# KPIs need special post-processing, so we skip them here and handle them in deploy_liveboard()
if ts_chart_type != 'KPI':
viz_tml['answer']['chart'] = {'type': ts_chart_type}
else:
# Mark this viz as needing KPI conversion (stored in config, used later)
viz_config['_needs_kpi_conversion'] = True
return viz_tml
def generate_visualizations_from_research(
self,
company_data: Dict,
use_case: str,
num_visualizations: int = 6
) -> List[Dict]:
"""
Generate visualization configs based on company research and use case
Uses AI to determine what visualizations would be valuable
Args:
company_data: Dictionary with company research data (name, description, etc.)
use_case: Use case name (Merchandising, Sales AI Analyst, etc.)
num_visualizations: Number of visualizations to generate (default: 6)
Returns:
List of visualization config dictionaries
"""
# Extract available columns from model (supports both metadata and TML-export formats)
def _col_name(col: Dict) -> str:
return col.get('header', {}).get('name', col.get('name', ''))
def _is_measure(col: Dict) -> bool:
col_type = str(col.get('type', '')).upper()
ts_type = str(col.get('ts_type', '')).upper()
return (
col_type in ['MEASURE', 'INT_MEASURE', 'FLOAT_MEASURE', 'NUMBER', 'INT64', 'INT32', 'DOUBLE', 'FLOAT', 'DECIMAL', 'INTEGER', 'BIGINT']
or ts_type == 'MEASURE'
)
def _is_date(col: Dict) -> bool:
col_type = str(col.get('type', '')).upper()
ts_type = str(col.get('ts_type', '')).upper()
name_lower = _col_name(col).lower()
if col_type in ['DATE', 'DATE_TIME', 'TIMESTAMP', 'TIME'] or ts_type in ['DATE', 'DATE_TIME', 'TIMESTAMP', 'TIME']:
return True
# TML export doesn't include native date types; infer from attribute names.
if ts_type != 'MEASURE' and any(tok in name_lower for tok in ['date', 'timestamp', 'time']) and not name_lower.endswith(' key'):
return True
return False
available_measures = [_col_name(col) for col in self.model_columns if _is_measure(col)]
available_dimensions = [_col_name(col) for col in self.model_columns if not _is_measure(col)]
date_columns = [_col_name(col) for col in self.model_columns if _is_date(col)]
# Find numeric columns that LOOK like dates but are NOT (to warn the AI)
fake_date_columns = [
_col_name(col)
for col in self.model_columns
if _is_measure(col) and any(d in _col_name(col).lower() for d in ['month', 'year', 'quarter', 'week', 'day', 'date'])
]
prompt = f"""Based on this company: {company_data.get('name', 'Unknown Company')}
Use case: {use_case}
Available measures (numeric columns): {available_measures}
Available dimensions (text/date columns): {available_dimensions}
**ACTUAL DATE columns** (USE THESE for time_column and granularity): {date_columns}
**DO NOT USE these for granularity** (they are integers, not dates): {fake_date_columns}
Generate {num_visualizations} visualization configurations for a compelling demo dashboard.
For each visualization, create a JSON object with:
- name: Clear, business-friendly title (e.g., "Revenue Trend Last 12 Months")
- description: Optional 1-sentence description
- chart_type: Choose from: KPI, LINE, COLUMN, BAR, STACKED_COLUMN, AREA, PIE
- measure: Which measure to visualize (from available measures)
- time_column: MUST be an actual DATE column from the list above (NOT month/year/quarter integers!)
- granularity: If time_column used, specify: daily, weekly, monthly, quarterly, yearly
- dimensions: Array of dimensions to group by (from available dimensions, can be empty)
- filters: Array of filter expressions like "[date].'last quarter'", "[date].'last 12 months'", "[date].'this year'" (optional)
- top_n: If showing top N items, specify number (e.g., 5, 10) (optional)
Guidelines:
- Mix chart types (don't use all the same type)
- Include at least 1-2 KPI charts for key metrics
* IMPORTANT: For KPIs, ALWAYS include time_column and granularity (monthly/quarterly/yearly)
* CRITICAL: time_column MUST be an actual DATE type column like 'full_date', NOT 'month' or 'year'!
* This enables sparklines and percent change comparisons (MoM/QoQ/YoY)
* Example KPI: measure="total_sales", time_column="full_date", granularity="monthly"
- Include trend analysis with LINE or AREA charts
- Include comparisons with COLUMN or BAR charts
- Use appropriate time filters for business context
- Make visualizations relevant to the use case
Return ONLY a valid JSON object with structure:
{{
"visualizations": [
{{
"name": "...",
"chart_type": "...",
"measure": "...",
...
}}
]
}}"""
try:
messages = [{"role": "user", "content": prompt}]
response_text = self.llm_researcher.make_request(messages, temperature=0.7, max_tokens=4000, stream=False)
# Debug: Check what we got back
print(f"🔍 DEBUG: AI response type: {type(response_text)}")
print(f"🔍 DEBUG: AI response length: {len(response_text) if response_text else 0}")
if response_text:
print(f"🔍 DEBUG: AI response first 200 chars: {response_text[:200]}")
else:
print(f"❌ ERROR: AI returned empty response!")
return self._generate_fallback_visualizations(available_measures, date_columns)
# Strip markdown code fences if present
response_text = response_text.strip()
if response_text.startswith('```'):
# Remove opening fence (```json or ```)
lines = response_text.split('\n')
response_text = '\n'.join(lines[1:])
# Remove closing fence
if response_text.endswith('```'):
response_text = response_text[:-3].strip()
result = json.loads(response_text)
viz_results = result.get('visualizations', [])
# Strictly sanitize to actual model columns to prevent invalid TML imports.
viz_results = self._sanitize_visualization_configs(
viz_results,
available_measures,
available_dimensions,
date_columns
)
if not viz_results:
print(" ⚠️ AI returned unusable visualizations, using fallback set")
return self._generate_fallback_visualizations(available_measures, date_columns)
# Enforce variety: at least 2 KPIs, no duplicate chart types
dimensions = [col.get('header', {}).get('name', col.get('name', '')) for col in self.model_columns if col.get('type') in ['ATTRIBUTE', 'STRING']]
viz_results = self._enforce_visualization_variety(viz_results, available_measures, date_columns, dimensions)
return self._sanitize_visualization_configs(
viz_results,
available_measures,
available_dimensions,
date_columns
)
except Exception as e:
print(f"Error generating visualizations: {e}")
print(f" Response text was: {response_text[:500] if response_text else 'None'}")
# Return fallback simple visualizations
return self._generate_fallback_visualizations(available_measures, date_columns)
@staticmethod
def _normalize_col_name(name: str) -> str:
"""Normalize column names for fuzzy matching."""
if not name:
return ""
return re.sub(r'[^a-z0-9]', '', name.lower())
def _match_column_name(self, requested: str, allowed: List[str]) -> Optional[str]:
"""Map AI-provided column names to exact model column names."""
if not requested or not allowed:
return None
if requested in allowed:
return requested
req_norm = self._normalize_col_name(requested)
if not req_norm:
return None
for col in allowed:
if self._normalize_col_name(col) == req_norm:
return col
for col in allowed:
col_norm = self._normalize_col_name(col)
if req_norm in col_norm or col_norm in req_norm:
return col
return None
def _sanitize_visualization_configs(
self,
viz_configs: List[Dict],
measures: List[str],
dimensions: List[str],
date_columns: List[str]
) -> List[Dict]:
"""Sanitize AI configs so all columns exist in the model."""
if not viz_configs or not measures:
return []
valid_chart_types = set(CHART_TYPES.keys())
fallback_measure = measures[0]
fallback_time = date_columns[0] if date_columns else None
fallback_dimension = next((d for d in dimensions if d not in date_columns), dimensions[0] if dimensions else None)
sanitized = []
for idx, raw in enumerate(viz_configs, 1):
if not isinstance(raw, dict):
continue
cfg = dict(raw)
cfg['id'] = cfg.get('id') or f'Viz_{idx}'
chart_type = str(cfg.get('chart_type', 'COLUMN')).upper()
if chart_type not in valid_chart_types:
chart_type = 'COLUMN'
cfg['chart_type'] = chart_type
measure = self._match_column_name(cfg.get('measure'), measures) or fallback_measure
if not measure:
continue
cfg['measure'] = measure
dims = []
raw_dims = cfg.get('dimensions') or []
if isinstance(raw_dims, list):
for dim in raw_dims:
matched = self._match_column_name(dim, dimensions)
if matched and matched not in dims and matched != measure:
dims.append(matched)
if chart_type in ['BAR', 'COLUMN', 'STACKED_COLUMN', 'PIE'] and not dims and fallback_dimension:
dims = [fallback_dimension]
if dims:
cfg['dimensions'] = dims
else:
cfg.pop('dimensions', None)
matched_time = self._match_column_name(cfg.get('time_column'), date_columns) if cfg.get('time_column') else None
if chart_type in ['KPI', 'LINE', 'AREA'] and not matched_time:
matched_time = fallback_time
if matched_time:
cfg['time_column'] = matched_time
granularity = str(cfg.get('granularity', 'monthly')).lower()
if granularity not in {'daily', 'weekly', 'monthly', 'quarterly', 'yearly'}:
granularity = 'monthly'
cfg['granularity'] = granularity
else:
cfg.pop('time_column', None)
cfg.pop('granularity', None)
top_n = cfg.get('top_n')
if top_n is not None:
try:
cfg['top_n'] = max(1, int(top_n))
except Exception:
cfg.pop('top_n', None)
sanitized.append(cfg)
return sanitized
def _enforce_visualization_variety(
self,
viz_configs: List[Dict],
measures: List[str],
date_columns: List[str],
dimensions: List[str]
) -> List[Dict]:
"""
Post-process AI-generated visualizations to ensure variety.
Enforces: at least 2 KPIs, no more than 2 of same chart type.
"""
if not viz_configs:
return viz_configs
# Count chart types
chart_type_counts = {}
for viz in viz_configs:
ct = viz.get('chart_type', 'LINE')
chart_type_counts[ct] = chart_type_counts.get(ct, 0) + 1
kpi_count = chart_type_counts.get('KPI', 0)
# If we have fewer than 2 KPIs and have measures/dates, add them
if kpi_count < 2 and measures and date_columns:
print(f" ⚠️ Only {kpi_count} KPIs found, adding to ensure 2 minimum")
# Find vizs we can convert to KPI (line charts showing single measure)
converted = 0
for viz in viz_configs:
if converted >= (2 - kpi_count):
break
if viz.get('chart_type') in ['LINE', 'AREA'] and not viz.get('dimensions'):
old_type = viz['chart_type']
viz['chart_type'] = 'KPI'
if not viz.get('time_column') and date_columns:
viz['time_column'] = date_columns[0]
if not viz.get('granularity'):
viz['granularity'] = 'monthly'
print(f" Converted '{viz.get('name')}' from {old_type} to KPI")
converted += 1
# If we still need more KPIs, add new ones
kpi_count_now = sum(1 for v in viz_configs if v.get('chart_type') == 'KPI')
while kpi_count_now < 2 and measures:
measure_idx = kpi_count_now % len(measures)
new_kpi = {
'id': f'Viz_KPI_{kpi_count_now + 1}',
'name': f'Total {measures[measure_idx]}',
'chart_type': 'KPI',
'measure': measures[measure_idx],
'time_column': date_columns[0] if date_columns else None,
'granularity': 'monthly'
}
viz_configs.insert(0, new_kpi)
print(f" Added KPI: {new_kpi['name']}")
kpi_count_now += 1
# Check for duplicate chart types (more than 2 of same non-KPI type)
chart_type_counts = {}
for viz in viz_configs:
ct = viz.get('chart_type', 'LINE')
chart_type_counts[ct] = chart_type_counts.get(ct, 0) + 1
for chart_type, count in chart_type_counts.items():
if chart_type != 'KPI' and count > 2:
print(f" ⚠️ {count} {chart_type} charts found, diversifying...")
converted = 0
alternative_types = ['BAR', 'COLUMN', 'STACKED_COLUMN', 'AREA']
alternative_types = [t for t in alternative_types if t != chart_type]
for viz in viz_configs:
if converted >= (count - 2):
break
if viz.get('chart_type') == chart_type:
new_type = alternative_types[converted % len(alternative_types)]
viz['chart_type'] = new_type
print(f" Converted '{viz.get('name')}' from {chart_type} to {new_type}")
converted += 1
return viz_configs
def _generate_fallback_visualizations(
self,
measures: List[str],
date_columns: List[str]
) -> List[Dict]:
"""Generate fallback visualizations if AI generation fails - now creates 6 visualizations"""
fallback_viz = []
# Get available dimensions (non-date attributes)
dimensions = [
col.get('header', {}).get('name', col.get('name', ''))
for col in self.model_columns
if col.get('type') in ['ATTRIBUTE', 'STRING']
]
if measures and date_columns:
# 1. KPI of first measure (weekly for sparkline)
fallback_viz.append({
'id': 'Viz_1',
'name': f'Total {measures[0]}',
'chart_type': 'KPI',
'measure': measures[0],
'time_column': date_columns[0],
'granularity': 'weekly'
})
# 2. Line trend of first measure
fallback_viz.append({
'id': 'Viz_2',
'name': f'{measures[0]} Trend - Last 12 Months',
'chart_type': 'LINE',
'measure': measures[0],
'time_column': date_columns[0],
'granularity': 'monthly',
'filters': [f"[{date_columns[0]}].'last 12 months'"]
})
if len(measures) > 1:
# 3. Second measure KPI (monthly for sparkline)
fallback_viz.append({
'id': 'Viz_3',
'name': f'Total {measures[1]}',
'chart_type': 'KPI',
'measure': measures[1],
'time_column': date_columns[0],
'granularity': 'monthly'
})
# 4. Line trend of second measure
fallback_viz.append({
'id': 'Viz_4',
'name': f'{measures[1]} Trend - Last 12 Months',
'chart_type': 'LINE',
'measure': measures[1],
'time_column': date_columns[0],
'granularity': 'monthly',
'filters': [f"[{date_columns[0]}].'last 12 months'"]
})
# 5. Bar chart by dimension (if we have dimensions)
if dimensions:
fallback_viz.append({
'id': f'Viz_{len(fallback_viz) + 1}',
'name': f'{measures[0]} by {dimensions[0]}',
'chart_type': 'BAR',
'measure': measures[0],
'dimensions': [dimensions[0]],
'top_n': 10
})
# 6. Column chart by month
fallback_viz.append({
'id': f'Viz_{len(fallback_viz) + 1}',
'name': f'Monthly {measures[0]}',
'chart_type': 'COLUMN',
'measure': measures[0],
'time_column': date_columns[0],
'granularity': 'monthly',
'filters': [f"[{date_columns[0]}].'last 6 months'"]
})
print(f" ⚠️ AI generation failed, using {len(fallback_viz)} fallback visualizations")
return fallback_viz
def create_liveboard_tml(
self,
company_data: Dict,
use_case: str,
num_visualizations: int = 6,
liveboard_name: str = None,
outliers: Optional[List[Dict]] = None
) -> str:
"""
Create complete Liveboard TML YAML
Args:
company_data: Company research data
use_case: Use case name
num_visualizations: Number of visualizations to create
liveboard_name: Optional custom name for the liveboard
outliers: Optional list of demo outliers to create targeted visualizations
Returns:
Complete Liveboard TML as YAML string
"""
# Generate visualization configs
visualizations = []
tiles = []
# Priority 1: Create visualizations from outliers if provided
if outliers and len(outliers) > 0:
print("🎯 Using outlier-driven visualization generation")
viz_configs = self.create_outlier_visualizations(outliers)
# If we need more visualizations, generate additional ones with AI
remaining_viz = num_visualizations - len(viz_configs)
if remaining_viz > 0:
print(f" + Generating {remaining_viz} additional visualizations...")
additional_viz = self.generate_visualizations_from_research(
company_data,
use_case,
remaining_viz
)
# Adjust IDs for additional viz
for i, viz in enumerate(additional_viz):
viz['id'] = f'Viz_{len(viz_configs) + i + 1}'
viz_configs.extend(additional_viz)
# Fallback: Generate visualizations using AI if no outliers
elif num_visualizations > 0:
print("🤖 Using AI-driven visualization generation")
viz_configs = self.generate_visualizations_from_research(
company_data,
use_case,
num_visualizations
)
# Create visualization TML objects
for i, viz_config in enumerate(viz_configs):
viz_config['id'] = f'Viz_{i+1}'
else:
viz_configs = []
# TEMPORARILY DISABLED - Text tiles causing TML import errors
# Add text tiles for context (like in sample liveboard)
text_tiles = []
# text_tiles = [
# {
# 'id': 'Text_1',
# 'name': '📊 Dashboard Overview',
# 'chart_type': 'TEXT',
# 'text_content': f"## {company_data.get('name', 'Company')} Analytics\n\n{use_case} insights and metrics",
# 'background_color': '#2E3D4D' # Dark blue-gray
# },
# {
# 'id': 'Text_2',
# 'name': 'Key Insights',
# 'chart_type': 'TEXT',
# 'text_content': "💡 **Key Performance Indicators**\n\nTrack trends and identify opportunities",
# 'background_color': '#85016b' # Pink (from sample)
# }
# ]
# Create text tile visualizations
# for text_config in text_tiles:
# viz_tml = self.create_visualization_tml(text_config)
# visualizations.append(viz_tml)
# Create visualization TML objects
# Track which visualizations need KPI conversion (done in post-processing)
self._kpi_viz_ids = []
if viz_configs:
for viz_config in viz_configs:
# Ensure ID is set
if 'id' not in viz_config:
viz_config['id'] = f'Viz_{len(visualizations) + 1}'
viz_tml = self.create_visualization_tml(viz_config)
visualizations.append(viz_tml)
# Track KPIs for post-processing
if viz_config.get('_needs_kpi_conversion') or viz_config.get('chart_type') == 'KPI':
self._kpi_viz_ids.append(viz_config['id'])
# Use custom name if provided, otherwise generate default
if not liveboard_name:
liveboard_name = f"{company_data.get('name', 'Demo')} - {use_case} Liveboard"
# Check if we should use advanced TML features (Groups, Tabs, Style)
# Set USE_ADVANCED_TML=true in .env to enable Groups/Tabs structure
use_advanced_tml = os.getenv('USE_ADVANCED_TML', 'true').lower() == 'true'
if use_advanced_tml:
# === ADVANCED: Create Groups structure (like golden demo) ===
# Groups are containers that hold related visualizations
groups = self._create_groups_structure(viz_configs) if viz_configs else []
# === ADVANCED: Create tabbed layout with groups ===
layout = self._create_tabbed_layout(visualizations, viz_configs, groups) if viz_configs else None
# === ADVANCED: Create style properties (brand colors, borders) ===
style = self._create_style_properties(groups, viz_configs) if viz_configs else None
else:
groups = []
layout = None
style = None
print(" ℹ️ Using basic TML structure (set USE_ADVANCED_TML=true to enable Groups)")
# Assemble complete Liveboard TML
liveboard_tml = {
'guid': None, # Will be generated by ThoughtSpot
'liveboard': {
'name': liveboard_name,
'visualizations': visualizations
}
}
# Add groups if we created any
if groups:
liveboard_tml['liveboard']['groups'] = groups
print(f" 📦 Created {len(groups)} groups: {[g['name'] for g in groups]}")
# Add layout with tabs (only if using advanced TML)
if layout:
liveboard_tml['liveboard']['layout'] = layout
print(f" 📐 Created tabbed layout with {len(layout.get('tabs', []))} tab(s)")
# Add style properties (only if using advanced TML)
if style:
liveboard_tml['liveboard']['style'] = style
print(f" 🎨 Added brand colors and style properties")
# Convert to YAML format (TML is YAML, not JSON)
return yaml.dump(liveboard_tml, default_flow_style=False, sort_keys=False)
def _convert_to_kpis(self, liveboard_id: str, kpi_viz_ids: List[str]) -> bool:
"""
Convert specified visualizations to KPIs with sparkline settings.
This is done via export/modify/import cycle because ThoughtSpot's TML import
fails with "Index: 0" error when creating new KPIs directly.
Args:
liveboard_id: GUID of the liveboard to modify
kpi_viz_ids: List of visualization IDs to convert (e.g., ['Viz_1', 'Viz_2'])
Returns:
True if successful, False otherwise
"""
import json
try:
# Export the liveboard
export_response = self.ts_client.session.post(
f"{self.ts_client.base_url}/api/rest/2.0/metadata/tml/export",
headers=self.ts_client.headers,
json={'metadata': [{'type': 'LIVEBOARD', 'identifier': liveboard_id}], 'export_fqn': False}
)
if export_response.status_code != 200:
print(f" Export failed: {export_response.status_code}")
return False
export_data = export_response.json()
tml_key = 'edoc' if 'edoc' in export_data[0] else 'object'
lb_tml = yaml.safe_load(export_data[0][tml_key])
# KPI sparkline settings - enables sparkline, comparison, and period-over-period
kpi_client_state_v2 = json.dumps({
'version': 'V4DOT2',
'chartProperties': {
'responsiveLayoutPreference': 'AUTO_ON',
'chartSpecific': {
'customProps': json.dumps({
'showLabel': True,
'showComparison': True,
'showSparkline': True,
'showAnomalies': False,
'showBounds': True,
'customCompare': 'PREV_AVAILABLE',
'showOnlyLatestAnomaly': False
})
}
}
})
# Convert specified visualizations to KPIs
converted_count = 0
for viz in lb_tml['liveboard']['visualizations']:
viz_id = viz.get('id', '')
if viz_id in kpi_viz_ids:
if 'chart' not in viz.get('answer', {}):
viz['answer']['chart'] = {}
viz['answer']['chart']['type'] = 'KPI'
viz['answer']['chart']['client_state_v2'] = kpi_client_state_v2
converted_count += 1
print(f" Converting {viz_id} ({viz.get('answer', {}).get('name', '?')}) to KPI")
if converted_count == 0:
print(f" No matching visualizations found to convert")
return False
# Re-import the modified TML
modified_tml = yaml.dump(lb_tml, default_flow_style=False, sort_keys=False)
import_response = self.ts_client.session.post(
f"{self.ts_client.base_url}/api/rest/2.0/metadata/tml/import",
headers=self.ts_client.headers,
json={'metadata_tmls': [modified_tml], 'import_policy': 'ALL_OR_NONE', 'create_new': False}
)
if import_response.status_code != 200:
print(f" Import failed: {import_response.status_code}")
return False
result = import_response.json()
status = result[0].get('response', {}).get('status', {}).get('status_code', '')
if status == 'OK':
return True
else:
error = result[0].get('response', {}).get('status', {}).get('error_message', 'Unknown')
print(f" Import error: {error[:100]}")
return False
except Exception as e:
print(f" KPI conversion error: {str(e)}")
return False
def _check_liveboard_errors(self, liveboard_id: str) -> Dict:
"""
Check if liveboard visualizations have errors
Returns:
Dict with 'has_errors' boolean and 'errors' list
"""
try:
# Fetch liveboard data to see if vizzes render
response = self.ts_client.session.post(
f"{self.ts_client.base_url}/api/rest/2.0/metadata/liveboard/data",
headers=self.ts_client.headers,
json={'metadata_identifier': liveboard_id}
)
if response.status_code != 200:
return {'has_errors': True, 'errors': [{'viz_name': 'Unknown', 'error': f'HTTP {response.status_code}'}]}
data = response.json()
errors = []
# Check each visualization
for viz_data in data.get('contents', []):
viz_name = viz_data.get('visualization_name', 'Unknown')
# Check for error indicators
if 'error' in viz_data or 'error_message' in viz_data:
errors.append({
'viz_name': viz_name,
'error': viz_data.get('error_message', viz_data.get('error', 'Unknown error'))
})
# Check if no data returned (might indicate query error)
elif viz_data.get('returned_data_row_count', 0) == 0 and viz_data.get('available_data_row_count', 0) == 0:
# This might be okay for some viz, so just warn
pass
return {'has_errors': len(errors) > 0, 'errors': errors}
except Exception as e:
return {'has_errors': True, 'errors': [{'viz_name': 'Check failed', 'error': str(e)}]}
def export_liveboard_tml(self, liveboard_id: str) -> Optional[Dict]:
"""Export liveboard TML from ThoughtSpot to verify what was created"""
try:
response = self.ts_client.session.post(
f"{self.ts_client.base_url}/api/rest/2.0/metadata/tml/export",
headers=self.ts_client.headers,
json={
'metadata': [{'identifier': liveboard_id}],
'export_associated': False,
'export_fqn': False
}
)
if response.status_code == 200:
result = response.json()
if result and len(result) > 0:
# Parse the YAML/JSON TML
import json
tml_content = result[0].get('edoc', '')
if tml_content:
try:
return json.loads(tml_content)
except:
# Might be YAML, return raw
return {'raw': tml_content}
return None
else:
print(f" ⚠️ Export failed: {response.status_code}")
return None
except Exception as e:
print(f" ⚠️ Export error: {str(e)}")
return None
def deploy_liveboard(self, liveboard_tml_json: str) -> Dict:
"""
Deploy Liveboard TML to ThoughtSpot
Args:
liveboard_tml_json: Complete Liveboard TML as JSON string
Returns:
Dictionary with deployment result:
- success: Boolean
- liveboard_id: GUID of created liveboard
- url: Direct URL to liveboard in ThoughtSpot
- error: Error message if failed
"""
try:
# Debug: Log the TML being sent
print(f"🔍 DEBUG: Sending TML to ThoughtSpot")
print(f"🔍 DEBUG: TML length: {len(liveboard_tml_json)}")
print(f"🔍 DEBUG: TML first 500 chars:\n{liveboard_tml_json[:500]}")
response = self.ts_client.session.post(
f"{self.ts_client.base_url}/api/rest/2.0/metadata/tml/import",
headers=self.ts_client.headers,
json={
"metadata_tmls": [liveboard_tml_json],
"import_policy": "PARTIAL",
"create_new": True
}
)
if response.status_code == 200:
result = response.json()
# Extract liveboard ID from response
liveboard_id = None
# Debug: print response structure
print(f" API Response type: {type(result)}")
if isinstance(result, list) and len(result) > 0:
print(f" First item keys: {list(result[0].keys())}")
response_obj = result[0].get('response', {})
print(f" Response obj type: {type(response_obj)}, keys: {list(response_obj.keys()) if isinstance(response_obj, dict) else 'not a dict'}")
status_obj = response_obj.get('status', {})
print(f" Status: {status_obj}")
# If error, check if there's more info in the full result
if status_obj.get('status_code') == 'ERROR':
print(f" Full error details:")
import json
print(json.dumps(result[0], indent=2)[:2000])
# Navigate response structure
liveboard_id = result[0].get('response', {}).get('header', {}).get('id_guid')
if not liveboard_id:
# Try alternate paths
liveboard_id = result[0].get('response', {}).get('header', {}).get('id')
# Check for visualization errors after creation
if liveboard_id and status_obj.get('status_code') == 'OK':
print(f"\n ✅ Liveboard created, verifying TML and checking errors...")
# Export the TML to see what was actually created
exported_tml = self.export_liveboard_tml(liveboard_id)
if exported_tml and 'raw' not in exported_tml:
print(f"\n 📋 Exported TML verification:")
visualizations = exported_tml.get('liveboard', {}).get('visualizations', [])
for i, viz in enumerate(visualizations[:3]): # Show first 3
viz_name = viz.get('answer', {}).get('name', 'Unknown')
search_query = viz.get('answer', {}).get('search_query', 'No query')
print(f" Viz {i+1}: {viz_name}")
print(f" Query: {search_query}")
error_check = self._check_liveboard_errors(liveboard_id)
if error_check['has_errors']:
print(f"\n ⚠️ Found {len(error_check['errors'])} visualization errors:")
for err in error_check['errors']:
print(f" - {err['viz_name']}: {err['error']}")
else:
print(f" ✅ All visualizations rendering successfully!")
# Post-process: Convert KPI visualizations with sparkline settings
if hasattr(self, '_kpi_viz_ids') and self._kpi_viz_ids:
print(f"\n 🔄 Converting {len(self._kpi_viz_ids)} visualizations to KPIs...")
kpi_result = self._convert_to_kpis(liveboard_id, self._kpi_viz_ids)
if kpi_result:
print(f" ✅ KPI conversion successful!")
else:
print(f" ⚠️ KPI conversion failed - visualizations will display as auto-selected chart types")
if not liveboard_id:
liveboard_id = result[0].get('response', {}).get('id')
if not liveboard_id:
# Try edoc_id
liveboard_id = result[0].get('response', {}).get('edoc_header', {}).get('id_guid')
elif isinstance(result, dict):
print(f" Dict keys: {list(result.keys())}")
if result.get('object') and len(result['object']) > 0:
liveboard_id = result['object'][0].get('response', {}).get('header', {}).get('id')
print(f" Extracted liveboard ID: {liveboard_id}")
# Build liveboard name from model or TML
lb_name = f"{self.model_name} Liveboard"
# Check if TML import actually succeeded
# BUG FIX: Don't return success=True if liveboard_id is None or status is ERROR
if isinstance(result, list) and len(result) > 0:
status_code = result[0].get('response', {}).get('status', {}).get('status_code', '')
error_message = result[0].get('response', {}).get('status', {}).get('error_message', '')
if status_code == 'ERROR' or not liveboard_id:
print(f" ❌ TML import failed: {error_message or 'No liveboard ID returned'}")
return {
'success': False,
'liveboard_id': None,
'liveboard_guid': None,
'liveboard_name': lb_name,
'liveboard_url': None,
'error': error_message or 'TML import failed - no liveboard ID returned',
'response': result
}
return {
'success': True,
'liveboard_id': liveboard_id,
'liveboard_guid': liveboard_id, # Alias for compatibility with deployer
'liveboard_name': lb_name,
'liveboard_url': f"{self.ts_client.base_url}/#/pinboard/{liveboard_id}" if liveboard_id else None,
'url': f"{self.ts_client.base_url}/#/pinboard/{liveboard_id}" if liveboard_id else None,
'response': result
}
else:
return {
'success': False,
'error': f"API returned status {response.status_code}: {response.text}"
}
except Exception as e:
return {
'success': False,
'error': str(e)
}
def create_liveboard_from_model(
ts_client,
model_id: str,
model_name: str,
company_data: Dict,
use_case: str,
num_visualizations: int = 6,
liveboard_name: str = None,
llm_model: str = None,
outliers: Optional[List[Dict]] = None,
model_columns: Optional[List[Dict]] = None,
prompt_logger=None,
) -> Dict:
"""
Create and deploy a Liveboard via TML (Spotter Viz path).
This is the direct API approach — builds complete TML and imports it.
Used by the direct TML liveboard creation path.
Args:
ts_client: Authenticated ThoughtSpotDeployer instance
model_id: GUID of ThoughtSpot model
model_name: Display name of model
company_data: Company research data
use_case: Use case name
num_visualizations: Number of visualizations to create
liveboard_name: Optional custom name for the liveboard
llm_model: LLM model to use for AI-driven viz generation
outliers: Optional list of outlier pattern dicts for targeted viz generation
model_columns: Optional list of column metadata from TS model
Returns:
Deployment result dictionary with:
- success: bool
- liveboard_guid: str
- liveboard_name: str
- liveboard_url: str
"""
print(f"🚀 Starting Spotter Viz liveboard creation...", flush=True)
print(f" - Model: {model_name}", flush=True)
print(f" - Use case: {use_case}", flush=True)
print(f" - Visualizations: {num_visualizations}", flush=True)
if outliers:
print(f" - Outlier patterns: {len(outliers)}", flush=True)
if model_columns:
print(f" - Model columns: {len(model_columns)}", flush=True)
creator = LiveboardCreator(ts_client, model_id, model_name, llm_model)
# If external model_columns provided, override the auto-fetched ones
if model_columns:
creator.model_columns = model_columns
liveboard_tml = creator.create_liveboard_tml(
company_data, use_case, num_visualizations, liveboard_name, outliers
)
# Deploy and get result
result = creator.deploy_liveboard(liveboard_tml)
# Ensure liveboard_name is in the result
if result.get('success') and not result.get('liveboard_name'):
# Generate default name if not provided
final_name = liveboard_name or f"{company_data.get('name', 'Demo')} - {use_case} Liveboard"
result['liveboard_name'] = final_name
return result
def _convert_outlier_to_mcp_question(outlier: Dict) -> str:
"""
Convert outlier metadata to natural language question for MCP.
Args:
outlier: Dictionary with outlier metadata including 'show_me_query'
Returns:
Natural language question suitable for MCP getAnswer
"""
# Get the SHOW_ME query (already in natural language)
show_me = outlier.get('show_me_query', '')
if show_me:
# Clean up the query - remove quotes, extra formatting
question = show_me.replace('"', '').replace("'", '').strip()
# Capitalize first letter if needed
question = question[0].upper() + question[1:] if question else question
return question
# Fallback: use title directly
return outlier.get('title', 'data')
def _create_kpi_question_from_outlier(outlier: Dict) -> Optional[str]:
"""
Create companion KPI question from outlier if KPI metric is specified.
For sparklines, KPI questions need time dimension!
Example: "What is the total revenue by week over the last 8 quarters?"
Args:
outlier: Dictionary with outlier metadata
Returns:
KPI question string or None if no KPI specified
"""
if not outlier.get('kpi_companion'):
return None
kpi_metric = outlier.get('kpi_metric', '')
if kpi_metric:
# Include time dimension for sparkline visualization
return f"What is the total {kpi_metric} by week over the last 8 quarters?"
# Fallback: extract first measure from viz_measure_types
measure_types = outlier.get('viz_measure_types', '')
if measure_types:
first_measure = measure_types.split(',')[0].strip()
return f"What is the total {first_measure} by week over the last 8 quarters?"
return None
def _generate_smart_questions_with_ai(
company_data: Dict,
use_case: str,
num_questions: int,
llm_model: str = None,
model_columns: List[Dict] = None,
prompt_logger=None,
) -> List[str]:
"""
Use AI to generate smart, targeted questions for the use case.
Args:
company_data: Company research data
use_case: Use case name
num_questions: Number of questions to generate
llm_model: LLM model to use (e.g., 'gpt-5.4', 'gpt-5.2')
model_columns: List of column dicts with 'name' and 'type' from DDL
Returns:
List of natural language questions
"""
try:
# Use the selected LLM model
from main_research import MultiLLMResearcher
model_to_use = llm_model or DEFAULT_LLM_MODEL
provider_name, model_name = map_llm_display_to_provider(model_to_use)
print(f" 🤖 Using {provider_name}/{model_name} for AI question generation")
researcher = MultiLLMResearcher(provider=provider_name, model=model_name)
# Build column context for the AI
column_context = ""
if model_columns:
measures = []
dimensions = []
dates = []
for col in model_columns:
col_name = col.get('name', '')
col_type = col.get('type', '').upper()
# Classify columns
if any(t in col_type for t in ['DATE', 'TIMESTAMP', 'TIME']):
dates.append(col_name)
elif any(t in col_type for t in ['INT', 'DECIMAL', 'NUMBER', 'FLOAT', 'DOUBLE', 'BIGINT']):
# Numeric - likely measure unless it ends with _ID
if col_name.endswith('_ID') or col_name.endswith('ID'):
dimensions.append(col_name)
else:
measures.append(col_name)
else:
dimensions.append(col_name)
column_context = f"""
AVAILABLE COLUMNS IN THE DATA MODEL (USE THESE EXACT NAMES!):
- Measures (numeric): {', '.join(measures) if measures else 'None found'}
- Dimensions (categorical): {', '.join(dimensions) if dimensions else 'None found'}
- Date columns: {', '.join(dates) if dates else 'None found'}
CRITICAL: You MUST use the actual column names above in your questions!
For example, if measures are [IMPRESSIONS, CLICKS, CONVERSIONS], use those - NOT generic "revenue" or "sales"!
"""
prompt = f"""Generate {num_questions} business intelligence questions for a demo dashboard.
Company: {company_data.get('name', 'Unknown Company')}
Use Case: {use_case}
{column_context}
The liveboard already has KPI cards showing single metrics. These fill questions should surface INTERESTING BUSINESS INSIGHTS that a senior executive would find compelling — comparisons, rankings, and breakdowns that tell a story.
WHAT MAKES A GREAT FILL QUESTION:
- Reveals which dimension is driving performance: "Top 10 [DIMENSION] by [MEASURE] last 12 months"
- Compares performance across a meaningful grouping: "[MEASURE] by [DIMENSION] last 12 months"
- Shows how a breakdown has shifted: "[MEASURE] by [DIMENSION1] and [DIMENSION2] last 18 months"
- Surfaces a ranking with business stakes: "Top 5 [DIMENSION] by [MEASURE] vs prior year"
DO NOT generate simple single-metric questions like "[MEASURE] by month" — those become redundant KPI cards and the liveboard already has enough of those. Every question here should involve at least one dimension to compare against.
TIME FILTER RULES:
- Use "last 12 months" or "last 18 months" — never "this year" or "this quarter" (too restrictive with generated data)
- Use "last 2 years" for broad comparisons
CRITICAL:
- Use the EXACT column names from the data model above
- Do NOT mention chart types — ThoughtSpot picks automatically
- Each question should make a business leader say "that's interesting" — not just confirm a number
Return ONLY a JSON object with this exact structure (no other text):
{{
"questions": [
"question 1 using actual column names",
"question 2 using actual column names",
"question 3 using actual column names"
]
}}"""
messages = [{"role": "user", "content": prompt}]
import time as _time
_q_start = _time.time()
response_text = researcher.make_request(messages, temperature=0.7, max_tokens=2000, stream=False)
_q_ms = int((_time.time() - _q_start) * 1000)
# Log the prompt/response
from prompt_logger import log_researcher_call
log_researcher_call("liveboard_questions", researcher, messages, response_text or "", duration_ms=_q_ms, logger=prompt_logger)
# Parse the response
content = response_text
if not content or content.strip() == '':
print(f" ⚠️ AI returned empty content")
raise ValueError("Empty AI response")
# Try to extract JSON if AI added extra text
content = content.strip()
if not content.startswith('{') and not content.startswith('['):
# Look for JSON in the response
json_match = re.search(r'(\{[\s\S]*\})', content)
if json_match:
content = json_match.group(1)
else:
print(f" ⚠️ No JSON found in AI response")
raise ValueError("No JSON in response")
result = json.loads(content)
# Handle different response formats
if 'questions' in result and isinstance(result['questions'], list):
questions = result['questions']
elif isinstance(result, list):
questions = result
else:
print(f" ⚠️ Unexpected AI response format: {list(result.keys())}")
questions = []
# Ensure at least 2 questions are in KPI format
# KPI format: "[measure] weekly" or "[measure] monthly" (very short, 2-4 words)
kpi_count = 0
for q in questions[:2]: # Check first 2
words = q.lower().split()
if len(words) <= 4 and any(kw in words for kw in ['weekly', 'monthly', 'quarterly', 'yearly']):
kpi_count += 1
if kpi_count < 2 and model_columns:
# Find measures to create KPI questions
measures = [col['name'] for col in model_columns
if any(t in col.get('type', '').upper() for t in ['INT', 'DECIMAL', 'NUMBER', 'FLOAT'])
and not col['name'].endswith('ID') and not col['name'].endswith('_ID')]
if len(measures) >= 2:
# Insert proper KPI questions at the start
kpi_questions = [f"{measures[0].lower().replace('_', ' ')} weekly",
f"{measures[1].lower().replace('_', ' ')} monthly"]
print(f" 🔧 Inserting KPI questions: {kpi_questions}")
questions = kpi_questions + [q for q in questions if not any(kw in q.lower() for kw in ['weekly', 'monthly', 'quarterly', 'yearly'])]
return questions[:num_questions]
except Exception as e:
print(f" ⚠️ AI question generation failed: {e}")
print(f" 🔍 DEBUG: AI response was: {response_text[:500] if 'response_text' in locals() else 'N/A'}")
# Fallback: try to use actual column names from model_columns
if model_columns:
measures = [col['name'] for col in model_columns
if any(t in col.get('type', '').upper() for t in ['INT', 'DECIMAL', 'NUMBER', 'FLOAT'])
and not col['name'].endswith('ID') and not col['name'].endswith('_ID')]
dimensions = [col['name'] for col in model_columns
if col.get('type', '').upper() == 'VARCHAR' and not col['name'].endswith('ID')]
if measures and dimensions:
m1 = measures[0].lower().replace('_', ' ')
m2 = measures[1].lower().replace('_', ' ') if len(measures) > 1 else m1
d1 = dimensions[0].lower().replace('_', ' ') if dimensions else 'category'
return [
f"{m1} by week",
f"{m2} by month",
f"top 10 {d1} by {m1}",
f"{m1} by {d1}",
f"{m1} by month last 12 months",
f"{m2} by {d1}"
][:num_questions]
# Ultimate fallback
return [
"revenue by week",
"sales by month",
"top 10 products by revenue",
"revenue by category",
"revenue by month last 12 months",
"sales by region"
][:num_questions]
def create_liveboard_from_model_mcp(
ts_client,
model_id: str,
model_name: str,
company_data: Dict,
use_case: str,
num_visualizations: int = 8,
liveboard_name: str = None,
outliers: Optional[List[Dict]] = None,
llm_model: str = None,
model_columns: List[Dict] = None,
prompt_logger=None,
session_logger=None,
) -> Dict:
"""
Create and deploy a Liveboard using ThoughtSpot's Model Context Protocol (MCP)
This is an alternative to the TML-based approach that uses MCP's AI-driven
question generation and answer retrieval to build liveboards.
PHASE 1 ENHANCEMENT: Now supports outlier-driven question generation for
more targeted, demo-ready visualizations.
Args:
ts_client: Authenticated ThoughtSpotDeployer instance (used for base_url)
model_id: GUID of ThoughtSpot model/datasource
model_name: Display name of model
company_data: Company research data
use_case: Use case name
num_visualizations: Number of visualizations to create (default: 6)
liveboard_name: Optional custom name for the liveboard
outliers: Optional list of demo outliers to create targeted visualizations
llm_model: LLM model for AI question generation
model_columns: List of column dicts with 'name' and 'type' from DDL
Returns:
Deployment result dictionary with structure:
{
'success': bool,
'liveboard_name': str,
'liveboard_guid': str,
'liveboard_url': str,
'error': str (if failed)
}
"""
_slog = session_logger
import time as _time
_t_lb = _slog.log_start("liveboard") if _slog else None
print(f"🚀 Starting MCP liveboard creation...", flush=True)
print(f" [MCP] Model: {model_name}", flush=True)
print(f" [MCP] Model ID: {model_id}", flush=True)
print(f" [MCP] Use case: {use_case}", flush=True)
print(f" [MCP] Visualizations: {num_visualizations}", flush=True)
print(f" [MCP] Endpoint: https://agent.thoughtspot.app/bearer/mcp (bearer auth)", flush=True)
import asyncio
import sys
async def _create_mcp_liveboard():
"""Inner async function to handle MCP workflow"""
print(f"[MCP] Starting async MCP liveboard creation...")
try:
print(f"[MCP] Importing MCP modules...")
from mcp import ClientSession
from mcp.client.streamable_http import streamablehttp_client
print(f"[MCP] MCP modules imported successfully (streamable HTTP transport)")
# Get trusted auth token for bearer auth
print(f"[MCP] Getting trusted auth token for {ts_client.username}...")
_, mcp_token = _get_direct_api_session(ts_client.username, ts_client.secret_key)
if not mcp_token:
return {
'success': False,
'error': 'Failed to get trusted auth token. Check THOUGHTSPOT_TRUSTED_AUTH_KEY in .env'
}
# Build bearer auth header: {token}@{host}
ts_url = get_admin_setting('THOUGHTSPOT_URL').rstrip('/')
ts_host = ts_url.replace('https://', '').replace('http://', '')
mcp_endpoint = "https://agent.thoughtspot.app/bearer/mcp"
headers = {
"Authorization": f"Bearer {mcp_token}@{ts_host}"
}
print(f"[MCP] Connecting to {mcp_endpoint} (bearer auth, host={ts_host})")
async with streamablehttp_client(mcp_endpoint, headers=headers) as (read, write, _):
async with ClientSession(read, write) as session:
await session.initialize()
# Verify connection with ping
print(f"Pinging MCP server...")
ping_result = await session.call_tool("ping", {})
print(f"✅ MCP connection established: {ping_result.content[0].text}")
# Step 2: Generate questions intelligently
print(f"🤔 Generating questions for: {use_case}")
questions_to_ask = []
# Priority 1: Use outlier-driven questions if available
if outliers and len(outliers) > 0:
print(f"🎯 Using {len(outliers)} outlier-driven questions", flush=True)
for outlier in outliers:
# Convert outlier to MCP question
question = _convert_outlier_to_mcp_question(outlier)
questions_to_ask.append({
'question': question,
'source': 'outlier',
'outlier_title': outlier.get('title', '')
})
print(f" 📌 {outlier.get('title', 'Outlier')}: {question}", flush=True)
# Add companion KPI if specified
kpi_question = _create_kpi_question_from_outlier(outlier)
if kpi_question:
questions_to_ask.append({
'question': kpi_question,
'source': 'kpi',
'outlier_title': outlier.get('title', '') + ' - KPI'
})
print(f" 📊 KPI: {kpi_question}", flush=True)
# Priority 2: If we need more questions, use AI to generate them
remaining_viz = num_visualizations - len(questions_to_ask)
if remaining_viz > 0:
print(f"🤖 Generating {remaining_viz} additional AI-driven questions...", flush=True)
if model_columns:
print(f" 📋 Using {len(model_columns)} columns from DDL for question generation", flush=True)
ai_questions = _generate_smart_questions_with_ai(
company_data,
use_case,
remaining_viz,
llm_model=llm_model,
model_columns=model_columns,
prompt_logger=prompt_logger,
)
for q in ai_questions:
questions_to_ask.append({
'question': q,
'source': 'ai'
})
print(f" 🧠 AI: {q}", flush=True)
# Fallback: If still no questions, use MCP's getRelevantQuestions
if not questions_to_ask:
print(f"⚠️ No outliers or AI questions, falling back to MCP getRelevantQuestions", flush=True)
simple_query = use_case.lower()
questions_result = await session.call_tool("getRelevantQuestions", {
"query": simple_query,
"datasourceIds": [model_id]
})
questions_data = json.loads(questions_result.content[0].text)
mcp_questions = questions_data.get('questions', [])
for q in mcp_questions[:num_visualizations]:
questions_to_ask.append({
'question': q['question'],
'source': 'mcp'
})
if not questions_to_ask:
print(f"❌ No questions generated!", flush=True)
return {
'success': False,
'error': 'No questions generated. Model may be empty or incompatible.'
}
print(f"💡 Total questions to answer: {len(questions_to_ask)}", flush=True)
# Step 3: Get answers for each question
# Use direct API instead of MCP's getAnswer (bypasses agent.thoughtspot.app proxy)
use_direct_api = os.getenv('MCP_USE_DIRECT_API', 'true').lower() == 'true'
print(f"📊 Getting answers for {len(questions_to_ask)} questions...", flush=True)
print(f" Using: {'Direct ThoughtSpot API' if use_direct_api else 'MCP getAnswer'}", flush=True)
answers = []
for i, q_obj in enumerate(questions_to_ask, 1):
try:
# Strip chart type hints from question - they're for AI inference, not viz titles
question_text = _strip_chart_type_hints(q_obj['question'])
source = q_obj.get('source', 'unknown')
print(f" {i}/{len(questions_to_ask)}: [{source}] {question_text}", flush=True)
if use_direct_api:
# Use direct ThoughtSpot API (bypasses MCP proxy issues)
answer_data = _get_answer_direct(question_text, model_id, ts_client.username, ts_client.secret_key)
if answer_data:
# Clean up the viz title
if 'question' in answer_data:
answer_data['question'] = _clean_viz_title(answer_data['question'])
print(f" 🔍 DEBUG: Direct API answer keys: {list(answer_data.keys())}")
answers.append(answer_data)
print(f" ✅ Answer retrieved (direct API)", flush=True)
else:
print(f" ⚠️ Direct API returned no answer, trying MCP fallback...", flush=True)
# Fallback to MCP if direct API fails
answer_result = await session.call_tool("getAnswer", {
"question": question_text,
"datasourceId": model_id
})
answer_data = json.loads(answer_result.content[0].text)
# Clean up the viz title
if 'question' in answer_data:
answer_data['question'] = _clean_viz_title(answer_data['question'])
answers.append(answer_data)
print(f" ✅ Answer retrieved (MCP fallback)", flush=True)
else:
# Use MCP's getAnswer (original method)
answer_result = await session.call_tool("getAnswer", {
"question": question_text,
"datasourceId": model_id
})
print(f" 🔍 DEBUG: answer_result type: {type(answer_result)}")
print(f" 🔍 DEBUG: answer_result.content: {answer_result.content}")
if answer_result.content and len(answer_result.content) > 0:
print(f" 🔍 DEBUG: answer_result.content[0].text: '{answer_result.content[0].text}'")
# Parse answer data
answer_data = json.loads(answer_result.content[0].text)
# Clean up the viz title
if 'question' in answer_data:
answer_data['question'] = _clean_viz_title(answer_data['question'])
print(f" 🔍 DEBUG: Answer keys: {list(answer_data.keys())}")
answers.append(answer_data)
print(f" ✅ Answer retrieved", flush=True)
except Exception as e:
print(f" ⚠️ Failed to get answer: {str(e)}", flush=True)
print(f" 🔍 DEBUG: Exception type: {type(e).__name__}")
import traceback
print(f" 🔍 DEBUG: Traceback: {traceback.format_exc()}")
# Continue with other questions
continue
if not answers:
print(f"❌ DEBUG: No answers retrieved!")
print(f" Total questions attempted: {len(questions_to_ask)}")
return {
'success': False,
'error': 'Failed to get answers for any questions. Model may not contain data.'
}
print(f"✅ Successfully retrieved {len(answers)} answers")
# Step 4: Create liveboard with rich note tile
# Use custom name if provided, otherwise generate default
final_liveboard_name = liveboard_name
if not final_liveboard_name:
final_liveboard_name = f"📊 {company_data.get('name', 'Company')} - {use_case}"
print(f"🔍 DEBUG: Liveboard name to use: '{final_liveboard_name}'")
print(f"🔍 DEBUG: Company data: {company_data}")
# Generate rich HTML note tile with outlier highlights
outlier_section = ""
if outliers and len(outliers) > 0:
outlier_section = """
<div style="margin-top: 25px; padding: 25px; background: rgba(255,255,255,0.1);
border-radius: 12px; border-left: 4px solid #fbbf24;">
<h3 style="margin: 0 0 15px 0; font-size: 18px;">🎯 Strategic Insights</h3>
<ul style="margin: 0; padding-left: 20px; font-size: 14px; line-height: 1.8;">"""
for outlier in outliers[:3]: # Show top 3 outliers
title = outlier.get('title', 'Insight')
insight = outlier.get('insight', '')
impact = outlier.get('impact', '')
outlier_section += f"""
<li style="margin-bottom: 12px;">
<strong style="color: #fbbf24;">{title}</strong><br/>
<span style="opacity: 0.9;">{insight}</span>
{f'<br/><span style="color: #10b981; font-weight: 500;">💡 {impact}</span>' if impact else ''}
</li>"""
outlier_section += """
</ul>
</div>"""
# Count question sources
outlier_count = sum(1 for q in questions_to_ask if q.get('source') == 'outlier')
ai_count = sum(1 for q in questions_to_ask if q.get('source') == 'ai')
mcp_count = sum(1 for q in questions_to_ask if q.get('source') == 'mcp')
source_info = []
if outlier_count > 0:
source_info.append(f"🎯 {outlier_count} outlier-driven")
if ai_count > 0:
source_info.append(f"🧠 {ai_count} AI-generated")
if mcp_count > 0:
source_info.append(f"⚡ {mcp_count} MCP-suggested")
# Create branded note tile with logo and colors
note_tile = create_branded_note_tile(
company_data=company_data,
use_case=use_case,
answers=answers,
source_info=source_info,
outlier_section=outlier_section
)
print(f"🎨 Creating liveboard: {final_liveboard_name}")
print(f"📊 Preparing to send {len(answers)} answers to createLiveboard")
# Log what we're sending to createLiveboard
print(f"📋 [createLiveboard] Answer summary ({len(answers)} total):", flush=True)
for idx, ans in enumerate(answers):
sid = ans.get('session_identifier', 'MISSING')
viz = ans.get('visualization_type', 'MISSING')
q = ans.get('question', 'MISSING')[:60]
has_tok = bool(ans.get('tokens'))
print(f" [{idx+1}] session={sid} viz={viz} tokens={has_tok} q='{q}'", flush=True)
try:
print(f"📦 [createLiveboard] Sending {len(answers)} answers...", flush=True)
print(f" Name: '{final_liveboard_name}'", flush=True)
print(f" Note tile: {len(note_tile)} chars", flush=True)
liveboard_result = await session.call_tool("createLiveboard", {
"name": final_liveboard_name,
"answers": answers,
"noteTile": note_tile
})
result_text = liveboard_result.content[0].text
print(f" [createLiveboard] FULL response: {result_text}", flush=True)
if not result_text or result_text.strip() == '':
raise ValueError("createLiveboard returned empty response")
if result_text.startswith('ERROR:'):
error_msg = result_text.replace('ERROR:', '').strip()
print(f"❌ [createLiveboard] Server error: {error_msg}", flush=True)
return {
'success': False,
'error': f'MCP createLiveboard failed: {error_msg}',
'liveboard_name': final_liveboard_name
}
except Exception as create_error:
error_str = str(create_error)
print(f"❌ [createLiveboard] FAILED with {len(answers)} answers: {type(create_error).__name__}: {error_str[:200]}", flush=True)
# Retry with fewer answers if we had multiple
if len(answers) > 1:
retry_count = max(1, len(answers) // 2)
print(f"🔄 [createLiveboard] Retrying with {retry_count}/{len(answers)} answers...", flush=True)
try:
liveboard_result = await session.call_tool("createLiveboard", {
"name": final_liveboard_name,
"answers": answers[:retry_count],
"noteTile": note_tile
})
result_text = liveboard_result.content[0].text
print(f" [createLiveboard] Retry OK: {result_text[:200]}", flush=True)
if result_text.startswith('ERROR:'):
raise ValueError(result_text)
except Exception as retry_error:
print(f"❌ [createLiveboard] Retry also failed: {type(retry_error).__name__}: {str(retry_error)[:200]}", flush=True)
if retry_count > 1:
print(f"🔄 [createLiveboard] Last resort: trying with 1 answer...", flush=True)
try:
liveboard_result = await session.call_tool("createLiveboard", {
"name": final_liveboard_name,
"answers": answers[:1],
"noteTile": note_tile
})
result_text = liveboard_result.content[0].text
print(f" [createLiveboard] Minimal OK: {result_text[:200]}", flush=True)
if result_text.startswith('ERROR:'):
raise ValueError(result_text)
except Exception as last_error:
print(f"❌ [createLiveboard] All attempts failed. Last error: {str(last_error)[:200]}", flush=True)
raise last_error
else:
raise retry_error
else:
raise create_error
# Extract URL from the text response using regex
import re
url_match = re.search(r'https://[^\s]+', result_text)
liveboard_url = url_match.group(0) if url_match else ''
# Extract GUID from URL if present
liveboard_guid = None
if liveboard_url:
# URL format: https://instance.thoughtspot.cloud/#/pinboard/GUID
import re
guid_match = re.search(r'/pinboard/([a-f0-9\-]+)', liveboard_url)
if guid_match:
liveboard_guid = guid_match.group(1)
print(f"✅ Liveboard created successfully!", flush=True)
print(f"🔗 URL: {liveboard_url}", flush=True)
print(f"🆔 GUID: {liveboard_guid}", flush=True)
# POST-MCP PROCESSING: Fix note tile layout
# MCP creates note tiles with height: 8, but we want height: 2 like golden demo
if liveboard_guid:
print(f"🔧 Post-processing: Fixing note tile layout...", flush=True)
try:
# Use ts_client session (already authenticated)
ts_base_url = ts_client.base_url
# Export liveboard TML using authenticated session
export_response = ts_client.session.post(
f"{ts_base_url}/api/rest/2.0/metadata/tml/export",
json={
'metadata': [{'identifier': liveboard_guid}],
'export_associated': False
}
)
if export_response.status_code == 200:
tml_data = export_response.json()
if tml_data and len(tml_data) > 0:
# Parse YAML TML
import yaml
tml_str = tml_data[0].get('edoc', '')
liveboard_tml = yaml.safe_load(tml_str)
# Fix Viz_1 layout (note tile)
layout = liveboard_tml.get('liveboard', {}).get('layout', {})
tiles = layout.get('tiles', [])
# Find and fix Viz_1 note tile dimensions (readable size)
for tile in tiles:
if tile.get('visualization_id') == 'Viz_1':
# Make it readable: height 4, width 6 (half screen)
tile['height'] = 4
tile['width'] = 6
print(f" ✓ Fixed Viz_1: height={tile['height']}, width={tile['width']}")
break
# Replace Viz_1 content with company name using golden demo styling
company_name = company_data.get('name', 'Company')
# Use the actual create_branded_note_tile function
company_note = create_branded_note_tile(
company_data=company_data,
use_case=use_case or '',
answers=answers,
source_info=source_info,
outlier_section=''
)
# Find and update Viz_1 content in visualizations
visualizations = liveboard_tml.get('liveboard', {}).get('visualizations', [])
for viz in visualizations:
if viz.get('id') == 'Viz_1':
# Handle different TML structures
if 'note_tile' in viz:
# MCP creates note tiles with note_tile structure
# note_tile is a DICT with html_parsed_string key
if isinstance(viz['note_tile'], dict) and 'html_parsed_string' in viz['note_tile']:
viz['note_tile']['html_parsed_string'] = company_note
else:
viz['note_tile'] = {'html_parsed_string': company_note}
elif 'answer' in viz:
viz['answer']['text_data'] = company_note
viz['answer']['name'] = f'{company_name} Info'
else:
# Direct structure without wrappers
if 'text_data' in viz:
viz['text_data'] = company_note
if 'name' in viz:
viz['name'] = f'{company_name} Info'
print(f" ✓ Replaced Viz_1 content with {company_name}")
break
# Add style_properties to make note tile dark themed (like golden demo)
style = liveboard_tml.get('liveboard', {}).get('style', {})
if 'overrides' not in style:
style['overrides'] = []
liveboard_tml['liveboard']['style'] = style
# Check if Viz_1 already has style override
viz_1_has_style = False
for override in style['overrides']:
if override.get('object_id') == 'Viz_1':
viz_1_has_style = True
# Ensure it has tile_brand_color for dark background
if 'style_properties' not in override:
override['style_properties'] = []
has_brand_color = any(prop.get('name') == 'tile_brand_color' for prop in override['style_properties'])
if not has_brand_color:
override['style_properties'].append({
'name': 'tile_brand_color',
'value': 'TBC_I'
})
print(f" ✓ Added dark theme to Viz_1")
break
if not viz_1_has_style:
# Add new style override for Viz_1 with dark background
style['overrides'].append({
'object_id': 'Viz_1',
'style_properties': [{
'name': 'tile_brand_color',
'value': 'TBC_I'
}]
})
print(f" ✓ Added dark theme style to Viz_1")
# Convert time-series visualizations to KPIs with sparklines
print(f" 🔄 Converting time-series charts to KPIs...")
kpi_count = 0
for viz in visualizations:
if viz.get('id') == 'Viz_1':
continue # Skip note tile
answer = viz.get('answer', {})
viz_name = answer.get('name', '').lower()
search_query = answer.get('search_query', '').lower()
# Check if this is a time-series viz (weekly, monthly, daily patterns)
time_patterns = ['weekly', 'monthly', 'daily', 'quarterly', 'yearly', '.week', '.month', '.day', '.quarter', '.year']
is_time_series = any(p in viz_name or p in search_query for p in time_patterns)
if is_time_series and 'chart' in answer:
# Convert to KPI
answer['chart']['type'] = 'KPI'
# Add KPI-specific settings for sparkline and comparison
kpi_settings = {
"showLabel": True,
"showComparison": True,
"showSparkline": True,
"showAnomalies": False,
"showBounds": False,
"customCompare": "PREV_AVAILABLE",
"showOnlyLatestAnomaly": False
}
# Update client_state_v2 with KPI settings
import json as json_module
client_state = answer['chart'].get('client_state_v2', '{}')
try:
cs = json_module.loads(client_state) if client_state else {}
if 'chartProperties' not in cs:
cs['chartProperties'] = {}
if 'chartSpecific' not in cs['chartProperties']:
cs['chartProperties']['chartSpecific'] = {}
cs['chartProperties']['chartSpecific']['customProps'] = json_module.dumps(kpi_settings)
cs['chartProperties']['chartSpecific']['dataFieldArea'] = 'column'
answer['chart']['client_state_v2'] = json_module.dumps(cs)
except:
pass # Keep existing if parsing fails
kpi_count += 1
print(f" ✓ Converted '{answer.get('name', '?')}' to KPI")
if kpi_count > 0:
print(f" ✅ Converted {kpi_count} visualizations to KPIs with sparklines")
# Re-import fixed TML using authenticated session
import_response = ts_client.session.post(
f"{ts_base_url}/api/rest/2.0/metadata/tml/import",
json={
'metadata_tmls': [yaml.dump(liveboard_tml, default_flow_style=False, sort_keys=False)],
'import_policy': 'PARTIAL',
'create_new': False
}
)
if import_response.status_code == 200:
print(f" ✅ Layout fixed successfully!")
else:
print(f" ⚠️ Could not re-import TML: {import_response.status_code}")
else:
print(f" ⚠️ No TML data in export response")
else:
print(f" ⚠️ Could not export TML: {export_response.status_code}")
except Exception as fix_error:
print(f" ⚠️ Layout fix failed: {str(fix_error)}")
# Don't fail the whole operation if post-processing fails
return {
'success': True,
'liveboard_name': final_liveboard_name,
'liveboard_guid': liveboard_guid or 'unknown',
'liveboard_url': liveboard_url,
'message': 'Liveboard created successfully via MCP',
'visualizations_created': len(answers)
}
except ImportError as e:
print(f"❌ ImportError in MCP liveboard creation: {str(e)}")
import traceback
print(f" Traceback: {traceback.format_exc()}")
return {
'success': False,
'error': f'MCP dependencies not installed. Run: pip install "mcp>=1.26.0" for streamable HTTP support. Error: {str(e)}'
}
except Exception as e:
print(f"❌ Exception in MCP liveboard creation: {str(e)}")
print(f" Exception type: {type(e).__name__}")
import traceback
print(f" Traceback: {traceback.format_exc()}")
# Extract real errors from ExceptionGroup/TaskGroup
error_msg = str(e)
if hasattr(e, 'exceptions'):
real_errors = []
for sub_exc in e.exceptions:
print(f" Sub-exception: {type(sub_exc).__name__}: {str(sub_exc)}")
# Check for nested exception groups
if hasattr(sub_exc, 'exceptions'):
for nested_exc in sub_exc.exceptions:
print(f" Nested sub-exception: {type(nested_exc).__name__}: {str(nested_exc)}")
real_errors.append(f"{type(nested_exc).__name__}: {str(nested_exc)}")
else:
real_errors.append(f"{type(sub_exc).__name__}: {str(sub_exc)}")
if real_errors:
error_msg = "; ".join(real_errors)
return {
'success': False,
'error': f'MCP liveboard creation failed: {error_msg}'
}
# Run the async function with a hard overall timeout (5 minutes)
MCP_OVERALL_TIMEOUT = 300 # seconds
async def _run_with_timeout():
return await asyncio.wait_for(_create_mcp_liveboard(), timeout=MCP_OVERALL_TIMEOUT)
try:
print(f"DEBUG: About to call asyncio.run(_create_mcp_liveboard()) [timeout={MCP_OVERALL_TIMEOUT}s]")
result = asyncio.run(_run_with_timeout())
print(f"DEBUG: asyncio.run() completed, result: {result}")
if _slog and _t_lb is not None:
if result and result.get('success'):
_slog.log_end("liveboard", _t_lb, liveboard_guid=result.get('liveboard_guid', ''))
else:
_slog.log_end("liveboard", _t_lb, error=result.get('error', 'unknown error') if result else 'no result')
return result
except asyncio.TimeoutError:
err = f'MCP liveboard creation timed out after {MCP_OVERALL_TIMEOUT} seconds. ThoughtSpot MCP endpoint may be slow or unresponsive.'
print(f"[ERROR] MCP liveboard creation timed out after {MCP_OVERALL_TIMEOUT}s")
if _slog and _t_lb is not None:
_slog.log_end("liveboard", _t_lb, error=err)
return {'success': False, 'error': err}
except Exception as e:
print(f"[ERROR] Exception in asyncio.run(): {str(e)}")
print(f" Exception type: {type(e).__name__}")
import traceback
print(f" Traceback: {traceback.format_exc()}")
# Check if it's an ExceptionGroup (TaskGroup error) - extract real errors
if hasattr(e, 'exceptions'):
real_errors = []
for sub_exc in e.exceptions:
print(f" Sub-exception: {type(sub_exc).__name__}: {str(sub_exc)}")
real_errors.append(f"{type(sub_exc).__name__}: {str(sub_exc)}")
err = f'MCP async errors: {"; ".join(real_errors)}'
if _slog and _t_lb is not None:
_slog.log_end("liveboard", _t_lb, error=err)
return {'success': False, 'error': err}
err = f'Failed to run MCP async workflow: {str(e)}'
if _slog and _t_lb is not None:
_slog.log_end("liveboard", _t_lb, error=err)
return {'success': False, 'error': err}
def enhance_mcp_liveboard(
liveboard_guid: str,
company_data: Dict,
ts_client,
add_groups: bool = True,
fix_kpis: bool = True,
apply_brand_colors: bool = True,
add_layout: bool = True
) -> Dict:
"""
Enhance an MCP-created liveboard with TML post-processing (Golden Demo style).
This is the key function for the HYBRID approach:
- MCP creates the liveboard quickly with AI-driven questions
- This function then polishes it to match the Golden Demo standard
Enhancements applied:
- Add Groups to organize visualizations by business function
- Add Tabs layout for multi-page organization
- Fix KPI visualizations for sparklines and comparisons
- Apply brand colors (liveboard-level and group-level)
- Set border style and hide unnecessary descriptions
- Create proper grid layout for visualizations
Args:
liveboard_guid: GUID of the MCP-created liveboard
company_data: Company research data (name, use_case, etc.)
ts_client: Authenticated ThoughtSpotDeployer instance
add_groups: Whether to add Groups organization
fix_kpis: Whether to fix KPI visualizations for sparklines
apply_brand_colors: Whether to apply brand color styling
add_layout: Whether to add proper tab/tile layout
Returns:
Dict with success, message, enhancements list
"""
print(f"Starting hybrid post-processing for liveboard {liveboard_guid}...", flush=True)
enhancements_applied = []
try:
# Step 1: Export the liveboard TML
print(f" Exporting liveboard TML...", flush=True)
export_response = ts_client.session.post(
f"{ts_client.base_url}/api/rest/2.0/metadata/tml/export",
json={
'metadata': [{'identifier': liveboard_guid}],
'export_associated': False
}
)
if export_response.status_code != 200:
return {
'success': False,
'message': f'Failed to export liveboard TML: HTTP {export_response.status_code}',
'enhancements': []
}
tml_data = export_response.json()
if not tml_data or len(tml_data) == 0:
return {
'success': False,
'message': 'Empty TML export response',
'enhancements': []
}
# Parse YAML TML
tml_str = tml_data[0].get('edoc', '')
liveboard_tml = yaml.safe_load(tml_str)
if not liveboard_tml or 'liveboard' not in liveboard_tml:
return {
'success': False,
'message': 'Invalid TML structure',
'enhancements': []
}
visualizations = liveboard_tml.get('liveboard', {}).get('visualizations', [])
print(f" Found {len(visualizations)} visualizations", flush=True)
for _v in visualizations:
_vid = _v.get('id', '?')
_is_note = 'note_tile' in _v
_chart_type = _v.get('answer', {}).get('chart', {}).get('type', 'N/A') if not _is_note else 'note'
_name = _v.get('answer', {}).get('name', '') if not _is_note else '(note tile)'
print(f" viz {_vid}: type={_chart_type} note={_is_note} name='{_name}'", flush=True)
# Step 2: Classify visualizations by type and purpose
kpi_vizs = [] # KPI charts (big numbers, sparklines)
trend_vizs = [] # Line/area charts (time series)
bar_vizs = [] # Column/bar charts
table_vizs = [] # Tables and pivot tables
geo_vizs = [] # Maps
note_vizs = [] # Text tiles
other_vizs = [] # Everything else
viz_details = {} # Store details for layout
for viz in visualizations:
viz_id = viz.get('id', '')
# Check if it's a note tile
if 'note_tile' in viz:
note_vizs.append(viz_id)
viz_details[viz_id] = {'type': 'note', 'name': 'Note'}
continue
# Get chart type from answer
answer = viz.get('answer', {})
chart = answer.get('chart', {})
chart_type = chart.get('type', 'UNKNOWN')
viz_name = answer.get('name', viz_id)
viz_details[viz_id] = {'type': chart_type, 'name': viz_name}
if chart_type == 'KPI':
kpi_vizs.append(viz_id)
elif chart_type in ['LINE', 'AREA']:
trend_vizs.append(viz_id)
elif chart_type in ['COLUMN', 'BAR', 'STACKED_COLUMN', 'STACKED_BAR']:
bar_vizs.append(viz_id)
elif chart_type in ['TABLE', 'PIVOT_TABLE']:
table_vizs.append(viz_id)
elif chart_type in ['GEO_AREA', 'GEO_BUBBLE', 'GEO_HEATMAP']:
geo_vizs.append(viz_id)
elif chart_type in ['PIE', 'DONUT', 'TREEMAP', 'SANKEY', 'FUNNEL']:
other_vizs.append(viz_id)
else:
other_vizs.append(viz_id)
print(f" Classification: {len(kpi_vizs)} KPIs, {len(trend_vizs)} trends, {len(bar_vizs)} bars, {len(table_vizs)} tables, {len(note_vizs)} notes", flush=True)
# Step 2.5: Remove note tiles (MCP requires them but we don't want them in final liveboard)
# Safety: only remove note tiles if there are other vizzes to show — never leave liveboard empty
non_note_count = len(visualizations) - len(note_vizs)
if note_vizs and non_note_count > 0:
print(f" Removing {len(note_vizs)} note tile(s)...", flush=True)
original_count = len(visualizations)
liveboard_tml['liveboard']['visualizations'] = [
v for v in visualizations if v.get('id') not in note_vizs
]
visualizations = liveboard_tml['liveboard']['visualizations']
print(f" [OK] Removed note tiles ({original_count} -> {len(visualizations)} visualizations)", flush=True)
enhancements_applied.append(f"Removed {len(note_vizs)} note tile(s)")
elif note_vizs and non_note_count == 0:
print(f" ⚠️ Only note tiles found ({len(note_vizs)}) — MCP may have returned no chart answers. Keeping note tiles to preserve liveboard content.", flush=True)
# Step 3: Add Groups - simplified: just KPI section, rest ungrouped
if add_groups:
print(f" Adding Groups (simplified)...", flush=True)
groups = []
# Create ONE group: Key Metrics with ALL KPIs (always at top)
if kpi_vizs:
groups.append({
'id': 'Group_1',
'name': 'Key Metrics',
'visualizations': list(kpi_vizs) # ALL KPIs in the group
})
print(f" [OK] Added Key Metrics group with {len(kpi_vizs)} KPIs", flush=True)
# Leave trends, bars, tables, etc. ungrouped (no Trends or Analysis groups)
ungrouped_count = len(trend_vizs) + len(bar_vizs) + len(table_vizs) + len(other_vizs)
if ungrouped_count > 0:
print(f" [OK] {ungrouped_count} visualizations left ungrouped", flush=True)
if groups:
liveboard_tml['liveboard']['groups'] = groups
enhancements_applied.append(f"Added {len(groups)} group with KPIs")
# Step 3.5: Convert LINE charts to KPIs (MCP creates LINE instead of KPI)
# ThoughtSpot's MCP doesn't create KPI chart types - convert the first 2 trend charts
if fix_kpis and len(kpi_vizs) == 0 and len(trend_vizs) >= 1:
print(f" Converting LINE charts to KPIs (MCP creates LINE instead of KPI)...", flush=True)
converted_count = 0
# Convert first 2 LINE/AREA charts to KPIs
for viz_id in trend_vizs[:2]:
for viz in visualizations:
if viz.get('id') != viz_id:
continue
answer = viz.get('answer', {})
chart = answer.get('chart', {})
# Change chart type from LINE/AREA to KPI
old_type = chart.get('type', 'LINE')
chart['type'] = 'KPI'
# Move from trend_vizs to kpi_vizs
kpi_vizs.append(viz_id)
converted_count += 1
viz_name = answer.get('name', viz_id)
print(f" Converted '{viz_name}' from {old_type} to KPI", flush=True)
# Remove converted vizs from trend_vizs
trend_vizs = [v for v in trend_vizs if v not in kpi_vizs]
if converted_count > 0:
enhancements_applied.append(f"Converted {converted_count} LINE charts to KPIs")
print(f" [OK] Converted {converted_count} LINE→KPI", flush=True)
# Update the groups if we just created KPIs
if add_groups and 'groups' in liveboard_tml.get('liveboard', {}):
groups = liveboard_tml['liveboard']['groups']
if not groups: # No groups yet
groups.append({
'id': 'Group_1',
'name': 'Key Metrics',
'visualizations': list(kpi_vizs)
})
liveboard_tml['liveboard']['groups'] = groups
print(f" [OK] Added Key Metrics group with {len(kpi_vizs)} KPIs", flush=True)
# Step 4: Fix KPI visualizations for sparklines
if fix_kpis and kpi_vizs:
print(f" Fixing KPI sparklines...", flush=True)
kpis_fixed = 0
for viz in visualizations:
if viz.get('id') not in kpi_vizs:
continue
answer = viz.get('answer', {})
chart = answer.get('chart', {})
# Parse and update client_state_v2 for KPI settings
client_state_str = chart.get('client_state_v2', '{}')
try:
client_state = json.loads(client_state_str)
except:
client_state = {}
# Ensure structure exists
if 'version' not in client_state:
client_state['version'] = 'V4DOT2'
if 'chartProperties' not in client_state:
client_state['chartProperties'] = {}
if 'chartSpecific' not in client_state['chartProperties']:
client_state['chartProperties']['chartSpecific'] = {}
# Golden Demo KPI settings
kpi_settings = {
'showLabel': True,
'showComparison': True,
'showSparkline': True,
'showAnomalies': False, # Cleaner look
'showBounds': False,
'customCompare': 'PREV_AVAILABLE',
'showOnlyLatestAnomaly': False
}
client_state['chartProperties']['chartSpecific']['customProps'] = json.dumps(kpi_settings)
client_state['chartProperties']['responsiveLayoutPreference'] = 'AUTO_ON'
# Update the TML
chart['client_state_v2'] = json.dumps(client_state)
kpis_fixed += 1
if kpis_fixed > 0:
enhancements_applied.append(f"Fixed {kpis_fixed} KPI sparklines")
print(f" [OK] Fixed {kpis_fixed} KPIs", flush=True)
# Step 4b: Convert categorical breakdowns to donut charts with colors
# Look for bar/pie charts with "by category/channel/segment" in the name
print(f" Converting categorical charts to donuts...", flush=True)
donuts_converted = 0
# Define vibrant donut colors (from Golden Demo)
DONUT_COLORS = [
'#40C1C0', # Teal
'#9c6ade', # Purple
'#FF8142', # Orange
'#4CAF50', # Green
'#2196F3', # Blue
'#E91E63', # Pink
'#FFC107', # Amber
'#00BCD4', # Cyan
]
for viz in visualizations:
viz_id = viz.get('id')
answer = viz.get('answer', {})
viz_name = answer.get('name', '').lower()
chart = answer.get('chart', {})
chart_type = chart.get('type', '').upper()
# Check if this looks like a categorical breakdown that should be a donut
is_breakdown = any(keyword in viz_name for keyword in
['by channel', 'by category', 'by segment', 'breakdown', 'by audience',
'by campaign', 'distribution', 'mix', 'split'])
is_convertible = chart_type in ['PIE', 'BAR', 'COLUMN', 'STACKED_COLUMN']
if is_breakdown and is_convertible:
# Convert to donut chart
chart['type'] = 'PIE'
# Set donut (ring) style with colors
client_state_str = chart.get('client_state_v2', '{}')
try:
client_state = json.loads(client_state_str)
except:
client_state = {}
if 'version' not in client_state:
client_state['version'] = 'V4DOT2'
if 'chartProperties' not in client_state:
client_state['chartProperties'] = {}
# Donut settings
client_state['chartProperties']['chartSpecific'] = {
'customProps': json.dumps({
'pieStyle': 'RING', # Makes it a donut
'showLabel': True,
'showValue': True,
'showPercent': True
})
}
# Add custom colors
client_state['chartProperties']['chartLevelColorConfig'] = {
'colorPalette': {
'colors': DONUT_COLORS
}
}
chart['client_state_v2'] = json.dumps(client_state)
donuts_converted += 1
print(f" Converted '{answer.get('name', viz_id)}' to donut chart", flush=True)
if donuts_converted > 0:
enhancements_applied.append(f"Converted {donuts_converted} charts to donuts")
print(f" [OK] Converted {donuts_converted} charts to donuts", flush=True)
# Step 5: Apply brand colors and styling (Golden Demo style)
if apply_brand_colors:
print(f" Applying brand colors...", flush=True)
# Ensure style section exists
if 'style' not in liveboard_tml['liveboard']:
liveboard_tml['liveboard']['style'] = {}
style = liveboard_tml['liveboard']['style']
# Set liveboard-level style properties (Golden Demo uses these)
if 'style_properties' not in style:
style['style_properties'] = []
# Add/update liveboard-level properties
lb_props = {
'lb_border_type': 'CURVED', # Rounded corners
'lb_brand_color': 'LBC_C', # Green theme
'hide_group_title': 'false',
'hide_group_description': 'false',
'hide_tile_description': 'false'
}
for prop_name, prop_value in lb_props.items():
existing = next((p for p in style['style_properties'] if p.get('name') == prop_name), None)
if existing:
existing['value'] = prop_value
else:
style['style_properties'].append({'name': prop_name, 'value': prop_value})
if 'overrides' not in style:
style['overrides'] = []
# Group colors: Teal, Green, Purple, Orange (matches Golden Demo)
group_colors = ['GBC_B', 'GBC_C', 'GBC_D', 'GBC_E']
# Apply colors to groups
groups = liveboard_tml.get('liveboard', {}).get('groups', [])
colors_applied = 0
for i, group in enumerate(groups):
grp_id = group.get('id')
color = group_colors[i % len(group_colors)]
existing = next((o for o in style['overrides'] if o.get('object_id') == grp_id), None)
if not existing:
style['overrides'].append({
'object_id': grp_id,
'style_properties': [
{'name': 'group_brand_color', 'value': color},
{'name': 'hide_group_description', 'value': 'true'},
{'name': 'hide_group_tile_description', 'value': 'true'}
]
})
colors_applied += 1
# Style note tiles
for note_id in note_vizs:
existing = next((o for o in style['overrides'] if o.get('object_id') == note_id), None)
if not existing:
style['overrides'].append({
'object_id': note_id,
'style_properties': [
{'name': 'tile_brand_color', 'value': 'TBC_I'}
]
})
colors_applied += 1
# Hide descriptions on analysis charts
for viz_id in (bar_vizs + table_vizs + other_vizs):
existing = next((o for o in style['overrides'] if o.get('object_id') == viz_id), None)
if not existing:
style['overrides'].append({
'object_id': viz_id,
'style_properties': [
{'name': 'hide_tile_description', 'value': 'true'}
]
})
if colors_applied > 0:
enhancements_applied.append(f"Applied styling ({colors_applied} colors)")
print(f" [OK] Applied {colors_applied} colors + liveboard styling", flush=True)
# Step 6: Add proper layout with tabs and group_layouts (Golden Demo style)
if add_layout:
print(f" Adding layout structure (tab-based with group_layouts)...", flush=True)
groups = liveboard_tml.get('liveboard', {}).get('groups', [])
all_viz_ids = kpi_vizs + trend_vizs + bar_vizs + geo_vizs + table_vizs + other_vizs
# Collect grouped visualization IDs
grouped_vizs = set()
for group in groups:
grouped_vizs.update(group.get('visualizations', []))
# Ungrouped visualizations (everything not in a group)
ungrouped = [v for v in all_viz_ids if v not in grouped_vizs]
# === Build tab tiles: groups FIRST (KPIs at top), then ungrouped below ===
tab_tiles = []
row = 0
# Add KPI group at the very top spanning full width
for group in groups:
grp_id = group.get('id')
grp_vizs = group.get('visualizations', [])
# KPI group spans full width at top
group_height = max(3, (len(grp_vizs) + 1) // 2 * 3) # 3 rows per pair
tab_tiles.append({
'visualization_id': grp_id,
'x': 0,
'y': row,
'height': group_height,
'width': 12 # Full width for KPI group
})
row += group_height
# Add ungrouped visualizations in 2-column grid below groups
for i, viz_id in enumerate(ungrouped):
tab_tiles.append({
'visualization_id': viz_id,
'x': 0 if i % 2 == 0 else 6,
'y': row + (i // 2) * 5,
'height': 5,
'width': 6
})
# === Build group_layouts: define tile positions INSIDE each group ===
group_layouts = []
for group in groups:
grp_id = group.get('id')
grp_vizs = group.get('visualizations', [])
grp_tiles = []
for j, gviz_id in enumerate(grp_vizs):
# KPIs: 2-column layout inside group, each 3 rows tall
grp_tiles.append({
'visualization_id': gviz_id,
'x': 0 if j % 2 == 0 else 6,
'y': (j // 2) * 3,
'height': 3,
'width': 6
})
group_layouts.append({
'id': grp_id,
'tiles': grp_tiles
})
# === Assemble the layout with tabs structure ===
if tab_tiles:
tab = {
'name': 'Overview',
'description': '',
'tiles': tab_tiles
}
if group_layouts:
tab['group_layouts'] = group_layouts
liveboard_tml['liveboard']['layout'] = {
'tabs': [tab]
}
total_tiles = len(tab_tiles) + sum(len(gl.get('tiles', [])) for gl in group_layouts)
enhancements_applied.append(f"Added tab layout ({total_tiles} tiles, {len(group_layouts)} group layouts)")
print(f" [OK] Added tab layout: {len(tab_tiles)} top-level tiles, {len(group_layouts)} group layouts", flush=True)
# Step 7: Re-import the enhanced TML
if enhancements_applied:
print(f" Re-importing enhanced TML...", flush=True)
import_response = ts_client.session.post(
f"{ts_client.base_url}/api/rest/2.0/metadata/tml/import",
json={
'metadata_tmls': [yaml.dump(liveboard_tml, default_flow_style=False, sort_keys=False)],
'import_policy': 'PARTIAL',
'create_new': False
}
)
if import_response.status_code == 200:
result = import_response.json()
if result and len(result) > 0:
status = result[0].get('response', {}).get('status', {})
if status.get('status_code') == 'OK':
print(f" [OK] Enhancement complete!", flush=True)
return {
'success': True,
'message': f'Enhanced liveboard with {len(enhancements_applied)} improvements',
'enhancements': enhancements_applied
}
else:
error_msg = status.get('error_message', 'Unknown TML import error')
print(f" [WARN] TML import warning: {error_msg}", flush=True)
return {
'success': True,
'message': f'Enhancement applied with warnings: {error_msg}',
'enhancements': enhancements_applied
}
else:
print(f" [WARN] Re-import failed: HTTP {import_response.status_code}", flush=True)
return {
'success': False,
'message': f'TML re-import failed: HTTP {import_response.status_code}',
'enhancements': enhancements_applied
}
else:
print(f" No enhancements needed", flush=True)
return {
'success': True,
'message': 'No enhancements needed',
'enhancements': []
}
except Exception as e:
print(f" [ERROR] Enhancement error: {str(e)}", flush=True)
import traceback
print(f" Traceback: {traceback.format_exc()}", flush=True)
return {
'success': False,
'message': f'Enhancement failed: {str(e)}',
'enhancements': enhancements_applied
}
return {
'success': True,
'message': f'Successfully enhanced liveboard',
'enhancements': enhancements_applied
}