Mailix / auth_backend.py
ernestmindres's picture
Update auth_backend.py
91d162f verified
# auth_backend.py
import json
import uuid
import secrets
import string
import sys # Nécessaire pour print(..., file=sys.stderr)
from datetime import datetime
from werkzeug.security import generate_password_hash, check_password_hash
# --- NOUVEL IMPORT: Migration vers Baserow ---
# Ces fonctions sont maintenant implémentées dans baserow_storage.py
from baserow_storage import (
# Fonctions de lecture/écriture pour les utilisateurs principaux (simulant l'ancienne API)
get_client_user_by_api_key,
load_primary_user_data,
save_primary_user_data,
# Fonctions de recherche indexées (nouvelles et plus efficaces)
get_user_by_email,
get_client_user_by_api_key,
FIELD_USER_TYPE,
FIELD_CLIENT_ID,
)
from flask import session
from config import PLANS_CONFIG
from typing import Optional, Dict
# ----------------------------------------------------------------------
# --- Fonctions Utilitaires et de Configuration ---
# ----------------------------------------------------------------------
def get_plan_limit(plan: str) -> float:
"""Retourne la limite de compte pour un plan donné."""
return PLANS_CONFIG.get(plan, {}).get("limit", PLANS_CONFIG["free"]["limit"])
def get_plan_details(plan_id: str) -> Optional[Dict]:
"""Retourne les détails complets d'un plan à partir de son ID."""
return PLANS_CONFIG.get(plan_id)
def generate_api_key(length: int = 32) -> str:
"""Génère une clé API sécurisée."""
chars = string.ascii_letters + string.digits
return ''.join(secrets.choice(chars) for _ in range(length))
# ----------------------------------------------------------------------
# --- Fonctions d'Authentification WEB (Primary Users) ---
# ----------------------------------------------------------------------
def register_user(username: str, email: str, password: str, confirm_password: str, security_question: str, security_answer: str) -> tuple[Optional[str], str, Optional[Dict]]:
"""
Tente d'enregistrer un nouvel utilisateur principal.
Retourne l'ID utilisateur (str), un message (str), et les données utilisateur complètes (Dict) ou None.
"""
email = email.lower().strip()
# Validation du formulaire
if not all([username, email, password, confirm_password, security_question, security_answer]):
# Retourne (ID, message, Data)
return None, "Tous les champs sont requis.", None
if password != confirm_password:
return None, "Les mots de passe ne correspondent pas.", None
if len(password) < 8:
return None, "Le mot de passe est trop court (min 8 caractères).", None
# 1. Vérification de l'existence de l'utilisateur par email
if get_user_by_email(email):
return None, "Un compte avec cette adresse e-mail existe déjà.", None
# 2. Hachage des données
password_hash = generate_password_hash(password)
security_answer_hash = generate_password_hash(security_answer.lower())
# 3. Création des données utilisateur
user_id = str(uuid.uuid4())
# Ligne optimisée: on génère les 5 clés en une seule liste
api_keys = [generate_api_key() for _ in range(5)]
new_user = {
'user_id': user_id,
'username': username,
'email': email,
'password_hash': password_hash,
# ASSIGNATION DES 5 CLÉS API AUX CHAMPS CORRESPONDANTS
'api_key': api_keys[0],
'api_key_2': api_keys[1],
'api_key_3': api_keys[2],
'api_key_4': api_keys[3],
'api_key_5': api_keys[4],
'security_question': security_question,
'security_answer_hash': security_answer_hash,
'plan_id': PLANS_CONFIG['free']['baserow_value'],
'stripe_subscription_id': None, # Pas d'abonnement Stripe au début
'date_creation': datetime.now().isoformat(),
'date_plan_start': datetime.now().isoformat(),
'api_calls_month': 0,
'status': 'Active',
'baserow_row_id': None, # Sera rempli par la fonction save_primary_user_data si c'est une création
# Note: D'autres champs pourraient être nécessaires ici, selon les besoins non-vus
}
# 4. Sauvegarde dans Baserow
# La fonction save_primary_user_data mettra à jour 'baserow_row_id' dans new_user si la création est réussie.
success = save_primary_user_data(new_user, commit_msg=f"feat: Création de l'utilisateur {user_id} ({email})")
if success:
# Retourne : ID utilisateur, message de succès, Dictionnaire utilisateur complet
return user_id, "Inscription réussie. Vous pouvez maintenant vous connecter.", new_user
else:
# Échec de l'écriture dans la BDD (Baserow)
# Retourne : None, message d'erreur, None
return None, "Erreur interne lors de l'enregistrement de l'utilisateur.", None
def login_user(username_or_email: str, password: str) -> tuple[Optional[str], str, Optional[Dict]]: # <-- NOUVEAU: Ajout de Optional[Dict] dans le type hint
"""
Tente de connecter un utilisateur principal.
Retourne l'ID utilisateur (str), un message (str) et les données utilisateur (Dict).
"""
username_or_email = username_or_email.lower().strip()
# 1. Recherche de l'utilisateur par email (Utilisation de la nouvelle fonction rapide Baserow)
user = get_user_by_email(username_or_email)
if user:
# 2. Vérification du mot de passe
if check_password_hash(user['password_hash'], password):
session['user_id'] = user['user_id']
# CORRECTION : Retourne 3 valeurs : ID, Message, Données Utilisateur
return user['user_id'], "Connexion réussie.", user
# CORRECTION : Retourne 3 valeurs : None ID, Message, None Data
return None, "Email/Nom d'utilisateur ou mot de passe invalide.", None
def get_user_by_id(user_id: str) -> Optional[Dict]:
"""
Récupère un utilisateur principal par son ID.
Utilise la fonction load_primary_user_data (qui est get_user_by_id dans Baserow).
"""
return load_primary_user_data(user_id)
def get_user_by_api_key(api_key: str) -> Optional[Dict]:
"""
Récupère un utilisateur principal par sa Clé API (utilisé par le décorateur).
Utilise la nouvelle fonction indexée et rapide de Baserow.
"""
return get_client_user_by_api_key(api_key)
def reset_password_via_security_question(username_or_email: str, question: str, answer: str, new_password: str) -> tuple[bool, str]:
"""Réinitialise le mot de passe via la question de sécurité."""
username_or_email = username_or_email.lower().strip()
# Validation du mot de passe
if len(new_password) < 8:
return False, "Le nouveau mot de passe est trop court (min 8 caractères)."
# 1. Recherche de l'utilisateur
user = get_user_by_email(username_or_email)
if not user:
return False, "Utilisateur introuvable."
# 2. Vérification de la question/réponse
if user.get('security_question') != question:
return False, "Question de sécurité incorrecte."
if not check_password_hash(user.get('security_answer_hash', ''), answer.lower()):
return False, "Réponse de sécurité incorrecte."
# 3. Hachage du nouveau mot de passe
new_hashed_password = generate_password_hash(new_password)
# 4. Mise à jour et sauvegarde en BDD (Baserow)
user['password_hash'] = new_hashed_password
success = save_primary_user_data(user, commit_msg=f"feat: Réinitialisation MDP utilisateur {user['user_id']}")
if success:
return True, "Mot de passe réinitialisé avec succès."
else:
return False, "Erreur interne lors de la mise à jour du mot de passe."
def update_user_plan(user_id: str, new_plan_id: str, stripe_subscription_id: Optional[str]) -> bool:
"""Met à jour le plan et l'ID d'abonnement Stripe pour un utilisateur."""
user = get_user_by_id(user_id)
if not user:
print(f"Erreur: Utilisateur {user_id} non trouvé pour la mise à jour du plan.", file=sys.stderr)
return False
user['plan_id'] = new_plan_id
user['stripe_subscription_id'] = stripe_subscription_id
user['date_plan_start'] = datetime.now().isoformat()
success = save_primary_user_data(user, commit_msg=f"feat: Mise à jour du plan pour {user_id} vers {new_plan_id}")
return success
def update_user_profile(user_id: str, username: str, email: str, new_password: Optional[str] = None) -> tuple[bool, str]:
"""
Met à jour le profil de l'utilisateur principal (client).
user_id: L'UUID de l'utilisateur.
username: Le nouveau nom d'utilisateur.
email: Le nouvel email.
new_password: Le nouveau mot de passe (si fourni et non vide).
"""
# 1. Chargement des données existantes
user_data = load_primary_user_data(user_id)
if not user_data:
return False, "Erreur critique : Utilisateur introuvable pour la mise à jour."
original_email = user_data.get('email')
# 2. Validation et mise à jour de l'email
if email and email != original_email:
# Vérification si le nouvel email n'est pas déjà utilisé par un autre utilisateur
# Note: get_user_by_email retourne l'objet utilisateur, pas juste l'ID.
existing_user_by_email = get_user_by_email(email)
# S'il trouve un utilisateur ET que son ID ne correspond pas à l'utilisateur actuel,
# l'email est déjà pris.
if existing_user_by_email and existing_user_by_email.get('user_id') != user_id:
return False, "Cet email est déjà utilisé par un autre compte."
# Mise à jour de l'email si la validation passe
user_data['email'] = email
# 3. Mise à jour du nom d'utilisateur (pas de validation d'unicité assumée ici)
user_data['username'] = username
# 4. Mise à jour du mot de passe (si un nouveau est fourni)
if new_password:
if len(new_password) < 8:
# Assurez-vous que cette limite est cohérente avec la fonction register_user
return False, "Le nouveau mot de passe est trop court (min 8 caractères)."
user_data['password_hash'] = generate_password_hash(new_password)
# 5. Sauvegarde des données
# Le row_id est stocké dans l'objet utilisateur après la première sauvegarde.
baserow_row_id = user_data.get('baserow_row_id')
if baserow_row_id is None:
return False, "Erreur de configuration : ID de ligne Baserow manquant."
if save_primary_user_data(user_data, commit_msg=f"feat: Mise à jour du profil utilisateur {user_id}"):
return True, "Votre profil a été mis à jour avec succès."
else:
return False, "Une erreur s'est produite lors de la sauvegarde du profil."
def register_end_user(client_id: str, username: str, email: str, password: str, confirm_password: str) -> tuple[Optional[str], str, Optional[Dict]]:
"""
Enregistre un nouvel utilisateur final (End User) lié à un utilisateur client (Primary User).
"""
if password != confirm_password:
return None, "Les mots de passe ne correspondent pas.", None
if len(password) < 8:
return None, "Le mot de passe doit contenir au moins 8 caractères.", None
# Vérification de l'unicité de l'email UNIQUEMENT pour ce client et pour le type 'End'
existing_user_by_email = load_primary_user_data(
filters={
FIELD_EMAIL: email,
FIELD_USER_TYPE: 'End',
FIELD_CLIENT_ID: client_id
}
)
if existing_user_by_email:
return None, "Cet email est déjà utilisé pour votre service.", None
# 1. Création des données de l'utilisateur final
user_id = str(uuid.uuid4())
password_hash = generate_password_hash(password)
new_user_data = {
FIELD_ID: user_id,
FIELD_EMAIL: email,
FIELD_USERNAME: username,
FIELD_PASSWORD_HASH: password_hash,
FIELD_DATE_CREATION: datetime.now().isoformat(),
FIELD_USER_TYPE: 'End', # <-- IMPORTANT: Marquer comme utilisateur final
FIELD_CLIENT_ID: client_id, # <-- IMPORTANT: Lier au client principal
FIELD_PLAN: PLANS_CONFIG['free']['baserow_value'], # Plan par défaut
}
# 2. Sauvegarde dans Baserow (dans la même table)
success, row_id = save_primary_user_data(new_user_data, is_new=True)
if success:
new_user_data['baserow_row_id'] = row_id
return user_id, "Inscription réussie !", new_user_data
else:
return None, f"Erreur lors de l'enregistrement dans la base de données. ID: {user_id}", None
def login_end_user(client_id: str, email: str, password: str) -> tuple[Optional[str], str, Optional[Dict]]:
"""
Connecte un utilisateur final (End User) lié à un client donné.
"""
user_data_list = load_primary_user_data(
filters={
FIELD_EMAIL: email,
FIELD_USER_TYPE: 'End', # Filtrer uniquement les utilisateurs finaux
FIELD_CLIENT_ID: client_id # Filtrer par l'ID du client
}
)
if not user_data_list:
return None, "Email ou mot de passe invalide.", None
user_data = user_data_list[0]
if check_password_hash(user_data.get(FIELD_PASSWORD_HASH), password):
return user_data.get(FIELD_ID), "Connexion réussie.", user_data
else:
return None, "Email ou mot de passe invalide.", None
def reset_end_user_password(client_id: str, email: str, new_password: str, confirm_password: str) -> tuple[bool, str]:
"""
Réinitialise le mot de passe d'un utilisateur final (End User) lié à un client donné.
"""
if new_password != confirm_password:
return False, "Les mots de passe ne correspondent pas."
if len(new_password) < 8:
return False, "Le nouveau mot de passe est trop court (min 8 caractères)."
user_data_list = load_primary_user_data(
filters={
FIELD_EMAIL: email,
FIELD_USER_TYPE: 'End',
FIELD_CLIENT_ID: client_id
}
)
if not user_data_list:
return False, "Utilisateur non trouvé."
user_data = user_data_list[0]
# Mise à jour du mot de passe
user_data[FIELD_PASSWORD_HASH] = generate_password_hash(new_password)
baserow_row_id = user_data.get('baserow_row_id')
if baserow_row_id is None:
return False, "Erreur de configuration: ID de ligne Baserow manquant."
success, _ = save_primary_user_data(user_data, commit_to_baserow=True)
if success:
return True, "Votre mot de passe a été réinitialisé avec succès."
else:
return False, "Une erreur s'est produite lors de la réinitialisation du mot de passe."