SignalMod / src /pipeline /run_pipeline.py
Mirae Kang
fix: modify evaluation pipeline, #7
975d796
"""
src/pipeline/run_pipeline.py
Pipeline end-to-end de entrenamiento y evaluaciΓ³n.
Ejecutar con: python -m src.pipeline.run_pipeline [--model lr|rf|xgboost]
Fases:
1. Carga de datos
2. Split train/test
3. Preprocesamiento (spaCy + NLTK)
4. Entrenamiento (modelo elegido desde config o argumento)
5. Cross-validation
6. EvaluaciΓ³n final en test
7. Guardado del modelo
8. Registro en MLflow
9. Informe JSON + CSV resumen
Todo se controla desde:
configs/pipeline.yaml β†’ rutas, split, cv_folds
configs/features.yaml β†’ preprocesamiento y vectorizaciΓ³n
configs/models.yaml β†’ hiperparΓ‘metros
configs/best_params.yaml β†’ resultado de Optuna (si existe)
"""
import argparse
import sys
import yaml
import mlflow
import mlflow.sklearn
from pathlib import Path
from datetime import datetime
from sklearn.model_selection import train_test_split
# ── Setup path ────────────────────────────────────────────────────────────────
PROJECT_ROOT = Path(__file__).resolve().parents[2]
sys.path.insert(0, str(PROJECT_ROOT))
from src.data.loader import load_raw_data
from src.features.text_preprocessor import TextPreprocessor
from src.models.baseline import build_model
from src.evaluation.evaluator import Evaluator
from src.utils.logger import get_logger
logger = get_logger(__name__)
# ══════════════════════════════════════════════════════════════════════════════
# PIPELINE
# ══════════════════════════════════════════════════════════════════════════════
def run_pipeline(model_type: str = "lr") -> dict:
"""
Ejecuta el pipeline completo de ML.
Args:
model_type: "lr" | "rf" | "xgboost"
Returns:
Dict con las mΓ©tricas del modelo entrenado.
"""
run_id = datetime.now().strftime("%Y%m%d_%H%M%S")
logger.info("=" * 60)
logger.info(f"πŸš€ PIPELINE β€” model={model_type} | run={run_id}")
logger.info("=" * 60)
# ── Cargar configuraciΓ³n ──────────────────────────────────────────────────
cfg_pipe = yaml.safe_load(open(PROJECT_ROOT / "configs" / "pipeline.yaml"))
cfg_feat = yaml.safe_load(open(PROJECT_ROOT / "configs" / "features.yaml"))
TARGET = cfg_pipe["data"]["target_binary"]
RAND = cfg_pipe["pipeline"]["random_state"]
TEST_SIZE = cfg_pipe["pipeline"]["test_size"]
CV_FOLDS = cfg_pipe["pipeline"]["cv_folds"]
RAW_PATH = PROJECT_ROOT / cfg_pipe["data"]["raw_path"]
MODELS_DIR = PROJECT_ROOT / "models"
#MODELS_DIR.mkdir(exist_ok=True)
# Carpeta segura para experimentos
EXPERIMENTS_DIR = MODELS_DIR / "experiments" / model_type
EXPERIMENTS_DIR.mkdir(parents=True, exist_ok=True)
# ── FASE 1: Carga de datos ────────────────────────────────────────────────
logger.info("FASE 1 β€” Carga de datos")
df = load_raw_data(RAW_PATH)
logger.info(f" {len(df)} comentarios cargados")
# ── FASE 2: Split ─────────────────────────────────────────────────────────
logger.info("FASE 2 β€” Split train/test")
X = df["Text"]
y = df[TARGET]
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=TEST_SIZE, random_state=RAND, stratify=y
)
logger.info(f" Train: {len(X_train)} | Test: {len(X_test)}")
# ── FASE 3: Preprocesamiento ──────────────────────────────────────────────
logger.info("FASE 3 β€” Preprocesamiento NLP")
preprocessor = TextPreprocessor(
config_path=str(PROJECT_ROOT / "configs" / "features.yaml")
)
X_train_clean = preprocessor.transform(X_train)
X_test_clean = preprocessor.transform(X_test)
# Reemplazar vacΓ­os con texto original (evitar pΓ©rdida de muestras)
X_train_clean = X_train_clean.where(X_train_clean != "", X_train)
X_test_clean = X_test_clean.where(X_test_clean != "", X_test)
logger.info(f" Preprocesamiento completado")
# ── FASE 4: Entrenamiento ─────────────────────────────────────────────────
logger.info(f"FASE 4 β€” Entrenamiento ({model_type.upper()})")
model = build_model(
model_type,
config_path = str(PROJECT_ROOT / "configs" / "models.yaml"),
feat_config_path = str(PROJECT_ROOT / "configs" / "features.yaml"),
best_params_path = str(PROJECT_ROOT / "configs" / "best_params.yaml"),
)
model.fit(X_train_clean, y_train)
# ── FASE 5: Cross-validation ──────────────────────────────────────────────
logger.info(f"FASE 5 β€” Cross-validation ({CV_FOLDS} folds)")
cv_results = model.cross_validate(X_train_clean, y_train, cv_folds=CV_FOLDS, rand=RAND)
# ── FASE 6: EvaluaciΓ³n en test ────────────────────────────────────────────
logger.info("FASE 6 β€” EvaluaciΓ³n en test")
report_dir = PROJECT_ROOT / "reports" / "pipeline" / model_type
evaluator = Evaluator(output_dir=report_dir)
metrics = evaluator.evaluate_and_report(
model,
X_test_clean,
y_test,
model_name=model_type.upper(),
X_train=X_train_clean,
y_train=y_train,
cv_results=cv_results,
summary_path=PROJECT_ROOT / "reports" / "summary.csv",
)
# ── FASE 7: Guardado del modelo ───────────────────────────────────────────
logger.info("FASE 7 β€” Guardado del modelo")
model_path = EXPERIMENTS_DIR / f"{model_type}_pipeline_{run_id}.joblib"
model.save(model_path)
"""
# Actualizar final_model.joblib si es el modelo por defecto del proyecto
final_path = MODELS_DIR / "pipeline" / "final_model.joblib"
model.save(final_path)
logger.info(f" Modelo de producciΓ³n actualizado: {final_path}")
"""
# ── FASE 8: MLflow ────────────────────────────────────────────────────────
logger.info("FASE 8 β€” Registro en MLflow")
_log_mlflow(metrics, cv_results, model, model_path, run_id, model_type)
# ── FASE 9: Informe ───────────────────────────────────────────────────────
logger.info("FASE 9 β€” Generando informes")
metrics["run_id"] = run_id
metrics["model_path"]= str(model_path)
metrics["model_type"] = model_type
metrics["run_id"] = run_id
metrics["model_family"] = "sklearn_baseline"
evaluator.save_report(metrics, f"exp_{run_id}_{model_type}")
logger.info("=" * 60)
logger.info(f"βœ… Pipeline completado β€” F1={metrics['f1_weighted']:.4f}")
logger.info("=" * 60)
return metrics
# ── MLflow logging ────────────────────────────────────────────────────────────
def _log_mlflow(metrics, cv_results, model, model_path, run_id, model_type):
"""Registra el experimento en MLflow."""
try:
mlflow_dir = PROJECT_ROOT / "mlruns"
mlflow.set_tracking_uri(f"file://{mlflow_dir}")
mlflow.set_experiment("Youtube_project_experiment_pipeline")
with mlflow.start_run(run_name=f"{model_type}_{run_id}"):
# ParΓ‘metros
mlflow.log_param("model_type", model_type)
mlflow.log_param("run_id", run_id)
# MΓ©tricas del pipeline
mlflow.log_metric("test_f1", metrics["f1_weighted"])
mlflow.log_metric("test_roc_auc", metrics["roc_auc"])
mlflow.log_metric("test_fp", metrics["fp"])
mlflow.log_metric("test_fn", metrics["fn"])
mlflow.log_metric("cv_f1_mean", cv_results["cv_f1_mean"])
mlflow.log_metric("cv_f1_std", cv_results["cv_f1_std"])
mlflow.log_metric("train_test_gap_pp", metrics.get("train_test_gap_pp", 0))
if "cv_test_gap_pp" in metrics:
mlflow.log_metric("cv_test_gap_pp", metrics["cv_test_gap_pp"])
# Modelo como artefacto
mlflow.sklearn.log_model(model.pipeline, f"{model_type}_pipeline")
logger.info(f" MLflow run registrado: {model_type}_{run_id}")
except Exception as e:
logger.warning(f"MLflow no disponible: {e}")
# ══════════════════════════════════════════════════════════════════════════════
# ENTRY POINT
# ══════════════════════════════════════════════════════════════════════════════
def _parse_args():
parser = argparse.ArgumentParser(
description="Pipeline de entrenamiento β€” YouTube Hate Speech Detection"
)
parser.add_argument(
"--model",
type=str,
default="lr",
choices=["lr", "rf", "xgboost"],
help="Tipo de modelo a entrenar (default: lr)",
)
return parser.parse_args()
def main():
args = _parse_args()
metrics = run_pipeline(model_type=args.model)
return metrics
if __name__ == "__main__":
main()