"""Scripts utilitaires pour créer et remplir la base PostgreSQL.""" from __future__ import annotations from pathlib import Path import pandas as pd from loguru import logger from sqlalchemy import ( Column, DateTime, Float, Integer, MetaData, String, Table, Text, create_engine, text, ) import typer from projet_05 import dataset as ds from projet_05.settings import Settings, load_settings app = typer.Typer(help="Initialisation complète de la base PostgreSQL.") def _build_metadata(settings: Settings) -> MetaData: """Définir le schéma SQLAlchemy (tables et colonnes) aligné sur nos CSV.""" metadata = MetaData(schema=settings.db_schema) Table( "sirh", metadata, Column("id_employee", Integer, primary_key=True), Column("age", Float), Column("genre", String(16)), Column("revenu_mensuel", Float), Column("statut_marital", String(32)), Column("departement", String(64)), Column("poste", String(64)), Column("nombre_experiences_precedentes", Float), Column("nombre_heures_travailless", Float), Column("annee_experience_totale", Float), Column("annees_dans_l_entreprise", Float), Column("annees_dans_le_poste_actuel", Float), ) Table( "evaluation", metadata, Column("id_employee", Integer, primary_key=True), Column("satisfaction_employee_environnement", Float), Column("note_evaluation_precedente", Float), Column("niveau_hierarchique_poste", Float), Column("satisfaction_employee_nature_travail", Float), Column("satisfaction_employee_equipe", Float), Column("satisfaction_employee_equilibre_pro_perso", Float), Column("eval_number", String(64)), Column("note_evaluation_actuelle", Float), Column("heure_supplementaires", String(8)), Column("augementation_salaire_precedente", String(32)), ) Table( "sond", metadata, Column("id_employee", Integer, primary_key=True), Column("a_quitte_l_entreprise", String(8)), Column("nombre_participation_pee", Float), Column("nb_formations_suivies", Float), Column("nombre_employee_sous_responsabilite", Float), Column("code_sondage", String(64)), Column("distance_domicile_travail", Float), Column("niveau_education", Float), Column("domaine_etude", String(64)), Column("ayant_enfants", String(8)), Column("frequence_deplacement", String(32)), Column("annees_depuis_la_derniere_promotion", Float), Column("annes_sous_responsable_actuel", Float), ) Table( "prediction_logs", metadata, Column("log_id", Integer, primary_key=True, autoincrement=True), Column("created_at", DateTime(timezone=True), server_default=text("CURRENT_TIMESTAMP")), Column("id_employee", Integer), Column("source", String(32)), Column("probability", Float), Column("decision", Integer), Column("threshold", Float), Column("payload", Text), ) return metadata def _load_frames(settings: Settings) -> dict[str, pd.DataFrame]: """Charger les trois CSV bruts (sirh, évaluation, sondage) déjà nettoyés.""" sirh = ds.clean_text_values( ds.safe_read_csv(settings.path_sirh).pipe(ds._harmonize_id_column, settings.col_id, digits_only=True) ) evaluation = ds.clean_text_values( ds.safe_read_csv(settings.path_eval) .pipe(ds._rename_column, "eval_number", settings.col_id) .pipe(ds._harmonize_id_column, settings.col_id, digits_only=True) ) sond = ds.clean_text_values( ds.safe_read_csv(settings.path_sondage) .pipe(ds._rename_column, "code_sondage", settings.col_id) .pipe(ds._harmonize_id_column, settings.col_id, digits_only=True) ) return {"sirh": sirh, "evaluation": evaluation, "sond": sond} @app.command() def main( settings_path: Path | None = typer.Option( None, "--settings", "-s", help="Chemin vers un fichier settings.yml personnalisé.", ) ): """Créer les tables PostgreSQL et charger les données d'exemple.""" Path("logs").mkdir(parents=True, exist_ok=True) settings = load_settings(settings_path) if settings_path else load_settings() if not settings.db_url: raise typer.BadParameter( "Aucune URL de base de données fournie. Configurez `database.url` dans settings.yml." ) engine = create_engine(settings.db_url, future=True) metadata = _build_metadata(settings) with engine.begin() as conn: if settings.db_schema: conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {settings.db_schema}")) metadata.drop_all(conn, checkfirst=True) metadata.create_all(conn, checkfirst=True) frames = _load_frames(settings) with engine.begin() as conn: for table_name, frame in frames.items(): logger.info("Insertion de {} lignes dans la table {}", len(frame), table_name) frame.to_sql( table_name, conn, schema=settings.db_schema, index=False, if_exists="append", method="multi", ) logger.success("Initialisation PostgreSQL terminée.") if __name__ == "__main__": app()