""" API REST FastAPI pour les prédictions de modèle ML. Cette API charge un modèle pickle au démarrage et expose des endpoints pour effectuer des prédictions à partir de variables d'entrée. """ import logging #Module de logging pour enregistrer les activités de l'API et faciliter le debugging import time import os import json #Module pour manipuler les données au format JSON, notamment pour formater les logs en JSON import threading import requests from datetime import datetime, timezone from fastapi import FastAPI, HTTPException, UploadFile, File import io #Module pour manipuler les flux de données en mémoire, notamment pour lire les fichiers CSV uploadés sans les enregistrer sur le disque from pydantic import BaseModel #Module pour définir des modèles de données avec validation, utilisé pour les entrées et sorties de l'API import pickle from typing import Dict, Any, List #Module pour les annotations de type, notamment pour les dictionnaires d'entrée et les listes de colonnes import numpy as np import pandas as pd from functions import most_important_features_min_max, most_important_features_types from huggingface_hub import HfApi # ======================================================================================================= # Configuration du logging pour l'API # ======================================================================================================= class JsonFormatter(logging.Formatter): #définit une classe de formateur de logs qui hérite de logging.Formatter """ Formateur JSON pour les logs et faciliter leur interprétation """ def format(self, record: logging.LogRecord) -> str: # Construire un dictionnaire lisible contenant les métadonnées du log log_record = { "timestamp": datetime.now(timezone.utc).isoformat(), # horodatage UTC "level": record.levelname, "message": record.getMessage(), "module": record.module, "function": record.funcName, "line": record.lineno } # Si une exception est attachée, la formatter aussi if record.exc_info: log_record["exception"] = self.formatException(record.exc_info) # Retourner la chaîne JSON du log return json.dumps(log_record, ensure_ascii=False) # Configuration du logging logger = logging.getLogger(__name__) #Création d'un logger pour l'API, qui permettra d'enregistrer les activités et les erreurs de manière structurée logger.setLevel(logging.INFO) #Niveau de logging : INFO pour les événements normaux, DEBUG pour les détails, ERROR pour les erreurs #Instanciation du formatter JSON pour les logs json_formatter = JsonFormatter() #Utilisation du formateur JSON pour structurer les logs de manière lisible #Chemin provisoire pour le fichier de log de l'API avant upload vers HF Dataset # Note: On utilise l'extension .jsonl car le fichier contient du JSON Lines (un objet JSON par ligne) LOG_FILE_PATH = os.environ.get("LOG_FILE_PATH", "/tmp/api_log.jsonl" if os.path.exists("/tmp") else "./api_log.jsonl") # Taille maximale des fichiers de log (en octets) avant suppression et recréation : 30 Mo LOG_MAX_BYTES = 30 * 1024 * 1024 class SizeLimitedFileHandler(logging.FileHandler): """Handler de fichier personnalisé qui supprime le fichier et en crée un nouveau quand la taille dépasse LOG_MAX_BYTES""" def __init__(self, filename, mode='a', encoding=None, delay=False, max_bytes=LOG_MAX_BYTES): super().__init__(filename, mode=mode, encoding=encoding, delay=delay) self.max_bytes = max_bytes def emit(self, record: logging.LogRecord) -> None: try: if os.path.exists(self.baseFilename) and os.path.getsize(self.baseFilename) >= self.max_bytes: # Fermer le flux courant try: self.stream.close() except Exception: pass # Supprimer l'ancien fichier try: os.remove(self.baseFilename) except Exception: pass # Rouvrir un nouveau fichier vide self.stream = self._open() super().emit(record) except Exception: self.handleError(record) #Définition des handlers pour les logs try: #Handler de fichier avec limite de taille : supprime l'ancien fichier et en crée un nouveau à 10 Mo file_handler = SizeLimitedFileHandler(LOG_FILE_PATH) file_handler.setFormatter(json_formatter) logger.addHandler(file_handler) except PermissionError: pass # Ignorer si on ne peut pas écrire le fichier de log # Handler flux (stdout) pour permettre l'affichage dans la console/terminal et rattachement au logger stream_handler = logging.StreamHandler() stream_handler.setFormatter(json_formatter) logger.addHandler(stream_handler) # ======================================================================================================= # Configuration de l'enregistrement des données de logs et des données d'entrée/sortie dans un dataset HF # ======================================================================================================= #Configuration Hugging Face pour la persistance HF_DATASET_REPO = os.environ.get("HF_DATASET_REPO", "CedM/oc_mlops_projet_2") # Mettre le repo HF HF_TOKEN = os.environ.get("HF_TOKEN") # URL du dashboard Streamlit (pour le keep-alive croisé) DASHBOARD_URL = os.environ.get("DASHBOARD_URL", "https://cedm-oc-mlops-projet-2-dashboard.hf.space") def upload_to_hf(local_path: str, repo_path: str): """Upload un fichier vers le dataset HF (silencieux en cas d'erreur).""" if not HF_TOKEN or not os.path.exists(local_path): return try: api = HfApi(token=HF_TOKEN) api.upload_file( path_or_fileobj=local_path, path_in_repo=repo_path, repo_id=HF_DATASET_REPO, repo_type="dataset" ) logger.info(f"Fichier {repo_path} uploadé vers HF") except Exception as e: logger.warning(f"Upload HF échoué: {e}") def save_and_upload_csv_logs(input_df: pd.DataFrame, predictions: list): """ Enregistre les données d'entrée/sortie et les logs, puis les uploade vers HF. Args: input_df: DataFrame contenant les features d'entrée. predictions: Liste des prédictions effectuées. """ try: # Copier le DataFrame d'entrée pour ne pas modifier l'original log_df = input_df.copy() # Ajouter un timestamp et la prédiction associée log_df['_timestamp'] = datetime.now().isoformat() log_df['_prediction'] = predictions # Vérifier si le fichier existe pour ajouter ou créer # Si le fichier dépasse LOG_MAX_BYTES, le supprimer pour repartir sur un fichier neuf if os.path.exists(DRIFT_LOG_PATH): try: if os.path.getsize(DRIFT_LOG_PATH) >= LOG_MAX_BYTES: logger.info(f"Fichier drift dépassant {LOG_MAX_BYTES} octets, suppression et recréation: {DRIFT_LOG_PATH}") os.remove(DRIFT_LOG_PATH) except Exception as e: logger.warning(f"Impossible de vérifier/supprimer le fichier drift: {e}") file_exists = os.path.exists(DRIFT_LOG_PATH) # Écrire dans le fichier CSV (mode append) avec point-virgule comme séparateur log_df.to_csv( DRIFT_LOG_PATH, mode='a', header=not file_exists, index=False, sep=';' ) # Log d'information sur le nombre de lignes écrites logger.info(f"Données enregistrées pour drift detection: {len(log_df)} lignes") # Upload vers HF pour persistance permanente upload_to_hf(DRIFT_LOG_PATH, "data_io.csv") except Exception as e: # En cas d'erreur, on loggue un warning mais on ne remonte pas d'exception logger.warning(f"Impossible d'enregistrer les données pour drift: {str(e)}") # ======================================================================================================= # Initialisation de l'API et définition des variables globales # ======================================================================================================= # ----------------------------------------------------------------------- # Keep-alive : maintien des containers HF Spaces en état "running" # ----------------------------------------------------------------------- KEEP_ALIVE_INTERVAL = 12 * 3600 # 12 heures en secondes def _keep_alive_loop(): """ Boucle infinie exécutée dans un thread démon. Envoie une requête GET vers le dashboard Streamlit toutes les 12 h afin de maintenir les deux containers HF Spaces actifs. """ logger.info(f"Thread keep-alive démarré – ping du dashboard toutes les {KEEP_ALIVE_INTERVAL // 3600}h") while True: time.sleep(KEEP_ALIVE_INTERVAL) if DASHBOARD_URL: try: response = requests.get(DASHBOARD_URL, timeout=30) logger.info(f"Keep-alive dashboard → HTTP {response.status_code} ({DASHBOARD_URL})") except Exception as e: logger.warning(f"Keep-alive dashboard échoué: {e}") else: logger.warning("Keep-alive: DASHBOARD_URL non défini, ping ignoré") def start_keep_alive(): """Démarre le thread keep-alive en arrière-plan (daemon).""" thread = threading.Thread(target=_keep_alive_loop, daemon=True, name="keep-alive-dashboard") thread.start() # Initialisation de l'application FastAPI app = FastAPI( title="API de Classification binaire du Risque de Crédit", description="API pour classifier les demandes de crédit en fonction du risque de défaut, avec un seuil métier optimisé pour le métier", version="1.0.0" ) # Répertoire de base de l'application (pour compatibilité HF Spaces) BASE_DIR = os.path.dirname(os.path.abspath(__file__)) # Chemin vers le fichier du modèle pickle (modèle entraîné) MODEL_PATH = os.path.join(BASE_DIR, "3_Results/best_gradient_boosting_model.pkl") # Chemin vers le fichier CSV pour récupérer l'ordre de toutes les colonnes nécessaires au modèle COLS_PATH = os.path.join(BASE_DIR, "2_Data_transformed/train_data_sp2_subsample_1.csv") #Chemin vers le fichier CSV pour récupérer les variables obligatoires pour faire une prédiction assez fiable FEAT_PATH = os.path.join(BASE_DIR, "2_Data_transformed/shap_feature_importance.csv") # Chemin vers le fichier CSV pour enregistrer les données (détection de data drift) # Sur HF Spaces, utiliser /tmp pour les fichiers temporaires (écriture autorisée) DRIFT_LOG_PATH = os.environ.get("DRIFT_LOG_PATH", "/tmp/data_io.csv" if os.path.exists("/tmp") else os.path.join(BASE_DIR, "data_io.csv")) # Seuil de décision pour la classification (optimisé pour le métier) THRESHOLD = 0.474 # ======================================================================================================= # Fonctions utilitaires pour le chargement du modèle, l'ordre des colonnes et la détection de data drift # ======================================================================================================= def load_model(): """ Charge le modèle ML depuis un fichier pickle. Returns: Le modèle chargé ou None si le fichier n'existe pas. """ try: # Ouvrir le fichier pickle en binaire et charger l'objet Python (le modèle) with open(MODEL_PATH, "rb") as f: model = pickle.load(f) logger.info(f"Modèle chargé avec succès depuis {MODEL_PATH}") return model except FileNotFoundError: # Si le fichier n'existe pas, on logue une erreur et on retourne None logger.error(f"Fichier modèle non trouvé: {MODEL_PATH}") return None def load_column_order() -> List[str]: """ Charge le fichier CSV et extrait l'ordre des colonnes. Returns: Liste des noms de colonnes dans l'ordre du fichier CSV, ou liste vide si le fichier n'existe pas. """ try: # Charger uniquement les en-têtes du CSV (nrows=0) pour récupérer les colonnes df = pd.read_csv(COLS_PATH, nrows=0, sep=';') # Charger uniquement les en-têtes logger.info(f"Nombre et ordre des colonnes chargé depuis {COLS_PATH}") except FileNotFoundError: # Fichier absent -> on ne peut pas connaitre l'ordre des colonnes logger.error(f"Fichier CSV non trouvé: {COLS_PATH}") return [] try: # Supprimer des colonnes id or target si elles existent avant de retourner l'ordre df.drop(columns=['SK_ID_CURR', 'TARGET'], inplace=True) except KeyError: # Si les colonnes n'existent pas, on ignore l'erreur pass # Si 'SK_ID_CURR', 'TARGET' ne sont pas présents, ignorer l'erreur logger.info(f"Nombre de colonnes chargées: {len(df.columns)}") # Retourner la liste des noms de colonnes return df.columns.tolist() def load_important_features() -> List[str]: """ Charge le fichier CSV des importances de features et retourne la liste des features importantes. Returns: Liste des noms de features importantes. """ try: df = pd.read_csv(FEAT_PATH, sep=';') logger.info(f"Fichier d'importance des features chargé depuis {FEAT_PATH}") important_features = df['Feature'].tolist() logger.info(f"Nombre de features importantes chargées: {len(important_features)}") return important_features except FileNotFoundError: logger.error(f"Fichier d'importance des features non trouvé: {FEAT_PATH}") return [] # Chargement du modèle au démarrage de l'application model = load_model() # Chargement de l'ordre des colonnes au démarrage column_order = load_column_order() # Chargement de la liste des features importantes important_features = load_important_features() logger.info("API initialisée et prête") # Démarrage du thread keep-alive au lancement de l'application start_keep_alive() # ======================================================================================================= # Définition des modèles de données Pydantic pour les entrées et sorties de l'API, et implémentation des endpoints # ======================================================================================================= class PredictionInput(BaseModel): """ Modèle Pydantic pour les données d'entrée de prédiction. Attributes: features: Dictionnaire contenant les noms des variables et leurs valeurs. """ features: Dict[str, Any] class PredictionOutput(BaseModel): """ Modèle Pydantic pour la réponse de prédiction. Attributes: prediction: Résultat de la prédiction du modèle (0=accepté, 1=rejeté). probability: Probabilité de défaut (classe 1). threshold: Seuil de décision utilisé. status: Statut de la requête. """ prediction: int probability: float threshold: float status: str @app.post("/predict", response_model=PredictionOutput) #Décorateur de la méthode post() de l'object app instance de la classe FastAPI def predict(input_data: PredictionInput): """ Endpoint pour effectuer une prédiction. Args: input_data: Dictionnaire des features à utiliser pour la prédiction. Returns: PredictionOutput contenant la prédiction et le statut. """ # Mesurer le temps d'exécution pour monitoring start_time = time.time() logger.info("Requête de prédiction reçue") # Vérifier que le modèle est chargé if model is None: logger.error("Tentative de prédiction sans modèle chargé") raise HTTPException(status_code=500, detail="Modèle non chargé") # Vérifier que l'ordre des colonnes est disponible if not column_order: logger.error("Ordre des colonnes non disponible") raise HTTPException(status_code=500, detail="Ordre des colonnes non chargé") if not important_features: logger.error("Liste des features importantes non disponible") raise HTTPException(status_code=500, detail="Liste des features importantes non chargée") #Vérifier que les features importantes sont présentes dans les données d'entrée sinon lever une exception missing_important_features = [feat for feat in important_features if feat not in input_data.features] if len(missing_important_features) > 0: logger.error("Features importantes manquantes: " + ", ".join(missing_important_features)) raise HTTPException( status_code=400, detail=f"Features importantes manquantes: {missing_important_features}" ) else: #Vérifier que les features importantes ne sont pas vides (np.nan ou None) sinon lever une exception empty_important_features = [feat for feat in important_features if input_data.features.get(feat) in [None, np.nan]] if len(empty_important_features) > 0: logger.error("Features importantes vides: " + ", ".join(empty_important_features)) raise HTTPException( status_code=400, detail=f"Features importantes vides: {empty_important_features}" ) #Vérifier que les features importantes ont le bon type (ex: numérique pour les valeurs) sinon lever une exception feature_types = most_important_features_types() invalid_type_features = [] for feat, expected_type in feature_types.items(): if feat in input_data.features and input_data.features.get(feat) is not None: value = input_data.features.get(feat) if expected_type == "numeric": if not isinstance(value, (int, float)): invalid_type_features.append(f"{feat}={value} (type reçu: {type(value).__name__}, attendu: numérique)") if len(invalid_type_features) > 0: logger.error("Features importantes avec type invalide: " + ", ".join(invalid_type_features)) raise HTTPException( status_code=400, detail=f"Features importantes avec type invalide: {invalid_type_features}" ) #Vérifier que les features importantes se situent dans les bornes attendues sinon lever une exception feature_bounds = most_important_features_min_max() out_of_bounds_features = [] for feat, bounds in feature_bounds.items(): if feat in input_data.features and input_data.features.get(feat) is not None: value = input_data.features.get(feat) # Vérifier seulement si les bornes min/max sont définies if "min" in bounds and "max" in bounds: if value < bounds["min"] or value > bounds["max"]: out_of_bounds_features.append(f"{feat}={value} (attendu: [{bounds['min']}, {bounds['max']}])") if len(out_of_bounds_features) > 0: logger.error("Features importantes hors bornes: " + ", ".join(out_of_bounds_features)) raise HTTPException( status_code=400, detail=f"Features importantes hors bornes: {out_of_bounds_features}" ) try: # Réordonner les features selon l'ordre des colonnes du CSV # Si une feature manque, on met np.nan pour conserver la structure feature_values = {col: [input_data.features.get(col, np.nan)] for col in column_order} X = pd.DataFrame(feature_values) # Exécuter la prédiction avec le modèle et récupérer les probabilités probabilities = model.predict_proba(X) proba_default = probabilities[0][1] # Probabilité de la classe 1 (défaut) # Appliquer le seuil métier pour déterminer la classe prédite prediction = 1 if proba_default >= THRESHOLD else 0 # Réintégrer l'index SK_ID_CURR avant l'enregistrement pour la détection de drift X = X.copy() X.insert(0, 'SK_ID_CURR', 0) # Enregistrer les données pour la détection de drift (asynchrone côté storage) save_and_upload_csv_logs(X, [prediction]) execution_time = time.time() - start_time logger.info(f"Prédiction effectuée avec succès: {prediction} (proba={proba_default:.4f}, seuil={THRESHOLD}, temps={execution_time:.4f}s)") # Upload des logs vers HF pour persistance permanente upload_to_hf(LOG_FILE_PATH, "api_log.jsonl") # Retourner une réponse conforme au modèle de sortie Pydantic return PredictionOutput( prediction=prediction, probability=round(proba_default, 4), threshold=THRESHOLD, status="success" ) except Exception as e: # En cas d'erreur lors du traitement, log et remonter une HTTPException execution_time = time.time() - start_time logger.error(f"Erreur lors de la prédiction: {str(e)} (temps d'exécution: {execution_time:.4f}s)") # Upload des logs vers HF même en cas d'erreur pour garder une trace de l'incident upload_to_hf(LOG_FILE_PATH, "api_log.jsonl") raise HTTPException(status_code=400, detail=str(e)) @app.post("/predict/file") async def predict_from_file(file: UploadFile = File(...)): """ Endpoint pour effectuer des prédictions à partir d'un fichier CSV uploadé. Args: file: Fichier CSV contenant les features (une ou plusieurs lignes). Returns: Dictionnaire avec les prédictions pour chaque ligne. """ # Démarrer le timer pour monitoring start_time = time.time() logger.info(f"Fichier reçu pour prédiction: {file.filename}") # Vérifier que le modèle est chargé if model is None: logger.error("Tentative de prédiction sans modèle chargé") raise HTTPException(status_code=500, detail="Modèle non chargé") # Vérifier que l'ordre des colonnes est disponible if not column_order: logger.error("Ordre des colonnes non disponible") raise HTTPException(status_code=500, detail="Ordre des colonnes non chargé") # Vérifier l'extension du fichier pour éviter les formats invalides if not file.filename.endswith('.csv'): logger.warning(f"Format de fichier invalide: {file.filename}") raise HTTPException(status_code=400, detail="Le fichier doit être au format CSV") try: # Lire le contenu du fichier uploadé (en mémoire) contents = await file.read() # Convertir les octets en chaîne et lire le CSV avec séparateur ';' df = pd.read_csv(io.StringIO(contents.decode('utf-8')), sep=';', index_col='SK_ID_CURR') logger.info(f"Fichier CSV lu avec succès: {len(df)} lignes") # Vérifier si colonne manquante missing_cols = set(column_order) - set(df.columns) if missing_cols: # Si des colonnes sont manquantes, on remonte une erreur claire logger.error(f"Colonnes manquantes dans le fichier: {list(missing_cols)}") raise HTTPException( status_code=400, detail=f"Colonnes manquantes: {list(missing_cols)}" ) # Sélectionner uniquement les colonnes nécessaires dans le bon ordre X = df[column_order] # Exécuter les prédictions avec le seuil personnalisé probabilities = model.predict_proba(X) proba_defaults = [p[1] for p in probabilities] # Probabilité de la classe 1 (défaut) predictions = [1 if p >= THRESHOLD else 0 for p in proba_defaults] # Réintégrer l'index SK_ID_CURR avant l'enregistrement pour la détection de drift X = X.copy() X.insert(0, 'SK_ID_CURR', df.index) # Enregistrer les données pour la détection de drift save_and_upload_csv_logs(X, predictions) execution_time = time.time() - start_time logger.info(f"Prédictions effectuées avec succès: {len(predictions)} résultats (temps d'exécution: {execution_time:.4f}s)") # Upload des logs vers HF pour persistance permanente upload_to_hf(LOG_FILE_PATH, "api_log.jsonl") # Retourner un dictionnaire simple contenant les résultats return { "predictions": predictions, "probabilities": [round(p, 4) for p in proba_defaults], "threshold": THRESHOLD, "count": len(predictions), "status": "success" } except HTTPException: # Remonter les HTTPException telles quelles (déjà explicites) raise except Exception as e: # Tout autre erreur -> log et lever une HTTPException générique execution_time = time.time() - start_time logger.error(f"Erreur lors du traitement du fichier: {str(e)} (temps d'exécution: {execution_time:.4f}s)") # Upload des logs vers HF même en cas d'erreur pour garder une trace de l'incident upload_to_hf(LOG_FILE_PATH, "api_log.jsonl") raise HTTPException(status_code=400, detail=str(e)) @app.get("/health") def health_check(): """ Endpoint de vérification de l'état de santé de l'API. """ # Retourner des métriques simples sur l'état de l'API logger.debug("Vérification de santé de l'API") # Upload des logs vers HF même en cas d'erreur pour garder une trace de l'incident upload_to_hf(LOG_FILE_PATH, "api_log.jsonl") return { "status": "ok", "model_loaded": model is not None, "columns_loaded": len(column_order) > 0, "num_features": len(column_order) } @app.get("/columns") def get_columns(): """ Endpoint pour récupérer la liste des colonnes attendues. Returns: Liste des colonnes dans l'ordre attendu par le modèle. """ logger.debug("Liste des colonnes demandée") # Upload des logs vers HF même en cas d'erreur pour garder une trace de l'incident upload_to_hf(LOG_FILE_PATH, "api_log.jsonl") # Renvoyer simplement la liste et le nombre de colonnes return {"columns": column_order, "count": len(column_order)}