# auth_backend.py import json import uuid import secrets import string import sys # Nécessaire pour print(..., file=sys.stderr) from datetime import datetime from werkzeug.security import generate_password_hash, check_password_hash # --- NOUVEL IMPORT: Migration vers Baserow --- # Ces fonctions sont maintenant implémentées dans baserow_storage.py from baserow_storage import ( # Fonctions de lecture/écriture pour les utilisateurs principaux (simulant l'ancienne API) get_client_user_by_api_key, load_primary_user_data, save_primary_user_data, # Fonctions de recherche indexées (nouvelles et plus efficaces) get_user_by_email, get_client_user_by_api_key, FIELD_USER_TYPE, FIELD_CLIENT_ID, ) from flask import session from config import PLANS_CONFIG from typing import Optional, Dict # ---------------------------------------------------------------------- # --- Fonctions Utilitaires et de Configuration --- # ---------------------------------------------------------------------- def get_plan_limit(plan: str) -> float: """Retourne la limite de compte pour un plan donné.""" return PLANS_CONFIG.get(plan, {}).get("limit", PLANS_CONFIG["free"]["limit"]) def get_plan_details(plan_id: str) -> Optional[Dict]: """Retourne les détails complets d'un plan à partir de son ID.""" return PLANS_CONFIG.get(plan_id) def generate_api_key(length: int = 32) -> str: """Génère une clé API sécurisée.""" chars = string.ascii_letters + string.digits return ''.join(secrets.choice(chars) for _ in range(length)) # ---------------------------------------------------------------------- # --- Fonctions d'Authentification WEB (Primary Users) --- # ---------------------------------------------------------------------- def register_user(username: str, email: str, password: str, confirm_password: str, security_question: str, security_answer: str) -> tuple[Optional[str], str, Optional[Dict]]: """ Tente d'enregistrer un nouvel utilisateur principal. Retourne l'ID utilisateur (str), un message (str), et les données utilisateur complètes (Dict) ou None. """ email = email.lower().strip() # Validation du formulaire if not all([username, email, password, confirm_password, security_question, security_answer]): # Retourne (ID, message, Data) return None, "Tous les champs sont requis.", None if password != confirm_password: return None, "Les mots de passe ne correspondent pas.", None if len(password) < 8: return None, "Le mot de passe est trop court (min 8 caractères).", None # 1. Vérification de l'existence de l'utilisateur par email if get_user_by_email(email): return None, "Un compte avec cette adresse e-mail existe déjà.", None # 2. Hachage des données password_hash = generate_password_hash(password) security_answer_hash = generate_password_hash(security_answer.lower()) # 3. Création des données utilisateur user_id = str(uuid.uuid4()) # Ligne optimisée: on génère les 5 clés en une seule liste api_keys = [generate_api_key() for _ in range(5)] new_user = { 'user_id': user_id, 'username': username, 'email': email, 'password_hash': password_hash, # ASSIGNATION DES 5 CLÉS API AUX CHAMPS CORRESPONDANTS 'api_key': api_keys[0], 'api_key_2': api_keys[1], 'api_key_3': api_keys[2], 'api_key_4': api_keys[3], 'api_key_5': api_keys[4], 'security_question': security_question, 'security_answer_hash': security_answer_hash, 'plan_id': PLANS_CONFIG['free']['baserow_value'], 'stripe_subscription_id': None, # Pas d'abonnement Stripe au début 'date_creation': datetime.now().isoformat(), 'date_plan_start': datetime.now().isoformat(), 'api_calls_month': 0, 'status': 'Active', 'baserow_row_id': None, # Sera rempli par la fonction save_primary_user_data si c'est une création # Note: D'autres champs pourraient être nécessaires ici, selon les besoins non-vus } # 4. Sauvegarde dans Baserow # La fonction save_primary_user_data mettra à jour 'baserow_row_id' dans new_user si la création est réussie. success = save_primary_user_data(new_user, commit_msg=f"feat: Création de l'utilisateur {user_id} ({email})") if success: # Retourne : ID utilisateur, message de succès, Dictionnaire utilisateur complet return user_id, "Inscription réussie. Vous pouvez maintenant vous connecter.", new_user else: # Échec de l'écriture dans la BDD (Baserow) # Retourne : None, message d'erreur, None return None, "Erreur interne lors de l'enregistrement de l'utilisateur.", None def login_user(username_or_email: str, password: str) -> tuple[Optional[str], str, Optional[Dict]]: # <-- NOUVEAU: Ajout de Optional[Dict] dans le type hint """ Tente de connecter un utilisateur principal. Retourne l'ID utilisateur (str), un message (str) et les données utilisateur (Dict). """ username_or_email = username_or_email.lower().strip() # 1. Recherche de l'utilisateur par email (Utilisation de la nouvelle fonction rapide Baserow) user = get_user_by_email(username_or_email) if user: # 2. Vérification du mot de passe if check_password_hash(user['password_hash'], password): session['user_id'] = user['user_id'] # CORRECTION : Retourne 3 valeurs : ID, Message, Données Utilisateur return user['user_id'], "Connexion réussie.", user # CORRECTION : Retourne 3 valeurs : None ID, Message, None Data return None, "Email/Nom d'utilisateur ou mot de passe invalide.", None def get_user_by_id(user_id: str) -> Optional[Dict]: """ Récupère un utilisateur principal par son ID. Utilise la fonction load_primary_user_data (qui est get_user_by_id dans Baserow). """ return load_primary_user_data(user_id) def get_user_by_api_key(api_key: str) -> Optional[Dict]: """ Récupère un utilisateur principal par sa Clé API (utilisé par le décorateur). Utilise la nouvelle fonction indexée et rapide de Baserow. """ return get_client_user_by_api_key(api_key) def reset_password_via_security_question(username_or_email: str, question: str, answer: str, new_password: str) -> tuple[bool, str]: """Réinitialise le mot de passe via la question de sécurité.""" username_or_email = username_or_email.lower().strip() # Validation du mot de passe if len(new_password) < 8: return False, "Le nouveau mot de passe est trop court (min 8 caractères)." # 1. Recherche de l'utilisateur user = get_user_by_email(username_or_email) if not user: return False, "Utilisateur introuvable." # 2. Vérification de la question/réponse if user.get('security_question') != question: return False, "Question de sécurité incorrecte." if not check_password_hash(user.get('security_answer_hash', ''), answer.lower()): return False, "Réponse de sécurité incorrecte." # 3. Hachage du nouveau mot de passe new_hashed_password = generate_password_hash(new_password) # 4. Mise à jour et sauvegarde en BDD (Baserow) user['password_hash'] = new_hashed_password success = save_primary_user_data(user, commit_msg=f"feat: Réinitialisation MDP utilisateur {user['user_id']}") if success: return True, "Mot de passe réinitialisé avec succès." else: return False, "Erreur interne lors de la mise à jour du mot de passe." def update_user_plan(user_id: str, new_plan_id: str, stripe_subscription_id: Optional[str]) -> bool: """Met à jour le plan et l'ID d'abonnement Stripe pour un utilisateur.""" user = get_user_by_id(user_id) if not user: print(f"Erreur: Utilisateur {user_id} non trouvé pour la mise à jour du plan.", file=sys.stderr) return False user['plan_id'] = new_plan_id user['stripe_subscription_id'] = stripe_subscription_id user['date_plan_start'] = datetime.now().isoformat() success = save_primary_user_data(user, commit_msg=f"feat: Mise à jour du plan pour {user_id} vers {new_plan_id}") return success def update_user_profile(user_id: str, username: str, email: str, new_password: Optional[str] = None) -> tuple[bool, str]: """ Met à jour le profil de l'utilisateur principal (client). user_id: L'UUID de l'utilisateur. username: Le nouveau nom d'utilisateur. email: Le nouvel email. new_password: Le nouveau mot de passe (si fourni et non vide). """ # 1. Chargement des données existantes user_data = load_primary_user_data(user_id) if not user_data: return False, "Erreur critique : Utilisateur introuvable pour la mise à jour." original_email = user_data.get('email') # 2. Validation et mise à jour de l'email if email and email != original_email: # Vérification si le nouvel email n'est pas déjà utilisé par un autre utilisateur # Note: get_user_by_email retourne l'objet utilisateur, pas juste l'ID. existing_user_by_email = get_user_by_email(email) # S'il trouve un utilisateur ET que son ID ne correspond pas à l'utilisateur actuel, # l'email est déjà pris. if existing_user_by_email and existing_user_by_email.get('user_id') != user_id: return False, "Cet email est déjà utilisé par un autre compte." # Mise à jour de l'email si la validation passe user_data['email'] = email # 3. Mise à jour du nom d'utilisateur (pas de validation d'unicité assumée ici) user_data['username'] = username # 4. Mise à jour du mot de passe (si un nouveau est fourni) if new_password: if len(new_password) < 8: # Assurez-vous que cette limite est cohérente avec la fonction register_user return False, "Le nouveau mot de passe est trop court (min 8 caractères)." user_data['password_hash'] = generate_password_hash(new_password) # 5. Sauvegarde des données # Le row_id est stocké dans l'objet utilisateur après la première sauvegarde. baserow_row_id = user_data.get('baserow_row_id') if baserow_row_id is None: return False, "Erreur de configuration : ID de ligne Baserow manquant." if save_primary_user_data(user_data, commit_msg=f"feat: Mise à jour du profil utilisateur {user_id}"): return True, "Votre profil a été mis à jour avec succès." else: return False, "Une erreur s'est produite lors de la sauvegarde du profil." def register_end_user(client_id: str, username: str, email: str, password: str, confirm_password: str) -> tuple[Optional[str], str, Optional[Dict]]: """ Enregistre un nouvel utilisateur final (End User) lié à un utilisateur client (Primary User). """ if password != confirm_password: return None, "Les mots de passe ne correspondent pas.", None if len(password) < 8: return None, "Le mot de passe doit contenir au moins 8 caractères.", None # Vérification de l'unicité de l'email UNIQUEMENT pour ce client et pour le type 'End' existing_user_by_email = load_primary_user_data( filters={ FIELD_EMAIL: email, FIELD_USER_TYPE: 'End', FIELD_CLIENT_ID: client_id } ) if existing_user_by_email: return None, "Cet email est déjà utilisé pour votre service.", None # 1. Création des données de l'utilisateur final user_id = str(uuid.uuid4()) password_hash = generate_password_hash(password) new_user_data = { FIELD_ID: user_id, FIELD_EMAIL: email, FIELD_USERNAME: username, FIELD_PASSWORD_HASH: password_hash, FIELD_DATE_CREATION: datetime.now().isoformat(), FIELD_USER_TYPE: 'End', # <-- IMPORTANT: Marquer comme utilisateur final FIELD_CLIENT_ID: client_id, # <-- IMPORTANT: Lier au client principal FIELD_PLAN: PLANS_CONFIG['free']['baserow_value'], # Plan par défaut } # 2. Sauvegarde dans Baserow (dans la même table) success, row_id = save_primary_user_data(new_user_data, is_new=True) if success: new_user_data['baserow_row_id'] = row_id return user_id, "Inscription réussie !", new_user_data else: return None, f"Erreur lors de l'enregistrement dans la base de données. ID: {user_id}", None def login_end_user(client_id: str, email: str, password: str) -> tuple[Optional[str], str, Optional[Dict]]: """ Connecte un utilisateur final (End User) lié à un client donné. """ user_data_list = load_primary_user_data( filters={ FIELD_EMAIL: email, FIELD_USER_TYPE: 'End', # Filtrer uniquement les utilisateurs finaux FIELD_CLIENT_ID: client_id # Filtrer par l'ID du client } ) if not user_data_list: return None, "Email ou mot de passe invalide.", None user_data = user_data_list[0] if check_password_hash(user_data.get(FIELD_PASSWORD_HASH), password): return user_data.get(FIELD_ID), "Connexion réussie.", user_data else: return None, "Email ou mot de passe invalide.", None def reset_end_user_password(client_id: str, email: str, new_password: str, confirm_password: str) -> tuple[bool, str]: """ Réinitialise le mot de passe d'un utilisateur final (End User) lié à un client donné. """ if new_password != confirm_password: return False, "Les mots de passe ne correspondent pas." if len(new_password) < 8: return False, "Le nouveau mot de passe est trop court (min 8 caractères)." user_data_list = load_primary_user_data( filters={ FIELD_EMAIL: email, FIELD_USER_TYPE: 'End', FIELD_CLIENT_ID: client_id } ) if not user_data_list: return False, "Utilisateur non trouvé." user_data = user_data_list[0] # Mise à jour du mot de passe user_data[FIELD_PASSWORD_HASH] = generate_password_hash(new_password) baserow_row_id = user_data.get('baserow_row_id') if baserow_row_id is None: return False, "Erreur de configuration: ID de ligne Baserow manquant." success, _ = save_primary_user_data(user_data, commit_to_baserow=True) if success: return True, "Votre mot de passe a été réinitialisé avec succès." else: return False, "Une erreur s'est produite lors de la réinitialisation du mot de passe."