""" Supabase Client for Settings Persistence Handles saving and loading user settings and demo history to Supabase. Implements the database schema for Sprint 2, item #6. Required environment variables: SUPABASE_URL: Your Supabase project URL SUPABASE_ANON_KEY: Your Supabase anon key Usage: from supabase_client import SupabaseSettings settings = SupabaseSettings() settings.save_setting("user@example.com", "default_llm", "GPT-4", "ai_config") user_settings = settings.load_all_settings("user@example.com") """ import os import json from typing import Dict, List, Optional, Any from datetime import datetime from dotenv import load_dotenv from pathlib import Path from llm_config import DEFAULT_LLM_MODEL # Load environment variables - try multiple methods to ensure it works # First try to load from current directory load_dotenv() # If that doesn't work, try explicit path if not os.getenv("SUPABASE_URL"): env_path = Path(__file__).parent / '.env' if env_path.exists(): load_dotenv(env_path, override=True) else: # Try parent directory env_path = Path(__file__).parent.parent / '.env' if env_path.exists(): load_dotenv(env_path, override=True) # Check if Supabase is available try: from supabase import create_client, Client SUPABASE_AVAILABLE = True except ImportError: SUPABASE_AVAILABLE = False class SupabaseSettings: """Manage user settings and demo history in Supabase""" def __init__(self): """Initialize Supabase client""" self.client = None self.enabled = False if not SUPABASE_AVAILABLE: print("⚠️ Supabase module not available. Settings persistence disabled.") return url = os.getenv("SUPABASE_URL") key = os.getenv("SUPABASE_ANON_KEY") if not url or not key: print("⚠️ SUPABASE_URL or SUPABASE_ANON_KEY not set. Settings persistence disabled.") return try: self.client = create_client(url, key) self.enabled = True print("✅ Supabase client initialized for settings persistence") except Exception as e: print(f"⚠️ Failed to initialize Supabase client: {e}") def is_enabled(self) -> bool: """Check if Supabase is enabled and configured""" return self.enabled def save_setting(self, user_email: str, key: str, value: Any, category: str = None) -> bool: """ Save a single user setting Args: user_email: User's email address key: Setting key (e.g., 'default_llm', 'default_warehouse') value: Setting value (will be JSON serialized if not string) category: Optional category ('ai_config', 'defaults', 'thoughtspot', 'advanced') Returns: True if successful, False otherwise """ if not self.enabled: return False try: # Convert value to JSON if it's not a string if not isinstance(value, str): value = json.dumps(value) data = { "user_email": user_email, "setting_key": key, "setting_value": {"value": value}, # Store as JSONB "category": category, "updated_at": datetime.utcnow().isoformat() } print(f"\n🔍 DEBUG: Attempting to save setting '{key}' for {user_email}") print(f" Data being sent: {data}") # Upsert (insert or update) - specify conflict columns result = self.client.table("user_settings").upsert( data, on_conflict="user_email,setting_key" ).execute() print(f" ✅ Successfully saved '{key}'") return True except Exception as e: print(f"\n❌ ERROR saving setting '{key}' for {user_email}") print(f" Data attempted: {data}") print(f" Error type: {type(e).__name__}") print(f" Error message: {e}") print(f" Full error: {repr(e)}") return False def load_setting(self, user_email: str, key: str) -> Optional[Any]: """ Load a single user setting Args: user_email: User's email address key: Setting key to retrieve Returns: Setting value or None if not found """ if not self.enabled: return None try: result = self.client.table("user_settings") \ .select("setting_value") \ .eq("user_email", user_email) \ .eq("setting_key", key) \ .execute() if result.data and len(result.data) > 0: value_obj = result.data[0].get("setting_value", {}) return value_obj.get("value") return None except Exception as e: print(f"❌ Error loading setting '{key}': {e}") return None def load_all_settings(self, user_email: str, category: str = None) -> Dict[str, Any]: """ Load all settings for a user Args: user_email: User's email address category: Optional category filter Returns: Dictionary of settings {key: value} """ if not self.enabled: return {} try: query = self.client.table("user_settings").select("*").eq("user_email", user_email) if category: query = query.eq("category", category) result = query.execute() settings = {} for row in result.data: key = row.get("setting_key") value_obj = row.get("setting_value", {}) settings[key] = value_obj.get("value") return settings except Exception as e: print(f"❌ Error loading settings: {e}") return {} def save_all_settings(self, user_email: str, settings: Dict[str, Any], category: str = None) -> bool: """ Save multiple settings at once Args: user_email: User's email address settings: Dictionary of settings to save category: Optional category for all settings Returns: True if all successful, False otherwise """ if not self.enabled: return False success = True for key, value in settings.items(): if not self.save_setting(user_email, key, value, category): success = False return success def save_demo_history(self, user_email: str, company_name: str, industry: str, use_case: str, config: Dict, results: Dict = None, status: str = "in_progress") -> Optional[str]: """ Save demo creation history Args: user_email: User's email address company_name: Company name for the demo industry: Industry vertical use_case: Use case (e.g., 'Merchandising', 'Sales AI Analyst') config: Full configuration used results: Optional results/outputs status: 'in_progress', 'completed', or 'failed' Returns: Demo history ID if successful, None otherwise """ if not self.enabled: return None try: data = { "user_email": user_email, "company_name": company_name, "industry": industry, "use_case": use_case, "demo_config": config, "results": results or {}, "status": status, "created_at": datetime.utcnow().isoformat() } result = self.client.table("demo_history").insert(data).execute() if result.data and len(result.data) > 0: return result.data[0].get("id") return None except Exception as e: print(f"❌ Error saving demo history: {e}") return None def get_demo_history(self, user_email: str, limit: int = 10) -> List[Dict]: """ Get demo creation history for a user Args: user_email: User's email address limit: Maximum number of records to return Returns: List of demo history records """ if not self.enabled: return [] try: result = self.client.table("demo_history") \ .select("*") \ .eq("user_email", user_email) \ .order("created_at", desc=True) \ .limit(limit) \ .execute() return result.data except Exception as e: print(f"❌ Error loading demo history: {e}") return [] def delete_setting(self, user_email: str, key: str) -> bool: """ Delete a user setting Args: user_email: User's email address key: Setting key to delete Returns: True if successful, False otherwise """ if not self.enabled: return False try: self.client.table("user_settings") \ .delete() \ .eq("user_email", user_email) \ .eq("setting_key", key) \ .execute() return True except Exception as e: print(f"❌ Error deleting setting '{key}': {e}") return False # ========================================================================== # User Authentication Management # ========================================================================== class UserManager: """Manage application users in Supabase demoprep_users table. Table schema: id UUID PRIMARY KEY email TEXT UNIQUE NOT NULL password_hash TEXT NOT NULL display_name TEXT is_admin BOOLEAN DEFAULT FALSE is_active BOOLEAN DEFAULT TRUE created_at TIMESTAMPTZ DEFAULT now() last_login TIMESTAMPTZ """ def __init__(self): self.client = None self.enabled = False if not SUPABASE_AVAILABLE: return url = os.getenv("SUPABASE_URL") key = os.getenv("SUPABASE_ANON_KEY") if not url or not key: return try: self.client = create_client(url, key) self.enabled = True except Exception as e: print(f"UserManager: Failed to connect to Supabase: {e}") def _hash_password(self, password: str) -> str: """Hash a password using bcrypt.""" import hashlib import secrets # Use PBKDF2 with SHA-256 (stdlib, no extra dependency) salt = secrets.token_hex(16) pw_hash = hashlib.pbkdf2_hmac('sha256', password.encode(), salt.encode(), 100000) return f"pbkdf2:{salt}:{pw_hash.hex()}" def _verify_password(self, password: str, stored_hash: str) -> bool: """Verify a password against a stored hash.""" import hashlib if not stored_hash or ':' not in stored_hash: return False parts = stored_hash.split(':') if len(parts) != 3 or parts[0] != 'pbkdf2': return False salt = parts[1] expected_hash = parts[2] pw_hash = hashlib.pbkdf2_hmac('sha256', password.encode(), salt.encode(), 100000) return pw_hash.hex() == expected_hash def authenticate(self, email: str, password: str) -> Optional[Dict]: """ Authenticate a user by email and password. Returns: User dict if authenticated, None if failed. Dict has: email, display_name, is_admin, is_active """ if not self.enabled: return None try: result = self.client.table("demoprep_users") \ .select("*") \ .eq("email", email.lower().strip()) \ .execute() if not result.data or len(result.data) == 0: return None user = result.data[0] # Check if user is active if not user.get("is_active", True): print(f"UserManager: User {email} is deactivated") return None # Verify password if not self._verify_password(password, user.get("password_hash", "")): return None # Update last_login try: self.client.table("demoprep_users") \ .update({"last_login": datetime.utcnow().isoformat()}) \ .eq("email", email.lower().strip()) \ .execute() except Exception: pass # Non-critical return { "email": user["email"], "display_name": user.get("display_name", ""), "is_admin": user.get("is_admin", False), "is_active": user.get("is_active", True), } except Exception as e: print(f"UserManager: Auth error: {e}") return None def add_user(self, email: str, password: str, display_name: str = "", is_admin: bool = False) -> bool: """Add a new user. Returns True if successful.""" if not self.enabled: return False try: data = { "email": email.lower().strip(), "password_hash": self._hash_password(password), "display_name": display_name or email.split("@")[0], "is_admin": is_admin, "is_active": True, "created_at": datetime.utcnow().isoformat(), } self.client.table("demoprep_users").insert(data).execute() print(f"UserManager: Added user {email} (admin={is_admin})") return True except Exception as e: print(f"UserManager: Error adding user {email}: {e}") return False def list_users(self) -> List[Dict]: """List all users (for admin panel).""" if not self.enabled: return [] try: result = self.client.table("demoprep_users") \ .select("email, display_name, is_admin, is_active, created_at, last_login") \ .order("created_at") \ .execute() return result.data or [] except Exception as e: print(f"UserManager: Error listing users: {e}") return [] def update_user(self, email: str, **kwargs) -> bool: """Update user fields (display_name, is_admin, is_active).""" if not self.enabled: return False allowed_fields = {"display_name", "is_admin", "is_active"} update_data = {k: v for k, v in kwargs.items() if k in allowed_fields} if not update_data: return False try: self.client.table("demoprep_users") \ .update(update_data) \ .eq("email", email.lower().strip()) \ .execute() return True except Exception as e: print(f"UserManager: Error updating user {email}: {e}") return False def reset_password(self, email: str, new_password: str) -> bool: """Reset a user's password.""" if not self.enabled: return False try: self.client.table("demoprep_users") \ .update({"password_hash": self._hash_password(new_password)}) \ .eq("email", email.lower().strip()) \ .execute() return True except Exception as e: print(f"UserManager: Error resetting password for {email}: {e}") return False def deactivate_user(self, email: str) -> bool: """Deactivate a user (soft delete).""" return self.update_user(email, is_active=False) def activate_user(self, email: str) -> bool: """Reactivate a user.""" return self.update_user(email, is_active=True) def is_admin(self, email: str) -> bool: """Check if a user is an admin.""" if not self.enabled: return False try: result = self.client.table("demoprep_users") \ .select("is_admin") \ .eq("email", email.lower().strip()) \ .execute() if result.data and len(result.data) > 0: return result.data[0].get("is_admin", False) return False except Exception: return False # ========================================================================== # Admin Settings (System-wide, stored under __admin__ key) # ========================================================================== ADMIN_USER_KEY = "__admin__" # Admin settings keys with display labels ADMIN_SETTINGS_KEYS = { # ThoughtSpot "THOUGHTSPOT_URL": "ThoughtSpot Instance URL", "THOUGHTSPOT_TRUSTED_AUTH_KEY": "ThoughtSpot Trusted Auth Key", "THOUGHTSPOT_ADMIN_USER": "ThoughtSpot Admin Username", # LLM API Keys "OPENAI_API_KEY": "OpenAI API Key", "GOOGLE_API_KEY": "Google API Key", # Snowflake "SNOWFLAKE_ACCOUNT": "Snowflake Account", "SNOWFLAKE_KP_USER": "Snowflake Key Pair User", "SNOWFLAKE_KP_PK": "Snowflake Private Key (PEM)", "SNOWFLAKE_KP_PASSPHRASE": "Snowflake Private Key Passphrase", "SNOWFLAKE_ROLE": "Snowflake Role", "SNOWFLAKE_WAREHOUSE": "Snowflake Warehouse", "SNOWFLAKE_DATABASE": "Snowflake Database", "SNOWFLAKE_SSO_USER": "Snowflake SSO User", } # Cached admin settings (loaded once per session, refreshed on demand) _admin_settings_cache = None def load_admin_settings(force_refresh: bool = False) -> Dict[str, str]: """ Load system-wide admin settings from Supabase. Uses __admin__ as the user_email key. Results are cached after first load. Returns: Dictionary of {setting_key: value} for all admin settings. """ global _admin_settings_cache if _admin_settings_cache is not None and not force_refresh: return _admin_settings_cache settings_client = SupabaseSettings() if not settings_client.is_enabled(): print("⚠️ Supabase not available - cannot load admin settings") return {} raw = settings_client.load_all_settings(ADMIN_USER_KEY) # Return all settings, using empty string for any missing keys result = {} for key in ADMIN_SETTINGS_KEYS: result[key] = raw.get(key, "") _admin_settings_cache = result return result def save_admin_settings(settings: Dict[str, str]) -> bool: """ Save system-wide admin settings to Supabase under __admin__ key. Also refreshes the cache. Args: settings: Dictionary of {key: value} to save. Returns: True if all saved successfully. """ global _admin_settings_cache settings_client = SupabaseSettings() if not settings_client.is_enabled(): print("⚠️ Supabase not available - cannot save admin settings") return False success = True for key, value in settings.items(): if key in ADMIN_SETTINGS_KEYS: if not settings_client.save_setting(ADMIN_USER_KEY, key, str(value), category="admin"): success = False # Refresh cache _admin_settings_cache = None load_admin_settings(force_refresh=True) return success def get_admin_setting(key: str, required: bool = True) -> str: """ Get a single admin setting value. Loads from cache if available. Args: key: The setting key (e.g. 'THOUGHTSPOT_URL') required: If True, raises ValueError when setting is missing/empty Returns: Setting value as string. Raises: ValueError: If required=True and setting is missing or empty. """ settings = load_admin_settings() value = settings.get(key, "") if required and not value: raise ValueError(f"Missing required admin setting: {key}. Configure in Admin Settings tab.") return value def inject_admin_settings_to_env() -> bool: """ Load admin settings from Supabase and inject them into os.environ. Non-LLM settings remain sourced from Supabase for runtime compatibility. LLM keys are intentionally excluded so OPENAI_API_KEY/GOOGLE_API_KEY stay environment-only. Called once after login, before any pipeline runs. Also maps THOUGHTSPOT_ADMIN_USER -> THOUGHTSPOT_USERNAME for backwards compatibility. Returns: True if settings were loaded and injected successfully. """ settings = load_admin_settings(force_refresh=True) if not settings: print("⚠️ No admin settings found in Supabase") return False injected = 0 skipped = {"OPENAI_API_KEY", "GOOGLE_API_KEY"} for key, value in settings.items(): if key in skipped: continue if value: # Only inject non-empty values os.environ[key] = str(value) injected += 1 # Map THOUGHTSPOT_ADMIN_USER -> THOUGHTSPOT_USERNAME for backwards compat admin_user = settings.get("THOUGHTSPOT_ADMIN_USER", "") if admin_user: os.environ["THOUGHTSPOT_USERNAME"] = admin_user print( f"✅ Injected {injected} admin settings from Supabase into environment " "(LLM keys remain environment-only)." ) return injected > 0 # Convenience functions for Gradio integration # Note: save_gradio_settings has been removed - use SupabaseSettings().save_all_settings() directly def load_gradio_settings(email: str) -> Dict[str, Any]: """ Load all Gradio settings for a user Returns: Dictionary with all settings or defaults """ if not email or not str(email).strip(): raise ValueError("Authenticated user email is required to load settings.") email = str(email).strip().lower() settings_client = SupabaseSettings() if not settings_client.is_enabled(): # Return defaults if Supabase not configured return { "default_llm": DEFAULT_LLM_MODEL, "company_size": "Medium (100-1000)", "default_warehouse": "COMPUTE_WH", "default_database": "DEMO_DB", "thoughtspot_url": "", "thoughtspot_username": "", "batch_size": 5000, "thread_count": 4 } # Load saved settings saved_settings = settings_client.load_all_settings(email) # If user has NO saved settings, auto-create defaults (first login) if not saved_settings: print(f"[Settings] New user {email} - creating default settings in Supabase") try: settings_client.save_all_settings(email, { "default_llm": DEFAULT_LLM_MODEL, "default_company_url": "Amazon.com", "default_use_case": "Sales Analytics", "use_existing_model": "false", "fact_table_size": "1000", "dim_table_size": "100", "geo_scope": "USA Only", "validation_mode": "Off", "column_naming_style": "Regular Case", "liveboard_name": "", "tag_name": "", "object_naming_prefix": "", "existing_model_guid": "", }, category="defaults") except Exception as e: print(f"[Settings] Could not create defaults for {email}: {e}") # Merge with defaults - include ALL settings fields defaults = { # AI Configuration "default_llm": DEFAULT_LLM_MODEL, "temperature": 0.3, "max_tokens": 4000, # Default Values "default_company_url": "Amazon.com", "default_use_case": "Sales Analytics", # Data Generation Settings "fact_table_size": "1000", "dim_table_size": "100", # ThoughtSpot Connection (loaded from admin settings) "thoughtspot_url": "", "thoughtspot_username": "", "liveboard_name": "", # Snowflake Connection (loaded from admin settings) "snowflake_account": "", "snowflake_user": "", "snowflake_role": "", "default_warehouse": "", "default_database": "", "default_schema": "PUBLIC", # Demo Configuration "tag_name": "", "object_naming_prefix": "", "column_naming_style": "Regular Case", # Liveboard Creation "geo_scope": "USA Only", "validation_mode": "Off", # Existing Model Mode "use_existing_model": False, "existing_model_guid": "", # Advanced Options "batch_size": 5000, "thread_count": 4 } for key in defaults: if key in saved_settings: # Parse JSON values if needed value = saved_settings[key] try: defaults[key] = json.loads(value) if isinstance(value, str) and value.startswith('{') else value except: defaults[key] = value # Ensure batch_size and thread_count are integers if "batch_size" in defaults: try: defaults["batch_size"] = int(defaults["batch_size"]) except (ValueError, TypeError): defaults["batch_size"] = 5000 if "thread_count" in defaults: try: defaults["thread_count"] = int(defaults["thread_count"]) except (ValueError, TypeError): defaults["thread_count"] = 4 return defaults if __name__ == "__main__": # Test the Supabase client print("\n🧪 Testing Supabase Settings Client\n") settings = SupabaseSettings() if settings.is_enabled(): test_email = "test@example.com" # Test saving settings print("Testing save operations...") settings.save_setting(test_email, "test_setting", "test_value", "test_category") settings.save_setting(test_email, "default_llm", "GPT-4", "ai_config") # Test loading settings print("\nTesting load operations...") value = settings.load_setting(test_email, "test_setting") print(f"Loaded test_setting: {value}") all_settings = settings.load_all_settings(test_email) print(f"All settings: {all_settings}") # Test demo history print("\nTesting demo history...") demo_id = settings.save_demo_history( test_email, "Acme Corp", "Technology", "Sales AI Analyst", {"rows": 10000}, status="completed" ) print(f"Saved demo with ID: {demo_id}") history = settings.get_demo_history(test_email) print(f"Demo history count: {len(history)}") # Cleanup settings.delete_setting(test_email, "test_setting") print("\n✅ All tests completed!") else: print("❌ Supabase not configured. Add SUPABASE_URL and SUPABASE_ANON_KEY to .env file.")