Spaces:
Running
on
T4
Running
on
T4
File size: 8,631 Bytes
ed8368e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 |
import gradio as gr
from supabase import create_client, Client
import os
from config.settings import SUPABASE_URL, SUPABASE_KEY, REDIRECT_TO_URL
import traceback
from supabase.lib.client_options import ClientOptions
# --- Supabase Authentication Class ---
class SupabaseAuth:
"""A class to handle Supabase authentication logic."""
def __init__(self, url: str, key: str):
self.url = url
self.key = key
try:
self.client: Client = create_client(url, key)
except Exception as e:
print(f"Error creating Supabase client: {e}")
self.client = None
def login(self, email: str, password: str):
"""
Attempts to log in a user and returns a user-specific client.
"""
if not self.client:
return {'success': False, 'data': None, 'message': "Supabase client not initialized."}
try:
response = self.client.auth.sign_in_with_password({"email": email, "password": password})
user_session = response.session
# Create a new, authenticated client for this user
authenticated_client = create_client(
self.url,
self.key,
# options={"headers": {"Authorization": f"Bearer {user_session.access_token}"}}
options=ClientOptions(
headers={"Authorization": f"Bearer {user_session.access_token}"},
)
)
authenticated_client.auth.set_session(user_session.access_token, user_session.refresh_token)
session_data = {
"refresh_token": user_session.refresh_token,
"user_email": user_session.user.email,
"client": authenticated_client
}
return {'success': True, 'data': session_data, 'message': f"Welcome, {user_session.user.email}!"}
except Exception as e:
# print(f"Error logging in: {e}")
# traceback.print_exc()
# Handle specific error messages for better user feedback
return {'success': False, 'data': None, 'message': f"Login failed: {e}"}
def sign_up(self, email: str, password: str):
"""Signs up a new user."""
if not self.client:
return {'success': False, 'message': "Supabase client not initialized."}
try:
# Supabase sign_up returns a session if email confirmation is disabled,
# or just a user object if it's enabled. We'll just return a success message.
self.client.auth.sign_up({
"email": email,
"password": password,
})
return {'success': True, 'message': 'Sign up successful! You can login now.'}
except Exception as e:
return {'success': False, 'message': f"Sign up failed: {e}"}
def restore_session(self, refresh_token: str):
"""
Attempts to restore a session using a refresh token.
"""
if not self.client:
return {'success': False, 'data': None, 'message': "Supabase client not initialized."}
try:
response = self.client.auth.refresh_session(refresh_token)
user_session = response.session
authenticated_client = create_client(
self.url,
self.key,
options=ClientOptions(
headers={"Authorization": f"Bearer {user_session.access_token}"},
)
)
authenticated_client.auth.set_session(user_session.access_token, user_session.refresh_token)
session_data = {
"refresh_token": user_session.refresh_token,
"user_email": user_session.user.email,
"client": authenticated_client
}
print("Session restored successfully:", session_data)
return {'success': True, 'data': session_data, 'message': f"Welcome, {user_session.user.email}!"}
except Exception as e:
print("failed to restore session:", e)
return {'success': False, 'data': None, 'message': f"Failed to restore session: {e}"}
def logout(self, user_client: Client):
"""Signs out the user from Supabase, invalidating the token."""
if not user_client:
return {'success': False, 'message': 'No user client provided to log out.'}
try:
user_client.auth.sign_out()
return {'success': True, 'message': 'Successfully signed out from Supabase.'}
except Exception as e:
# It's often safe to ignore errors here (e.g., if token already expired)
# but we'll log it for debugging.
print(f"Error signing out from Supabase: {e}")
return {'success': False, 'message': f'Error signing out: {e}'}
def change_password(self, user_client: Client, new_password: str):
"""Changes the user's password."""
if not user_client:
return {'success': False, 'message': 'No user client provided to change password.'}
try:
user_client.auth.update_user({"password": new_password})
return {'success': True, 'message': 'Password changed successfully.'}
except Exception as e:
return {'success': False, 'message': f'Error changing password: {e}'}
def is_logged_in(self, user_client: Client):
"""Checks if a user is currently authenticated and returns their email."""
print("Checking if user is logged in...", user_client)
if not user_client:
return {'success': False, 'email': None, 'message': 'No user client provided.'}
try:
user_response = user_client.auth.get_user()
user = user_response.user
if user:
return {'success': True, 'email': user.email, 'message': f'Logged in as: {user.email}'}
else:
return {'success': False, 'email': None, 'message': 'User is not logged in.'}
except Exception as e:
# This might happen if the token has expired and can't be refreshed.
return {'success': False, 'email': None, 'message': f'Authentication check failed: {e}'}
def reset_password_for_email(self, email: str):
"""
Sends a password reset email to the specified address.
"""
if not self.client:
return {'success': False, 'message': "Supabase client not initialized."}
try:
self.client.auth.reset_password_for_email(
email,
{
"redirect_to": str(REDIRECT_TO_URL),
}
)
return {'success': True, 'message': "Password reset email sent. Check your inbox!"}
except Exception as e:
return {'success': False, 'message': f"Failed to send reset email: {e}"}
def retrieve_session_from_tokens(self, access_token: str, refresh_token: str):
"""
Retrieves a session from an access token and refresh token.
This is typically used after a password recovery link is clicked.
"""
if not self.client:
return {'success': False, 'data': None, 'message': "Supabase client not initialized."}
try:
# Set the session on the main client to verify tokens and get user info
self.client.auth.set_session(access_token, refresh_token)
user_response = self.client.auth.get_user()
user = user_response.user
if not user:
return {'success': False, 'data': None, 'message': "Could not retrieve user from tokens."}
# Create a new, authenticated client for this user, similar to login
authenticated_client = create_client(
self.url,
self.key,
options=ClientOptions(
headers={"Authorization": f"Bearer {access_token}"},
)
)
authenticated_client.auth.set_session(access_token, refresh_token)
session_data = {
"refresh_token": refresh_token,
"user_email": user.email,
"client": authenticated_client
}
return {'success': True, 'data': session_data, 'message': f"Welcome, {user.email}!"}
except Exception as e:
return {'success': False, 'data': None, 'message': f"Failed to retrieve session from tokens: {e}"}
auth_handler = SupabaseAuth(SUPABASE_URL, SUPABASE_KEY)
|