ai-lead-generation / database /supabase_manager.py
togitoon's picture
Fix supabase
f6f06c6
raw
history blame
9.76 kB
#!/usr/bin/env python3
"""
Supabase Database Manager
Handles all database operations for user preferences and scheduling
"""
import os
import json
from typing import Optional, Dict, Any, List
from datetime import datetime
import logging
try:
from supabase import create_client, Client
SUPABASE_AVAILABLE = True
IMPORT_ERROR = None
except ImportError as e:
print(f"Warning: supabase-py import failed: {e}")
print("This may be due to:")
print(" - Package not installed: pip install supabase==2.9.1")
print(" - Version conflicts with other packages")
print(" - Missing system dependencies")
Client = None
SUPABASE_AVAILABLE = False
IMPORT_ERROR = str(e)
from config.settings import settings
logger = logging.getLogger(__name__)
class SupabaseManager:
"""Manages Supabase database operations for user preferences"""
def __init__(self):
"""Initialize Supabase client"""
self.client: Optional[Client] = None
self._init_client()
def _init_client(self):
"""Initialize the Supabase client"""
logger.info("Initializing Supabase database client...")
# Check for missing environment variables
missing_vars = []
if not settings.supabase_url:
missing_vars.append("SUPABASE_URL")
if not settings.supabase_key:
missing_vars.append("SUPABASE_ANON_KEY")
if missing_vars:
logger.error(f"Missing required environment variables for Supabase: {', '.join(missing_vars)}")
logger.error("Database operations will fail. Please set these environment variables:")
for var in missing_vars:
logger.error(f" export {var}=your_value_here")
return
# Check if supabase-py is installed
if not SUPABASE_AVAILABLE:
logger.error("supabase-py package not available.")
logger.error(f"Import error: {IMPORT_ERROR}")
logger.error("This indicates the package is not properly installed or has dependency conflicts.")
logger.error("To fix this on Hugging Face Spaces:")
logger.error(" 1. Ensure 'supabase==2.9.1' is in requirements.txt")
logger.error(" 2. Restart the Space to reinstall dependencies")
logger.error(" 3. Check for conflicting package versions")
return
# Log configuration (without sensitive data)
logger.info(f"Supabase URL: {settings.supabase_url}")
logger.info(f"Supabase Key: {'*' * 10}...{settings.supabase_key[-4:] if settings.supabase_key else 'None'}")
try:
self.client = create_client(settings.supabase_url, settings.supabase_key)
logger.info("✅ Supabase client initialized successfully")
# Test the connection
logger.info("Testing database connection...")
test_result = self.health_check()
if test_result["success"]:
logger.info("✅ Database connection test passed")
else:
logger.warning(f"⚠️ Database connection test failed: {test_result['error']}")
except Exception as e:
logger.error(f"❌ Failed to initialize Supabase client: {type(e).__name__}: {e}")
logger.error("This could be due to:")
logger.error(" - Invalid SUPABASE_URL format")
logger.error(" - Invalid SUPABASE_ANON_KEY")
logger.error(" - Network connectivity issues")
logger.error(" - Supabase service unavailability")
def save_user_preferences(self, email: str, search_criteria: str,
system_prompt: str, preferences: str) -> Dict[str, Any]:
"""Save or update user preferences"""
if not self.client:
logger.error(f"Cannot save preferences for {email}: Database client not initialized. Check Supabase configuration.")
return {"success": False, "error": "Database not available"}
try:
# Prepare data for upsert
preference_data = {
"email": email,
"search_criteria": search_criteria,
"system_prompt": system_prompt,
"preferences": preferences,
"updated_at": datetime.utcnow().isoformat()
}
# Upsert (insert or update) the preferences
result = self.client.table("user_preferences").upsert(
preference_data,
on_conflict="email"
).execute()
if result.data:
logger.info(f"Successfully saved preferences for {email}")
return {"success": True, "data": result.data[0]}
else:
logger.error(f"No data returned when saving preferences for {email}")
return {"success": False, "error": "No data returned from database"}
except Exception as e:
logger.error(f"Error saving preferences for {email}: {e}")
return {"success": False, "error": str(e)}
def get_user_preferences(self, email: str) -> Dict[str, Any]:
"""Get user preferences by email"""
if not self.client:
logger.error(f"Cannot get preferences for {email}: Database client not initialized. Check Supabase configuration.")
return {"success": False, "error": "Database not available"}
try:
result = self.client.table("user_preferences").select("*").eq("email", email).execute()
if result.data:
logger.info(f"Retrieved preferences for {email}")
return {"success": True, "data": result.data[0]}
else:
logger.info(f"No preferences found for {email}")
return {"success": False, "error": "No preferences found"}
except Exception as e:
logger.error(f"Error retrieving preferences for {email}: {e}")
return {"success": False, "error": str(e)}
def update_schedule_status(self, email: str, enabled: bool) -> Dict[str, Any]:
"""Update scheduling status for a user"""
if not self.client:
logger.error(f"Cannot update schedule status for {email}: Database client not initialized. Check Supabase configuration.")
return {"success": False, "error": "Database not available"}
try:
result = self.client.table("user_preferences").update({
"schedule_enabled": enabled,
"updated_at": datetime.utcnow().isoformat()
}).eq("email", email).execute()
if result.data:
logger.info(f"Updated schedule status for {email} to {enabled}")
return {"success": True, "data": result.data[0]}
else:
logger.error(f"No data returned when updating schedule status for {email}")
return {"success": False, "error": "User not found or no update performed"}
except Exception as e:
logger.error(f"Error updating schedule status for {email}: {e}")
return {"success": False, "error": str(e)}
def get_scheduled_users(self) -> Dict[str, Any]:
"""Get all users with scheduling enabled"""
if not self.client:
logger.error("Cannot get scheduled users: Database client not initialized. Check Supabase configuration.")
return {"success": False, "error": "Database not available"}
try:
result = self.client.table("user_preferences").select("*").eq("schedule_enabled", True).execute()
logger.info(f"Retrieved {len(result.data)} users with scheduling enabled")
return {"success": True, "data": result.data}
except Exception as e:
logger.error(f"Error retrieving scheduled users: {e}")
return {"success": False, "error": str(e)}
def delete_user_preferences(self, email: str) -> Dict[str, Any]:
"""Delete user preferences"""
if not self.client:
logger.error(f"Cannot delete preferences for {email}: Database client not initialized. Check Supabase configuration.")
return {"success": False, "error": "Database not available"}
try:
result = self.client.table("user_preferences").delete().eq("email", email).execute()
if result.data:
logger.info(f"Deleted preferences for {email}")
return {"success": True, "message": f"Preferences deleted for {email}"}
else:
return {"success": False, "error": "User not found"}
except Exception as e:
logger.error(f"Error deleting preferences for {email}: {e}")
return {"success": False, "error": str(e)}
def health_check(self) -> Dict[str, Any]:
"""Check if database connection is healthy"""
if not self.client:
return {"success": False, "error": "Database client not initialized"}
try:
# Try to query the table structure
result = self.client.table("user_preferences").select("count", count="exact").limit(0).execute()
return {"success": True, "message": "Database connection healthy"}
except Exception as e:
logger.error(f"Database health check failed: {e}")
return {"success": False, "error": str(e)}
# Global database manager instance
db_manager = SupabaseManager()