| """ |
| src/pipeline/run_pipeline.py |
| |
| Pipeline end-to-end de entrenamiento y evaluaciΓ³n. |
| Ejecutar con: python -m src.pipeline.run_pipeline [--model lr|rf|xgboost] |
| |
| Fases: |
| 1. Carga de datos |
| 2. Split train/test |
| 3. Preprocesamiento (spaCy + NLTK) |
| 4. Entrenamiento (modelo elegido desde config o argumento) |
| 5. Cross-validation |
| 6. EvaluaciΓ³n final en test |
| 7. Guardado del modelo |
| 8. Registro en MLflow |
| 9. Informe JSON + CSV resumen |
| |
| Todo se controla desde: |
| configs/pipeline.yaml β rutas, split, cv_folds |
| configs/features.yaml β preprocesamiento y vectorizaciΓ³n |
| configs/models.yaml β hiperparΓ‘metros |
| configs/best_params.yaml β resultado de Optuna (si existe) |
| """ |
|
|
| import argparse |
| import sys |
| import yaml |
| import mlflow |
| import mlflow.sklearn |
| from pathlib import Path |
| from datetime import datetime |
| from sklearn.model_selection import train_test_split |
|
|
| |
| PROJECT_ROOT = Path(__file__).resolve().parents[2] |
| sys.path.insert(0, str(PROJECT_ROOT)) |
|
|
| from src.data.loader import load_raw_data |
| from src.features.text_preprocessor import TextPreprocessor |
| from src.models.baseline import build_model |
| from src.evaluation.evaluator import Evaluator |
| from src.utils.logger import get_logger |
|
|
| logger = get_logger(__name__) |
|
|
|
|
| |
| |
| |
| def run_pipeline(model_type: str = "lr") -> dict: |
| """ |
| Ejecuta el pipeline completo de ML. |
| |
| Args: |
| model_type: "lr" | "rf" | "xgboost" |
| |
| Returns: |
| Dict con las mΓ©tricas del modelo entrenado. |
| """ |
| run_id = datetime.now().strftime("%Y%m%d_%H%M%S") |
| logger.info("=" * 60) |
| logger.info(f"π PIPELINE β model={model_type} | run={run_id}") |
| logger.info("=" * 60) |
|
|
| |
| cfg_pipe = yaml.safe_load(open(PROJECT_ROOT / "configs" / "pipeline.yaml")) |
| cfg_feat = yaml.safe_load(open(PROJECT_ROOT / "configs" / "features.yaml")) |
|
|
| TARGET = cfg_pipe["data"]["target_binary"] |
| RAND = cfg_pipe["pipeline"]["random_state"] |
| TEST_SIZE = cfg_pipe["pipeline"]["test_size"] |
| CV_FOLDS = cfg_pipe["pipeline"]["cv_folds"] |
| RAW_PATH = PROJECT_ROOT / cfg_pipe["data"]["raw_path"] |
| MODELS_DIR = PROJECT_ROOT / "models" |
| |
| |
| EXPERIMENTS_DIR = MODELS_DIR / "experiments" / model_type |
| EXPERIMENTS_DIR.mkdir(parents=True, exist_ok=True) |
|
|
| |
| logger.info("FASE 1 β Carga de datos") |
| df = load_raw_data(RAW_PATH) |
| logger.info(f" {len(df)} comentarios cargados") |
|
|
| |
| logger.info("FASE 2 β Split train/test") |
| X = df["Text"] |
| y = df[TARGET] |
|
|
| X_train, X_test, y_train, y_test = train_test_split( |
| X, y, test_size=TEST_SIZE, random_state=RAND, stratify=y |
| ) |
| logger.info(f" Train: {len(X_train)} | Test: {len(X_test)}") |
|
|
| |
| logger.info("FASE 3 β Preprocesamiento NLP") |
| preprocessor = TextPreprocessor( |
| config_path=str(PROJECT_ROOT / "configs" / "features.yaml") |
| ) |
| X_train_clean = preprocessor.transform(X_train) |
| X_test_clean = preprocessor.transform(X_test) |
|
|
| |
| X_train_clean = X_train_clean.where(X_train_clean != "", X_train) |
| X_test_clean = X_test_clean.where(X_test_clean != "", X_test) |
|
|
| logger.info(f" Preprocesamiento completado") |
|
|
| |
| logger.info(f"FASE 4 β Entrenamiento ({model_type.upper()})") |
| model = build_model( |
| model_type, |
| config_path = str(PROJECT_ROOT / "configs" / "models.yaml"), |
| feat_config_path = str(PROJECT_ROOT / "configs" / "features.yaml"), |
| best_params_path = str(PROJECT_ROOT / "configs" / "best_params.yaml"), |
| ) |
| model.fit(X_train_clean, y_train) |
|
|
| |
| logger.info(f"FASE 5 β Cross-validation ({CV_FOLDS} folds)") |
| cv_results = model.cross_validate(X_train_clean, y_train, cv_folds=CV_FOLDS, rand=RAND) |
|
|
| |
| logger.info("FASE 6 β EvaluaciΓ³n en test") |
| report_dir = PROJECT_ROOT / "reports" / "pipeline" / model_type |
| evaluator = Evaluator(output_dir=report_dir) |
|
|
| metrics = evaluator.evaluate_and_report( |
| model, |
| X_test_clean, |
| y_test, |
| model_name=model_type.upper(), |
| X_train=X_train_clean, |
| y_train=y_train, |
| cv_results=cv_results, |
| summary_path=PROJECT_ROOT / "reports" / "summary.csv", |
| ) |
|
|
| |
| logger.info("FASE 7 β Guardado del modelo") |
| model_path = EXPERIMENTS_DIR / f"{model_type}_pipeline_{run_id}.joblib" |
| model.save(model_path) |
|
|
| """ |
| # Actualizar final_model.joblib si es el modelo por defecto del proyecto |
| final_path = MODELS_DIR / "pipeline" / "final_model.joblib" |
| model.save(final_path) |
| logger.info(f" Modelo de producciΓ³n actualizado: {final_path}") |
| """ |
|
|
| |
| logger.info("FASE 8 β Registro en MLflow") |
| _log_mlflow(metrics, cv_results, model, model_path, run_id, model_type) |
|
|
| |
| logger.info("FASE 9 β Generando informes") |
| metrics["run_id"] = run_id |
| metrics["model_path"]= str(model_path) |
| metrics["model_type"] = model_type |
| metrics["run_id"] = run_id |
| metrics["model_family"] = "sklearn_baseline" |
| evaluator.save_report(metrics, f"exp_{run_id}_{model_type}") |
|
|
| logger.info("=" * 60) |
| logger.info(f"β
Pipeline completado β F1={metrics['f1_weighted']:.4f}") |
| logger.info("=" * 60) |
|
|
| return metrics |
|
|
|
|
| |
| def _log_mlflow(metrics, cv_results, model, model_path, run_id, model_type): |
| """Registra el experimento en MLflow.""" |
| try: |
| mlflow_dir = PROJECT_ROOT / "mlruns" |
| mlflow.set_tracking_uri(f"file://{mlflow_dir}") |
| mlflow.set_experiment("Youtube_project_experiment_pipeline") |
|
|
| with mlflow.start_run(run_name=f"{model_type}_{run_id}"): |
| |
| mlflow.log_param("model_type", model_type) |
| mlflow.log_param("run_id", run_id) |
|
|
| |
| mlflow.log_metric("test_f1", metrics["f1_weighted"]) |
| mlflow.log_metric("test_roc_auc", metrics["roc_auc"]) |
| mlflow.log_metric("test_fp", metrics["fp"]) |
| mlflow.log_metric("test_fn", metrics["fn"]) |
| mlflow.log_metric("cv_f1_mean", cv_results["cv_f1_mean"]) |
| mlflow.log_metric("cv_f1_std", cv_results["cv_f1_std"]) |
| mlflow.log_metric("train_test_gap_pp", metrics.get("train_test_gap_pp", 0)) |
|
|
| if "cv_test_gap_pp" in metrics: |
| mlflow.log_metric("cv_test_gap_pp", metrics["cv_test_gap_pp"]) |
|
|
| |
| mlflow.sklearn.log_model(model.pipeline, f"{model_type}_pipeline") |
|
|
| logger.info(f" MLflow run registrado: {model_type}_{run_id}") |
|
|
| except Exception as e: |
| logger.warning(f"MLflow no disponible: {e}") |
|
|
|
|
| |
| |
| |
| def _parse_args(): |
| parser = argparse.ArgumentParser( |
| description="Pipeline de entrenamiento β YouTube Hate Speech Detection" |
| ) |
| parser.add_argument( |
| "--model", |
| type=str, |
| default="lr", |
| choices=["lr", "rf", "xgboost"], |
| help="Tipo de modelo a entrenar (default: lr)", |
| ) |
| return parser.parse_args() |
|
|
|
|
| def main(): |
| args = _parse_args() |
| metrics = run_pipeline(model_type=args.model) |
| return metrics |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|