demoprep / supabase_client.py
mikeboone's picture
cleanup: revert debug supabase import exception to ImportError
0c2ea33
"""
Supabase Client for Settings Persistence
Handles saving and loading user settings and demo history to Supabase.
Implements the database schema for Sprint 2, item #6.
Required environment variables:
SUPABASE_URL: Your Supabase project URL
SUPABASE_ANON_KEY: Your Supabase anon key
Usage:
from supabase_client import SupabaseSettings
settings = SupabaseSettings()
settings.save_setting("user@example.com", "default_llm", "GPT-4", "ai_config")
user_settings = settings.load_all_settings("user@example.com")
"""
import os
import json
from typing import Dict, List, Optional, Any
from datetime import datetime
from dotenv import load_dotenv
from pathlib import Path
from llm_config import DEFAULT_LLM_MODEL
# Load environment variables - try multiple methods to ensure it works
# First try to load from current directory
load_dotenv()
# If that doesn't work, try explicit path
if not os.getenv("SUPABASE_URL"):
env_path = Path(__file__).parent / '.env'
if env_path.exists():
load_dotenv(env_path, override=True)
else:
# Try parent directory
env_path = Path(__file__).parent.parent / '.env'
if env_path.exists():
load_dotenv(env_path, override=True)
# Check if Supabase is available
try:
from supabase import create_client, Client
SUPABASE_AVAILABLE = True
except ImportError:
SUPABASE_AVAILABLE = False
class SupabaseSettings:
"""Manage user settings and demo history in Supabase"""
def __init__(self):
"""Initialize Supabase client"""
self.client = None
self.enabled = False
if not SUPABASE_AVAILABLE:
print("⚠️ Supabase module not available. Settings persistence disabled.")
return
url = os.getenv("SUPABASE_URL")
key = os.getenv("SUPABASE_ANON_KEY")
if not url or not key:
print("⚠️ SUPABASE_URL or SUPABASE_ANON_KEY not set. Settings persistence disabled.")
return
try:
self.client = create_client(url, key)
self.enabled = True
print("✅ Supabase client initialized for settings persistence")
except Exception as e:
print(f"⚠️ Failed to initialize Supabase client: {e}")
def is_enabled(self) -> bool:
"""Check if Supabase is enabled and configured"""
return self.enabled
def save_setting(self, user_email: str, key: str, value: Any, category: str = None) -> bool:
"""
Save a single user setting
Args:
user_email: User's email address
key: Setting key (e.g., 'default_llm', 'default_warehouse')
value: Setting value (will be JSON serialized if not string)
category: Optional category ('ai_config', 'defaults', 'thoughtspot', 'advanced')
Returns:
True if successful, False otherwise
"""
if not self.enabled:
return False
try:
# Convert value to JSON if it's not a string
if not isinstance(value, str):
value = json.dumps(value)
data = {
"user_email": user_email,
"setting_key": key,
"setting_value": {"value": value}, # Store as JSONB
"category": category,
"updated_at": datetime.utcnow().isoformat()
}
print(f"\n🔍 DEBUG: Attempting to save setting '{key}' for {user_email}")
print(f" Data being sent: {data}")
# Upsert (insert or update) - specify conflict columns
result = self.client.table("user_settings").upsert(
data,
on_conflict="user_email,setting_key"
).execute()
print(f" ✅ Successfully saved '{key}'")
return True
except Exception as e:
print(f"\n❌ ERROR saving setting '{key}' for {user_email}")
print(f" Data attempted: {data}")
print(f" Error type: {type(e).__name__}")
print(f" Error message: {e}")
print(f" Full error: {repr(e)}")
return False
def load_setting(self, user_email: str, key: str) -> Optional[Any]:
"""
Load a single user setting
Args:
user_email: User's email address
key: Setting key to retrieve
Returns:
Setting value or None if not found
"""
if not self.enabled:
return None
try:
result = self.client.table("user_settings") \
.select("setting_value") \
.eq("user_email", user_email) \
.eq("setting_key", key) \
.execute()
if result.data and len(result.data) > 0:
value_obj = result.data[0].get("setting_value", {})
return value_obj.get("value")
return None
except Exception as e:
print(f"❌ Error loading setting '{key}': {e}")
return None
def load_all_settings(self, user_email: str, category: str = None) -> Dict[str, Any]:
"""
Load all settings for a user
Args:
user_email: User's email address
category: Optional category filter
Returns:
Dictionary of settings {key: value}
"""
if not self.enabled:
return {}
try:
query = self.client.table("user_settings").select("*").eq("user_email", user_email)
if category:
query = query.eq("category", category)
result = query.execute()
settings = {}
for row in result.data:
key = row.get("setting_key")
value_obj = row.get("setting_value", {})
settings[key] = value_obj.get("value")
return settings
except Exception as e:
print(f"❌ Error loading settings: {e}")
return {}
def save_all_settings(self, user_email: str, settings: Dict[str, Any], category: str = None) -> bool:
"""
Save multiple settings at once
Args:
user_email: User's email address
settings: Dictionary of settings to save
category: Optional category for all settings
Returns:
True if all successful, False otherwise
"""
if not self.enabled:
return False
success = True
for key, value in settings.items():
if not self.save_setting(user_email, key, value, category):
success = False
return success
def save_demo_history(self, user_email: str, company_name: str, industry: str,
use_case: str, config: Dict, results: Dict = None,
status: str = "in_progress") -> Optional[str]:
"""
Save demo creation history
Args:
user_email: User's email address
company_name: Company name for the demo
industry: Industry vertical
use_case: Use case (e.g., 'Merchandising', 'Sales AI Analyst')
config: Full configuration used
results: Optional results/outputs
status: 'in_progress', 'completed', or 'failed'
Returns:
Demo history ID if successful, None otherwise
"""
if not self.enabled:
return None
try:
data = {
"user_email": user_email,
"company_name": company_name,
"industry": industry,
"use_case": use_case,
"demo_config": config,
"results": results or {},
"status": status,
"created_at": datetime.utcnow().isoformat()
}
result = self.client.table("demo_history").insert(data).execute()
if result.data and len(result.data) > 0:
return result.data[0].get("id")
return None
except Exception as e:
print(f"❌ Error saving demo history: {e}")
return None
def get_demo_history(self, user_email: str, limit: int = 10) -> List[Dict]:
"""
Get demo creation history for a user
Args:
user_email: User's email address
limit: Maximum number of records to return
Returns:
List of demo history records
"""
if not self.enabled:
return []
try:
result = self.client.table("demo_history") \
.select("*") \
.eq("user_email", user_email) \
.order("created_at", desc=True) \
.limit(limit) \
.execute()
return result.data
except Exception as e:
print(f"❌ Error loading demo history: {e}")
return []
def delete_setting(self, user_email: str, key: str) -> bool:
"""
Delete a user setting
Args:
user_email: User's email address
key: Setting key to delete
Returns:
True if successful, False otherwise
"""
if not self.enabled:
return False
try:
self.client.table("user_settings") \
.delete() \
.eq("user_email", user_email) \
.eq("setting_key", key) \
.execute()
return True
except Exception as e:
print(f"❌ Error deleting setting '{key}': {e}")
return False
# ==========================================================================
# User Authentication Management
# ==========================================================================
class UserManager:
"""Manage application users in Supabase demoprep_users table.
Table schema:
id UUID PRIMARY KEY
email TEXT UNIQUE NOT NULL
password_hash TEXT NOT NULL
display_name TEXT
is_admin BOOLEAN DEFAULT FALSE
is_active BOOLEAN DEFAULT TRUE
created_at TIMESTAMPTZ DEFAULT now()
last_login TIMESTAMPTZ
"""
def __init__(self):
self.client = None
self.enabled = False
if not SUPABASE_AVAILABLE:
return
url = os.getenv("SUPABASE_URL")
key = os.getenv("SUPABASE_ANON_KEY")
if not url or not key:
return
try:
self.client = create_client(url, key)
self.enabled = True
except Exception as e:
print(f"UserManager: Failed to connect to Supabase: {e}")
def _hash_password(self, password: str) -> str:
"""Hash a password using bcrypt."""
import hashlib
import secrets
# Use PBKDF2 with SHA-256 (stdlib, no extra dependency)
salt = secrets.token_hex(16)
pw_hash = hashlib.pbkdf2_hmac('sha256', password.encode(), salt.encode(), 100000)
return f"pbkdf2:{salt}:{pw_hash.hex()}"
def _verify_password(self, password: str, stored_hash: str) -> bool:
"""Verify a password against a stored hash."""
import hashlib
if not stored_hash or ':' not in stored_hash:
return False
parts = stored_hash.split(':')
if len(parts) != 3 or parts[0] != 'pbkdf2':
return False
salt = parts[1]
expected_hash = parts[2]
pw_hash = hashlib.pbkdf2_hmac('sha256', password.encode(), salt.encode(), 100000)
return pw_hash.hex() == expected_hash
def authenticate(self, email: str, password: str) -> Optional[Dict]:
"""
Authenticate a user by email and password.
Returns:
User dict if authenticated, None if failed.
Dict has: email, display_name, is_admin, is_active
"""
if not self.enabled:
return None
try:
result = self.client.table("demoprep_users") \
.select("*") \
.eq("email", email.lower().strip()) \
.execute()
if not result.data or len(result.data) == 0:
return None
user = result.data[0]
# Check if user is active
if not user.get("is_active", True):
print(f"UserManager: User {email} is deactivated")
return None
# Verify password
if not self._verify_password(password, user.get("password_hash", "")):
return None
# Update last_login
try:
self.client.table("demoprep_users") \
.update({"last_login": datetime.utcnow().isoformat()}) \
.eq("email", email.lower().strip()) \
.execute()
except Exception:
pass # Non-critical
return {
"email": user["email"],
"display_name": user.get("display_name", ""),
"is_admin": user.get("is_admin", False),
"is_active": user.get("is_active", True),
}
except Exception as e:
print(f"UserManager: Auth error: {e}")
return None
def add_user(self, email: str, password: str, display_name: str = "",
is_admin: bool = False) -> bool:
"""Add a new user. Returns True if successful."""
if not self.enabled:
return False
try:
data = {
"email": email.lower().strip(),
"password_hash": self._hash_password(password),
"display_name": display_name or email.split("@")[0],
"is_admin": is_admin,
"is_active": True,
"created_at": datetime.utcnow().isoformat(),
}
self.client.table("demoprep_users").insert(data).execute()
print(f"UserManager: Added user {email} (admin={is_admin})")
return True
except Exception as e:
print(f"UserManager: Error adding user {email}: {e}")
return False
def list_users(self) -> List[Dict]:
"""List all users (for admin panel)."""
if not self.enabled:
return []
try:
result = self.client.table("demoprep_users") \
.select("email, display_name, is_admin, is_active, created_at, last_login") \
.order("created_at") \
.execute()
return result.data or []
except Exception as e:
print(f"UserManager: Error listing users: {e}")
return []
def update_user(self, email: str, **kwargs) -> bool:
"""Update user fields (display_name, is_admin, is_active)."""
if not self.enabled:
return False
allowed_fields = {"display_name", "is_admin", "is_active"}
update_data = {k: v for k, v in kwargs.items() if k in allowed_fields}
if not update_data:
return False
try:
self.client.table("demoprep_users") \
.update(update_data) \
.eq("email", email.lower().strip()) \
.execute()
return True
except Exception as e:
print(f"UserManager: Error updating user {email}: {e}")
return False
def reset_password(self, email: str, new_password: str) -> bool:
"""Reset a user's password."""
if not self.enabled:
return False
try:
self.client.table("demoprep_users") \
.update({"password_hash": self._hash_password(new_password)}) \
.eq("email", email.lower().strip()) \
.execute()
return True
except Exception as e:
print(f"UserManager: Error resetting password for {email}: {e}")
return False
def deactivate_user(self, email: str) -> bool:
"""Deactivate a user (soft delete)."""
return self.update_user(email, is_active=False)
def activate_user(self, email: str) -> bool:
"""Reactivate a user."""
return self.update_user(email, is_active=True)
def is_admin(self, email: str) -> bool:
"""Check if a user is an admin."""
if not self.enabled:
return False
try:
result = self.client.table("demoprep_users") \
.select("is_admin") \
.eq("email", email.lower().strip()) \
.execute()
if result.data and len(result.data) > 0:
return result.data[0].get("is_admin", False)
return False
except Exception:
return False
# ==========================================================================
# Admin Settings (System-wide, stored under __admin__ key)
# ==========================================================================
ADMIN_USER_KEY = "__admin__"
# Admin settings keys with display labels
ADMIN_SETTINGS_KEYS = {
# ThoughtSpot
"THOUGHTSPOT_URL": "ThoughtSpot Instance URL",
"THOUGHTSPOT_TRUSTED_AUTH_KEY": "ThoughtSpot Trusted Auth Key",
"THOUGHTSPOT_ADMIN_USER": "ThoughtSpot Admin Username",
# LLM API Keys
"OPENAI_API_KEY": "OpenAI API Key",
"GOOGLE_API_KEY": "Google API Key",
# Snowflake
"SNOWFLAKE_ACCOUNT": "Snowflake Account",
"SNOWFLAKE_KP_USER": "Snowflake Key Pair User",
"SNOWFLAKE_KP_PK": "Snowflake Private Key (PEM)",
"SNOWFLAKE_KP_PASSPHRASE": "Snowflake Private Key Passphrase",
"SNOWFLAKE_ROLE": "Snowflake Role",
"SNOWFLAKE_WAREHOUSE": "Snowflake Warehouse",
"SNOWFLAKE_DATABASE": "Snowflake Database",
"SNOWFLAKE_SSO_USER": "Snowflake SSO User",
}
# Cached admin settings (loaded once per session, refreshed on demand)
_admin_settings_cache = None
def load_admin_settings(force_refresh: bool = False) -> Dict[str, str]:
"""
Load system-wide admin settings from Supabase.
Uses __admin__ as the user_email key.
Results are cached after first load.
Returns:
Dictionary of {setting_key: value} for all admin settings.
"""
global _admin_settings_cache
if _admin_settings_cache is not None and not force_refresh:
return _admin_settings_cache
settings_client = SupabaseSettings()
if not settings_client.is_enabled():
print("⚠️ Supabase not available - cannot load admin settings")
return {}
raw = settings_client.load_all_settings(ADMIN_USER_KEY)
# Return all settings, using empty string for any missing keys
result = {}
for key in ADMIN_SETTINGS_KEYS:
result[key] = raw.get(key, "")
_admin_settings_cache = result
return result
def save_admin_settings(settings: Dict[str, str]) -> bool:
"""
Save system-wide admin settings to Supabase under __admin__ key.
Also refreshes the cache.
Args:
settings: Dictionary of {key: value} to save.
Returns:
True if all saved successfully.
"""
global _admin_settings_cache
settings_client = SupabaseSettings()
if not settings_client.is_enabled():
print("⚠️ Supabase not available - cannot save admin settings")
return False
success = True
for key, value in settings.items():
if key in ADMIN_SETTINGS_KEYS:
if not settings_client.save_setting(ADMIN_USER_KEY, key, str(value), category="admin"):
success = False
# Refresh cache
_admin_settings_cache = None
load_admin_settings(force_refresh=True)
return success
def get_admin_setting(key: str, required: bool = True) -> str:
"""
Get a single admin setting value. Loads from cache if available.
Args:
key: The setting key (e.g. 'THOUGHTSPOT_URL')
required: If True, raises ValueError when setting is missing/empty
Returns:
Setting value as string.
Raises:
ValueError: If required=True and setting is missing or empty.
"""
settings = load_admin_settings()
value = settings.get(key, "")
if required and not value:
raise ValueError(f"Missing required admin setting: {key}. Configure in Admin Settings tab.")
return value
def inject_admin_settings_to_env() -> bool:
"""
Load admin settings from Supabase and inject them into os.environ.
Non-LLM settings remain sourced from Supabase for runtime compatibility.
LLM keys are intentionally excluded so OPENAI_API_KEY/GOOGLE_API_KEY
stay environment-only.
Called once after login, before any pipeline runs.
Also maps THOUGHTSPOT_ADMIN_USER -> THOUGHTSPOT_USERNAME for backwards compatibility.
Returns:
True if settings were loaded and injected successfully.
"""
settings = load_admin_settings(force_refresh=True)
if not settings:
print("⚠️ No admin settings found in Supabase")
return False
injected = 0
skipped = {"OPENAI_API_KEY", "GOOGLE_API_KEY"}
for key, value in settings.items():
if key in skipped:
continue
if value: # Only inject non-empty values
os.environ[key] = str(value)
injected += 1
# Map THOUGHTSPOT_ADMIN_USER -> THOUGHTSPOT_USERNAME for backwards compat
admin_user = settings.get("THOUGHTSPOT_ADMIN_USER", "")
if admin_user:
os.environ["THOUGHTSPOT_USERNAME"] = admin_user
print(
f"✅ Injected {injected} admin settings from Supabase into environment "
"(LLM keys remain environment-only)."
)
return injected > 0
# Convenience functions for Gradio integration
# Note: save_gradio_settings has been removed - use SupabaseSettings().save_all_settings() directly
def load_gradio_settings(email: str) -> Dict[str, Any]:
"""
Load all Gradio settings for a user
Returns:
Dictionary with all settings or defaults
"""
if not email or not str(email).strip():
raise ValueError("Authenticated user email is required to load settings.")
email = str(email).strip().lower()
settings_client = SupabaseSettings()
if not settings_client.is_enabled():
# Return defaults if Supabase not configured
return {
"default_llm": DEFAULT_LLM_MODEL,
"company_size": "Medium (100-1000)",
"default_warehouse": "COMPUTE_WH",
"default_database": "DEMO_DB",
"thoughtspot_url": "",
"thoughtspot_username": "",
"batch_size": 5000,
"thread_count": 4
}
# Load saved settings
saved_settings = settings_client.load_all_settings(email)
# If user has NO saved settings, auto-create defaults (first login)
if not saved_settings:
print(f"[Settings] New user {email} - creating default settings in Supabase")
try:
settings_client.save_all_settings(email, {
"default_llm": DEFAULT_LLM_MODEL,
"default_company_url": "Amazon.com",
"default_use_case": "Sales Analytics",
"use_existing_model": "false",
"fact_table_size": "1000",
"dim_table_size": "100",
"geo_scope": "USA Only",
"validation_mode": "Off",
"column_naming_style": "Regular Case",
"liveboard_name": "",
"tag_name": "",
"object_naming_prefix": "",
"existing_model_guid": "",
}, category="defaults")
except Exception as e:
print(f"[Settings] Could not create defaults for {email}: {e}")
# Merge with defaults - include ALL settings fields
defaults = {
# AI Configuration
"default_llm": DEFAULT_LLM_MODEL,
"temperature": 0.3,
"max_tokens": 4000,
# Default Values
"default_company_url": "Amazon.com",
"default_use_case": "Sales Analytics",
# Data Generation Settings
"fact_table_size": "1000",
"dim_table_size": "100",
# ThoughtSpot Connection (loaded from admin settings)
"thoughtspot_url": "",
"thoughtspot_username": "",
"liveboard_name": "",
# Snowflake Connection (loaded from admin settings)
"snowflake_account": "",
"snowflake_user": "",
"snowflake_role": "",
"default_warehouse": "",
"default_database": "",
"default_schema": "PUBLIC",
# Demo Configuration
"tag_name": "",
"object_naming_prefix": "",
"column_naming_style": "Regular Case",
# Liveboard Creation
"geo_scope": "USA Only",
"validation_mode": "Off",
# Existing Model Mode
"use_existing_model": False,
"existing_model_guid": "",
# Advanced Options
"batch_size": 5000,
"thread_count": 4
}
for key in defaults:
if key in saved_settings:
# Parse JSON values if needed
value = saved_settings[key]
try:
defaults[key] = json.loads(value) if isinstance(value, str) and value.startswith('{') else value
except:
defaults[key] = value
# Ensure batch_size and thread_count are integers
if "batch_size" in defaults:
try:
defaults["batch_size"] = int(defaults["batch_size"])
except (ValueError, TypeError):
defaults["batch_size"] = 5000
if "thread_count" in defaults:
try:
defaults["thread_count"] = int(defaults["thread_count"])
except (ValueError, TypeError):
defaults["thread_count"] = 4
return defaults
if __name__ == "__main__":
# Test the Supabase client
print("\n🧪 Testing Supabase Settings Client\n")
settings = SupabaseSettings()
if settings.is_enabled():
test_email = "test@example.com"
# Test saving settings
print("Testing save operations...")
settings.save_setting(test_email, "test_setting", "test_value", "test_category")
settings.save_setting(test_email, "default_llm", "GPT-4", "ai_config")
# Test loading settings
print("\nTesting load operations...")
value = settings.load_setting(test_email, "test_setting")
print(f"Loaded test_setting: {value}")
all_settings = settings.load_all_settings(test_email)
print(f"All settings: {all_settings}")
# Test demo history
print("\nTesting demo history...")
demo_id = settings.save_demo_history(
test_email,
"Acme Corp",
"Technology",
"Sales AI Analyst",
{"rows": 10000},
status="completed"
)
print(f"Saved demo with ID: {demo_id}")
history = settings.get_demo_history(test_email)
print(f"Demo history count: {len(history)}")
# Cleanup
settings.delete_setting(test_email, "test_setting")
print("\n✅ All tests completed!")
else:
print("❌ Supabase not configured. Add SUPABASE_URL and SUPABASE_ANON_KEY to .env file.")