Mailix / baserow_storage.py
ernestmindres's picture
Update baserow_storage.py
9820e37 verified
# baserow_storage.py (Version Modifiée)
import os
import requests
import json
import sys
from datetime import datetime
from typing import Optional, Dict
import logging
# Configuration du logger (ajoutez ceci en haut du fichier)
logger = logging.getLogger(__name__)
# --- Configuration Baserow (Doit être défini dans les secrets) ---
HEALTH_CHECK_URL = "https://api.baserow.io/api/database/rows/table/"
# 2. URL de BASE CORRECTE pour la construction des requêtes de données (connexion, inscription, etc.)
DATA_BASE_URL = "https://api.baserow.io/api/database/rows/"
API_TOKEN = os.environ.get("BASEROW_API_TOKEN")
# Les IDs de table seront récupérés depuis les variables d'environnement
PRIMARY_USERS_TABLE_ID = os.environ.get("PRIMARY_USERS_TABLE_ID")
END_USERS_TABLE_ID = os.environ.get("END_USERS_TABLE_ID")
# NOUVEL ID DE TABLE POUR LES DÉPLOIEMENTS (Projets)
DEPLOYMENTS_TABLE_ID = os.environ.get("DEPLOYMENTS_TABLE_ID")
# Headers pour l'authentification
HEADERS = {
"Authorization": f"Token {API_TOKEN}",
"Content-Type": "application/json"
}
# ----------------------------------------------------------------------
# --- Noms de Colonnes pour les Déploiements (à adapter à votre structure Baserow) ---
# ----------------------------------------------------------------------
FIELD_DEPLOY_ID = 'Deployment ID' # Nom de la colonne pour l'ID de déploiement (le dossier HF)
FIELD_USER_REF = 'User Link' # Nom de la colonne pour la référence à l'utilisateur principal
FIELD_HF_REPO_PATH = 'HF Repo Path' # Chemin complet vers le dossier dans le repo
FIELD_REPOS_DATA = "Repositories Data" # Nom du champ dans Baserow pour stocker la liste des dépôts
FIELD_ID = 'ID' # Correspond à 'user_id' dans le code
FIELD_EMAIL = 'Email' # Correspond à 'email'
FIELD_USERNAME = 'Nom d\'utilisateur' # Correspond à 'username'
FIELD_PASSWORD_HASH = 'Hachage Mot de Passe' # Correspond à 'password_hash'
FIELD_API_KEY = 'Clé API' # Correspond à 'api_key'
FIELD_API_KEY_2 = 'Clé API 2'
FIELD_API_KEY_3 = 'Clé API 3'
FIELD_API_KEY_4 = 'Clé API 4'
FIELD_API_KEY_5 = 'Clé API 5'
FIELD_SECURITY_Q = 'Question de Sécurité'
FIELD_SECURITY_A_HASH = 'Hachage Réponse Secrète'
FIELD_PLAN_ID = 'Plan ID'
FIELD_STRIPE_SUB_ID = 'ID Abonnement Stripe'
FIELD_DATE_CREATION = 'Date Création'
FIELD_DATE_PLAN_START = 'Date Plan Start'
FIELD_API_CALLS_MONTH = 'API Calls Month' # À vérifier avec votre nom exact dans Baserow!
FIELD_STATUS = 'Status'
FIELD_END_USER_ID = 'ID Utilisateur Final' # Correspond à 'end_user_id'
FIELD_END_USER_IDENTIFIER = 'Identifiant' # Correspond à 'identifier'
FIELD_END_USER_METADATA = 'Métadonnées' # Correspond à 'metadata'
FIELD_CLIENT_ID_LINK = 'ID Client Principal' # Lien vers Primary_Users
FIELD_USER_TYPE = 'UserType' # Ex: 'Primary' pour les clients, 'End' pour leurs utilisateurs
FIELD_CLIENT_ID = 'ClientID' # L'ID de l'utilisateur 'Primary' qui possède cet utilisateur 'End'
# ----------------------------------------------------------------------
# --- Noms de Colonnes pour la Table des Utilisateurs Finaux (End Users) ---
# ----------------------------------------------------------------------
# Ces champs sont spécifiques à la table END_USERS
FIELD_END_USER_ID = 'ID Utilisateur Final'
FIELD_END_USER_IDENTIFIER = 'Identifiant' # Champ pour compatibilité ou recherche
FIELD_END_USER_EMAIL = 'Email' # NOUVEAU
FIELD_END_USER_USERNAME = 'Nom d\'utilisateur' # NOUVEAU
FIELD_END_USER_SECURITY_Q = 'Question de Sécurité' # NOUVEAU (Peut être différent de Primary)
FIELD_END_USER_SECURITY_A_HASH = 'Hachage Réponse Secrète' # NOUVEAU
FIELD_END_USER_STATUS = 'Statut' # NOUVEAU
FIELD_END_USER_METADATA = 'Métadonnées'
FIELD_PASSWORD_HASH_END_USER = 'Hachage Mot de Passe End User' # Renommer pour éviter le conflit si possible
FIELD_CLIENT_ID_LINK = 'ID Client Principal'
def _get_table_url(table_id: str) -> str:
"""Construit l'URL d'API pour une table donnée (avec le bon endpoint)."""
return f"{DATA_BASE_URL}table/{table_id}/"
def _baserow_record_to_user(record: Dict, is_end_user: bool) -> Dict:
"""
Convertit un enregistrement Baserow (avec noms de champs utilisateur)
en format de dictionnaire Python attendu par le backend.
"""
# LOGIQUE POUR L'UTILISATEUR PRINCIPAL (PRIMARY USER)
# 1. Récupération des champs individuels
user_data = {
# Champs communs / Primary Users
'baserow_row_id': record['id'], # ID interne de la ligne Baserow (pour les mises à jour)
'date_creation': record.get(FIELD_DATE_CREATION),
# Primary User specific fields
'user_id': record.get(FIELD_ID),
'email': record.get(FIELD_EMAIL),
'username': record.get(FIELD_USERNAME),
'password_hash': record.get(FIELD_PASSWORD_HASH),
# Récupération des 5 clés individuelles (pour l'authentification par clé)
'api_key': record.get(FIELD_API_KEY),
'api_key_2': record.get(FIELD_API_KEY_2),
'api_key_3': record.get(FIELD_API_KEY_3),
'api_key_4': record.get(FIELD_API_KEY_4),
'api_key_5': record.get(FIELD_API_KEY_5),
'security_question': record.get(FIELD_SECURITY_Q),
'security_answer_hash': record.get(FIELD_SECURITY_A_HASH),
'plan_id': record.get(FIELD_PLAN_ID),
'stripe_subscription_id': record.get(FIELD_STRIPE_SUB_ID),
'date_plan_start': record.get(FIELD_DATE_PLAN_START),
}
# 2. ÉTAPE CRUCIALE AJOUTÉE : Création de la liste 'api_keys' pour l'affichage
# Cette liste est nécessaire pour que la boucle dans api_key.html fonctionne correctement.
user_data['api_keys'] = [
user_data['api_key'],
user_data['api_key_2'],
user_data['api_key_3'],
user_data['api_key_4'],
user_data['api_key_5'],
]
# Nettoyage des clés None ou non-pertinentes
return {k: v for k, v in user_data.items() if v is not None}
def _user_to_baserow_data(user_data: Dict, is_end_user: bool) -> Dict:
"""
Convertit le format de dictionnaire Python du backend en format
JSON attendu par l'API Baserow (avec noms de champs utilisateur).
"""
if is_end_user:
# End User fields (Ajout des NOUVEAUX champs)
baserow_data = {
FIELD_END_USER_ID: user_data.get('end_user_id'),
FIELD_END_USER_IDENTIFIER: user_data.get('identifier'),
FIELD_END_USER_EMAIL: user_data.get('email'), # NOUVEAU
FIELD_END_USER_USERNAME: user_data.get('username'), # NOUVEAU
FIELD_END_USER_SECURITY_Q: user_data.get('security_question'), # NOUVEAU
FIELD_END_USER_SECURITY_A_HASH: user_data.get('security_answer_hash'), # NOUVEAU
FIELD_END_USER_STATUS: user_data.get('status'), # NOUVEAU
# CORRECTION CRUCIALE : Utilisation du nom de champ correct pour l'End User
FIELD_PASSWORD_HASH_END_USER: user_data.get('password_hash'),
FIELD_END_USER_METADATA: user_data.get('metadata'),
FIELD_DATE_CREATION: user_data.get('date_creation'),
# Le lien vers Primary_Users est géré dans save_end_user_data
}
else:
# Primary User fields
baserow_data = {
FIELD_ID: user_data.get('user_id'),
FIELD_EMAIL: user_data.get('email'),
FIELD_USERNAME: user_data.get('username'),
FIELD_PASSWORD_HASH: user_data.get('password_hash'),
FIELD_API_KEY: user_data.get('api_key'),
FIELD_API_KEY_2: user_data.get('api_key_2'),
FIELD_API_KEY_3: user_data.get('api_key_3'),
FIELD_API_KEY_4: user_data.get('api_key_4'),
FIELD_API_KEY_5: user_data.get('api_key_5'),
FIELD_SECURITY_Q: user_data.get('security_question'),
FIELD_SECURITY_A_HASH: user_data.get('security_answer_hash'),
FIELD_PLAN_ID: user_data.get('plan_id'),
FIELD_STRIPE_SUB_ID: user_data.get('stripe_subscription_id'),
FIELD_DATE_CREATION: user_data.get('date_creation'),
FIELD_DATE_PLAN_START: user_data.get('date_plan_start'),
FIELD_API_CALLS_MONTH: user_data.get('api_calls_month', 0),
FIELD_STATUS: user_data.get('status', 'Active') # Assurez-vous que 'Active' est une option valide dans Baserow
}
# Suppression des clés non-valorisées (None)
return {k: v for k, v in baserow_data.items() if v is not None}
def _get_single_user_record(table_id: str, field_name: str, value: str, is_end_user: bool) -> Optional[Dict]:
"""Fonction générique pour rechercher un seul enregistrement par un champ (filtrage Baserow)."""
url = _get_table_url(table_id)
# Utilisation du paramètre de filtre de Baserow pour une recherche indexée (plus rapide)
filter_param = f"filter__{field_name}__equal={value}"
try:
response = requests.get(
f"{url}?user_field_names=true&{filter_param}",
headers=HEADERS
)
response.raise_for_status()
data = response.json()
if data and data.get('results'):
# On ne prend que le premier résultat (car ID/Email/API Key sont uniques)
return _baserow_record_to_user(data['results'][0], is_end_user)
return None
except requests.exceptions.RequestException as e:
print(f"Erreur de Baserow lors de la recherche par filtre {field_name}: {e}", file=sys.stderr)
return None
# ----------------------------------------------------------------------
# --- Fonctions CRUD Primary_Users (Nouveau et Remplacement) ---
# ----------------------------------------------------------------------
def get_user_by_email(email: str) -> Optional[Dict]:
"""Recherche un utilisateur principal par son adresse Email."""
return _get_single_user_record(PRIMARY_USERS_TABLE_ID, FIELD_EMAIL, email, is_end_user=False)
API_KEY_FIELDS = [
FIELD_API_KEY, # Clé API
FIELD_API_KEY_2,
FIELD_API_KEY_3,
FIELD_API_KEY_4,
FIELD_API_KEY_5,
]
# baserow_storage.py - à partir de la ligne ~180 (ou autour de la fonction get_client_user_by_api_key)
def get_client_user_by_api_key(api_key: str) -> Optional[Dict]:
"""
Recherche un utilisateur principal dans Baserow en vérifiant la clé API
dans chacun des cinq champs de clé API de la table.
"""
if not api_key:
logger.debug("get_client_user_by_api_key: Clé API est None ou vide.")
return None
# Construction de l'URL de base pour la table
url = f"{DATA_BASE_URL}table/{PRIMARY_USERS_TABLE_ID}/"
# Recherche large dans la table (jusqu'à 100 lignes)
# NOTE: L'API Baserow utilise `search` pour une recherche FULL TEXT dans tous les champs.
# C'est la source potentielle de l'inexactitude qui nécessite un filtrage manuel.
params = {'search': api_key, 'size': 100}
try:
# CORRECTION 1: Utilisation de la fonction _make_baserow_request
# (Cette fonction est maintenant définie à la fin du fichier)
# On utilise log_response=True ici pour avoir un log détaillé si la requête échoue
response = _make_baserow_request("GET", url, params=params, log_response=True)
if response.status_code == 200:
data = response.json()
# NOUVEAU LOG : Afficher combien de résultats ont été renvoyés par la recherche Baserow
num_results = len(data.get('results', []))
logger.info(f"BASEROW SEARCH: Clé API: {api_key[:8]}... | URL: {url} | Résultats Baserow: {num_results}")
if data and 'results' in data and num_results > 0:
# Filtrage manuel des résultats pour vérifier la clé exacte
for row in data['results']:
# Vérifier si la clé API correspond à l'un des 5 champs
if any(row.get(field) == api_key for field in API_KEY_FIELDS):
# NOUVEAU LOG : SUCCÈS - L'utilisateur a été trouvé
logger.info(f"BASEROW SUCCESS: Utilisateur trouvé avec la clé API {api_key[:8]}... dans un des 5 champs.")
try:
# On suppose que _baserow_record_to_user existe et prend un booléen
return _baserow_record_to_user(row, is_end_user=False)
except NameError:
logger.error("_baserow_record_to_user non définie. Erreur critique.")
return None
# NOUVEAU LOG : ÉCHEC DU FILTRAGE
logger.warning(f"BASEROW FILTER FAIL: {num_results} ligne(s) trouvée(s) par 'search' Baserow, mais aucune ne correspondait exactement à la clé API: {api_key[:8]}...")
return None
# Aucune ligne trouvée dans Baserow (num_results est 0)
logger.debug(f"BASEROW NO RESULT: Aucune ligne trouvée par la recherche Baserow pour la clé API: {api_key[:8]}...")
return None
# Traiter les erreurs HTTP de Baserow (log déjà dans _make_baserow_request)
return None
except Exception as e:
# Erreur inattendue pendant le traitement
logger.error(f"Erreur CRITIQUE lors du traitement de la recherche par clé API: {e}", exc_info=True)
return None
# Remplacement de l'ancien load_primary_user_data(user_id)
def load_primary_user_data(user_id: str) -> Optional[Dict]:
"""Recherche un utilisateur principal par son ID (user_id)."""
return _get_single_user_record(PRIMARY_USERS_TABLE_ID, FIELD_ID, user_id, is_end_user=False)
def save_primary_user_data(user_data: Dict, commit_msg: str = "") -> bool:
"""Crée ou met à jour un utilisateur principal, avec détection d'erreur ultra-précise."""
row_id = user_data.get('baserow_row_id')
# Définition de l'URL de base pour la table des utilisateurs principaux
url = _get_table_url(PRIMARY_USERS_TABLE_ID)
# 1. Conversion des données
baserow_data = _user_to_baserow_data(user_data, is_end_user=False)
# 2. Suppression des champs en lecture seule (comme dans la correction précédente)
if baserow_data.pop(FIELD_ID, None):
print(f"DEBUG: Suppression du champ '{FIELD_ID}' (UUID auto) avant l'envoi { 'POST' if not row_id else 'PATCH'}.", file=sys.stderr)
try:
# Détermination de l'action (PATCH ou POST)
if row_id:
action = "PATCH" # ⬅️ CORRECTION: Définition de 'action'
# MISE À JOUR (PATCH)
response = requests.patch(
f"{url}{row_id}/?user_field_names=true",
headers=HEADERS,
json=baserow_data
)
else:
action = "POST" # ⬅️ CORRECTION: Définition de 'action'
# CRÉATION (POST)
response = requests.post(
f"{url}?user_field_names=true", # ⬅️ CORRECTION: Utilise l'URL de table 'url'
headers=HEADERS,
json=baserow_data
)
# Déclenche une exception requests.exceptions.HTTPError pour les statuts 4xx/5xx
response.raise_for_status()
# Succès
if not row_id:
new_record = response.json()
# 1. Mettre à jour l'ID de ligne Baserow
user_data['baserow_row_id'] = new_record.get('id')
# 2. Mettre à jour l'UUID de l'utilisateur (généré par Baserow)
user_data['user_id'] = new_record.get(FIELD_ID)
print(f"DEBUG: UUID de l'utilisateur généré par Baserow et enregistré: {user_data['user_id']}", file=sys.stderr)
print(f"DEBUG: Baserow Primary User action '{action}' réussie. Row ID: {user_data.get('baserow_row_id')}. Message: {commit_msg}", file=sys.stderr)
return True
except requests.exceptions.RequestException as e:
# --- BLOC DE DÉTECTION D'ERREUR PRÉCISE (Ultra-Complet) ---
# Note: 'action' est définie dans le bloc try/except, mais si l'erreur survient
# AVANT la définition de 'action', nous devons la gérer.
# Pour être sûr, nous allons la définir ici par défaut si elle n'existe pas.
if 'action' not in locals():
action = "INCONNU"
error_message = f"🚨 ÉCHEC: Erreur lors de la sauvegarde/mise à jour du Primary User dans Baserow. Requête: {action}"
error_details = ""
if hasattr(e, 'response') and e.response is not None:
# 1. Statut HTTP et URL
error_details += f"\n -> STATUT HTTP: {e.response.status_code} ({e.response.reason})"
error_details += f"\n -> URL de la requête: {e.response.url}"
# 2. Tenter de décoder le corps de la réponse en JSON (contient les erreurs Baserow)
try:
response_json = e.response.json()
error_details += f"\n\n -> ERREUR BASEROW DÉTAILLÉE (JSON):\n{json.dumps(response_json, indent=4)}"
# Optionnel: Synthèse des erreurs de validation de champ
if isinstance(response_json, dict):
validation_errors = {k: v for k, v in response_json.items() if isinstance(v, list) and k != 'detail'}
if validation_errors:
error_details += "\n -> SYNTHÈSE DES CHAMPS INVALIDES (Vérifiez les noms de colonnes/IDs de table!):"
for field_name, errors in validation_errors.items():
error_details += f"\n - Champ '{field_name}': {', '.join([err.get('error', 'Erreur inconnue') for err in errors])}"
except json.JSONDecodeError:
# 3. Si le corps de la réponse n'est pas du JSON
error_details += f"\n\n -> ERREUR BRUTE (Réponse non-JSON):\n{e.response.text[:500]}..."
# 4. Afficher les données que nous avons tenté d'envoyer (après la suppression de l'ID si c'était une création)
error_details += f"\n\n -> DONNÉES ENVOYÉES À BASEROW:\n{json.dumps(baserow_data, indent=4)}"
# Log complet de l'erreur
print(error_message + error_details, file=sys.stderr)
return False
# ----------------------------------------------------------------------
# --- Fonctions CRUD End_Users (Remplacement) ---
# ----------------------------------------------------------------------
# baserow_storage.py : Dans la section CRUD End_Users
def _get_client_baserow_row_id(client_user_id: str) -> Optional[int]:
"""Récupère l'ID de ligne interne Baserow du client principal pour le lien."""
client_user = load_primary_user_data(client_user_id) # utilise la fonction déjà créée
return client_user.get('baserow_row_id') if client_user else None
def check_baserow_connection() -> str:
"""
Vérifie l'état de connexion de la base de données Baserow.
Retourne 'operational' ou 'outage'.
"""
# Liste des IDs de tables critiques à vérifier
CRITICAL_TABLE_IDS = [
PRIMARY_USERS_TABLE_ID,
END_USERS_TABLE_ID
]
if not API_TOKEN:
# Si le token API n'est pas défini, échec immédiat
print("DEBUG: BASEROW_API_TOKEN manquant.", file=sys.stderr)
return "outage"
for table_id in CRITICAL_TABLE_IDS:
if not table_id:
# Si un des IDs de table critiques n'est pas défini, échec
print(f"DEBUG: Un ID de table critique Baserow est manquant (ID: {table_id}).", file=sys.stderr)
return "outage"
# Tenter de faire un appel très léger (récupérer la première ligne)
# On utilise page_size=1 pour minimiser la charge
url = f"{DATA_BASE_URL}table/{table_id}/?page_size=1"
try:
response = requests.get(url, headers=HEADERS, timeout=5)
if response.status_code != 200:
# Si un 404, 403, ou autre erreur est retournée par Baserow pour CETTE table
print(f"DEBUG: Baserow check failed for table {table_id} with status code {response.status_code}", file=sys.stderr)
return "outage"
except requests.exceptions.RequestException as e:
# Erreur de réseau (timeout, DNS, etc.)
print(f"DEBUG: Baserow connection error for table {table_id}: {e}", file=sys.stderr)
return "outage"
# Si toutes les tables critiques ont été vérifiées avec succès
return "operational"
def get_health_status() -> Dict:
"""
Collecte l'état de santé de tous les services pour la page /statut.
"""
db_status = check_baserow_connection()
# L'état de l'authentification et de l'API principale sont
# généralement liés à l'état de la DB pour une application simple.
# Si la DB est HS, l'auth est HS. Sinon, ils sont OK.
auth_status = db_status # Lié à la DB (pour charger les utilisateurs)
api_endpoint_status = "operational" # L'endpoint Flask lui-même est considéré comme OK s'il tourne
# Version du service (pour information)
service_version = os.environ.get("SERVICE_VERSION", "1.0.0 (Baserow)")
return {
# Ces valeurs correspondent aux attributs 'data-status' dans statut.html
"auth": auth_status,
"data_storage": db_status,
"api_endpoint": api_endpoint_status,
"version": service_version,
"last_update": datetime.now().strftime("%Y-%m-%d %H:%M:%S UTC")
}
def is_baserow_up() -> bool:
"""
Vérifie l'état de Baserow en utilisant l'URL qui garantit un statut 100% fonctionnel
sur Hugging Face, SANS utiliser la fonction de construction d'URL de table.
"""
try:
# Envoie une requête GET à l'URL qui répond positivement pour le health check.
response = requests.get(
HEALTH_CHECK_URL,
headers=HEADERS,
timeout=5
)
# On vérifie si la réponse est un succès (code 200).
return response.status_code == 200
except requests.exceptions.RequestException as e:
print(f"DEBUG: Baserow health check failed: {e}")
return False
def _make_baserow_request(method: str, url: str, params: Optional[Dict] = None, data: Optional[Dict] = None, log_response: bool = True):
"""
Fonction utilitaire pour effectuer des appels API à Baserow et journaliser
les requêtes et les réponses dans les logs du Space Hugging Face.
"""
# 1. Journalisation de la requête
logger.info(f"BASEROW REQUEST: {method} {url}")
# ATTENTION: Ne pas logger le token API complet!
logged_headers = {k: v.replace(API_TOKEN, '[TOKEN_MASKED]') if k == 'Authorization' else v for k, v in HEADERS.items()}
logger.debug(f"BASEROW REQUEST Headers: {logged_headers}")
if data:
# Pour les requêtes POST/PUT, logger les données (sans le hash du mot de passe si possible)
logged_data = data.copy() if isinstance(data, dict) else data
# 'Hachage Mot de Passe' est le nom du champ dans _user_to_baserow_data
if isinstance(logged_data, dict) and FIELD_PASSWORD_HASH in logged_data:
logged_data[FIELD_PASSWORD_HASH] = '[PASSWORD_HASH_MASKED]'
# Le nom du champ n'était pas le même dans l'ancienne version, on utilise la constante
if isinstance(logged_data, dict) and 'Hachage du mot de passe' in logged_data:
logged_data['Hachage du mot de passe'] = '[PASSWORD_HASH_MASKED]'
logger.debug(f"BASEROW REQUEST Body: {logged_data}")
# 2. Exécution de la requête
try:
# NOTE IMPORTANTE: Utilisation des HEADERS globaux et des 'params' pour GET
if method == "GET":
response = requests.get(url, headers=HEADERS, params=params)
elif method == "POST":
response = requests.post(url, headers=HEADERS, json=data)
elif method == "PUT":
response = requests.put(url, headers=HEADERS, json=data)
elif method == "DELETE":
response = requests.delete(url, headers=HEADERS)
elif method == "PATCH": # Ajout de la méthode PATCH pour la complétude
response = requests.patch(url, headers=HEADERS, json=data)
else:
raise ValueError(f"Méthode HTTP non supportée: {method}")
# 3. Journalisation de la réponse
if log_response:
logger.info(f"BASEROW RESPONSE: Status {response.status_code}")
# Journaliser le contenu pour les erreurs
if response.status_code >= 400:
logger.error(f"BASEROW ERROR RESPONSE Body: {response.text}")
# 4. Retourner la réponse
return response
except requests.exceptions.RequestException as e:
logger.error(f"BASEROW CONNECTION ERROR: {e}")
# Retourne un objet Response factice pour l'utiliser dans la fonction appelante
return requests.Response()
# --- NOUVELLES FONCTIONS POUR LES DÉPLOIEMENTS (Projets) ---
def update_user_deployment_data(
user_baserow_row_id: int,
deploy_id: str,
user_link: str, # Utilisera l'ID interne de l'utilisateur
hf_repo_path: str
) -> tuple[bool, str]:
"""
Met à jour la ligne de l'utilisateur principal (PRIMARY_USERS_TABLE_ID)
avec les données de déploiement du projet (ID, Repo, etc.).
"""
if not PRIMARY_USERS_TABLE_ID:
return False, "Erreur de configuration: ID de la table utilisateur principal manquant."
url = f"{DATA_BASE_URL}table/{PRIMARY_USERS_TABLE_ID}/{user_baserow_row_id}/"
# Les données à mettre à jour
data = {
FIELD_DEPLOY_ID: deploy_id,
FIELD_USER_REF: user_link,
FIELD_HF_REPO_PATH: hf_repo_path
}
# Utiliser PATCH pour mettre à jour seulement les champs spécifiés
response = _baserow_request(url, method="PATCH", data=data)
if response.status_code == 200:
logger.info(f"Déploiement ID {deploy_id} enregistré sur l'utilisateur Row ID {user_baserow_row_id}.")
return True, "Données de déploiement enregistrées avec succès sur le compte utilisateur."
else:
logger.error(f"Échec de la mise à jour du déploiement Baserow pour l'utilisateur {user_baserow_row_id}. Status: {response.status_code}, Body: {response.text}")
return False, f"Échec de la mise à jour Baserow: {response.status_code}"
# ----------------------------------------------------------------------
# --- MODIFICATION DE FONCTION : Récupérer le déploiement par ID ---
# La recherche doit maintenant se faire dans la table utilisateur.
# ----------------------------------------------------------------------
def get_deployment_by_id(deploy_id: str) -> Optional[Dict]:
"""
Récupère les détails du déploiement (la ligne de l'utilisateur) par son ID unique (le nom du dossier HF).
"""
if not PRIMARY_USERS_TABLE_ID:
return None
# Utiliser un filtre Baserow pour chercher par l'ID de déploiement dans la table utilisateur
url = f"{DATA_BASE_URL}table/{PRIMARY_USERS_TABLE_ID}/"
# Construction du filtre Baserow: filter__Deployment ID__equal={deploy_id}
filter_param = f"filter__{FIELD_DEPLOY_ID}__equal"
params = {
filter_param: deploy_id
}
response = _baserow_request(url, method="GET", params=params)
if response.status_code == 200:
results = response.json().get('results', [])
if results:
# Retourne les données de l'utilisateur contenant les données de déploiement
# Notez que cela renvoie la ligne utilisateur complète.
return results[0]
else:
return None
else:
logger.error(f"Échec de la récupération du déploiement Baserow. Status: {response.status_code}, Body: {response.text}")
return None
def save_new_repository(
user_id: int,
repo_data: Dict
) -> tuple[bool, str]:
"""
Ajoute un nouveau dépôt à la liste 'FIELD_REPOS_DATA' de l'utilisateur.
Ceci remplace l'ancienne approche 'save_new_deployment' qui supposait un seul déploiement par utilisateur.
"""
user_record = load_primary_user_data(filters={FIELD_ID: user_id}, single_record=True)
if not user_record:
logger.error(f"Utilisateur ID '{user_id}' non trouvé pour l'enregistrement du dépôt.")
return False, "Utilisateur non trouvé."
row_id = user_record.get('id')
if not row_id:
return False, f"Erreur critique: ID de ligne Baserow manquant pour l'utilisateur '{user_id}'."
# 1. Récupérer et désérialiser la liste des dépôts existants
repos_json = user_record.get(FIELD_REPOS_DATA)
try:
# Baserow renvoie 'None' ou une chaîne vide si le champ est vide
existing_repos = json.loads(repos_json) if repos_json and isinstance(repos_json, str) else []
except json.JSONDecodeError:
logger.warning(f"Champs Repositories Data corrompu pour l'utilisateur {user_id}. Réinitialisation.")
existing_repos = []
# 2. Ajouter le nouveau dépôt
existing_repos.append(repo_data)
# 3. Sérialiser la liste mise à jour
updated_repos_json = json.dumps(existing_repos)
# 4. Mettre à jour Baserow
url = f"{DATA_BASE_URL}table/{PRIMARY_USERS_TABLE_ID}/{row_id}/"
data = {
FIELD_REPOS_DATA: updated_repos_json
}
response = _make_baserow_request(
method="PATCH",
url=url,
data=data,
log_response=True
)
if response.status_code == 200:
logger.info(f"Nouveau dépôt '{repo_data['repo_name']}' enregistré pour l'utilisateur ID {user_id}.")
return True, "Dépôt enregistré avec succès."
else:
# Gérer l'échec de la mise à jour (ex: données trop volumineuses ou erreur API)
error_message = response.text
logger.error(f"Échec de l'enregistrement du dépôt pour l'utilisateur {user_id}: {error_message}")
return False, f"Échec de la mise à jour Baserow: {error_message}"
# Nouvelle fonction pour récupérer la liste de tous les dépôts d'un utilisateur
def get_all_user_repositories(user_id: int) -> list:
"""Récupère tous les dépôts d'un utilisateur depuis le champ JSON."""
user_record = load_primary_user_data(filters={FIELD_ID: user_id}, single_record=True)
if not user_record:
return []
repos_json = user_record.get(FIELD_REPOS_DATA)
try:
return json.loads(repos_json) if repos_json and isinstance(repos_json, str) else []
except json.JSONDecodeError:
logger.warning(f"Champs Repositories Data corrompu pour l'utilisateur {user_id}.")
return []
def delete_deployment_by_id(deploy_id: str) -> tuple[bool, str]:
"""
Supprime l'enregistrement d'un déploiement (la ligne utilisateur)
dans la table PRIMARY_USERS_TABLE_ID par le Deployment ID.
Note: Cela supprime la ligne utilisateur complète. Si la structure
change (1 utilisateur -> plusieurs déploiements), cela devra être adapté.
Pour l'instant, on suppose 1 utilisateur = 1 ligne = 1 déploiement.
Version modifiée pour mettre à NULL les champs de déploiement au lieu de supprimer la ligne utilisateur.
"""
# 1. Récupérer l'enregistrement de l'utilisateur par l'ID de déploiement (le dossier HF)
user_record = get_deployment_by_id(deploy_id)
if not user_record:
logger.warning(f"Tentative de suppression de déploiement échouée: ID '{deploy_id}' non trouvé dans Baserow.")
return True, "Déploiement non trouvé dans Baserow (considéré comme déjà supprimé)."
# 2. Récupérer l'ID de ligne Baserow pour la mise à jour
row_id = user_record.get('id')
if not row_id:
return False, f"Erreur critique: ID de ligne Baserow manquant pour le déploiement '{deploy_id}'."
# 3. Mettre à jour les champs de déploiement à NULL (ou vide) au lieu de supprimer la ligne utilisateur
# Note: On suppose que la suppression de déploiement réinitialise ces champs.
url = f"{DATA_BASE_URL}table/{PRIMARY_USERS_TABLE_ID}/{row_id}/"
data = {
FIELD_DEPLOY_ID: None,
FIELD_USER_REF: None,
FIELD_HF_REPO_PATH: None
}
response = _make_baserow_request(
method="PATCH",
url=url,
data=data,
log_response=True
)
if response.status_code == 200:
logger.info(f"Déploiement ID {deploy_id} supprimé (champs mis à NULL) sur l'utilisateur Row ID {row_id}.")
return True, "Déploiement supprimé de la base de données avec succès."
else:
logger.error(f"Échec de la suppression du déploiement Baserow (PATCH à NULL) pour l'utilisateur Row ID {row_id}. Status: {response.status_code}, Body: {response.text}")
return False, f"Échec de la suppression Baserow: {response.status_code}"