Spaces:
Running
Running
| """ | |
| Supabase Client for Settings Persistence | |
| Handles saving and loading user settings and demo history to Supabase. | |
| Implements the database schema for Sprint 2, item #6. | |
| Required environment variables: | |
| SUPABASE_URL: Your Supabase project URL | |
| SUPABASE_ANON_KEY: Your Supabase anon key | |
| Usage: | |
| from supabase_client import SupabaseSettings | |
| settings = SupabaseSettings() | |
| settings.save_setting("user@example.com", "default_llm", "GPT-4", "ai_config") | |
| user_settings = settings.load_all_settings("user@example.com") | |
| """ | |
| import os | |
| import json | |
| from typing import Dict, List, Optional, Any | |
| from datetime import datetime | |
| from dotenv import load_dotenv | |
| from pathlib import Path | |
| from llm_config import DEFAULT_LLM_MODEL | |
| # Load environment variables - try multiple methods to ensure it works | |
| # First try to load from current directory | |
| load_dotenv() | |
| # If that doesn't work, try explicit path | |
| if not os.getenv("SUPABASE_URL"): | |
| env_path = Path(__file__).parent / '.env' | |
| if env_path.exists(): | |
| load_dotenv(env_path, override=True) | |
| else: | |
| # Try parent directory | |
| env_path = Path(__file__).parent.parent / '.env' | |
| if env_path.exists(): | |
| load_dotenv(env_path, override=True) | |
| # Check if Supabase is available | |
| try: | |
| from supabase import create_client, Client | |
| SUPABASE_AVAILABLE = True | |
| except ImportError: | |
| SUPABASE_AVAILABLE = False | |
| class SupabaseSettings: | |
| """Manage user settings and demo history in Supabase""" | |
| def __init__(self): | |
| """Initialize Supabase client""" | |
| self.client = None | |
| self.enabled = False | |
| if not SUPABASE_AVAILABLE: | |
| print("⚠️ Supabase module not available. Settings persistence disabled.") | |
| return | |
| url = os.getenv("SUPABASE_URL") | |
| key = os.getenv("SUPABASE_ANON_KEY") | |
| if not url or not key: | |
| print("⚠️ SUPABASE_URL or SUPABASE_ANON_KEY not set. Settings persistence disabled.") | |
| return | |
| try: | |
| self.client = create_client(url, key) | |
| self.enabled = True | |
| print("✅ Supabase client initialized for settings persistence") | |
| except Exception as e: | |
| print(f"⚠️ Failed to initialize Supabase client: {e}") | |
| def is_enabled(self) -> bool: | |
| """Check if Supabase is enabled and configured""" | |
| return self.enabled | |
| def save_setting(self, user_email: str, key: str, value: Any, category: str = None) -> bool: | |
| """ | |
| Save a single user setting | |
| Args: | |
| user_email: User's email address | |
| key: Setting key (e.g., 'default_llm', 'default_warehouse') | |
| value: Setting value (will be JSON serialized if not string) | |
| category: Optional category ('ai_config', 'defaults', 'thoughtspot', 'advanced') | |
| Returns: | |
| True if successful, False otherwise | |
| """ | |
| if not self.enabled: | |
| return False | |
| try: | |
| # Convert value to JSON if it's not a string | |
| if not isinstance(value, str): | |
| value = json.dumps(value) | |
| data = { | |
| "user_email": user_email, | |
| "setting_key": key, | |
| "setting_value": {"value": value}, # Store as JSONB | |
| "category": category, | |
| "updated_at": datetime.utcnow().isoformat() | |
| } | |
| print(f"\n🔍 DEBUG: Attempting to save setting '{key}' for {user_email}") | |
| print(f" Data being sent: {data}") | |
| # Upsert (insert or update) - specify conflict columns | |
| result = self.client.table("user_settings").upsert( | |
| data, | |
| on_conflict="user_email,setting_key" | |
| ).execute() | |
| print(f" ✅ Successfully saved '{key}'") | |
| return True | |
| except Exception as e: | |
| print(f"\n❌ ERROR saving setting '{key}' for {user_email}") | |
| print(f" Data attempted: {data}") | |
| print(f" Error type: {type(e).__name__}") | |
| print(f" Error message: {e}") | |
| print(f" Full error: {repr(e)}") | |
| return False | |
| def load_setting(self, user_email: str, key: str) -> Optional[Any]: | |
| """ | |
| Load a single user setting | |
| Args: | |
| user_email: User's email address | |
| key: Setting key to retrieve | |
| Returns: | |
| Setting value or None if not found | |
| """ | |
| if not self.enabled: | |
| return None | |
| try: | |
| result = self.client.table("user_settings") \ | |
| .select("setting_value") \ | |
| .eq("user_email", user_email) \ | |
| .eq("setting_key", key) \ | |
| .execute() | |
| if result.data and len(result.data) > 0: | |
| value_obj = result.data[0].get("setting_value", {}) | |
| return value_obj.get("value") | |
| return None | |
| except Exception as e: | |
| print(f"❌ Error loading setting '{key}': {e}") | |
| return None | |
| def load_all_settings(self, user_email: str, category: str = None) -> Dict[str, Any]: | |
| """ | |
| Load all settings for a user | |
| Args: | |
| user_email: User's email address | |
| category: Optional category filter | |
| Returns: | |
| Dictionary of settings {key: value} | |
| """ | |
| if not self.enabled: | |
| return {} | |
| try: | |
| query = self.client.table("user_settings").select("*").eq("user_email", user_email) | |
| if category: | |
| query = query.eq("category", category) | |
| result = query.execute() | |
| settings = {} | |
| for row in result.data: | |
| key = row.get("setting_key") | |
| value_obj = row.get("setting_value", {}) | |
| settings[key] = value_obj.get("value") | |
| return settings | |
| except Exception as e: | |
| print(f"❌ Error loading settings: {e}") | |
| return {} | |
| def save_all_settings(self, user_email: str, settings: Dict[str, Any], category: str = None) -> bool: | |
| """ | |
| Save multiple settings at once | |
| Args: | |
| user_email: User's email address | |
| settings: Dictionary of settings to save | |
| category: Optional category for all settings | |
| Returns: | |
| True if all successful, False otherwise | |
| """ | |
| if not self.enabled: | |
| return False | |
| success = True | |
| for key, value in settings.items(): | |
| if not self.save_setting(user_email, key, value, category): | |
| success = False | |
| return success | |
| def save_demo_history(self, user_email: str, company_name: str, industry: str, | |
| use_case: str, config: Dict, results: Dict = None, | |
| status: str = "in_progress") -> Optional[str]: | |
| """ | |
| Save demo creation history | |
| Args: | |
| user_email: User's email address | |
| company_name: Company name for the demo | |
| industry: Industry vertical | |
| use_case: Use case (e.g., 'Merchandising', 'Sales AI Analyst') | |
| config: Full configuration used | |
| results: Optional results/outputs | |
| status: 'in_progress', 'completed', or 'failed' | |
| Returns: | |
| Demo history ID if successful, None otherwise | |
| """ | |
| if not self.enabled: | |
| return None | |
| try: | |
| data = { | |
| "user_email": user_email, | |
| "company_name": company_name, | |
| "industry": industry, | |
| "use_case": use_case, | |
| "demo_config": config, | |
| "results": results or {}, | |
| "status": status, | |
| "created_at": datetime.utcnow().isoformat() | |
| } | |
| result = self.client.table("demo_history").insert(data).execute() | |
| if result.data and len(result.data) > 0: | |
| return result.data[0].get("id") | |
| return None | |
| except Exception as e: | |
| print(f"❌ Error saving demo history: {e}") | |
| return None | |
| def get_demo_history(self, user_email: str, limit: int = 10) -> List[Dict]: | |
| """ | |
| Get demo creation history for a user | |
| Args: | |
| user_email: User's email address | |
| limit: Maximum number of records to return | |
| Returns: | |
| List of demo history records | |
| """ | |
| if not self.enabled: | |
| return [] | |
| try: | |
| result = self.client.table("demo_history") \ | |
| .select("*") \ | |
| .eq("user_email", user_email) \ | |
| .order("created_at", desc=True) \ | |
| .limit(limit) \ | |
| .execute() | |
| return result.data | |
| except Exception as e: | |
| print(f"❌ Error loading demo history: {e}") | |
| return [] | |
| def delete_setting(self, user_email: str, key: str) -> bool: | |
| """ | |
| Delete a user setting | |
| Args: | |
| user_email: User's email address | |
| key: Setting key to delete | |
| Returns: | |
| True if successful, False otherwise | |
| """ | |
| if not self.enabled: | |
| return False | |
| try: | |
| self.client.table("user_settings") \ | |
| .delete() \ | |
| .eq("user_email", user_email) \ | |
| .eq("setting_key", key) \ | |
| .execute() | |
| return True | |
| except Exception as e: | |
| print(f"❌ Error deleting setting '{key}': {e}") | |
| return False | |
| # ========================================================================== | |
| # User Authentication Management | |
| # ========================================================================== | |
| class UserManager: | |
| """Manage application users in Supabase demoprep_users table. | |
| Table schema: | |
| id UUID PRIMARY KEY | |
| email TEXT UNIQUE NOT NULL | |
| password_hash TEXT NOT NULL | |
| display_name TEXT | |
| is_admin BOOLEAN DEFAULT FALSE | |
| is_active BOOLEAN DEFAULT TRUE | |
| created_at TIMESTAMPTZ DEFAULT now() | |
| last_login TIMESTAMPTZ | |
| """ | |
| def __init__(self): | |
| self.client = None | |
| self.enabled = False | |
| if not SUPABASE_AVAILABLE: | |
| return | |
| url = os.getenv("SUPABASE_URL") | |
| key = os.getenv("SUPABASE_ANON_KEY") | |
| if not url or not key: | |
| return | |
| try: | |
| self.client = create_client(url, key) | |
| self.enabled = True | |
| except Exception as e: | |
| print(f"UserManager: Failed to connect to Supabase: {e}") | |
| def _hash_password(self, password: str) -> str: | |
| """Hash a password using bcrypt.""" | |
| import hashlib | |
| import secrets | |
| # Use PBKDF2 with SHA-256 (stdlib, no extra dependency) | |
| salt = secrets.token_hex(16) | |
| pw_hash = hashlib.pbkdf2_hmac('sha256', password.encode(), salt.encode(), 100000) | |
| return f"pbkdf2:{salt}:{pw_hash.hex()}" | |
| def _verify_password(self, password: str, stored_hash: str) -> bool: | |
| """Verify a password against a stored hash.""" | |
| import hashlib | |
| if not stored_hash or ':' not in stored_hash: | |
| return False | |
| parts = stored_hash.split(':') | |
| if len(parts) != 3 or parts[0] != 'pbkdf2': | |
| return False | |
| salt = parts[1] | |
| expected_hash = parts[2] | |
| pw_hash = hashlib.pbkdf2_hmac('sha256', password.encode(), salt.encode(), 100000) | |
| return pw_hash.hex() == expected_hash | |
| def authenticate(self, email: str, password: str) -> Optional[Dict]: | |
| """ | |
| Authenticate a user by email and password. | |
| Returns: | |
| User dict if authenticated, None if failed. | |
| Dict has: email, display_name, is_admin, is_active | |
| """ | |
| if not self.enabled: | |
| return None | |
| try: | |
| result = self.client.table("demoprep_users") \ | |
| .select("*") \ | |
| .eq("email", email.lower().strip()) \ | |
| .execute() | |
| if not result.data or len(result.data) == 0: | |
| return None | |
| user = result.data[0] | |
| # Check if user is active | |
| if not user.get("is_active", True): | |
| print(f"UserManager: User {email} is deactivated") | |
| return None | |
| # Verify password | |
| if not self._verify_password(password, user.get("password_hash", "")): | |
| return None | |
| # Update last_login | |
| try: | |
| self.client.table("demoprep_users") \ | |
| .update({"last_login": datetime.utcnow().isoformat()}) \ | |
| .eq("email", email.lower().strip()) \ | |
| .execute() | |
| except Exception: | |
| pass # Non-critical | |
| return { | |
| "email": user["email"], | |
| "display_name": user.get("display_name", ""), | |
| "is_admin": user.get("is_admin", False), | |
| "is_active": user.get("is_active", True), | |
| } | |
| except Exception as e: | |
| print(f"UserManager: Auth error: {e}") | |
| return None | |
| def add_user(self, email: str, password: str, display_name: str = "", | |
| is_admin: bool = False) -> bool: | |
| """Add a new user. Returns True if successful.""" | |
| if not self.enabled: | |
| return False | |
| try: | |
| data = { | |
| "email": email.lower().strip(), | |
| "password_hash": self._hash_password(password), | |
| "display_name": display_name or email.split("@")[0], | |
| "is_admin": is_admin, | |
| "is_active": True, | |
| "created_at": datetime.utcnow().isoformat(), | |
| } | |
| self.client.table("demoprep_users").insert(data).execute() | |
| print(f"UserManager: Added user {email} (admin={is_admin})") | |
| return True | |
| except Exception as e: | |
| print(f"UserManager: Error adding user {email}: {e}") | |
| return False | |
| def list_users(self) -> List[Dict]: | |
| """List all users (for admin panel).""" | |
| if not self.enabled: | |
| return [] | |
| try: | |
| result = self.client.table("demoprep_users") \ | |
| .select("email, display_name, is_admin, is_active, created_at, last_login") \ | |
| .order("created_at") \ | |
| .execute() | |
| return result.data or [] | |
| except Exception as e: | |
| print(f"UserManager: Error listing users: {e}") | |
| return [] | |
| def update_user(self, email: str, **kwargs) -> bool: | |
| """Update user fields (display_name, is_admin, is_active).""" | |
| if not self.enabled: | |
| return False | |
| allowed_fields = {"display_name", "is_admin", "is_active"} | |
| update_data = {k: v for k, v in kwargs.items() if k in allowed_fields} | |
| if not update_data: | |
| return False | |
| try: | |
| self.client.table("demoprep_users") \ | |
| .update(update_data) \ | |
| .eq("email", email.lower().strip()) \ | |
| .execute() | |
| return True | |
| except Exception as e: | |
| print(f"UserManager: Error updating user {email}: {e}") | |
| return False | |
| def reset_password(self, email: str, new_password: str) -> bool: | |
| """Reset a user's password.""" | |
| if not self.enabled: | |
| return False | |
| try: | |
| self.client.table("demoprep_users") \ | |
| .update({"password_hash": self._hash_password(new_password)}) \ | |
| .eq("email", email.lower().strip()) \ | |
| .execute() | |
| return True | |
| except Exception as e: | |
| print(f"UserManager: Error resetting password for {email}: {e}") | |
| return False | |
| def deactivate_user(self, email: str) -> bool: | |
| """Deactivate a user (soft delete).""" | |
| return self.update_user(email, is_active=False) | |
| def activate_user(self, email: str) -> bool: | |
| """Reactivate a user.""" | |
| return self.update_user(email, is_active=True) | |
| def is_admin(self, email: str) -> bool: | |
| """Check if a user is an admin.""" | |
| if not self.enabled: | |
| return False | |
| try: | |
| result = self.client.table("demoprep_users") \ | |
| .select("is_admin") \ | |
| .eq("email", email.lower().strip()) \ | |
| .execute() | |
| if result.data and len(result.data) > 0: | |
| return result.data[0].get("is_admin", False) | |
| return False | |
| except Exception: | |
| return False | |
| # ========================================================================== | |
| # Admin Settings (System-wide, stored under __admin__ key) | |
| # ========================================================================== | |
| ADMIN_USER_KEY = "__admin__" | |
| # Admin settings keys with display labels | |
| ADMIN_SETTINGS_KEYS = { | |
| # ThoughtSpot | |
| "THOUGHTSPOT_URL": "ThoughtSpot Instance URL", | |
| "THOUGHTSPOT_TRUSTED_AUTH_KEY": "ThoughtSpot Trusted Auth Key", | |
| "THOUGHTSPOT_ADMIN_USER": "ThoughtSpot Admin Username", | |
| # LLM API Keys | |
| "OPENAI_API_KEY": "OpenAI API Key", | |
| "GOOGLE_API_KEY": "Google API Key", | |
| # Snowflake | |
| "SNOWFLAKE_ACCOUNT": "Snowflake Account", | |
| "SNOWFLAKE_KP_USER": "Snowflake Key Pair User", | |
| "SNOWFLAKE_KP_PK": "Snowflake Private Key (PEM)", | |
| "SNOWFLAKE_KP_PASSPHRASE": "Snowflake Private Key Passphrase", | |
| "SNOWFLAKE_ROLE": "Snowflake Role", | |
| "SNOWFLAKE_WAREHOUSE": "Snowflake Warehouse", | |
| "SNOWFLAKE_DATABASE": "Snowflake Database", | |
| "SNOWFLAKE_SSO_USER": "Snowflake SSO User", | |
| } | |
| # Cached admin settings (loaded once per session, refreshed on demand) | |
| _admin_settings_cache = None | |
| def load_admin_settings(force_refresh: bool = False) -> Dict[str, str]: | |
| """ | |
| Load system-wide admin settings from Supabase. | |
| Uses __admin__ as the user_email key. | |
| Results are cached after first load. | |
| Returns: | |
| Dictionary of {setting_key: value} for all admin settings. | |
| """ | |
| global _admin_settings_cache | |
| if _admin_settings_cache is not None and not force_refresh: | |
| return _admin_settings_cache | |
| settings_client = SupabaseSettings() | |
| if not settings_client.is_enabled(): | |
| print("⚠️ Supabase not available - cannot load admin settings") | |
| return {} | |
| raw = settings_client.load_all_settings(ADMIN_USER_KEY) | |
| # Return all settings, using empty string for any missing keys | |
| result = {} | |
| for key in ADMIN_SETTINGS_KEYS: | |
| result[key] = raw.get(key, "") | |
| _admin_settings_cache = result | |
| return result | |
| def save_admin_settings(settings: Dict[str, str]) -> bool: | |
| """ | |
| Save system-wide admin settings to Supabase under __admin__ key. | |
| Also refreshes the cache. | |
| Args: | |
| settings: Dictionary of {key: value} to save. | |
| Returns: | |
| True if all saved successfully. | |
| """ | |
| global _admin_settings_cache | |
| settings_client = SupabaseSettings() | |
| if not settings_client.is_enabled(): | |
| print("⚠️ Supabase not available - cannot save admin settings") | |
| return False | |
| success = True | |
| for key, value in settings.items(): | |
| if key in ADMIN_SETTINGS_KEYS: | |
| if not settings_client.save_setting(ADMIN_USER_KEY, key, str(value), category="admin"): | |
| success = False | |
| # Refresh cache | |
| _admin_settings_cache = None | |
| load_admin_settings(force_refresh=True) | |
| return success | |
| def get_admin_setting(key: str, required: bool = True) -> str: | |
| """ | |
| Get a single admin setting value. Loads from cache if available. | |
| Args: | |
| key: The setting key (e.g. 'THOUGHTSPOT_URL') | |
| required: If True, raises ValueError when setting is missing/empty | |
| Returns: | |
| Setting value as string. | |
| Raises: | |
| ValueError: If required=True and setting is missing or empty. | |
| """ | |
| settings = load_admin_settings() | |
| value = settings.get(key, "") | |
| if required and not value: | |
| raise ValueError(f"Missing required admin setting: {key}. Configure in Admin Settings tab.") | |
| return value | |
| def inject_admin_settings_to_env() -> bool: | |
| """ | |
| Load admin settings from Supabase and inject them into os.environ. | |
| Non-LLM settings remain sourced from Supabase for runtime compatibility. | |
| LLM keys are intentionally excluded so OPENAI_API_KEY/GOOGLE_API_KEY | |
| stay environment-only. | |
| Called once after login, before any pipeline runs. | |
| Also maps THOUGHTSPOT_ADMIN_USER -> THOUGHTSPOT_USERNAME for backwards compatibility. | |
| Returns: | |
| True if settings were loaded and injected successfully. | |
| """ | |
| settings = load_admin_settings(force_refresh=True) | |
| if not settings: | |
| print("⚠️ No admin settings found in Supabase") | |
| return False | |
| injected = 0 | |
| skipped = {"OPENAI_API_KEY", "GOOGLE_API_KEY"} | |
| for key, value in settings.items(): | |
| if key in skipped: | |
| continue | |
| if value: # Only inject non-empty values | |
| os.environ[key] = str(value) | |
| injected += 1 | |
| # Map THOUGHTSPOT_ADMIN_USER -> THOUGHTSPOT_USERNAME for backwards compat | |
| admin_user = settings.get("THOUGHTSPOT_ADMIN_USER", "") | |
| if admin_user: | |
| os.environ["THOUGHTSPOT_USERNAME"] = admin_user | |
| print( | |
| f"✅ Injected {injected} admin settings from Supabase into environment " | |
| "(LLM keys remain environment-only)." | |
| ) | |
| return injected > 0 | |
| # Convenience functions for Gradio integration | |
| # Note: save_gradio_settings has been removed - use SupabaseSettings().save_all_settings() directly | |
| def load_gradio_settings(email: str) -> Dict[str, Any]: | |
| """ | |
| Load all Gradio settings for a user | |
| Returns: | |
| Dictionary with all settings or defaults | |
| """ | |
| if not email or not str(email).strip(): | |
| raise ValueError("Authenticated user email is required to load settings.") | |
| email = str(email).strip().lower() | |
| settings_client = SupabaseSettings() | |
| if not settings_client.is_enabled(): | |
| # Return defaults if Supabase not configured | |
| return { | |
| "default_llm": DEFAULT_LLM_MODEL, | |
| "company_size": "Medium (100-1000)", | |
| "default_warehouse": "COMPUTE_WH", | |
| "default_database": "DEMO_DB", | |
| "thoughtspot_url": "", | |
| "thoughtspot_username": "", | |
| "batch_size": 5000, | |
| "thread_count": 4 | |
| } | |
| # Load saved settings | |
| saved_settings = settings_client.load_all_settings(email) | |
| # If user has NO saved settings, auto-create defaults (first login) | |
| if not saved_settings: | |
| print(f"[Settings] New user {email} - creating default settings in Supabase") | |
| try: | |
| settings_client.save_all_settings(email, { | |
| "default_llm": DEFAULT_LLM_MODEL, | |
| "default_company_url": "Amazon.com", | |
| "default_use_case": "Sales Analytics", | |
| "use_existing_model": "false", | |
| "fact_table_size": "1000", | |
| "dim_table_size": "100", | |
| "geo_scope": "USA Only", | |
| "validation_mode": "Off", | |
| "column_naming_style": "Regular Case", | |
| "liveboard_name": "", | |
| "tag_name": "", | |
| "object_naming_prefix": "", | |
| "existing_model_guid": "", | |
| }, category="defaults") | |
| except Exception as e: | |
| print(f"[Settings] Could not create defaults for {email}: {e}") | |
| # Merge with defaults - include ALL settings fields | |
| defaults = { | |
| # AI Configuration | |
| "default_llm": DEFAULT_LLM_MODEL, | |
| "temperature": 0.3, | |
| "max_tokens": 4000, | |
| # Default Values | |
| "default_company_url": "Amazon.com", | |
| "default_use_case": "Sales Analytics", | |
| # Data Generation Settings | |
| "fact_table_size": "1000", | |
| "dim_table_size": "100", | |
| # ThoughtSpot Connection (loaded from admin settings) | |
| "thoughtspot_url": "", | |
| "thoughtspot_username": "", | |
| "liveboard_name": "", | |
| # Snowflake Connection (loaded from admin settings) | |
| "snowflake_account": "", | |
| "snowflake_user": "", | |
| "snowflake_role": "", | |
| "default_warehouse": "", | |
| "default_database": "", | |
| "default_schema": "PUBLIC", | |
| # Demo Configuration | |
| "tag_name": "", | |
| "object_naming_prefix": "", | |
| "column_naming_style": "Regular Case", | |
| # Liveboard Creation | |
| "geo_scope": "USA Only", | |
| "validation_mode": "Off", | |
| # Existing Model Mode | |
| "use_existing_model": False, | |
| "existing_model_guid": "", | |
| # Advanced Options | |
| "batch_size": 5000, | |
| "thread_count": 4 | |
| } | |
| for key in defaults: | |
| if key in saved_settings: | |
| # Parse JSON values if needed | |
| value = saved_settings[key] | |
| try: | |
| defaults[key] = json.loads(value) if isinstance(value, str) and value.startswith('{') else value | |
| except: | |
| defaults[key] = value | |
| # Ensure batch_size and thread_count are integers | |
| if "batch_size" in defaults: | |
| try: | |
| defaults["batch_size"] = int(defaults["batch_size"]) | |
| except (ValueError, TypeError): | |
| defaults["batch_size"] = 5000 | |
| if "thread_count" in defaults: | |
| try: | |
| defaults["thread_count"] = int(defaults["thread_count"]) | |
| except (ValueError, TypeError): | |
| defaults["thread_count"] = 4 | |
| return defaults | |
| if __name__ == "__main__": | |
| # Test the Supabase client | |
| print("\n🧪 Testing Supabase Settings Client\n") | |
| settings = SupabaseSettings() | |
| if settings.is_enabled(): | |
| test_email = "test@example.com" | |
| # Test saving settings | |
| print("Testing save operations...") | |
| settings.save_setting(test_email, "test_setting", "test_value", "test_category") | |
| settings.save_setting(test_email, "default_llm", "GPT-4", "ai_config") | |
| # Test loading settings | |
| print("\nTesting load operations...") | |
| value = settings.load_setting(test_email, "test_setting") | |
| print(f"Loaded test_setting: {value}") | |
| all_settings = settings.load_all_settings(test_email) | |
| print(f"All settings: {all_settings}") | |
| # Test demo history | |
| print("\nTesting demo history...") | |
| demo_id = settings.save_demo_history( | |
| test_email, | |
| "Acme Corp", | |
| "Technology", | |
| "Sales AI Analyst", | |
| {"rows": 10000}, | |
| status="completed" | |
| ) | |
| print(f"Saved demo with ID: {demo_id}") | |
| history = settings.get_demo_history(test_email) | |
| print(f"Demo history count: {len(history)}") | |
| # Cleanup | |
| settings.delete_setting(test_email, "test_setting") | |
| print("\n✅ All tests completed!") | |
| else: | |
| print("❌ Supabase not configured. Add SUPABASE_URL and SUPABASE_ANON_KEY to .env file.") |