from __future__ import annotations import subprocess import sys from datetime import datetime from pathlib import Path from loguru import logger from projet_05.settings import load_settings PIPELINE_STEPS = [ ("Initialisation base PostgreSQL", "scripts.init_db"), ("Préparation des données brutes", "projet_05.dataset"), ("Feature engineering", "projet_05.features"), ("Entraînement du modèle", "projet_05.modeling.train"), ] def run_step(label: str, module_path: str) -> None: """Execute one stage of the training pipeline. Args: label: Human‑readable step name for logging. module_path: Dotted path to the module to execute (used with `python -m`). Raises: RuntimeError: If the subprocess exits with a non‑zero status. """ logger.info("➡️ Étape '{}' en cours...", label) completed = subprocess.run( [sys.executable, "-m", module_path], capture_output=True, text=True, ) if completed.returncode != 0: logger.error("Échec pour '{}'.", label) if completed.stdout: logger.error("STDOUT:\n{}", completed.stdout) if completed.stderr: logger.error("STDERR:\n{}", completed.stderr) raise RuntimeError(f"L'étape '{label}' a échoué (code {completed.returncode}).") if completed.stdout: logger.debug(completed.stdout.strip()) logger.success("Étape '{}' terminée.", label) def main() -> None: """Run all pipeline stages sequentially.""" log_root = Path("logs") pipeline_log_dir = log_root / "pipeline_logs" pipeline_log_dir.mkdir(parents=True, exist_ok=True) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") log_file = pipeline_log_dir / f"{timestamp}.log" logger.add(log_file, level="INFO", enqueue=True) logger.info("Début d'exécution du pipeline (log: {})", log_file) if not _raw_data_available(): logger.warning("Données brutes introuvables. Pipeline ignoré.") return for label, module in PIPELINE_STEPS: run_step(label, module) logger.success("Pipeline exécuté avec succès. Logs disponibles dans {}", log_file) def _raw_data_available() -> bool: """Verify that the required raw CSV files exist on disk.""" settings = load_settings() required = [ Path(settings.path_sirh), Path(settings.path_eval), Path(settings.path_sondage), ] missing = [path for path in required if not path.exists()] if missing: logger.warning("Fichiers absents: {}", ", ".join(map(str, missing))) return False return True if __name__ == "__main__": try: main() except Exception as exc: # pragma: no cover - orchestration script logger.error("Pipeline interrompu : {}", exc) sys.exit(1)