swiftops-backend / src /app /core /supabase_auth.py
kamau1's picture
fix: Improve Supabase signup error handling and auto-confirm emails
7e93600
"""
Supabase Authentication Integration
Handles user authentication via Supabase Auth API
"""
from supabase import create_client, Client
from app.config import settings
from typing import Optional, Dict, Any
import logging
import httpx
logger = logging.getLogger(__name__)
# Singleton HTTP client for connection reuse (MAJOR PERFORMANCE BOOST)
_http_client = None
def get_http_client():
"""Get singleton HTTP client with connection pooling"""
global _http_client
if _http_client is None:
_http_client = httpx.AsyncClient(
timeout=30.0,
limits=httpx.Limits(
max_keepalive_connections=20,
max_connections=50,
keepalive_expiry=300.0
)
)
return _http_client
class SupabaseAuth:
"""Supabase Authentication Manager - Singleton with connection pooling"""
_instance = None
def __new__(cls):
"""Singleton pattern to reuse Supabase client"""
if cls._instance is None:
cls._instance = super(SupabaseAuth, cls).__new__(cls)
cls._instance._initialize()
return cls._instance
def _initialize(self):
"""
Initialize Supabase client for backend API use
Note: Backend APIs are stateless token validators, not session managers.
The client is used only to validate tokens sent by clients.
Session management and token refresh is handled by the frontend/mobile apps.
"""
self.client: Client = create_client(
settings.SUPABASE_URL,
settings.SUPABASE_KEY
)
async def sign_up(self, email: str, password: str, user_metadata: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""
Register a new user with Supabase Auth
Args:
email: User's email address
password: User's password
user_metadata: Additional user data (first_name, last_name, phone, etc.)
Returns:
Dict containing user data and session
Raises:
Exception: If registration fails
"""
try:
response = self.client.auth.sign_up({
"email": email,
"password": password,
"options": {
"data": user_metadata or {},
"email_redirect_to": None # Disable email confirmation redirect
}
})
if response.user:
logger.info(f"User registered successfully: {email}")
# Auto-confirm user if email confirmation is enabled
# This prevents "Email not confirmed" errors
if not response.session:
logger.warning(f"No session returned for {email}, attempting to confirm email")
try:
# Use admin API to confirm user email
from supabase import create_client
admin_client = create_client(
self.url,
settings.SUPABASE_SERVICE_ROLE_KEY
)
admin_client.auth.admin.update_user_by_id(
response.user.id,
{"email_confirm": True}
)
logger.info(f"Email auto-confirmed for {email}")
# Sign in to get session
sign_in_response = await self.sign_in(email, password)
return sign_in_response
except Exception as confirm_error:
logger.error(f"Failed to auto-confirm email: {str(confirm_error)}")
# Continue anyway - user may need to confirm manually
return {
"user": response.user,
"session": response.session
}
else:
raise Exception("Failed to create user - no user returned from Supabase")
except Exception as e:
error_msg = str(e)
logger.error(f"Sign up error for {email}: {error_msg}")
# Provide more specific error messages
if "556" in error_msg or "Server error" in error_msg:
raise Exception(f"Supabase authentication service error. This may be due to email verification requirements or service limits. Please contact support. Technical details: {error_msg}")
elif "email" in error_msg.lower() and "confirm" in error_msg.lower():
raise Exception("Email confirmation required. Please check your email inbox for the confirmation link.")
elif "rate limit" in error_msg.lower():
raise Exception("Too many signup attempts. Please try again later.")
else:
raise
async def sign_in(self, email: str, password: str) -> Dict[str, Any]:
"""
Sign in user with email and password
Args:
email: User's email address
password: User's password
Returns:
Dict containing user data and session with access_token
Raises:
Exception: If login fails
"""
try:
response = self.client.auth.sign_in_with_password({
"email": email,
"password": password
})
if response.user and response.session:
logger.info(f"User signed in successfully: {email}")
return {
"user": response.user,
"session": response.session,
"access_token": response.session.access_token
}
else:
raise Exception("Invalid credentials")
except Exception as e:
logger.error(f"Sign in error: {str(e)}")
raise
async def refresh_session(self, refresh_token: str) -> Dict[str, Any]:
"""
Refresh user session with refresh token
Args:
refresh_token: User's refresh token
Returns:
Dict containing new session with access_token and refresh_token (rotated)
Raises:
Exception: If refresh fails
"""
try:
response = self.client.auth.refresh_session(refresh_token)
if response.session and response.user:
logger.info("Session refreshed successfully")
return {
"user": response.user,
"session": response.session
}
else:
raise Exception("Failed to refresh session")
except Exception as e:
logger.error(f"Session refresh error: {str(e)}")
raise
async def sign_out(self, access_token: str) -> bool:
"""
Sign out user
Args:
access_token: User's access token
Returns:
True if successful
"""
try:
self.client.auth.sign_out()
logger.info("User signed out successfully")
return True
except Exception as e:
logger.error(f"Sign out error: {str(e)}")
return False
async def get_user(self, access_token: str) -> Optional[Dict[str, Any]]:
"""
Get user data from access token
Args:
access_token: User's access token
Returns:
User data if token is valid, None otherwise
"""
try:
response = self.client.auth.get_user(access_token)
if response.user:
return response.user
return None
except Exception as e:
logger.error(f"Get user error: {str(e)}")
return None
async def update_user(self, access_token: str, user_metadata: Dict[str, Any]) -> Dict[str, Any]:
"""
Update user metadata
Args:
access_token: User's access token
user_metadata: Updated user data
Returns:
Updated user data
"""
try:
# Set the session first
self.client.auth.set_session(access_token, access_token)
response = self.client.auth.update_user({
"data": user_metadata
})
if response.user:
logger.info("User updated successfully")
return response.user
else:
raise Exception("Failed to update user")
except Exception as e:
logger.error(f"Update user error: {str(e)}")
raise
async def reset_password_email(self, email: str) -> bool:
"""
Send password reset email
Args:
email: User's email address
Returns:
True if email sent successfully
"""
try:
self.client.auth.reset_password_email(email)
logger.info(f"Password reset email sent to: {email}")
return True
except Exception as e:
logger.error(f"Reset password error: {str(e)}")
return False
async def verify_token(self, access_token: str) -> Optional[str]:
"""
Verify access token and return user ID
Args:
access_token: JWT access token
Returns:
User ID if token is valid, None otherwise
"""
try:
user = await self.get_user(access_token)
if user:
return user.id
return None
except Exception as e:
logger.error(f"Token verification error: {str(e)}")
return None
# Global instance
supabase_auth = SupabaseAuth()