Mailix / app.py
ernestmindres's picture
Update app.py
bf1ee12 verified
# app.py
import requests
import json
import os
import sys
import io
import uuid
from functools import wraps
from datetime import datetime
from flask import Flask, request, jsonify, Response, session
from flask_cors import CORS
import baserow_storage # Assurez-vous que ceci est présent
import shutil
from flask_socketio import SocketIO
# NOUVEL IMPORT POUR LE STOCKAGE PERSISTANT HUGGING FACE
import huggingface_storage # <-- NOUVEL IMPORT
# NOUVEL IMPORT POUR LA GESTION DU TERMINAL
from terminal_manager import setup_terminal_events
# Importation des modules backend
from auth_backend import (
get_user_by_id,
login_user,
register_user,
generate_api_key,
get_plan_limit,
reset_password_via_security_question,
generate_password_hash,
)
from decorators import api_key_required # <-- NOUVEL IMPORT
# Importation des Blueprints
from web_routes import web_bp
from user_routes import user_bp
from billing_routes import billing_bp
from embed_routes import embed_bp
# Ajout d'une constante pour la taille maximale du contenu (16 Mo par défaut)
# Ceci corrige un potentiel NameError sur DEFAULT_MAX_CONTENT_LENGTH
DEFAULT_MAX_CONTENT_LENGTH = 16 * 1024 * 1024
# Dossier pour le stockage temporaire des fichiers HTML uploadés
# Nous utilisons un dossier 'temp_uploads' à la racine de l'application
UPLOAD_FOLDER = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'temp_uploads')
if not os.path.exists(UPLOAD_FOLDER):
os.makedirs(UPLOAD_FOLDER) # Créer le dossier s'il n'existe pas
# Extensions de fichiers autorisées (très important pour la sécurité)
ALLOWED_EXTENSIONS = {'html', 'htm'}
# --- Initialisation de l'Application Flask (DÉPLACÉ ICI pour corriger le NameError) ---
app = Flask(__name__)
app.config['HF_DATASET_REPO_ID'] = os.environ.get('HF_DATASET_REPO_ID')
# Maintenant, 'app' est défini et on peut lui appliquer des configurations
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER
app.config['ALLOWED_EXTENSIONS'] = ALLOWED_EXTENSIONS
from werkzeug.middleware.proxy_fix import ProxyFix
app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_proto=1, x_host=1, x_port=1, x_prefix=1)
# ------------------------------------------------------------------
# car Hugging Face utilise souvent Gunicorn avec ces workers.
socketio = SocketIO(app, cors_allowed_origins="*")
# --- Configuration des événements SocketIO pour le Terminal ---
setup_terminal_events(socketio)
# Configuration
app.secret_key = os.environ.get("FLASK_SECRET_KEY", "super_secret_dev_key")
app.config['MAX_CONTENT_LENGTH'] = DEFAULT_MAX_CONTENT_LENGTH
# Permettre les requêtes cross-origin (CORS)
CORS(app, supports_credentials=True, origins="*", allow_headers=["Content-Type", "X-User-API-Key"])
# Permettre les requêtes cross-origin pour l'API
CORS(app)
# --- Enregistrement des Blueprints (Nouveau) ---
app.register_blueprint(web_bp)
app.register_blueprint(user_bp)
app.register_blueprint(billing_bp)
app.register_blueprint(embed_bp) # <-- NOUVEL ENREGISTREMENT
# --- Décorateurs d'Authentification (Conservés) ---
def login_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if 'user_id' not in session:
# Redirection HTTP 302 vers la page de connexion pour les requêtes non-API
if not request.path.startswith('/api/'):
from flask import redirect, url_for
return redirect(url_for('user_bp.connexion'))
# Réponse JSON pour les API
return jsonify({"status": "Error", "message": "Accès non autorisé. Veuillez vous connecter.", "code": "AUTH_REQUIRED"}), 401
return f(*args, **kwargs)
return decorated_function
# --- Routes d'Authentification (API - Conservées) ---
@app.route("/api/register", methods=["POST"])
def register():
data = request.get_json()
username = data.get("username")
email = data.get("email")
password = data.get("password")
confirm_password = data.get("confirm_password")
security_question = data.get("security_question")
security_answer = data.get("security_answer")
# CORRECTION ICI: Déballage des 3 valeurs retournées par register_user
user_id, message, new_user_data = register_user(username, email, password, confirm_password, security_question, security_answer)
if user_id and new_user_data: # Vérifier l'ID et les données pour s'assurer du succès
session['user_id'] = user_id
# Réponse JSON pour l'API, incluant la clé API
return jsonify({
"message": message,
"status": "Success",
"user_id": user_id,
# On récupère la clé API directement des données utilisateur
"api_key": new_user_data.get("api_key")
}), 201
else:
# Échec de l'inscription (message d'erreur de register_user)
return jsonify({"message": message, "status": "Error"}), 400
@app.route("/api/login", methods=["POST"])
def login():
"""
Route de connexion de l'utilisateur principal.
Prend le nom d'utilisateur/email et le mot de passe.
"""
data = request.get_json()
username = data.get("username")
password = data.get("password")
# CORRECTION DE L'ERREUR :
# Nous déballons maintenant 3 valeurs (ID, Message, Données Utilisateur)
# car la fonction login_user() dans auth_backend.py a été modifiée pour
# retourner les 3 valeurs.
user_id, message, user_data = login_user(username, password)
# Note: user_data est la 3ème valeur (Dict des données utilisateur ou None)
if user_id and user_data:
# La connexion est réussie
session['user_id'] = user_id
# Réponse API avec la clé API de l'utilisateur pour les futures requêtes
return jsonify({
"message": message,
"status": "Success",
"user_id": user_id,
# On utilise les données utilisateur (user_data) que nous avons déjà récupérées
"api_key": user_data.get("api_key")
}), 200
else:
# La connexion a échoué (identifiants invalides ou autre erreur)
return jsonify({"message": message, "status": "Error"}), 401
@app.route("/api/logout", methods=["POST"])
def logout():
session.pop('user_id', None)
return jsonify({"message": "Déconnexion réussie.", "status": "Success"}), 200
@app.route("/api/forgot-password", methods=["POST"])
def forgot_password_api(): # Renommée pour éviter conflit avec la route HTML
data = request.get_json()
username_or_email = data.get("username_or_email")
security_answer = data.get("security_answer")
new_password = data.get("new_password")
if not username_or_email or not security_answer or not new_password:
return jsonify({"message": "Champs manquants.", "status": "Error"}), 400
success, message = reset_password_via_security_question(username_or_email, security_answer, new_password)
if success:
return jsonify({
"message": message,
"status": "Success"
}), 200
else:
return jsonify({
"message": message,
"status": "Error"
}), 400
# --- Routes de Gestion de Compte (API - Conservées) ---
@app.route("/api/user/generate-key", methods=["POST"])
@login_required
def generate_user_api_key():
user_id = session.get('user_id')
new_api_key = create_dynamic_api_key()
success, message = update_user_data(user_id, {"api_key": new_api_key})
if success:
return jsonify({
"message": "Clé API utilisateur générée et sauvegardée. Conservez-la en lieu sûr. **Utilisez-la via URL simple ('api_key=...') ou en-tête 'X-User-API-Key'.**",
"status": "Success",
"api_key": new_api_key
}), 200
else:
return jsonify({
"message": f"Erreur lors de la génération de la clé : {message}",
"status": "Error"
}), 500
@app.route("/api/user/update-info", methods=["POST"])
@login_required
def update_user_info():
user_id = session.get('user_id')
data = request.get_json()
updates = {}
if 'username' in data:
updates['username'] = data['username']
if 'email' in data:
updates['email'] = data['email']
if 'plan' in data:
updates['plan'] = data['plan']
if not updates:
return jsonify({
"message": "Aucune information à mettre à jour fournie.",
"status": "Error"
}), 400
success, message = update_user_data(user_id, updates)
if success:
return jsonify({
"message": message,
"status": "Success"
}), 200
else:
return jsonify({
"message": f"Échec de la mise à jour : {message}",
"status": "Error"
}), 400
@app.route("/api/user-info", methods=["GET"])
@api_key_required
def api_user_info(client_user):
"""
Route API pour récupérer les informations de l'utilisateur principal (client)
à partir de la clé API fournie.
Le 'client_user' est injecté par le décorateur api_key_required.
"""
# client_user est l'objet utilisateur complet injecté par le décorateur
# Sécurité : créer une copie et supprimer les données sensibles avant l'envoi
user_info_safe = client_user.copy()
user_info_safe.pop('password_hash', None)
user_info_safe.pop('security_answer_hash', None)
return jsonify({
"message": "Informations utilisateur récupérées avec succès.",
"status": "Success",
"user": user_info_safe
}), 200
@app.route("/api/health", methods=["GET"])
def health_check():
"""Vérifie l'état du service en utilisant le statut Baserow."""
# 1. Tenter d'obtenir le statut Baserow réel
try:
# Appelle la fonction de baserow_storage pour vérifier l'état
health_status = baserow_storage.get_health_status()
# Le statut 'data_storage' est la clé pour le frontend
db_status_message = health_status.get('data_storage', 'Unknown')
# Si la DB est 'operational', on envoie 'Ready'
if db_status_message == 'operational':
data_status = "Ready"
else:
data_status = f"Failed (Baserow: {db_status_message})"
except Exception as e:
# Erreur générale, Baserow inaccessible ou problème de configuration critique
data_status = f"Failed (Exception: {str(e)})"
return jsonify({
"status": "Online",
"data_storage": data_status
}), 200
@app.route("/", methods=["GET"])
def read_root():
"""Endpoint racine pour le Health Check (Flask version)."""
if baserow_storage.is_baserow_up():
# Statut OK (200) avec le message
return jsonify({"status": "ok", "message": "Backend and Baserow API are reachable."}), 200
else:
# Statut de service non disponible (503) avec le message d'erreur
return jsonify({"detail": "Baserow service unavailable (Health Check failed)."}), 503
# Fonction de nettoyage des fichiers temporaires (utilisée à l'arrêt/redémarrage, etc.)
@app.teardown_appcontext
def shutdown_session(exception=None):
"""Supprime les dossiers de déploiement temporaires après la fin de la session, ou lors du nettoyage."""
# NOTE: Cette approche est simplifiée. Pour la production, vous voudriez
# une tâche en arrière-plan (cron job) pour nettoyer périodiquement les
# vieux dossiers pour éviter les fuites de mémoire.
# Pour l'instant, on se contente de vider le dossier 'temp_uploads'
# Cela devrait être exécuté uniquement si on est sûr que l'application s'arrête.
# Dans un environnement de développement, on pourrait le faire au démarrage,
# mais ce n'est pas fiable en production (Gunicorn/multi-worker).
pass # On laisse la gestion du nettoyage à une tâche externe ou un redémarrage.