Spaces:
Build error
Build error
File size: 14,939 Bytes
bc3c8fb 91d162f bc3c8fb 91d162f |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 |
# auth_backend.py
import json
import uuid
import secrets
import string
import sys # Nécessaire pour print(..., file=sys.stderr)
from datetime import datetime
from werkzeug.security import generate_password_hash, check_password_hash
# --- NOUVEL IMPORT: Migration vers Baserow ---
# Ces fonctions sont maintenant implémentées dans baserow_storage.py
from baserow_storage import (
# Fonctions de lecture/écriture pour les utilisateurs principaux (simulant l'ancienne API)
get_client_user_by_api_key,
load_primary_user_data,
save_primary_user_data,
# Fonctions de recherche indexées (nouvelles et plus efficaces)
get_user_by_email,
get_client_user_by_api_key,
FIELD_USER_TYPE,
FIELD_CLIENT_ID,
)
from flask import session
from config import PLANS_CONFIG
from typing import Optional, Dict
# ----------------------------------------------------------------------
# --- Fonctions Utilitaires et de Configuration ---
# ----------------------------------------------------------------------
def get_plan_limit(plan: str) -> float:
"""Retourne la limite de compte pour un plan donné."""
return PLANS_CONFIG.get(plan, {}).get("limit", PLANS_CONFIG["free"]["limit"])
def get_plan_details(plan_id: str) -> Optional[Dict]:
"""Retourne les détails complets d'un plan à partir de son ID."""
return PLANS_CONFIG.get(plan_id)
def generate_api_key(length: int = 32) -> str:
"""Génère une clé API sécurisée."""
chars = string.ascii_letters + string.digits
return ''.join(secrets.choice(chars) for _ in range(length))
# ----------------------------------------------------------------------
# --- Fonctions d'Authentification WEB (Primary Users) ---
# ----------------------------------------------------------------------
def register_user(username: str, email: str, password: str, confirm_password: str, security_question: str, security_answer: str) -> tuple[Optional[str], str, Optional[Dict]]:
"""
Tente d'enregistrer un nouvel utilisateur principal.
Retourne l'ID utilisateur (str), un message (str), et les données utilisateur complètes (Dict) ou None.
"""
email = email.lower().strip()
# Validation du formulaire
if not all([username, email, password, confirm_password, security_question, security_answer]):
# Retourne (ID, message, Data)
return None, "Tous les champs sont requis.", None
if password != confirm_password:
return None, "Les mots de passe ne correspondent pas.", None
if len(password) < 8:
return None, "Le mot de passe est trop court (min 8 caractères).", None
# 1. Vérification de l'existence de l'utilisateur par email
if get_user_by_email(email):
return None, "Un compte avec cette adresse e-mail existe déjà.", None
# 2. Hachage des données
password_hash = generate_password_hash(password)
security_answer_hash = generate_password_hash(security_answer.lower())
# 3. Création des données utilisateur
user_id = str(uuid.uuid4())
# Ligne optimisée: on génère les 5 clés en une seule liste
api_keys = [generate_api_key() for _ in range(5)]
new_user = {
'user_id': user_id,
'username': username,
'email': email,
'password_hash': password_hash,
# ASSIGNATION DES 5 CLÉS API AUX CHAMPS CORRESPONDANTS
'api_key': api_keys[0],
'api_key_2': api_keys[1],
'api_key_3': api_keys[2],
'api_key_4': api_keys[3],
'api_key_5': api_keys[4],
'security_question': security_question,
'security_answer_hash': security_answer_hash,
'plan_id': PLANS_CONFIG['free']['baserow_value'],
'stripe_subscription_id': None, # Pas d'abonnement Stripe au début
'date_creation': datetime.now().isoformat(),
'date_plan_start': datetime.now().isoformat(),
'api_calls_month': 0,
'status': 'Active',
'baserow_row_id': None, # Sera rempli par la fonction save_primary_user_data si c'est une création
# Note: D'autres champs pourraient être nécessaires ici, selon les besoins non-vus
}
# 4. Sauvegarde dans Baserow
# La fonction save_primary_user_data mettra à jour 'baserow_row_id' dans new_user si la création est réussie.
success = save_primary_user_data(new_user, commit_msg=f"feat: Création de l'utilisateur {user_id} ({email})")
if success:
# Retourne : ID utilisateur, message de succès, Dictionnaire utilisateur complet
return user_id, "Inscription réussie. Vous pouvez maintenant vous connecter.", new_user
else:
# Échec de l'écriture dans la BDD (Baserow)
# Retourne : None, message d'erreur, None
return None, "Erreur interne lors de l'enregistrement de l'utilisateur.", None
def login_user(username_or_email: str, password: str) -> tuple[Optional[str], str, Optional[Dict]]: # <-- NOUVEAU: Ajout de Optional[Dict] dans le type hint
"""
Tente de connecter un utilisateur principal.
Retourne l'ID utilisateur (str), un message (str) et les données utilisateur (Dict).
"""
username_or_email = username_or_email.lower().strip()
# 1. Recherche de l'utilisateur par email (Utilisation de la nouvelle fonction rapide Baserow)
user = get_user_by_email(username_or_email)
if user:
# 2. Vérification du mot de passe
if check_password_hash(user['password_hash'], password):
session['user_id'] = user['user_id']
# CORRECTION : Retourne 3 valeurs : ID, Message, Données Utilisateur
return user['user_id'], "Connexion réussie.", user
# CORRECTION : Retourne 3 valeurs : None ID, Message, None Data
return None, "Email/Nom d'utilisateur ou mot de passe invalide.", None
def get_user_by_id(user_id: str) -> Optional[Dict]:
"""
Récupère un utilisateur principal par son ID.
Utilise la fonction load_primary_user_data (qui est get_user_by_id dans Baserow).
"""
return load_primary_user_data(user_id)
def get_user_by_api_key(api_key: str) -> Optional[Dict]:
"""
Récupère un utilisateur principal par sa Clé API (utilisé par le décorateur).
Utilise la nouvelle fonction indexée et rapide de Baserow.
"""
return get_client_user_by_api_key(api_key)
def reset_password_via_security_question(username_or_email: str, question: str, answer: str, new_password: str) -> tuple[bool, str]:
"""Réinitialise le mot de passe via la question de sécurité."""
username_or_email = username_or_email.lower().strip()
# Validation du mot de passe
if len(new_password) < 8:
return False, "Le nouveau mot de passe est trop court (min 8 caractères)."
# 1. Recherche de l'utilisateur
user = get_user_by_email(username_or_email)
if not user:
return False, "Utilisateur introuvable."
# 2. Vérification de la question/réponse
if user.get('security_question') != question:
return False, "Question de sécurité incorrecte."
if not check_password_hash(user.get('security_answer_hash', ''), answer.lower()):
return False, "Réponse de sécurité incorrecte."
# 3. Hachage du nouveau mot de passe
new_hashed_password = generate_password_hash(new_password)
# 4. Mise à jour et sauvegarde en BDD (Baserow)
user['password_hash'] = new_hashed_password
success = save_primary_user_data(user, commit_msg=f"feat: Réinitialisation MDP utilisateur {user['user_id']}")
if success:
return True, "Mot de passe réinitialisé avec succès."
else:
return False, "Erreur interne lors de la mise à jour du mot de passe."
def update_user_plan(user_id: str, new_plan_id: str, stripe_subscription_id: Optional[str]) -> bool:
"""Met à jour le plan et l'ID d'abonnement Stripe pour un utilisateur."""
user = get_user_by_id(user_id)
if not user:
print(f"Erreur: Utilisateur {user_id} non trouvé pour la mise à jour du plan.", file=sys.stderr)
return False
user['plan_id'] = new_plan_id
user['stripe_subscription_id'] = stripe_subscription_id
user['date_plan_start'] = datetime.now().isoformat()
success = save_primary_user_data(user, commit_msg=f"feat: Mise à jour du plan pour {user_id} vers {new_plan_id}")
return success
def update_user_profile(user_id: str, username: str, email: str, new_password: Optional[str] = None) -> tuple[bool, str]:
"""
Met à jour le profil de l'utilisateur principal (client).
user_id: L'UUID de l'utilisateur.
username: Le nouveau nom d'utilisateur.
email: Le nouvel email.
new_password: Le nouveau mot de passe (si fourni et non vide).
"""
# 1. Chargement des données existantes
user_data = load_primary_user_data(user_id)
if not user_data:
return False, "Erreur critique : Utilisateur introuvable pour la mise à jour."
original_email = user_data.get('email')
# 2. Validation et mise à jour de l'email
if email and email != original_email:
# Vérification si le nouvel email n'est pas déjà utilisé par un autre utilisateur
# Note: get_user_by_email retourne l'objet utilisateur, pas juste l'ID.
existing_user_by_email = get_user_by_email(email)
# S'il trouve un utilisateur ET que son ID ne correspond pas à l'utilisateur actuel,
# l'email est déjà pris.
if existing_user_by_email and existing_user_by_email.get('user_id') != user_id:
return False, "Cet email est déjà utilisé par un autre compte."
# Mise à jour de l'email si la validation passe
user_data['email'] = email
# 3. Mise à jour du nom d'utilisateur (pas de validation d'unicité assumée ici)
user_data['username'] = username
# 4. Mise à jour du mot de passe (si un nouveau est fourni)
if new_password:
if len(new_password) < 8:
# Assurez-vous que cette limite est cohérente avec la fonction register_user
return False, "Le nouveau mot de passe est trop court (min 8 caractères)."
user_data['password_hash'] = generate_password_hash(new_password)
# 5. Sauvegarde des données
# Le row_id est stocké dans l'objet utilisateur après la première sauvegarde.
baserow_row_id = user_data.get('baserow_row_id')
if baserow_row_id is None:
return False, "Erreur de configuration : ID de ligne Baserow manquant."
if save_primary_user_data(user_data, commit_msg=f"feat: Mise à jour du profil utilisateur {user_id}"):
return True, "Votre profil a été mis à jour avec succès."
else:
return False, "Une erreur s'est produite lors de la sauvegarde du profil."
def register_end_user(client_id: str, username: str, email: str, password: str, confirm_password: str) -> tuple[Optional[str], str, Optional[Dict]]:
"""
Enregistre un nouvel utilisateur final (End User) lié à un utilisateur client (Primary User).
"""
if password != confirm_password:
return None, "Les mots de passe ne correspondent pas.", None
if len(password) < 8:
return None, "Le mot de passe doit contenir au moins 8 caractères.", None
# Vérification de l'unicité de l'email UNIQUEMENT pour ce client et pour le type 'End'
existing_user_by_email = load_primary_user_data(
filters={
FIELD_EMAIL: email,
FIELD_USER_TYPE: 'End',
FIELD_CLIENT_ID: client_id
}
)
if existing_user_by_email:
return None, "Cet email est déjà utilisé pour votre service.", None
# 1. Création des données de l'utilisateur final
user_id = str(uuid.uuid4())
password_hash = generate_password_hash(password)
new_user_data = {
FIELD_ID: user_id,
FIELD_EMAIL: email,
FIELD_USERNAME: username,
FIELD_PASSWORD_HASH: password_hash,
FIELD_DATE_CREATION: datetime.now().isoformat(),
FIELD_USER_TYPE: 'End', # <-- IMPORTANT: Marquer comme utilisateur final
FIELD_CLIENT_ID: client_id, # <-- IMPORTANT: Lier au client principal
FIELD_PLAN: PLANS_CONFIG['free']['baserow_value'], # Plan par défaut
}
# 2. Sauvegarde dans Baserow (dans la même table)
success, row_id = save_primary_user_data(new_user_data, is_new=True)
if success:
new_user_data['baserow_row_id'] = row_id
return user_id, "Inscription réussie !", new_user_data
else:
return None, f"Erreur lors de l'enregistrement dans la base de données. ID: {user_id}", None
def login_end_user(client_id: str, email: str, password: str) -> tuple[Optional[str], str, Optional[Dict]]:
"""
Connecte un utilisateur final (End User) lié à un client donné.
"""
user_data_list = load_primary_user_data(
filters={
FIELD_EMAIL: email,
FIELD_USER_TYPE: 'End', # Filtrer uniquement les utilisateurs finaux
FIELD_CLIENT_ID: client_id # Filtrer par l'ID du client
}
)
if not user_data_list:
return None, "Email ou mot de passe invalide.", None
user_data = user_data_list[0]
if check_password_hash(user_data.get(FIELD_PASSWORD_HASH), password):
return user_data.get(FIELD_ID), "Connexion réussie.", user_data
else:
return None, "Email ou mot de passe invalide.", None
def reset_end_user_password(client_id: str, email: str, new_password: str, confirm_password: str) -> tuple[bool, str]:
"""
Réinitialise le mot de passe d'un utilisateur final (End User) lié à un client donné.
"""
if new_password != confirm_password:
return False, "Les mots de passe ne correspondent pas."
if len(new_password) < 8:
return False, "Le nouveau mot de passe est trop court (min 8 caractères)."
user_data_list = load_primary_user_data(
filters={
FIELD_EMAIL: email,
FIELD_USER_TYPE: 'End',
FIELD_CLIENT_ID: client_id
}
)
if not user_data_list:
return False, "Utilisateur non trouvé."
user_data = user_data_list[0]
# Mise à jour du mot de passe
user_data[FIELD_PASSWORD_HASH] = generate_password_hash(new_password)
baserow_row_id = user_data.get('baserow_row_id')
if baserow_row_id is None:
return False, "Erreur de configuration: ID de ligne Baserow manquant."
success, _ = save_primary_user_data(user_data, commit_to_baserow=True)
if success:
return True, "Votre mot de passe a été réinitialisé avec succès."
else:
return False, "Une erreur s'est produite lors de la réinitialisation du mot de passe." |