# baserow_storage.py (Version Modifiée) import os import requests import json import sys from datetime import datetime from typing import Optional, Dict import logging # Configuration du logger (ajoutez ceci en haut du fichier) logger = logging.getLogger(__name__) # --- Configuration Baserow (Doit être défini dans les secrets) --- HEALTH_CHECK_URL = "https://api.baserow.io/api/database/rows/table/" # 2. URL de BASE CORRECTE pour la construction des requêtes de données (connexion, inscription, etc.) DATA_BASE_URL = "https://api.baserow.io/api/database/rows/" API_TOKEN = os.environ.get("BASEROW_API_TOKEN") # Les IDs de table seront récupérés depuis les variables d'environnement PRIMARY_USERS_TABLE_ID = os.environ.get("PRIMARY_USERS_TABLE_ID") END_USERS_TABLE_ID = os.environ.get("END_USERS_TABLE_ID") # NOUVEL ID DE TABLE POUR LES DÉPLOIEMENTS (Projets) DEPLOYMENTS_TABLE_ID = os.environ.get("DEPLOYMENTS_TABLE_ID") # Headers pour l'authentification HEADERS = { "Authorization": f"Token {API_TOKEN}", "Content-Type": "application/json" } # ---------------------------------------------------------------------- # --- Noms de Colonnes pour les Déploiements (à adapter à votre structure Baserow) --- # ---------------------------------------------------------------------- FIELD_DEPLOY_ID = 'Deployment ID' # Nom de la colonne pour l'ID de déploiement (le dossier HF) FIELD_USER_REF = 'User Link' # Nom de la colonne pour la référence à l'utilisateur principal FIELD_HF_REPO_PATH = 'HF Repo Path' # Chemin complet vers le dossier dans le repo FIELD_REPOS_DATA = "Repositories Data" # Nom du champ dans Baserow pour stocker la liste des dépôts FIELD_ID = 'ID' # Correspond à 'user_id' dans le code FIELD_EMAIL = 'Email' # Correspond à 'email' FIELD_USERNAME = 'Nom d\'utilisateur' # Correspond à 'username' FIELD_PASSWORD_HASH = 'Hachage Mot de Passe' # Correspond à 'password_hash' FIELD_API_KEY = 'Clé API' # Correspond à 'api_key' FIELD_API_KEY_2 = 'Clé API 2' FIELD_API_KEY_3 = 'Clé API 3' FIELD_API_KEY_4 = 'Clé API 4' FIELD_API_KEY_5 = 'Clé API 5' FIELD_SECURITY_Q = 'Question de Sécurité' FIELD_SECURITY_A_HASH = 'Hachage Réponse Secrète' FIELD_PLAN_ID = 'Plan ID' FIELD_STRIPE_SUB_ID = 'ID Abonnement Stripe' FIELD_DATE_CREATION = 'Date Création' FIELD_DATE_PLAN_START = 'Date Plan Start' FIELD_API_CALLS_MONTH = 'API Calls Month' # À vérifier avec votre nom exact dans Baserow! FIELD_STATUS = 'Status' FIELD_END_USER_ID = 'ID Utilisateur Final' # Correspond à 'end_user_id' FIELD_END_USER_IDENTIFIER = 'Identifiant' # Correspond à 'identifier' FIELD_END_USER_METADATA = 'Métadonnées' # Correspond à 'metadata' FIELD_CLIENT_ID_LINK = 'ID Client Principal' # Lien vers Primary_Users FIELD_USER_TYPE = 'UserType' # Ex: 'Primary' pour les clients, 'End' pour leurs utilisateurs FIELD_CLIENT_ID = 'ClientID' # L'ID de l'utilisateur 'Primary' qui possède cet utilisateur 'End' # ---------------------------------------------------------------------- # --- Noms de Colonnes pour la Table des Utilisateurs Finaux (End Users) --- # ---------------------------------------------------------------------- # Ces champs sont spécifiques à la table END_USERS FIELD_END_USER_ID = 'ID Utilisateur Final' FIELD_END_USER_IDENTIFIER = 'Identifiant' # Champ pour compatibilité ou recherche FIELD_END_USER_EMAIL = 'Email' # NOUVEAU FIELD_END_USER_USERNAME = 'Nom d\'utilisateur' # NOUVEAU FIELD_END_USER_SECURITY_Q = 'Question de Sécurité' # NOUVEAU (Peut être différent de Primary) FIELD_END_USER_SECURITY_A_HASH = 'Hachage Réponse Secrète' # NOUVEAU FIELD_END_USER_STATUS = 'Statut' # NOUVEAU FIELD_END_USER_METADATA = 'Métadonnées' FIELD_PASSWORD_HASH_END_USER = 'Hachage Mot de Passe End User' # Renommer pour éviter le conflit si possible FIELD_CLIENT_ID_LINK = 'ID Client Principal' def _get_table_url(table_id: str) -> str: """Construit l'URL d'API pour une table donnée (avec le bon endpoint).""" return f"{DATA_BASE_URL}table/{table_id}/" def _baserow_record_to_user(record: Dict, is_end_user: bool) -> Dict: """ Convertit un enregistrement Baserow (avec noms de champs utilisateur) en format de dictionnaire Python attendu par le backend. """ # LOGIQUE POUR L'UTILISATEUR PRINCIPAL (PRIMARY USER) # 1. Récupération des champs individuels user_data = { # Champs communs / Primary Users 'baserow_row_id': record['id'], # ID interne de la ligne Baserow (pour les mises à jour) 'date_creation': record.get(FIELD_DATE_CREATION), # Primary User specific fields 'user_id': record.get(FIELD_ID), 'email': record.get(FIELD_EMAIL), 'username': record.get(FIELD_USERNAME), 'password_hash': record.get(FIELD_PASSWORD_HASH), # Récupération des 5 clés individuelles (pour l'authentification par clé) 'api_key': record.get(FIELD_API_KEY), 'api_key_2': record.get(FIELD_API_KEY_2), 'api_key_3': record.get(FIELD_API_KEY_3), 'api_key_4': record.get(FIELD_API_KEY_4), 'api_key_5': record.get(FIELD_API_KEY_5), 'security_question': record.get(FIELD_SECURITY_Q), 'security_answer_hash': record.get(FIELD_SECURITY_A_HASH), 'plan_id': record.get(FIELD_PLAN_ID), 'stripe_subscription_id': record.get(FIELD_STRIPE_SUB_ID), 'date_plan_start': record.get(FIELD_DATE_PLAN_START), } # 2. ÉTAPE CRUCIALE AJOUTÉE : Création de la liste 'api_keys' pour l'affichage # Cette liste est nécessaire pour que la boucle dans api_key.html fonctionne correctement. user_data['api_keys'] = [ user_data['api_key'], user_data['api_key_2'], user_data['api_key_3'], user_data['api_key_4'], user_data['api_key_5'], ] # Nettoyage des clés None ou non-pertinentes return {k: v for k, v in user_data.items() if v is not None} def _user_to_baserow_data(user_data: Dict, is_end_user: bool) -> Dict: """ Convertit le format de dictionnaire Python du backend en format JSON attendu par l'API Baserow (avec noms de champs utilisateur). """ if is_end_user: # End User fields (Ajout des NOUVEAUX champs) baserow_data = { FIELD_END_USER_ID: user_data.get('end_user_id'), FIELD_END_USER_IDENTIFIER: user_data.get('identifier'), FIELD_END_USER_EMAIL: user_data.get('email'), # NOUVEAU FIELD_END_USER_USERNAME: user_data.get('username'), # NOUVEAU FIELD_END_USER_SECURITY_Q: user_data.get('security_question'), # NOUVEAU FIELD_END_USER_SECURITY_A_HASH: user_data.get('security_answer_hash'), # NOUVEAU FIELD_END_USER_STATUS: user_data.get('status'), # NOUVEAU # CORRECTION CRUCIALE : Utilisation du nom de champ correct pour l'End User FIELD_PASSWORD_HASH_END_USER: user_data.get('password_hash'), FIELD_END_USER_METADATA: user_data.get('metadata'), FIELD_DATE_CREATION: user_data.get('date_creation'), # Le lien vers Primary_Users est géré dans save_end_user_data } else: # Primary User fields baserow_data = { FIELD_ID: user_data.get('user_id'), FIELD_EMAIL: user_data.get('email'), FIELD_USERNAME: user_data.get('username'), FIELD_PASSWORD_HASH: user_data.get('password_hash'), FIELD_API_KEY: user_data.get('api_key'), FIELD_API_KEY_2: user_data.get('api_key_2'), FIELD_API_KEY_3: user_data.get('api_key_3'), FIELD_API_KEY_4: user_data.get('api_key_4'), FIELD_API_KEY_5: user_data.get('api_key_5'), FIELD_SECURITY_Q: user_data.get('security_question'), FIELD_SECURITY_A_HASH: user_data.get('security_answer_hash'), FIELD_PLAN_ID: user_data.get('plan_id'), FIELD_STRIPE_SUB_ID: user_data.get('stripe_subscription_id'), FIELD_DATE_CREATION: user_data.get('date_creation'), FIELD_DATE_PLAN_START: user_data.get('date_plan_start'), FIELD_API_CALLS_MONTH: user_data.get('api_calls_month', 0), FIELD_STATUS: user_data.get('status', 'Active') # Assurez-vous que 'Active' est une option valide dans Baserow } # Suppression des clés non-valorisées (None) return {k: v for k, v in baserow_data.items() if v is not None} def _get_single_user_record(table_id: str, field_name: str, value: str, is_end_user: bool) -> Optional[Dict]: """Fonction générique pour rechercher un seul enregistrement par un champ (filtrage Baserow).""" url = _get_table_url(table_id) # Utilisation du paramètre de filtre de Baserow pour une recherche indexée (plus rapide) filter_param = f"filter__{field_name}__equal={value}" try: response = requests.get( f"{url}?user_field_names=true&{filter_param}", headers=HEADERS ) response.raise_for_status() data = response.json() if data and data.get('results'): # On ne prend que le premier résultat (car ID/Email/API Key sont uniques) return _baserow_record_to_user(data['results'][0], is_end_user) return None except requests.exceptions.RequestException as e: print(f"Erreur de Baserow lors de la recherche par filtre {field_name}: {e}", file=sys.stderr) return None # ---------------------------------------------------------------------- # --- Fonctions CRUD Primary_Users (Nouveau et Remplacement) --- # ---------------------------------------------------------------------- def get_user_by_email(email: str) -> Optional[Dict]: """Recherche un utilisateur principal par son adresse Email.""" return _get_single_user_record(PRIMARY_USERS_TABLE_ID, FIELD_EMAIL, email, is_end_user=False) API_KEY_FIELDS = [ FIELD_API_KEY, # Clé API FIELD_API_KEY_2, FIELD_API_KEY_3, FIELD_API_KEY_4, FIELD_API_KEY_5, ] # baserow_storage.py - à partir de la ligne ~180 (ou autour de la fonction get_client_user_by_api_key) def get_client_user_by_api_key(api_key: str) -> Optional[Dict]: """ Recherche un utilisateur principal dans Baserow en vérifiant la clé API dans chacun des cinq champs de clé API de la table. """ if not api_key: logger.debug("get_client_user_by_api_key: Clé API est None ou vide.") return None # Construction de l'URL de base pour la table url = f"{DATA_BASE_URL}table/{PRIMARY_USERS_TABLE_ID}/" # Recherche large dans la table (jusqu'à 100 lignes) # NOTE: L'API Baserow utilise `search` pour une recherche FULL TEXT dans tous les champs. # C'est la source potentielle de l'inexactitude qui nécessite un filtrage manuel. params = {'search': api_key, 'size': 100} try: # CORRECTION 1: Utilisation de la fonction _make_baserow_request # (Cette fonction est maintenant définie à la fin du fichier) # On utilise log_response=True ici pour avoir un log détaillé si la requête échoue response = _make_baserow_request("GET", url, params=params, log_response=True) if response.status_code == 200: data = response.json() # NOUVEAU LOG : Afficher combien de résultats ont été renvoyés par la recherche Baserow num_results = len(data.get('results', [])) logger.info(f"BASEROW SEARCH: Clé API: {api_key[:8]}... | URL: {url} | Résultats Baserow: {num_results}") if data and 'results' in data and num_results > 0: # Filtrage manuel des résultats pour vérifier la clé exacte for row in data['results']: # Vérifier si la clé API correspond à l'un des 5 champs if any(row.get(field) == api_key for field in API_KEY_FIELDS): # NOUVEAU LOG : SUCCÈS - L'utilisateur a été trouvé logger.info(f"BASEROW SUCCESS: Utilisateur trouvé avec la clé API {api_key[:8]}... dans un des 5 champs.") try: # On suppose que _baserow_record_to_user existe et prend un booléen return _baserow_record_to_user(row, is_end_user=False) except NameError: logger.error("_baserow_record_to_user non définie. Erreur critique.") return None # NOUVEAU LOG : ÉCHEC DU FILTRAGE logger.warning(f"BASEROW FILTER FAIL: {num_results} ligne(s) trouvée(s) par 'search' Baserow, mais aucune ne correspondait exactement à la clé API: {api_key[:8]}...") return None # Aucune ligne trouvée dans Baserow (num_results est 0) logger.debug(f"BASEROW NO RESULT: Aucune ligne trouvée par la recherche Baserow pour la clé API: {api_key[:8]}...") return None # Traiter les erreurs HTTP de Baserow (log déjà dans _make_baserow_request) return None except Exception as e: # Erreur inattendue pendant le traitement logger.error(f"Erreur CRITIQUE lors du traitement de la recherche par clé API: {e}", exc_info=True) return None # Remplacement de l'ancien load_primary_user_data(user_id) def load_primary_user_data(user_id: str) -> Optional[Dict]: """Recherche un utilisateur principal par son ID (user_id).""" return _get_single_user_record(PRIMARY_USERS_TABLE_ID, FIELD_ID, user_id, is_end_user=False) def save_primary_user_data(user_data: Dict, commit_msg: str = "") -> bool: """Crée ou met à jour un utilisateur principal, avec détection d'erreur ultra-précise.""" row_id = user_data.get('baserow_row_id') # Définition de l'URL de base pour la table des utilisateurs principaux url = _get_table_url(PRIMARY_USERS_TABLE_ID) # 1. Conversion des données baserow_data = _user_to_baserow_data(user_data, is_end_user=False) # 2. Suppression des champs en lecture seule (comme dans la correction précédente) if baserow_data.pop(FIELD_ID, None): print(f"DEBUG: Suppression du champ '{FIELD_ID}' (UUID auto) avant l'envoi { 'POST' if not row_id else 'PATCH'}.", file=sys.stderr) try: # Détermination de l'action (PATCH ou POST) if row_id: action = "PATCH" # ⬅️ CORRECTION: Définition de 'action' # MISE À JOUR (PATCH) response = requests.patch( f"{url}{row_id}/?user_field_names=true", headers=HEADERS, json=baserow_data ) else: action = "POST" # ⬅️ CORRECTION: Définition de 'action' # CRÉATION (POST) response = requests.post( f"{url}?user_field_names=true", # ⬅️ CORRECTION: Utilise l'URL de table 'url' headers=HEADERS, json=baserow_data ) # Déclenche une exception requests.exceptions.HTTPError pour les statuts 4xx/5xx response.raise_for_status() # Succès if not row_id: new_record = response.json() # 1. Mettre à jour l'ID de ligne Baserow user_data['baserow_row_id'] = new_record.get('id') # 2. Mettre à jour l'UUID de l'utilisateur (généré par Baserow) user_data['user_id'] = new_record.get(FIELD_ID) print(f"DEBUG: UUID de l'utilisateur généré par Baserow et enregistré: {user_data['user_id']}", file=sys.stderr) print(f"DEBUG: Baserow Primary User action '{action}' réussie. Row ID: {user_data.get('baserow_row_id')}. Message: {commit_msg}", file=sys.stderr) return True except requests.exceptions.RequestException as e: # --- BLOC DE DÉTECTION D'ERREUR PRÉCISE (Ultra-Complet) --- # Note: 'action' est définie dans le bloc try/except, mais si l'erreur survient # AVANT la définition de 'action', nous devons la gérer. # Pour être sûr, nous allons la définir ici par défaut si elle n'existe pas. if 'action' not in locals(): action = "INCONNU" error_message = f"🚨 ÉCHEC: Erreur lors de la sauvegarde/mise à jour du Primary User dans Baserow. Requête: {action}" error_details = "" if hasattr(e, 'response') and e.response is not None: # 1. Statut HTTP et URL error_details += f"\n -> STATUT HTTP: {e.response.status_code} ({e.response.reason})" error_details += f"\n -> URL de la requête: {e.response.url}" # 2. Tenter de décoder le corps de la réponse en JSON (contient les erreurs Baserow) try: response_json = e.response.json() error_details += f"\n\n -> ERREUR BASEROW DÉTAILLÉE (JSON):\n{json.dumps(response_json, indent=4)}" # Optionnel: Synthèse des erreurs de validation de champ if isinstance(response_json, dict): validation_errors = {k: v for k, v in response_json.items() if isinstance(v, list) and k != 'detail'} if validation_errors: error_details += "\n -> SYNTHÈSE DES CHAMPS INVALIDES (Vérifiez les noms de colonnes/IDs de table!):" for field_name, errors in validation_errors.items(): error_details += f"\n - Champ '{field_name}': {', '.join([err.get('error', 'Erreur inconnue') for err in errors])}" except json.JSONDecodeError: # 3. Si le corps de la réponse n'est pas du JSON error_details += f"\n\n -> ERREUR BRUTE (Réponse non-JSON):\n{e.response.text[:500]}..." # 4. Afficher les données que nous avons tenté d'envoyer (après la suppression de l'ID si c'était une création) error_details += f"\n\n -> DONNÉES ENVOYÉES À BASEROW:\n{json.dumps(baserow_data, indent=4)}" # Log complet de l'erreur print(error_message + error_details, file=sys.stderr) return False # ---------------------------------------------------------------------- # --- Fonctions CRUD End_Users (Remplacement) --- # ---------------------------------------------------------------------- # baserow_storage.py : Dans la section CRUD End_Users def _get_client_baserow_row_id(client_user_id: str) -> Optional[int]: """Récupère l'ID de ligne interne Baserow du client principal pour le lien.""" client_user = load_primary_user_data(client_user_id) # utilise la fonction déjà créée return client_user.get('baserow_row_id') if client_user else None def check_baserow_connection() -> str: """ Vérifie l'état de connexion de la base de données Baserow. Retourne 'operational' ou 'outage'. """ # Liste des IDs de tables critiques à vérifier CRITICAL_TABLE_IDS = [ PRIMARY_USERS_TABLE_ID, END_USERS_TABLE_ID ] if not API_TOKEN: # Si le token API n'est pas défini, échec immédiat print("DEBUG: BASEROW_API_TOKEN manquant.", file=sys.stderr) return "outage" for table_id in CRITICAL_TABLE_IDS: if not table_id: # Si un des IDs de table critiques n'est pas défini, échec print(f"DEBUG: Un ID de table critique Baserow est manquant (ID: {table_id}).", file=sys.stderr) return "outage" # Tenter de faire un appel très léger (récupérer la première ligne) # On utilise page_size=1 pour minimiser la charge url = f"{DATA_BASE_URL}table/{table_id}/?page_size=1" try: response = requests.get(url, headers=HEADERS, timeout=5) if response.status_code != 200: # Si un 404, 403, ou autre erreur est retournée par Baserow pour CETTE table print(f"DEBUG: Baserow check failed for table {table_id} with status code {response.status_code}", file=sys.stderr) return "outage" except requests.exceptions.RequestException as e: # Erreur de réseau (timeout, DNS, etc.) print(f"DEBUG: Baserow connection error for table {table_id}: {e}", file=sys.stderr) return "outage" # Si toutes les tables critiques ont été vérifiées avec succès return "operational" def get_health_status() -> Dict: """ Collecte l'état de santé de tous les services pour la page /statut. """ db_status = check_baserow_connection() # L'état de l'authentification et de l'API principale sont # généralement liés à l'état de la DB pour une application simple. # Si la DB est HS, l'auth est HS. Sinon, ils sont OK. auth_status = db_status # Lié à la DB (pour charger les utilisateurs) api_endpoint_status = "operational" # L'endpoint Flask lui-même est considéré comme OK s'il tourne # Version du service (pour information) service_version = os.environ.get("SERVICE_VERSION", "1.0.0 (Baserow)") return { # Ces valeurs correspondent aux attributs 'data-status' dans statut.html "auth": auth_status, "data_storage": db_status, "api_endpoint": api_endpoint_status, "version": service_version, "last_update": datetime.now().strftime("%Y-%m-%d %H:%M:%S UTC") } def is_baserow_up() -> bool: """ Vérifie l'état de Baserow en utilisant l'URL qui garantit un statut 100% fonctionnel sur Hugging Face, SANS utiliser la fonction de construction d'URL de table. """ try: # Envoie une requête GET à l'URL qui répond positivement pour le health check. response = requests.get( HEALTH_CHECK_URL, headers=HEADERS, timeout=5 ) # On vérifie si la réponse est un succès (code 200). return response.status_code == 200 except requests.exceptions.RequestException as e: print(f"DEBUG: Baserow health check failed: {e}") return False def _make_baserow_request(method: str, url: str, params: Optional[Dict] = None, data: Optional[Dict] = None, log_response: bool = True): """ Fonction utilitaire pour effectuer des appels API à Baserow et journaliser les requêtes et les réponses dans les logs du Space Hugging Face. """ # 1. Journalisation de la requête logger.info(f"BASEROW REQUEST: {method} {url}") # ATTENTION: Ne pas logger le token API complet! logged_headers = {k: v.replace(API_TOKEN, '[TOKEN_MASKED]') if k == 'Authorization' else v for k, v in HEADERS.items()} logger.debug(f"BASEROW REQUEST Headers: {logged_headers}") if data: # Pour les requêtes POST/PUT, logger les données (sans le hash du mot de passe si possible) logged_data = data.copy() if isinstance(data, dict) else data # 'Hachage Mot de Passe' est le nom du champ dans _user_to_baserow_data if isinstance(logged_data, dict) and FIELD_PASSWORD_HASH in logged_data: logged_data[FIELD_PASSWORD_HASH] = '[PASSWORD_HASH_MASKED]' # Le nom du champ n'était pas le même dans l'ancienne version, on utilise la constante if isinstance(logged_data, dict) and 'Hachage du mot de passe' in logged_data: logged_data['Hachage du mot de passe'] = '[PASSWORD_HASH_MASKED]' logger.debug(f"BASEROW REQUEST Body: {logged_data}") # 2. Exécution de la requête try: # NOTE IMPORTANTE: Utilisation des HEADERS globaux et des 'params' pour GET if method == "GET": response = requests.get(url, headers=HEADERS, params=params) elif method == "POST": response = requests.post(url, headers=HEADERS, json=data) elif method == "PUT": response = requests.put(url, headers=HEADERS, json=data) elif method == "DELETE": response = requests.delete(url, headers=HEADERS) elif method == "PATCH": # Ajout de la méthode PATCH pour la complétude response = requests.patch(url, headers=HEADERS, json=data) else: raise ValueError(f"Méthode HTTP non supportée: {method}") # 3. Journalisation de la réponse if log_response: logger.info(f"BASEROW RESPONSE: Status {response.status_code}") # Journaliser le contenu pour les erreurs if response.status_code >= 400: logger.error(f"BASEROW ERROR RESPONSE Body: {response.text}") # 4. Retourner la réponse return response except requests.exceptions.RequestException as e: logger.error(f"BASEROW CONNECTION ERROR: {e}") # Retourne un objet Response factice pour l'utiliser dans la fonction appelante return requests.Response() # --- NOUVELLES FONCTIONS POUR LES DÉPLOIEMENTS (Projets) --- def update_user_deployment_data( user_baserow_row_id: int, deploy_id: str, user_link: str, # Utilisera l'ID interne de l'utilisateur hf_repo_path: str ) -> tuple[bool, str]: """ Met à jour la ligne de l'utilisateur principal (PRIMARY_USERS_TABLE_ID) avec les données de déploiement du projet (ID, Repo, etc.). """ if not PRIMARY_USERS_TABLE_ID: return False, "Erreur de configuration: ID de la table utilisateur principal manquant." url = f"{DATA_BASE_URL}table/{PRIMARY_USERS_TABLE_ID}/{user_baserow_row_id}/" # Les données à mettre à jour data = { FIELD_DEPLOY_ID: deploy_id, FIELD_USER_REF: user_link, FIELD_HF_REPO_PATH: hf_repo_path } # Utiliser PATCH pour mettre à jour seulement les champs spécifiés response = _baserow_request(url, method="PATCH", data=data) if response.status_code == 200: logger.info(f"Déploiement ID {deploy_id} enregistré sur l'utilisateur Row ID {user_baserow_row_id}.") return True, "Données de déploiement enregistrées avec succès sur le compte utilisateur." else: logger.error(f"Échec de la mise à jour du déploiement Baserow pour l'utilisateur {user_baserow_row_id}. Status: {response.status_code}, Body: {response.text}") return False, f"Échec de la mise à jour Baserow: {response.status_code}" # ---------------------------------------------------------------------- # --- MODIFICATION DE FONCTION : Récupérer le déploiement par ID --- # La recherche doit maintenant se faire dans la table utilisateur. # ---------------------------------------------------------------------- def get_deployment_by_id(deploy_id: str) -> Optional[Dict]: """ Récupère les détails du déploiement (la ligne de l'utilisateur) par son ID unique (le nom du dossier HF). """ if not PRIMARY_USERS_TABLE_ID: return None # Utiliser un filtre Baserow pour chercher par l'ID de déploiement dans la table utilisateur url = f"{DATA_BASE_URL}table/{PRIMARY_USERS_TABLE_ID}/" # Construction du filtre Baserow: filter__Deployment ID__equal={deploy_id} filter_param = f"filter__{FIELD_DEPLOY_ID}__equal" params = { filter_param: deploy_id } response = _baserow_request(url, method="GET", params=params) if response.status_code == 200: results = response.json().get('results', []) if results: # Retourne les données de l'utilisateur contenant les données de déploiement # Notez que cela renvoie la ligne utilisateur complète. return results[0] else: return None else: logger.error(f"Échec de la récupération du déploiement Baserow. Status: {response.status_code}, Body: {response.text}") return None def save_new_repository( user_id: int, repo_data: Dict ) -> tuple[bool, str]: """ Ajoute un nouveau dépôt à la liste 'FIELD_REPOS_DATA' de l'utilisateur. Ceci remplace l'ancienne approche 'save_new_deployment' qui supposait un seul déploiement par utilisateur. """ user_record = load_primary_user_data(filters={FIELD_ID: user_id}, single_record=True) if not user_record: logger.error(f"Utilisateur ID '{user_id}' non trouvé pour l'enregistrement du dépôt.") return False, "Utilisateur non trouvé." row_id = user_record.get('id') if not row_id: return False, f"Erreur critique: ID de ligne Baserow manquant pour l'utilisateur '{user_id}'." # 1. Récupérer et désérialiser la liste des dépôts existants repos_json = user_record.get(FIELD_REPOS_DATA) try: # Baserow renvoie 'None' ou une chaîne vide si le champ est vide existing_repos = json.loads(repos_json) if repos_json and isinstance(repos_json, str) else [] except json.JSONDecodeError: logger.warning(f"Champs Repositories Data corrompu pour l'utilisateur {user_id}. Réinitialisation.") existing_repos = [] # 2. Ajouter le nouveau dépôt existing_repos.append(repo_data) # 3. Sérialiser la liste mise à jour updated_repos_json = json.dumps(existing_repos) # 4. Mettre à jour Baserow url = f"{DATA_BASE_URL}table/{PRIMARY_USERS_TABLE_ID}/{row_id}/" data = { FIELD_REPOS_DATA: updated_repos_json } response = _make_baserow_request( method="PATCH", url=url, data=data, log_response=True ) if response.status_code == 200: logger.info(f"Nouveau dépôt '{repo_data['repo_name']}' enregistré pour l'utilisateur ID {user_id}.") return True, "Dépôt enregistré avec succès." else: # Gérer l'échec de la mise à jour (ex: données trop volumineuses ou erreur API) error_message = response.text logger.error(f"Échec de l'enregistrement du dépôt pour l'utilisateur {user_id}: {error_message}") return False, f"Échec de la mise à jour Baserow: {error_message}" # Nouvelle fonction pour récupérer la liste de tous les dépôts d'un utilisateur def get_all_user_repositories(user_id: int) -> list: """Récupère tous les dépôts d'un utilisateur depuis le champ JSON.""" user_record = load_primary_user_data(filters={FIELD_ID: user_id}, single_record=True) if not user_record: return [] repos_json = user_record.get(FIELD_REPOS_DATA) try: return json.loads(repos_json) if repos_json and isinstance(repos_json, str) else [] except json.JSONDecodeError: logger.warning(f"Champs Repositories Data corrompu pour l'utilisateur {user_id}.") return [] def delete_deployment_by_id(deploy_id: str) -> tuple[bool, str]: """ Supprime l'enregistrement d'un déploiement (la ligne utilisateur) dans la table PRIMARY_USERS_TABLE_ID par le Deployment ID. Note: Cela supprime la ligne utilisateur complète. Si la structure change (1 utilisateur -> plusieurs déploiements), cela devra être adapté. Pour l'instant, on suppose 1 utilisateur = 1 ligne = 1 déploiement. Version modifiée pour mettre à NULL les champs de déploiement au lieu de supprimer la ligne utilisateur. """ # 1. Récupérer l'enregistrement de l'utilisateur par l'ID de déploiement (le dossier HF) user_record = get_deployment_by_id(deploy_id) if not user_record: logger.warning(f"Tentative de suppression de déploiement échouée: ID '{deploy_id}' non trouvé dans Baserow.") return True, "Déploiement non trouvé dans Baserow (considéré comme déjà supprimé)." # 2. Récupérer l'ID de ligne Baserow pour la mise à jour row_id = user_record.get('id') if not row_id: return False, f"Erreur critique: ID de ligne Baserow manquant pour le déploiement '{deploy_id}'." # 3. Mettre à jour les champs de déploiement à NULL (ou vide) au lieu de supprimer la ligne utilisateur # Note: On suppose que la suppression de déploiement réinitialise ces champs. url = f"{DATA_BASE_URL}table/{PRIMARY_USERS_TABLE_ID}/{row_id}/" data = { FIELD_DEPLOY_ID: None, FIELD_USER_REF: None, FIELD_HF_REPO_PATH: None } response = _make_baserow_request( method="PATCH", url=url, data=data, log_response=True ) if response.status_code == 200: logger.info(f"Déploiement ID {deploy_id} supprimé (champs mis à NULL) sur l'utilisateur Row ID {row_id}.") return True, "Déploiement supprimé de la base de données avec succès." else: logger.error(f"Échec de la suppression du déploiement Baserow (PATCH à NULL) pour l'utilisateur Row ID {row_id}. Status: {response.status_code}, Body: {response.text}") return False, f"Échec de la suppression Baserow: {response.status_code}"