| | from __future__ import annotations |
| |
|
| | import json |
| | from pathlib import Path |
| | from typing import Any |
| |
|
| | import gradio as gr |
| | import pandas as pd |
| | from loguru import logger |
| |
|
| | from projet_05.branding import apply_brand_theme |
| | from projet_05.modeling.predict import load_metadata, load_pipeline, run_inference |
| |
|
| | MODEL_PATH = Path("models/best_model.joblib") |
| | METADATA_PATH = Path("models/best_model_meta.json") |
| | SCHEMA_PATH = Path("data/processed/schema.json") |
| |
|
| |
|
| | def _load_schema(path: Path) -> dict[str, Any]: |
| | if not path.exists(): |
| | return {} |
| | return json.loads(path.read_text(encoding="utf-8")) |
| |
|
| |
|
| | def _infer_features(metadata: dict, schema: dict, pipeline) -> list[str]: |
| | if schema: |
| | candidates = schema.get("numerical_features", []) + schema.get("categorical_features", []) |
| | if candidates: |
| | return candidates |
| | features = metadata.get("features", {}) |
| | explicit = (features.get("numerical") or []) + (features.get("categorical") or []) |
| | if explicit: |
| | return explicit |
| | if pipeline is not None and hasattr(pipeline, "feature_names_in_"): |
| | return list(pipeline.feature_names_in_) |
| | return [] |
| |
|
| |
|
| | def _convert_input(payload: Any, headers: list[str]) -> pd.DataFrame: |
| | if isinstance(payload, pd.DataFrame): |
| | df = payload.copy() |
| | elif payload is None: |
| | df = pd.DataFrame(columns=headers) |
| | else: |
| | df = pd.DataFrame(payload, columns=headers if headers else None) |
| | df = df.dropna(how="all") |
| | if df.empty: |
| | raise gr.Error("Merci de saisir au moins une ligne complète.") |
| | return df |
| |
|
| |
|
| | def _ensure_model(): |
| | if PIPELINE is None: |
| | raise gr.Error( |
| | "Aucun modèle entrainé n'a été trouvé. Lancez `python projet_05/modeling/train.py` puis relancez l'application." |
| | ) |
| |
|
| |
|
| | def score_table(table): |
| | _ensure_model() |
| | df = _convert_input(table, FEATURE_ORDER) |
| | drop_cols = [TARGET_COLUMN] if TARGET_COLUMN else None |
| | return run_inference( |
| | df, |
| | PIPELINE, |
| | THRESHOLD, |
| | drop_columns=drop_cols, |
| | required_features=FEATURE_ORDER or None, |
| | ) |
| |
|
| |
|
| | def score_csv(upload): |
| | _ensure_model() |
| | if upload is None: |
| | raise gr.Error("Veuillez déposer un fichier CSV.") |
| | df = pd.read_csv(upload.name) |
| | drop_cols = [TARGET_COLUMN] if TARGET_COLUMN else None |
| | return run_inference( |
| | df, |
| | PIPELINE, |
| | THRESHOLD, |
| | drop_columns=drop_cols, |
| | required_features=FEATURE_ORDER or None, |
| | ) |
| |
|
| |
|
| | def predict_from_form(*values): |
| | _ensure_model() |
| | if not FEATURE_ORDER: |
| | raise gr.Error("Impossible de générer le formulaire sans configuration des features.") |
| | payload = {feature: value for feature, value in zip(FEATURE_ORDER, values)} |
| | df = pd.DataFrame([payload]) |
| | scored = run_inference( |
| | df, |
| | PIPELINE, |
| | THRESHOLD, |
| | required_features=FEATURE_ORDER or None, |
| | ) |
| | row = scored.iloc[0] |
| | label = "Risque de départ" if int(row["prediction"]) == 1 else "Reste probable" |
| | return { |
| | "probability": round(float(row["proba_depart"]), 4), |
| | "decision": label, |
| | "threshold": THRESHOLD, |
| | } |
| |
|
| |
|
| | |
| | apply_brand_theme() |
| |
|
| | PIPELINE = None |
| | METADATA: dict[str, Any] = {} |
| | THRESHOLD = 0.5 |
| | TARGET_COLUMN: str | None = None |
| | SCHEMA = _load_schema(SCHEMA_PATH) |
| |
|
| | try: |
| | PIPELINE = load_pipeline(MODEL_PATH) |
| | METADATA = load_metadata(METADATA_PATH) |
| | THRESHOLD = float(METADATA.get("best_threshold", THRESHOLD)) |
| | TARGET_COLUMN = METADATA.get("target") |
| | except FileNotFoundError as exc: |
| | logger.warning("Artéfact manquant: {}", exc) |
| |
|
| | FEATURE_ORDER = _infer_features(METADATA, SCHEMA, PIPELINE) |
| |
|
| | with gr.Blocks(title="Prédicteur d'attrition") as demo: |
| | gr.Markdown("# API Gradio – Prédiction de départ employé") |
| | gr.Markdown( |
| | "Le modèle applique le pipeline entraîné hors-notebook pour fournir une probabilité de départ ainsi qu'une décision binaire." |
| | ) |
| |
|
| | if PIPELINE is None: |
| | gr.Markdown( |
| | "⚠️ **Aucun modèle disponible.** Lancez les scripts `dataset.py`, `features.py` puis `modeling/train.py`." |
| | ) |
| | else: |
| | gr.Markdown(f"Seuil de décision actuel : **{THRESHOLD:.2f}**") |
| |
|
| | with gr.Tab("Formulaire unitaire"): |
| | if not FEATURE_ORDER: |
| | gr.Markdown("Aucune configuration de features détectée. Utilisez l'onglet CSV pour scorer vos données.") |
| | else: |
| | form_inputs: list[gr.components.Component] = [] |
| | for feature in FEATURE_ORDER: |
| | form_inputs.append( |
| | gr.Textbox(label=feature, placeholder=f"Saisir {feature.replace('_', ' ')}") |
| | ) |
| | form_output = gr.JSON(label="Résultat") |
| | gr.Button("Prédire").click( |
| | fn=predict_from_form, |
| | inputs=form_inputs, |
| | outputs=form_output, |
| | ) |
| |
|
| | with gr.Tab("Tableau interactif"): |
| | table_input = gr.Dataframe( |
| | headers=FEATURE_ORDER if FEATURE_ORDER else None, |
| | row_count=(1, "dynamic"), |
| | col_count=(len(FEATURE_ORDER), "dynamic") if FEATURE_ORDER else (5, "dynamic"), |
| | type="pandas", |
| | ) |
| | table_output = gr.Dataframe(label="Prédictions", type="pandas") |
| | gr.Button("Scorer les lignes").click( |
| | fn=score_table, |
| | inputs=table_input, |
| | outputs=table_output, |
| | ) |
| |
|
| | with gr.Tab("Fichier CSV"): |
| | file_input = gr.File(file_types=[".csv"], label="Déposez votre fichier CSV") |
| | file_output = gr.Dataframe(label="Résultats CSV", type="pandas") |
| | gr.Button("Scorer le fichier").click( |
| | fn=score_csv, |
| | inputs=file_input, |
| | outputs=file_output, |
| | ) |
| |
|
| |
|
| | if __name__ == "__main__": |
| | demo.launch() |
| |
|