OC_P5 / app /main.py
KLEB38's picture
fix: change docker file to include database folder
3078ae5
import sys
import os
from fastapi import FastAPI, HTTPException
import pandas as pd
import joblib
from app.schemas import EmployeeInput
import shap
import logging
from dotenv import load_dotenv
from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm import Session
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from database.create_db import PredictionLog
load_dotenv(dotenv_path=os.path.join(os.path.dirname(__file__), '..', '.env'))
DATABASE_URL = os.getenv("DATABASE_URL")
engine = create_engine(DATABASE_URL)
SessionLocal = sessionmaker(bind=engine)
TESTING = os.getenv("TESTING", "false").lower() == "true"
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
app = FastAPI() # On crée l'outil (le guichet)
# Au démarrage, on charge ton pipeline
model = joblib.load(os.path.join(BASE_DIR, 'pipeline_rh.joblib'))
logger = logging.getLogger(__name__)
known_values = {
"departement": ["Consulting", "Commercial", "Ressources Humaines"],
"statut_marital": ["Marié(e)", "Célibataire", "Divorcé(e)"],
"frequence_deplacement": ["Aucun", "Occasionnel", "Frequent"],
"poste": ['Cadre Commercial', 'Assistant de Direction', 'Consultant',
'Tech Lead', 'Manager', 'Senior Manager',
'Représentant Commercial', 'Directeur Technique',
'Ressources Humaines'],
"domaine_etude": ['Infra & Cloud', 'Autre', 'Transformation Digitale', 'Marketing',
'Entrepreunariat', 'Ressources Humaines'],
}
def inconsistency(df):
if df["departement"] == "Commercial":
if (
df["domaine_etude"]
== "Marketing"
# or df["domaine_etude"] == "Transformation Digitale"
# or df["domaine_etude"] == "Infra & Cloud"
# or df["domaine_etude"] == "Entrepreunariat"
):
return 0
else:
return 1
elif df["departement"] == "Consulting":
if (
df["domaine_etude"] == "Infra & Cloud"
or df["domaine_etude"] == "Transformation Digitale"
# or df["domaine_etude"] == "Entrepreunariat"
):
return 0
else:
return 1
elif df["departement"] == "Ressources Humaines":
if (
df["domaine_etude"] == "Ressources Humaines"
or df["domaine_etude"] == "Entrepreunariat"
):
return 0
else:
return 1
def promotion(df):
if (
df["annes_sous_responsable_actuel"] > 4
and df["annees_depuis_la_derniere_promotion"] > 4
):
return 1
else:
return 0
def developpement(df):
if df["annees_dans_l_entreprise"] == 0:
return 0
elif df["annees_dans_l_entreprise"] >= 2 and df["nb_formations_suivies"] <= 1:
return 1
else:
return 0
def depart(x):
if x == 0:
return "The staff has a LOW probability of resigning"
if x == 1:
return "The staff has a HIGH probability of resigning"
def interpret_shap(rank: int, value: float) -> str:
intensity = {
0: "Primary driver",
1: "Strong factor",
2: "Moderate factor",
3: "Contributing factor",
4: "Notable factor"
}
direction = "increases resignation risk" if value > 0 else "decreases resignation risk"
return f"{intensity[rank]}{direction}"
def run_prediction(df):
for col, known in known_values.items():
val = df[col].values[0]
if val not in known:
logger.warning(f"Unknown value '{val}' for column '{col}' — prediction may be unreliable")
# Encodage binaire non inclus dans le pipeline:
df['genre']= df["genre"].map({"M": 1, "F": 0})
df['heure_supplementaires']= df["heure_supplementaires"].map({"Oui": 1, "Non": 0})
# Changement de type pour augmentation salaire precedente (non inclus dans pipeline)
df["augementation_salaire_precedente"] = df["augementation_salaire_precedente"].apply(lambda x: float(x[:-1]) / 100)
dft = df[[item for item in df.columns if item.startswith("satisfaction")]].copy()
dft.loc[:, "overall_satisfaction"] = dft.mean(
axis=1
)
df["overall_satisfaction"] = dft["overall_satisfaction"].copy()
df["expertise_inconcistency"] = df.apply(inconsistency, axis=1)
df["managarial_stagnation"] = df.apply(promotion, axis=1)
df["developpement_stagnation"] = df.apply(developpement, axis=1)
# 2. On utilise le pipeline pour faire la prédiction
proba = model.predict_proba(df)[0][1]
prediction = 1 if proba >= 0.37 else 0
# 3. SHAP explanation
preprocessor_step = model[:-1]
gb_step = model[-1]
X_preprocessed = preprocessor_step.transform(df)
explainer = shap.TreeExplainer(gb_step)
shap_values_obj = explainer(X_preprocessed)
feature_names = [name.split("__")[-1] for name in preprocessor_step.get_feature_names_out()]
shap_series = pd.Series(shap_values_obj.values[0], index=feature_names)
top_factors = shap_series.abs().nlargest(5)
# 3. On renvoie le résultat au format JSON
return {
"statut_employe": depart(prediction),
"probability_score": round(proba, 2),
"model_threshold": 0.37,
"note": "Decision based on a strategic threshold of 0.37, not 0.50",
"top_5_factors": {
factor: {
"interpretation": interpret_shap(rank, shap_series[factor]),
"feature_value": str(df[factor].values[0]) if factor in df.columns and isinstance(df[factor].values[0], str) else float(df[factor].values[0]) if factor in df.columns else "encoded" }
for rank, factor in enumerate(top_factors.index)
}
}
def log_prediction(df:pd.DataFrame, result: dict,id_employee:int = None ):
if TESTING:
return # on ne logue pas pendant les tests
with Session(engine) as session:
factors = list(result["top_5_factors"].keys())
log = PredictionLog(
id_employee=id_employee,
**{col: df[col].values[0].item() if hasattr(df[col].values[0], 'item') else df[col].values[0]
for col in df.columns
if col in PredictionLog.__table__.columns.keys()
and col != 'id_employee'},
prediction=result["statut_employe"],
probability_score=result["probability_score"].item() if hasattr(result["probability_score"], 'item') else result["probability_score"],
primary_driver=factors[0] if len(factors) > 0 else None,
strong_factor=factors[1] if len(factors) > 1 else None,
moderate_factor=factors[2] if len(factors) > 2 else None,
contributing_factor=factors[3] if len(factors) > 3 else None,
notable_factor=factors[4] if len(factors) > 4 else None,
)
session.add(log)
session.commit()
@app.get("/") # La page d'accueil de ton API
def read_root():
return {"message": "Welcome to the FUTURISYS HR predictor API"}
@app.get("/predict/{id_employee}")
def predict_by_id(id_employee: int):
with engine.connect() as conn:
result = conn.execute(
text("SELECT * FROM employees_full WHERE id_employee = :id"),
{"id": id_employee}
)
row = result.fetchone()
if row is None:
raise HTTPException(status_code=404, detail="Employee ID not found in database")
df = pd.DataFrame([row._mapping])
result = run_prediction(df)
log_prediction(df, result, id_employee)
return result
@app.post("/predict")
def predict(data: EmployeeInput):
df = pd.DataFrame([data.model_dump()])
result= run_prediction(df)
log_prediction(df, result)
return result