Spaces:
Build error
Build error
| # auth_backend.py | |
| import json | |
| import uuid | |
| import secrets | |
| import string | |
| import sys # Nécessaire pour print(..., file=sys.stderr) | |
| from datetime import datetime | |
| from werkzeug.security import generate_password_hash, check_password_hash | |
| # --- NOUVEL IMPORT: Migration vers Baserow --- | |
| # Ces fonctions sont maintenant implémentées dans baserow_storage.py | |
| from baserow_storage import ( | |
| # Fonctions de lecture/écriture pour les utilisateurs principaux (simulant l'ancienne API) | |
| get_client_user_by_api_key, | |
| load_primary_user_data, | |
| save_primary_user_data, | |
| # Fonctions de recherche indexées (nouvelles et plus efficaces) | |
| get_user_by_email, | |
| get_client_user_by_api_key, | |
| FIELD_USER_TYPE, | |
| FIELD_CLIENT_ID, | |
| ) | |
| from flask import session | |
| from config import PLANS_CONFIG | |
| from typing import Optional, Dict | |
| # ---------------------------------------------------------------------- | |
| # --- Fonctions Utilitaires et de Configuration --- | |
| # ---------------------------------------------------------------------- | |
| def get_plan_limit(plan: str) -> float: | |
| """Retourne la limite de compte pour un plan donné.""" | |
| return PLANS_CONFIG.get(plan, {}).get("limit", PLANS_CONFIG["free"]["limit"]) | |
| def get_plan_details(plan_id: str) -> Optional[Dict]: | |
| """Retourne les détails complets d'un plan à partir de son ID.""" | |
| return PLANS_CONFIG.get(plan_id) | |
| def generate_api_key(length: int = 32) -> str: | |
| """Génère une clé API sécurisée.""" | |
| chars = string.ascii_letters + string.digits | |
| return ''.join(secrets.choice(chars) for _ in range(length)) | |
| # ---------------------------------------------------------------------- | |
| # --- Fonctions d'Authentification WEB (Primary Users) --- | |
| # ---------------------------------------------------------------------- | |
| def register_user(username: str, email: str, password: str, confirm_password: str, security_question: str, security_answer: str) -> tuple[Optional[str], str, Optional[Dict]]: | |
| """ | |
| Tente d'enregistrer un nouvel utilisateur principal. | |
| Retourne l'ID utilisateur (str), un message (str), et les données utilisateur complètes (Dict) ou None. | |
| """ | |
| email = email.lower().strip() | |
| # Validation du formulaire | |
| if not all([username, email, password, confirm_password, security_question, security_answer]): | |
| # Retourne (ID, message, Data) | |
| return None, "Tous les champs sont requis.", None | |
| if password != confirm_password: | |
| return None, "Les mots de passe ne correspondent pas.", None | |
| if len(password) < 8: | |
| return None, "Le mot de passe est trop court (min 8 caractères).", None | |
| # 1. Vérification de l'existence de l'utilisateur par email | |
| if get_user_by_email(email): | |
| return None, "Un compte avec cette adresse e-mail existe déjà.", None | |
| # 2. Hachage des données | |
| password_hash = generate_password_hash(password) | |
| security_answer_hash = generate_password_hash(security_answer.lower()) | |
| # 3. Création des données utilisateur | |
| user_id = str(uuid.uuid4()) | |
| # Ligne optimisée: on génère les 5 clés en une seule liste | |
| api_keys = [generate_api_key() for _ in range(5)] | |
| new_user = { | |
| 'user_id': user_id, | |
| 'username': username, | |
| 'email': email, | |
| 'password_hash': password_hash, | |
| # ASSIGNATION DES 5 CLÉS API AUX CHAMPS CORRESPONDANTS | |
| 'api_key': api_keys[0], | |
| 'api_key_2': api_keys[1], | |
| 'api_key_3': api_keys[2], | |
| 'api_key_4': api_keys[3], | |
| 'api_key_5': api_keys[4], | |
| 'security_question': security_question, | |
| 'security_answer_hash': security_answer_hash, | |
| 'plan_id': PLANS_CONFIG['free']['baserow_value'], | |
| 'stripe_subscription_id': None, # Pas d'abonnement Stripe au début | |
| 'date_creation': datetime.now().isoformat(), | |
| 'date_plan_start': datetime.now().isoformat(), | |
| 'api_calls_month': 0, | |
| 'status': 'Active', | |
| 'baserow_row_id': None, # Sera rempli par la fonction save_primary_user_data si c'est une création | |
| # Note: D'autres champs pourraient être nécessaires ici, selon les besoins non-vus | |
| } | |
| # 4. Sauvegarde dans Baserow | |
| # La fonction save_primary_user_data mettra à jour 'baserow_row_id' dans new_user si la création est réussie. | |
| success = save_primary_user_data(new_user, commit_msg=f"feat: Création de l'utilisateur {user_id} ({email})") | |
| if success: | |
| # Retourne : ID utilisateur, message de succès, Dictionnaire utilisateur complet | |
| return user_id, "Inscription réussie. Vous pouvez maintenant vous connecter.", new_user | |
| else: | |
| # Échec de l'écriture dans la BDD (Baserow) | |
| # Retourne : None, message d'erreur, None | |
| return None, "Erreur interne lors de l'enregistrement de l'utilisateur.", None | |
| def login_user(username_or_email: str, password: str) -> tuple[Optional[str], str, Optional[Dict]]: # <-- NOUVEAU: Ajout de Optional[Dict] dans le type hint | |
| """ | |
| Tente de connecter un utilisateur principal. | |
| Retourne l'ID utilisateur (str), un message (str) et les données utilisateur (Dict). | |
| """ | |
| username_or_email = username_or_email.lower().strip() | |
| # 1. Recherche de l'utilisateur par email (Utilisation de la nouvelle fonction rapide Baserow) | |
| user = get_user_by_email(username_or_email) | |
| if user: | |
| # 2. Vérification du mot de passe | |
| if check_password_hash(user['password_hash'], password): | |
| session['user_id'] = user['user_id'] | |
| # CORRECTION : Retourne 3 valeurs : ID, Message, Données Utilisateur | |
| return user['user_id'], "Connexion réussie.", user | |
| # CORRECTION : Retourne 3 valeurs : None ID, Message, None Data | |
| return None, "Email/Nom d'utilisateur ou mot de passe invalide.", None | |
| def get_user_by_id(user_id: str) -> Optional[Dict]: | |
| """ | |
| Récupère un utilisateur principal par son ID. | |
| Utilise la fonction load_primary_user_data (qui est get_user_by_id dans Baserow). | |
| """ | |
| return load_primary_user_data(user_id) | |
| def get_user_by_api_key(api_key: str) -> Optional[Dict]: | |
| """ | |
| Récupère un utilisateur principal par sa Clé API (utilisé par le décorateur). | |
| Utilise la nouvelle fonction indexée et rapide de Baserow. | |
| """ | |
| return get_client_user_by_api_key(api_key) | |
| def reset_password_via_security_question(username_or_email: str, question: str, answer: str, new_password: str) -> tuple[bool, str]: | |
| """Réinitialise le mot de passe via la question de sécurité.""" | |
| username_or_email = username_or_email.lower().strip() | |
| # Validation du mot de passe | |
| if len(new_password) < 8: | |
| return False, "Le nouveau mot de passe est trop court (min 8 caractères)." | |
| # 1. Recherche de l'utilisateur | |
| user = get_user_by_email(username_or_email) | |
| if not user: | |
| return False, "Utilisateur introuvable." | |
| # 2. Vérification de la question/réponse | |
| if user.get('security_question') != question: | |
| return False, "Question de sécurité incorrecte." | |
| if not check_password_hash(user.get('security_answer_hash', ''), answer.lower()): | |
| return False, "Réponse de sécurité incorrecte." | |
| # 3. Hachage du nouveau mot de passe | |
| new_hashed_password = generate_password_hash(new_password) | |
| # 4. Mise à jour et sauvegarde en BDD (Baserow) | |
| user['password_hash'] = new_hashed_password | |
| success = save_primary_user_data(user, commit_msg=f"feat: Réinitialisation MDP utilisateur {user['user_id']}") | |
| if success: | |
| return True, "Mot de passe réinitialisé avec succès." | |
| else: | |
| return False, "Erreur interne lors de la mise à jour du mot de passe." | |
| def update_user_plan(user_id: str, new_plan_id: str, stripe_subscription_id: Optional[str]) -> bool: | |
| """Met à jour le plan et l'ID d'abonnement Stripe pour un utilisateur.""" | |
| user = get_user_by_id(user_id) | |
| if not user: | |
| print(f"Erreur: Utilisateur {user_id} non trouvé pour la mise à jour du plan.", file=sys.stderr) | |
| return False | |
| user['plan_id'] = new_plan_id | |
| user['stripe_subscription_id'] = stripe_subscription_id | |
| user['date_plan_start'] = datetime.now().isoformat() | |
| success = save_primary_user_data(user, commit_msg=f"feat: Mise à jour du plan pour {user_id} vers {new_plan_id}") | |
| return success | |
| def update_user_profile(user_id: str, username: str, email: str, new_password: Optional[str] = None) -> tuple[bool, str]: | |
| """ | |
| Met à jour le profil de l'utilisateur principal (client). | |
| user_id: L'UUID de l'utilisateur. | |
| username: Le nouveau nom d'utilisateur. | |
| email: Le nouvel email. | |
| new_password: Le nouveau mot de passe (si fourni et non vide). | |
| """ | |
| # 1. Chargement des données existantes | |
| user_data = load_primary_user_data(user_id) | |
| if not user_data: | |
| return False, "Erreur critique : Utilisateur introuvable pour la mise à jour." | |
| original_email = user_data.get('email') | |
| # 2. Validation et mise à jour de l'email | |
| if email and email != original_email: | |
| # Vérification si le nouvel email n'est pas déjà utilisé par un autre utilisateur | |
| # Note: get_user_by_email retourne l'objet utilisateur, pas juste l'ID. | |
| existing_user_by_email = get_user_by_email(email) | |
| # S'il trouve un utilisateur ET que son ID ne correspond pas à l'utilisateur actuel, | |
| # l'email est déjà pris. | |
| if existing_user_by_email and existing_user_by_email.get('user_id') != user_id: | |
| return False, "Cet email est déjà utilisé par un autre compte." | |
| # Mise à jour de l'email si la validation passe | |
| user_data['email'] = email | |
| # 3. Mise à jour du nom d'utilisateur (pas de validation d'unicité assumée ici) | |
| user_data['username'] = username | |
| # 4. Mise à jour du mot de passe (si un nouveau est fourni) | |
| if new_password: | |
| if len(new_password) < 8: | |
| # Assurez-vous que cette limite est cohérente avec la fonction register_user | |
| return False, "Le nouveau mot de passe est trop court (min 8 caractères)." | |
| user_data['password_hash'] = generate_password_hash(new_password) | |
| # 5. Sauvegarde des données | |
| # Le row_id est stocké dans l'objet utilisateur après la première sauvegarde. | |
| baserow_row_id = user_data.get('baserow_row_id') | |
| if baserow_row_id is None: | |
| return False, "Erreur de configuration : ID de ligne Baserow manquant." | |
| if save_primary_user_data(user_data, commit_msg=f"feat: Mise à jour du profil utilisateur {user_id}"): | |
| return True, "Votre profil a été mis à jour avec succès." | |
| else: | |
| return False, "Une erreur s'est produite lors de la sauvegarde du profil." | |
| def register_end_user(client_id: str, username: str, email: str, password: str, confirm_password: str) -> tuple[Optional[str], str, Optional[Dict]]: | |
| """ | |
| Enregistre un nouvel utilisateur final (End User) lié à un utilisateur client (Primary User). | |
| """ | |
| if password != confirm_password: | |
| return None, "Les mots de passe ne correspondent pas.", None | |
| if len(password) < 8: | |
| return None, "Le mot de passe doit contenir au moins 8 caractères.", None | |
| # Vérification de l'unicité de l'email UNIQUEMENT pour ce client et pour le type 'End' | |
| existing_user_by_email = load_primary_user_data( | |
| filters={ | |
| FIELD_EMAIL: email, | |
| FIELD_USER_TYPE: 'End', | |
| FIELD_CLIENT_ID: client_id | |
| } | |
| ) | |
| if existing_user_by_email: | |
| return None, "Cet email est déjà utilisé pour votre service.", None | |
| # 1. Création des données de l'utilisateur final | |
| user_id = str(uuid.uuid4()) | |
| password_hash = generate_password_hash(password) | |
| new_user_data = { | |
| FIELD_ID: user_id, | |
| FIELD_EMAIL: email, | |
| FIELD_USERNAME: username, | |
| FIELD_PASSWORD_HASH: password_hash, | |
| FIELD_DATE_CREATION: datetime.now().isoformat(), | |
| FIELD_USER_TYPE: 'End', # <-- IMPORTANT: Marquer comme utilisateur final | |
| FIELD_CLIENT_ID: client_id, # <-- IMPORTANT: Lier au client principal | |
| FIELD_PLAN: PLANS_CONFIG['free']['baserow_value'], # Plan par défaut | |
| } | |
| # 2. Sauvegarde dans Baserow (dans la même table) | |
| success, row_id = save_primary_user_data(new_user_data, is_new=True) | |
| if success: | |
| new_user_data['baserow_row_id'] = row_id | |
| return user_id, "Inscription réussie !", new_user_data | |
| else: | |
| return None, f"Erreur lors de l'enregistrement dans la base de données. ID: {user_id}", None | |
| def login_end_user(client_id: str, email: str, password: str) -> tuple[Optional[str], str, Optional[Dict]]: | |
| """ | |
| Connecte un utilisateur final (End User) lié à un client donné. | |
| """ | |
| user_data_list = load_primary_user_data( | |
| filters={ | |
| FIELD_EMAIL: email, | |
| FIELD_USER_TYPE: 'End', # Filtrer uniquement les utilisateurs finaux | |
| FIELD_CLIENT_ID: client_id # Filtrer par l'ID du client | |
| } | |
| ) | |
| if not user_data_list: | |
| return None, "Email ou mot de passe invalide.", None | |
| user_data = user_data_list[0] | |
| if check_password_hash(user_data.get(FIELD_PASSWORD_HASH), password): | |
| return user_data.get(FIELD_ID), "Connexion réussie.", user_data | |
| else: | |
| return None, "Email ou mot de passe invalide.", None | |
| def reset_end_user_password(client_id: str, email: str, new_password: str, confirm_password: str) -> tuple[bool, str]: | |
| """ | |
| Réinitialise le mot de passe d'un utilisateur final (End User) lié à un client donné. | |
| """ | |
| if new_password != confirm_password: | |
| return False, "Les mots de passe ne correspondent pas." | |
| if len(new_password) < 8: | |
| return False, "Le nouveau mot de passe est trop court (min 8 caractères)." | |
| user_data_list = load_primary_user_data( | |
| filters={ | |
| FIELD_EMAIL: email, | |
| FIELD_USER_TYPE: 'End', | |
| FIELD_CLIENT_ID: client_id | |
| } | |
| ) | |
| if not user_data_list: | |
| return False, "Utilisateur non trouvé." | |
| user_data = user_data_list[0] | |
| # Mise à jour du mot de passe | |
| user_data[FIELD_PASSWORD_HASH] = generate_password_hash(new_password) | |
| baserow_row_id = user_data.get('baserow_row_id') | |
| if baserow_row_id is None: | |
| return False, "Erreur de configuration: ID de ligne Baserow manquant." | |
| success, _ = save_primary_user_data(user_data, commit_to_baserow=True) | |
| if success: | |
| return True, "Votre mot de passe a été réinitialisé avec succès." | |
| else: | |
| return False, "Une erreur s'est produite lors de la réinitialisation du mot de passe." |