File size: 5,477 Bytes
f84949e 01490af f84949e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 |
"""Scripts utilitaires pour créer et remplir la base PostgreSQL."""
from __future__ import annotations
from pathlib import Path
import pandas as pd
from loguru import logger
from sqlalchemy import (
Column,
DateTime,
Float,
Integer,
MetaData,
String,
Table,
Text,
create_engine,
text,
)
import typer
from projet_05 import dataset as ds
from projet_05.settings import Settings, load_settings
app = typer.Typer(help="Initialisation complète de la base PostgreSQL.")
def _build_metadata(settings: Settings) -> MetaData:
"""Définir le schéma SQLAlchemy (tables et colonnes) aligné sur nos CSV."""
metadata = MetaData(schema=settings.db_schema)
Table(
"sirh",
metadata,
Column("id_employee", Integer, primary_key=True),
Column("age", Float),
Column("genre", String(16)),
Column("revenu_mensuel", Float),
Column("statut_marital", String(32)),
Column("departement", String(64)),
Column("poste", String(64)),
Column("nombre_experiences_precedentes", Float),
Column("nombre_heures_travailless", Float),
Column("annee_experience_totale", Float),
Column("annees_dans_l_entreprise", Float),
Column("annees_dans_le_poste_actuel", Float),
)
Table(
"evaluation",
metadata,
Column("id_employee", Integer, primary_key=True),
Column("satisfaction_employee_environnement", Float),
Column("note_evaluation_precedente", Float),
Column("niveau_hierarchique_poste", Float),
Column("satisfaction_employee_nature_travail", Float),
Column("satisfaction_employee_equipe", Float),
Column("satisfaction_employee_equilibre_pro_perso", Float),
Column("eval_number", String(64)),
Column("note_evaluation_actuelle", Float),
Column("heure_supplementaires", String(8)),
Column("augementation_salaire_precedente", String(32)),
)
Table(
"sond",
metadata,
Column("id_employee", Integer, primary_key=True),
Column("a_quitte_l_entreprise", String(8)),
Column("nombre_participation_pee", Float),
Column("nb_formations_suivies", Float),
Column("nombre_employee_sous_responsabilite", Float),
Column("code_sondage", String(64)),
Column("distance_domicile_travail", Float),
Column("niveau_education", Float),
Column("domaine_etude", String(64)),
Column("ayant_enfants", String(8)),
Column("frequence_deplacement", String(32)),
Column("annees_depuis_la_derniere_promotion", Float),
Column("annes_sous_responsable_actuel", Float),
)
Table(
"prediction_logs",
metadata,
Column("log_id", Integer, primary_key=True, autoincrement=True),
Column("created_at", DateTime(timezone=True), server_default=text("CURRENT_TIMESTAMP")),
Column("id_employee", Integer),
Column("source", String(32)),
Column("probability", Float),
Column("decision", Integer),
Column("threshold", Float),
Column("payload", Text),
)
return metadata
def _load_frames(settings: Settings) -> dict[str, pd.DataFrame]:
"""Charger les trois CSV bruts (sirh, évaluation, sondage) déjà nettoyés."""
sirh = ds.clean_text_values(
ds.safe_read_csv(settings.path_sirh).pipe(ds._harmonize_id_column, settings.col_id, digits_only=True)
)
evaluation = ds.clean_text_values(
ds.safe_read_csv(settings.path_eval)
.pipe(ds._rename_column, "eval_number", settings.col_id)
.pipe(ds._harmonize_id_column, settings.col_id, digits_only=True)
)
sond = ds.clean_text_values(
ds.safe_read_csv(settings.path_sondage)
.pipe(ds._rename_column, "code_sondage", settings.col_id)
.pipe(ds._harmonize_id_column, settings.col_id, digits_only=True)
)
return {"sirh": sirh, "evaluation": evaluation, "sond": sond}
@app.command()
def main(
settings_path: Path | None = typer.Option(
None,
"--settings",
"-s",
help="Chemin vers un fichier settings.yml personnalisé.",
)
):
"""Créer les tables PostgreSQL et charger les données d'exemple."""
Path("logs").mkdir(parents=True, exist_ok=True)
settings = load_settings(settings_path) if settings_path else load_settings()
if not settings.db_url:
raise typer.BadParameter(
"Aucune URL de base de données fournie. Configurez `database.url` dans settings.yml."
)
engine = create_engine(settings.db_url, future=True)
metadata = _build_metadata(settings)
with engine.begin() as conn:
if settings.db_schema:
conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {settings.db_schema}"))
metadata.drop_all(conn, checkfirst=True)
metadata.create_all(conn, checkfirst=True)
frames = _load_frames(settings)
with engine.begin() as conn:
for table_name, frame in frames.items():
logger.info("Insertion de {} lignes dans la table {}", len(frame), table_name)
frame.to_sql(
table_name,
conn,
schema=settings.db_schema,
index=False,
if_exists="append",
method="multi",
)
logger.success("Initialisation PostgreSQL terminée.")
if __name__ == "__main__":
app()
|