import sys import os from fastapi import FastAPI, HTTPException import pandas as pd import joblib from app.schemas import EmployeeInput import shap import logging from dotenv import load_dotenv from sqlalchemy import create_engine, text from sqlalchemy.orm import sessionmaker from sqlalchemy.orm import Session sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from database.create_db import PredictionLog load_dotenv(dotenv_path=os.path.join(os.path.dirname(__file__), '..', '.env')) DATABASE_URL = os.getenv("DATABASE_URL") engine = create_engine(DATABASE_URL) SessionLocal = sessionmaker(bind=engine) TESTING = os.getenv("TESTING", "false").lower() == "true" BASE_DIR = os.path.dirname(os.path.abspath(__file__)) app = FastAPI() # On crée l'outil (le guichet) # Au démarrage, on charge ton pipeline model = joblib.load(os.path.join(BASE_DIR, 'pipeline_rh.joblib')) logger = logging.getLogger(__name__) known_values = { "departement": ["Consulting", "Commercial", "Ressources Humaines"], "statut_marital": ["Marié(e)", "Célibataire", "Divorcé(e)"], "frequence_deplacement": ["Aucun", "Occasionnel", "Frequent"], "poste": ['Cadre Commercial', 'Assistant de Direction', 'Consultant', 'Tech Lead', 'Manager', 'Senior Manager', 'Représentant Commercial', 'Directeur Technique', 'Ressources Humaines'], "domaine_etude": ['Infra & Cloud', 'Autre', 'Transformation Digitale', 'Marketing', 'Entrepreunariat', 'Ressources Humaines'], } def inconsistency(df): if df["departement"] == "Commercial": if ( df["domaine_etude"] == "Marketing" # or df["domaine_etude"] == "Transformation Digitale" # or df["domaine_etude"] == "Infra & Cloud" # or df["domaine_etude"] == "Entrepreunariat" ): return 0 else: return 1 elif df["departement"] == "Consulting": if ( df["domaine_etude"] == "Infra & Cloud" or df["domaine_etude"] == "Transformation Digitale" # or df["domaine_etude"] == "Entrepreunariat" ): return 0 else: return 1 elif df["departement"] == "Ressources Humaines": if ( df["domaine_etude"] == "Ressources Humaines" or df["domaine_etude"] == "Entrepreunariat" ): return 0 else: return 1 def promotion(df): if ( df["annes_sous_responsable_actuel"] > 4 and df["annees_depuis_la_derniere_promotion"] > 4 ): return 1 else: return 0 def developpement(df): if df["annees_dans_l_entreprise"] == 0: return 0 elif df["annees_dans_l_entreprise"] >= 2 and df["nb_formations_suivies"] <= 1: return 1 else: return 0 def depart(x): if x == 0: return "The staff has a LOW probability of resigning" if x == 1: return "The staff has a HIGH probability of resigning" def interpret_shap(rank: int, value: float) -> str: intensity = { 0: "Primary driver", 1: "Strong factor", 2: "Moderate factor", 3: "Contributing factor", 4: "Notable factor" } direction = "increases resignation risk" if value > 0 else "decreases resignation risk" return f"{intensity[rank]} — {direction}" def run_prediction(df): for col, known in known_values.items(): val = df[col].values[0] if val not in known: logger.warning(f"Unknown value '{val}' for column '{col}' — prediction may be unreliable") # Encodage binaire non inclus dans le pipeline: df['genre']= df["genre"].map({"M": 1, "F": 0}) df['heure_supplementaires']= df["heure_supplementaires"].map({"Oui": 1, "Non": 0}) # Changement de type pour augmentation salaire precedente (non inclus dans pipeline) df["augementation_salaire_precedente"] = df["augementation_salaire_precedente"].apply(lambda x: float(x[:-1]) / 100) dft = df[[item for item in df.columns if item.startswith("satisfaction")]].copy() dft.loc[:, "overall_satisfaction"] = dft.mean( axis=1 ) df["overall_satisfaction"] = dft["overall_satisfaction"].copy() df["expertise_inconcistency"] = df.apply(inconsistency, axis=1) df["managarial_stagnation"] = df.apply(promotion, axis=1) df["developpement_stagnation"] = df.apply(developpement, axis=1) # 2. On utilise le pipeline pour faire la prédiction proba = model.predict_proba(df)[0][1] prediction = 1 if proba >= 0.37 else 0 # 3. SHAP explanation preprocessor_step = model[:-1] gb_step = model[-1] X_preprocessed = preprocessor_step.transform(df) explainer = shap.TreeExplainer(gb_step) shap_values_obj = explainer(X_preprocessed) feature_names = [name.split("__")[-1] for name in preprocessor_step.get_feature_names_out()] shap_series = pd.Series(shap_values_obj.values[0], index=feature_names) top_factors = shap_series.abs().nlargest(5) # 3. On renvoie le résultat au format JSON return { "statut_employe": depart(prediction), "probability_score": round(proba, 2), "model_threshold": 0.37, "note": "Decision based on a strategic threshold of 0.37, not 0.50", "top_5_factors": { factor: { "interpretation": interpret_shap(rank, shap_series[factor]), "feature_value": str(df[factor].values[0]) if factor in df.columns and isinstance(df[factor].values[0], str) else float(df[factor].values[0]) if factor in df.columns else "encoded" } for rank, factor in enumerate(top_factors.index) } } def log_prediction(df:pd.DataFrame, result: dict,id_employee:int = None ): if TESTING: return # on ne logue pas pendant les tests with Session(engine) as session: factors = list(result["top_5_factors"].keys()) log = PredictionLog( id_employee=id_employee, **{col: df[col].values[0].item() if hasattr(df[col].values[0], 'item') else df[col].values[0] for col in df.columns if col in PredictionLog.__table__.columns.keys() and col != 'id_employee'}, prediction=result["statut_employe"], probability_score=result["probability_score"].item() if hasattr(result["probability_score"], 'item') else result["probability_score"], primary_driver=factors[0] if len(factors) > 0 else None, strong_factor=factors[1] if len(factors) > 1 else None, moderate_factor=factors[2] if len(factors) > 2 else None, contributing_factor=factors[3] if len(factors) > 3 else None, notable_factor=factors[4] if len(factors) > 4 else None, ) session.add(log) session.commit() @app.get("/") # La page d'accueil de ton API def read_root(): return {"message": "Welcome to the FUTURISYS HR predictor API"} @app.get("/predict/{id_employee}") def predict_by_id(id_employee: int): with engine.connect() as conn: result = conn.execute( text("SELECT * FROM employees_full WHERE id_employee = :id"), {"id": id_employee} ) row = result.fetchone() if row is None: raise HTTPException(status_code=404, detail="Employee ID not found in database") df = pd.DataFrame([row._mapping]) result = run_prediction(df) log_prediction(df, result, id_employee) return result @app.post("/predict") def predict(data: EmployeeInput): df = pd.DataFrame([data.model_dump()]) result= run_prediction(df) log_prediction(df, result) return result