Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| from supabase import create_client, Client | |
| import os | |
| from config.settings import SUPABASE_URL, SUPABASE_KEY, REDIRECT_TO_URL | |
| import traceback | |
| from supabase.lib.client_options import ClientOptions | |
| # --- Supabase Authentication Class --- | |
| class SupabaseAuth: | |
| """A class to handle Supabase authentication logic.""" | |
| def __init__(self, url: str, key: str): | |
| self.url = url | |
| self.key = key | |
| try: | |
| self.client: Client = create_client(url, key) | |
| except Exception as e: | |
| print(f"Error creating Supabase client: {e}") | |
| self.client = None | |
| def login(self, email: str, password: str): | |
| """ | |
| Attempts to log in a user and returns a user-specific client. | |
| """ | |
| if not self.client: | |
| return {'success': False, 'data': None, 'message': "Supabase client not initialized."} | |
| try: | |
| response = self.client.auth.sign_in_with_password({"email": email, "password": password}) | |
| user_session = response.session | |
| # Create a new, authenticated client for this user | |
| authenticated_client = create_client( | |
| self.url, | |
| self.key, | |
| # options={"headers": {"Authorization": f"Bearer {user_session.access_token}"}} | |
| options=ClientOptions( | |
| headers={"Authorization": f"Bearer {user_session.access_token}"}, | |
| ) | |
| ) | |
| authenticated_client.auth.set_session(user_session.access_token, user_session.refresh_token) | |
| session_data = { | |
| "refresh_token": user_session.refresh_token, | |
| "user_email": user_session.user.email, | |
| "client": authenticated_client | |
| } | |
| return {'success': True, 'data': session_data, 'message': f"Welcome, {user_session.user.email}!"} | |
| except Exception as e: | |
| # print(f"Error logging in: {e}") | |
| # traceback.print_exc() | |
| # Handle specific error messages for better user feedback | |
| return {'success': False, 'data': None, 'message': f"Login failed: {e}"} | |
| def sign_up(self, email: str, password: str): | |
| """Signs up a new user.""" | |
| if not self.client: | |
| return {'success': False, 'message': "Supabase client not initialized."} | |
| try: | |
| # Supabase sign_up returns a session if email confirmation is disabled, | |
| # or just a user object if it's enabled. We'll just return a success message. | |
| self.client.auth.sign_up({ | |
| "email": email, | |
| "password": password, | |
| }) | |
| return {'success': True, 'message': 'Sign up successful! You can login now.'} | |
| except Exception as e: | |
| return {'success': False, 'message': f"Sign up failed: {e}"} | |
| def restore_session(self, refresh_token: str): | |
| """ | |
| Attempts to restore a session using a refresh token. | |
| """ | |
| if not self.client: | |
| return {'success': False, 'data': None, 'message': "Supabase client not initialized."} | |
| try: | |
| response = self.client.auth.refresh_session(refresh_token) | |
| user_session = response.session | |
| authenticated_client = create_client( | |
| self.url, | |
| self.key, | |
| options=ClientOptions( | |
| headers={"Authorization": f"Bearer {user_session.access_token}"}, | |
| ) | |
| ) | |
| authenticated_client.auth.set_session(user_session.access_token, user_session.refresh_token) | |
| session_data = { | |
| "refresh_token": user_session.refresh_token, | |
| "user_email": user_session.user.email, | |
| "client": authenticated_client | |
| } | |
| print("Session restored successfully:", session_data) | |
| return {'success': True, 'data': session_data, 'message': f"Welcome, {user_session.user.email}!"} | |
| except Exception as e: | |
| print("failed to restore session:", e) | |
| return {'success': False, 'data': None, 'message': f"Failed to restore session: {e}"} | |
| def logout(self, user_client: Client): | |
| """Signs out the user from Supabase, invalidating the token.""" | |
| if not user_client: | |
| return {'success': False, 'message': 'No user client provided to log out.'} | |
| try: | |
| user_client.auth.sign_out() | |
| return {'success': True, 'message': 'Successfully signed out from Supabase.'} | |
| except Exception as e: | |
| # It's often safe to ignore errors here (e.g., if token already expired) | |
| # but we'll log it for debugging. | |
| print(f"Error signing out from Supabase: {e}") | |
| return {'success': False, 'message': f'Error signing out: {e}'} | |
| def change_password(self, user_client: Client, new_password: str): | |
| """Changes the user's password.""" | |
| if not user_client: | |
| return {'success': False, 'message': 'No user client provided to change password.'} | |
| try: | |
| user_client.auth.update_user({"password": new_password}) | |
| return {'success': True, 'message': 'Password changed successfully.'} | |
| except Exception as e: | |
| return {'success': False, 'message': f'Error changing password: {e}'} | |
| def is_logged_in(self, user_client: Client): | |
| """Checks if a user is currently authenticated and returns their email.""" | |
| print("Checking if user is logged in...", user_client) | |
| if not user_client: | |
| return {'success': False, 'email': None, 'message': 'No user client provided.'} | |
| try: | |
| user_response = user_client.auth.get_user() | |
| user = user_response.user | |
| if user: | |
| return {'success': True, 'email': user.email, 'message': f'Logged in as: {user.email}'} | |
| else: | |
| return {'success': False, 'email': None, 'message': 'User is not logged in.'} | |
| except Exception as e: | |
| # This might happen if the token has expired and can't be refreshed. | |
| return {'success': False, 'email': None, 'message': f'Authentication check failed: {e}'} | |
| def reset_password_for_email(self, email: str): | |
| """ | |
| Sends a password reset email to the specified address. | |
| """ | |
| if not self.client: | |
| return {'success': False, 'message': "Supabase client not initialized."} | |
| try: | |
| self.client.auth.reset_password_for_email( | |
| email, | |
| { | |
| "redirect_to": str(REDIRECT_TO_URL), | |
| } | |
| ) | |
| return {'success': True, 'message': "Password reset email sent. Check your inbox!"} | |
| except Exception as e: | |
| return {'success': False, 'message': f"Failed to send reset email: {e}"} | |
| def retrieve_session_from_tokens(self, access_token: str, refresh_token: str): | |
| """ | |
| Retrieves a session from an access token and refresh token. | |
| This is typically used after a password recovery link is clicked. | |
| """ | |
| if not self.client: | |
| return {'success': False, 'data': None, 'message': "Supabase client not initialized."} | |
| try: | |
| # Set the session on the main client to verify tokens and get user info | |
| self.client.auth.set_session(access_token, refresh_token) | |
| user_response = self.client.auth.get_user() | |
| user = user_response.user | |
| if not user: | |
| return {'success': False, 'data': None, 'message': "Could not retrieve user from tokens."} | |
| # Create a new, authenticated client for this user, similar to login | |
| authenticated_client = create_client( | |
| self.url, | |
| self.key, | |
| options=ClientOptions( | |
| headers={"Authorization": f"Bearer {access_token}"}, | |
| ) | |
| ) | |
| authenticated_client.auth.set_session(access_token, refresh_token) | |
| session_data = { | |
| "refresh_token": refresh_token, | |
| "user_email": user.email, | |
| "client": authenticated_client | |
| } | |
| return {'success': True, 'data': session_data, 'message': f"Welcome, {user.email}!"} | |
| except Exception as e: | |
| return {'success': False, 'data': None, 'message': f"Failed to retrieve session from tokens: {e}"} | |
| auth_handler = SupabaseAuth(SUPABASE_URL, SUPABASE_KEY) | |