""" Supabase Authentication Integration Handles user authentication via Supabase Auth API """ from supabase import create_client, Client from app.config import settings from typing import Optional, Dict, Any import logging import httpx logger = logging.getLogger(__name__) # Singleton HTTP client for connection reuse (MAJOR PERFORMANCE BOOST) _http_client = None def get_http_client(): """Get singleton HTTP client with connection pooling""" global _http_client if _http_client is None: _http_client = httpx.AsyncClient( timeout=30.0, limits=httpx.Limits( max_keepalive_connections=20, max_connections=50, keepalive_expiry=300.0 ) ) return _http_client class SupabaseAuth: """Supabase Authentication Manager - Singleton with connection pooling""" _instance = None def __new__(cls): """Singleton pattern to reuse Supabase client""" if cls._instance is None: cls._instance = super(SupabaseAuth, cls).__new__(cls) cls._instance._initialize() return cls._instance def _initialize(self): """ Initialize Supabase client for backend API use Note: Backend APIs are stateless token validators, not session managers. The client is used only to validate tokens sent by clients. Session management and token refresh is handled by the frontend/mobile apps. """ self.client: Client = create_client( settings.SUPABASE_URL, settings.SUPABASE_KEY ) async def sign_up(self, email: str, password: str, user_metadata: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: """ Register a new user with Supabase Auth Args: email: User's email address password: User's password user_metadata: Additional user data (first_name, last_name, phone, etc.) Returns: Dict containing user data and session Raises: Exception: If registration fails """ try: response = self.client.auth.sign_up({ "email": email, "password": password, "options": { "data": user_metadata or {}, "email_redirect_to": None # Disable email confirmation redirect } }) if response.user: logger.info(f"User registered successfully: {email}") # Auto-confirm user if email confirmation is enabled # This prevents "Email not confirmed" errors if not response.session: logger.warning(f"No session returned for {email}, attempting to confirm email") try: # Use admin API to confirm user email from supabase import create_client admin_client = create_client( self.url, settings.SUPABASE_SERVICE_ROLE_KEY ) admin_client.auth.admin.update_user_by_id( response.user.id, {"email_confirm": True} ) logger.info(f"Email auto-confirmed for {email}") # Sign in to get session sign_in_response = await self.sign_in(email, password) return sign_in_response except Exception as confirm_error: logger.error(f"Failed to auto-confirm email: {str(confirm_error)}") # Continue anyway - user may need to confirm manually return { "user": response.user, "session": response.session } else: raise Exception("Failed to create user - no user returned from Supabase") except Exception as e: error_msg = str(e) logger.error(f"Sign up error for {email}: {error_msg}") # Provide more specific error messages if "556" in error_msg or "Server error" in error_msg: raise Exception(f"Supabase authentication service error. This may be due to email verification requirements or service limits. Please contact support. Technical details: {error_msg}") elif "email" in error_msg.lower() and "confirm" in error_msg.lower(): raise Exception("Email confirmation required. Please check your email inbox for the confirmation link.") elif "rate limit" in error_msg.lower(): raise Exception("Too many signup attempts. Please try again later.") else: raise async def sign_in(self, email: str, password: str) -> Dict[str, Any]: """ Sign in user with email and password Args: email: User's email address password: User's password Returns: Dict containing user data and session with access_token Raises: Exception: If login fails """ try: response = self.client.auth.sign_in_with_password({ "email": email, "password": password }) if response.user and response.session: logger.info(f"User signed in successfully: {email}") return { "user": response.user, "session": response.session, "access_token": response.session.access_token } else: raise Exception("Invalid credentials") except Exception as e: logger.error(f"Sign in error: {str(e)}") raise async def refresh_session(self, refresh_token: str) -> Dict[str, Any]: """ Refresh user session with refresh token Args: refresh_token: User's refresh token Returns: Dict containing new session with access_token and refresh_token (rotated) Raises: Exception: If refresh fails """ try: response = self.client.auth.refresh_session(refresh_token) if response.session and response.user: logger.info("Session refreshed successfully") return { "user": response.user, "session": response.session } else: raise Exception("Failed to refresh session") except Exception as e: logger.error(f"Session refresh error: {str(e)}") raise async def sign_out(self, access_token: str) -> bool: """ Sign out user Args: access_token: User's access token Returns: True if successful """ try: self.client.auth.sign_out() logger.info("User signed out successfully") return True except Exception as e: logger.error(f"Sign out error: {str(e)}") return False async def get_user(self, access_token: str) -> Optional[Dict[str, Any]]: """ Get user data from access token Args: access_token: User's access token Returns: User data if token is valid, None otherwise """ try: response = self.client.auth.get_user(access_token) if response.user: return response.user return None except Exception as e: logger.error(f"Get user error: {str(e)}") return None async def update_user(self, access_token: str, user_metadata: Dict[str, Any]) -> Dict[str, Any]: """ Update user metadata Args: access_token: User's access token user_metadata: Updated user data Returns: Updated user data """ try: # Set the session first self.client.auth.set_session(access_token, access_token) response = self.client.auth.update_user({ "data": user_metadata }) if response.user: logger.info("User updated successfully") return response.user else: raise Exception("Failed to update user") except Exception as e: logger.error(f"Update user error: {str(e)}") raise async def reset_password_email(self, email: str) -> bool: """ Send password reset email Args: email: User's email address Returns: True if email sent successfully """ try: self.client.auth.reset_password_email(email) logger.info(f"Password reset email sent to: {email}") return True except Exception as e: logger.error(f"Reset password error: {str(e)}") return False async def verify_token(self, access_token: str) -> Optional[str]: """ Verify access token and return user ID Args: access_token: JWT access token Returns: User ID if token is valid, None otherwise """ try: user = await self.get_user(access_token) if user: return user.id return None except Exception as e: logger.error(f"Token verification error: {str(e)}") return None # Global instance supabase_auth = SupabaseAuth()