# auth_backend.py import json import uuid import secrets import string import sys # Nécessaire pour print(..., file=sys.stderr) from datetime import datetime from werkzeug.security import generate_password_hash, check_password_hash # --- NOUVEL IMPORT: Migration vers Baserow --- # Ces fonctions sont maintenant implémentées dans baserow_storage.py from baserow_storage import ( # Fonctions de lecture/écriture pour les utilisateurs principaux (simulant l'ancienne API) get_client_user_by_api_key, get_end_user_by_email, load_primary_user_data, save_primary_user_data, # Fonctions de recherche indexées (nouvelles et plus efficaces) get_user_by_email, get_client_user_by_api_key, # Fonctions de lecture/écriture pour les utilisateurs finaux (simulant l'ancienne API) load_end_user_data, save_end_user_data, # load_users_data est désormais obsolète et retiré. ) from flask import session from config import PLANS_CONFIG from typing import Optional, Dict # ---------------------------------------------------------------------- # --- Fonctions Utilitaires et de Configuration --- # ---------------------------------------------------------------------- def get_plan_limit(plan: str) -> float: """Retourne la limite de compte pour un plan donné.""" return PLANS_CONFIG.get(plan, {}).get("limit", PLANS_CONFIG["free"]["limit"]) def get_plan_details(plan_id: str) -> Optional[Dict]: """Retourne les détails complets d'un plan à partir de son ID.""" return PLANS_CONFIG.get(plan_id) def generate_api_key(length: int = 32) -> str: """Génère une clé API sécurisée.""" chars = string.ascii_letters + string.digits return ''.join(secrets.choice(chars) for _ in range(length)) # ---------------------------------------------------------------------- # --- Fonctions d'Authentification WEB (Primary Users) --- # ---------------------------------------------------------------------- def register_user(username: str, email: str, password: str, confirm_password: str, security_question: str, security_answer: str) -> tuple[Optional[str], str, Optional[Dict]]: """ Tente d'enregistrer un nouvel utilisateur principal. Retourne l'ID utilisateur (str), un message (str), et les données utilisateur complètes (Dict) ou None. """ email = email.lower().strip() # Validation du formulaire if not all([username, email, password, confirm_password, security_question, security_answer]): # Retourne (ID, message, Data) return None, "Tous les champs sont requis.", None if password != confirm_password: return None, "Les mots de passe ne correspondent pas.", None if len(password) < 8: return None, "Le mot de passe est trop court (min 8 caractères).", None # 1. Vérification de l'existence de l'utilisateur par email if get_user_by_email(email): return None, "Un compte avec cette adresse e-mail existe déjà.", None # 2. Hachage des données password_hash = generate_password_hash(password) security_answer_hash = generate_password_hash(security_answer.lower()) # 3. Création des données utilisateur user_id = str(uuid.uuid4()) # Ligne optimisée: on génère les 5 clés en une seule liste api_keys = [generate_api_key() for _ in range(5)] new_user = { 'user_id': user_id, 'username': username, 'email': email, 'password_hash': password_hash, # ASSIGNATION DES 5 CLÉS API AUX CHAMPS CORRESPONDANTS 'api_key': api_keys[0], 'api_key_2': api_keys[1], 'api_key_3': api_keys[2], 'api_key_4': api_keys[3], 'api_key_5': api_keys[4], 'security_question': security_question, 'security_answer_hash': security_answer_hash, 'plan_id': PLANS_CONFIG['free']['baserow_value'], 'stripe_subscription_id': None, # Pas d'abonnement Stripe au début 'date_creation': datetime.now().isoformat(), 'date_plan_start': datetime.now().isoformat(), 'api_calls_month': 0, 'status': 'Active', 'baserow_row_id': None, # Sera rempli par la fonction save_primary_user_data si c'est une création # Note: D'autres champs pourraient être nécessaires ici, selon les besoins non-vus } # 4. Sauvegarde dans Baserow # La fonction save_primary_user_data mettra à jour 'baserow_row_id' dans new_user si la création est réussie. success = save_primary_user_data(new_user, commit_msg=f"feat: Création de l'utilisateur {user_id} ({email})") if success: # Retourne : ID utilisateur, message de succès, Dictionnaire utilisateur complet return user_id, "Inscription réussie. Vous pouvez maintenant vous connecter.", new_user else: # Échec de l'écriture dans la BDD (Baserow) # Retourne : None, message d'erreur, None return None, "Erreur interne lors de l'enregistrement de l'utilisateur.", None def login_user(username_or_email: str, password: str) -> tuple[Optional[str], str, Optional[Dict]]: # <-- NOUVEAU: Ajout de Optional[Dict] dans le type hint """ Tente de connecter un utilisateur principal. Retourne l'ID utilisateur (str), un message (str) et les données utilisateur (Dict). """ username_or_email = username_or_email.lower().strip() # 1. Recherche de l'utilisateur par email (Utilisation de la nouvelle fonction rapide Baserow) user = get_user_by_email(username_or_email) if user: # 2. Vérification du mot de passe if check_password_hash(user['password_hash'], password): session['user_id'] = user['user_id'] # CORRECTION : Retourne 3 valeurs : ID, Message, Données Utilisateur return user['user_id'], "Connexion réussie.", user # CORRECTION : Retourne 3 valeurs : None ID, Message, None Data return None, "Email/Nom d'utilisateur ou mot de passe invalide.", None def get_user_by_id(user_id: str) -> Optional[Dict]: """ Récupère un utilisateur principal par son ID. Utilise la fonction load_primary_user_data (qui est get_user_by_id dans Baserow). """ return load_primary_user_data(user_id) def get_user_by_api_key(api_key: str) -> Optional[Dict]: """ Récupère un utilisateur principal par sa Clé API (utilisé par le décorateur). Utilise la nouvelle fonction indexée et rapide de Baserow. """ return get_client_user_by_api_key(api_key) def reset_password_via_security_question(username_or_email: str, question: str, answer: str, new_password: str) -> tuple[bool, str]: """Réinitialise le mot de passe via la question de sécurité.""" username_or_email = username_or_email.lower().strip() # Validation du mot de passe if len(new_password) < 8: return False, "Le nouveau mot de passe est trop court (min 8 caractères)." # 1. Recherche de l'utilisateur user = get_user_by_email(username_or_email) if not user: return False, "Utilisateur introuvable." # 2. Vérification de la question/réponse if user.get('security_question') != question: return False, "Question de sécurité incorrecte." if not check_password_hash(user.get('security_answer_hash', ''), answer.lower()): return False, "Réponse de sécurité incorrecte." # 3. Hachage du nouveau mot de passe new_hashed_password = generate_password_hash(new_password) # 4. Mise à jour et sauvegarde en BDD (Baserow) user['password_hash'] = new_hashed_password success = save_primary_user_data(user, commit_msg=f"feat: Réinitialisation MDP utilisateur {user['user_id']}") if success: return True, "Mot de passe réinitialisé avec succès." else: return False, "Erreur interne lors de la mise à jour du mot de passe." def update_user_plan(user_id: str, new_plan_id: str, stripe_subscription_id: Optional[str]) -> bool: """Met à jour le plan et l'ID d'abonnement Stripe pour un utilisateur.""" user = get_user_by_id(user_id) if not user: print(f"Erreur: Utilisateur {user_id} non trouvé pour la mise à jour du plan.", file=sys.stderr) return False user['plan_id'] = new_plan_id user['stripe_subscription_id'] = stripe_subscription_id user['date_plan_start'] = datetime.now().isoformat() success = save_primary_user_data(user, commit_msg=f"feat: Mise à jour du plan pour {user_id} vers {new_plan_id}") return success # ---------------------------------------------------------------------- # --- Fonctions API (End Users) --- # ---------------------------------------------------------------------- def register_end_user(client_user_id: str, email: str, username: str, password: str, security_question: Optional[str] = None, security_answer: Optional[str] = None, identifier: Optional[str] = None) -> tuple[Optional[str], str, Optional[Dict]]: """ Tente d'enregistrer un nouvel utilisateur final pour un client donné. Retourne l'ID utilisateur final (str), un message (str), et les données utilisateur complètes (Dict) ou None. """ email = email.lower().strip() # 1. Validation de base if not all([client_user_id, email, username, password]): return None, "Les champs Client ID, Email, Nom d'utilisateur et Mot de passe sont requis.", None if len(password) < 8: return None, "Le mot de passe est trop court (min 8 caractères).", None # 2. Vérification de l'unicité de l'utilisateur final par email (scopé par client) if get_end_user_by_email(client_user_id, email): return None, "Un utilisateur final avec cette adresse e-mail existe déjà pour ce client.", None # 3. Hachage des données password_hash = generate_password_hash(password) security_answer_hash = generate_password_hash(security_answer.lower()) if security_answer else None # 4. Création des données end_user_id = str(uuid.uuid4()) new_end_user = { 'end_user_id': end_user_id, 'client_user_id': client_user_id, # ID du client principal pour le lien 'identifier': identifier or email, 'email': email, 'username': username, 'password_hash': password_hash, 'security_question': security_question, 'security_answer_hash': security_answer_hash, 'status': 'Active', 'metadata': '{}', # Initialiser les métadonnées 'date_creation': datetime.now().isoformat(), 'baserow_row_id': None, } # 5. Sauvegarde dans Baserow success = save_end_user_data(new_end_user, commit_msg=f"feat: Création de l'utilisateur final {end_user_id}") if success: return end_user_id, "Inscription de l'utilisateur final réussie.", new_end_user else: return None, "Erreur interne lors de l'enregistrement de l'utilisateur final.", None def login_end_user(client_user_id: str, email: str, password: str) -> tuple[Optional[str], str, Optional[Dict]]: """ Tente de connecter un utilisateur final (End User) sous l'autorité d'un client principal (Primary User). Retourne l'ID utilisateur final (str), un message (str) et les données utilisateur (Dict). """ email = email.lower().strip() # 1. Recherche de l'utilisateur final par email et client ID end_user = get_end_user_by_email(client_user_id, email) if end_user: # 2. Vérification du mot de passe if check_password_hash(end_user['password_hash'], password): # Succès de la connexion. return end_user['end_user_id'], "Connexion utilisateur final réussie.", end_user # Échec de l'authentification return None, "Email ou mot de passe utilisateur final invalide pour ce client.", None def reset_end_user_password_by_client(client_user_id: str, end_user_id: str, new_password: str) -> tuple[bool, str]: """ Permet à un client principal (Primary User) de réinitialiser le mot de passe d'un de ses utilisateurs finaux (End User) par son ID (API client-side). """ # 1. Validation du mot de passe if len(new_password) < 8: return False, "Le nouveau mot de passe est trop court (min 8 caractères)." # 2. Charger les données de l'utilisateur final, en s'assurant qu'il appartient bien au client. end_user_data = load_end_user_data(client_user_id, end_user_id) if not end_user_data: # L'utilisateur final n'existe pas ou n'est pas lié à ce client_user_id return False, "Utilisateur final introuvable ou non autorisé (n'appartient pas à ce client)." # 3. Hachage du nouveau mot de passe new_password_hash = generate_password_hash(new_password) # 4. Mise à jour des données end_user_data['password_hash'] = new_password_hash # 5. Sauvegarde des données success = save_end_user_data(end_user_data, commit_msg=f"action: Réinitialisation du mot de passe de l'utilisateur final {end_user_id} par le client {client_user_id}") if success: return True, "Mot de passe de l'utilisateur final réinitialisé avec succès." else: return False, "Erreur lors de la sauvegarde du nouveau mot de passe de l'utilisateur final." def update_user_profile(user_id: str, username: str, email: str, new_password: Optional[str] = None) -> tuple[bool, str]: """ Met à jour le profil de l'utilisateur principal (client). user_id: L'UUID de l'utilisateur. username: Le nouveau nom d'utilisateur. email: Le nouvel email. new_password: Le nouveau mot de passe (si fourni et non vide). """ # 1. Chargement des données existantes user_data = load_primary_user_data(user_id) if not user_data: return False, "Erreur critique : Utilisateur introuvable pour la mise à jour." original_email = user_data.get('email') # 2. Validation et mise à jour de l'email if email and email != original_email: # Vérification si le nouvel email n'est pas déjà utilisé par un autre utilisateur # Note: get_user_by_email retourne l'objet utilisateur, pas juste l'ID. existing_user_by_email = get_user_by_email(email) # S'il trouve un utilisateur ET que son ID ne correspond pas à l'utilisateur actuel, # l'email est déjà pris. if existing_user_by_email and existing_user_by_email.get('user_id') != user_id: return False, "Cet email est déjà utilisé par un autre compte." # Mise à jour de l'email si la validation passe user_data['email'] = email # 3. Mise à jour du nom d'utilisateur (pas de validation d'unicité assumée ici) user_data['username'] = username # 4. Mise à jour du mot de passe (si un nouveau est fourni) if new_password: if len(new_password) < 8: # Assurez-vous que cette limite est cohérente avec la fonction register_user return False, "Le nouveau mot de passe est trop court (min 8 caractères)." user_data['password_hash'] = generate_password_hash(new_password) # 5. Sauvegarde des données # Le row_id est stocké dans l'objet utilisateur après la première sauvegarde. baserow_row_id = user_data.get('baserow_row_id') if baserow_row_id is None: return False, "Erreur de configuration : ID de ligne Baserow manquant." if save_primary_user_data(user_data, commit_msg=f"feat: Mise à jour du profil utilisateur {user_id}"): return True, "Votre profil a été mis à jour avec succès." else: return False, "Une erreur s'est produite lors de la sauvegarde du profil."