corner-forecast / src /models /train_model.py
daniel-saed's picture
Upload 21 files
c2aaace verified
import numpy as np
import pandas as pd
import json
import os
from datetime import datetime
# MLflow
import mlflow
import mlflow.sklearn
import mlflow.xgboost
from sklearn.model_selection import train_test_split, GridSearchCV, cross_val_score, KFold
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score, make_scorer
from xgboost import XGBRegressor
import joblib
class TRAIN_MODEL():
def __init__(self, nombre, use_grid_search=False, config_path="config/model_config.json"):
"""
Entrenar modelo con tracking MLflow
Args:
nombre: Identificador del modelo (ej: "v3_production")
use_grid_search: True = buscar hiperparámetros, False = usar config guardado
config_path: Ruta al archivo de configuración con hiperparámetros
"""
# ===========================
# CONFIGURACIÓN MLFLOW
# ===========================
mlflow.set_tracking_uri("file:./mlruns")
mlflow.set_experiment("corners_prediction")
self.nombre = nombre
self.use_grid_search = use_grid_search
self.config_path = config_path
self.timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
# Iniciar run de MLflow
with mlflow.start_run(run_name=f"{nombre}_{self.timestamp}") as run:
self.run_id = run.info.run_id
print(f"\n{'='*80}")
print(f"🚀 Entrenamiento iniciado con MLflow")
print(f" Run ID: {self.run_id}")
print(f" Nombre: {nombre}")
print(f" GridSearch: {'SÍ' if use_grid_search else 'NO (usando config)'}")
print(f"{'='*80}\n")
# Tags básicos
mlflow.set_tags({
"model_name": nombre,
"timestamp": self.timestamp,
"grid_search_used": str(use_grid_search),
"framework": "XGBoost",
"task": "regression"
})
# Pipeline de entrenamiento
try:
self.init_variables()
self.load_dataset()
self.split_train_test(0.15)
self.define_model()
if use_grid_search:
print("🔍 Ejecutando GridSearch (puede tardar)...")
self.train_grid_search()
self.save_best_params() # Guardar para futuros entrenamientos
else:
print("⚡ Usando hiperparámetros guardados (rápido)")
self.load_best_params()
self.train_model()
self.test_and_eval()
self.top_features()
self.save_models(nombre)
mlflow.set_tag("status", "SUCCESS")
print(f"\n✅ Entrenamiento completado")
print(f"📊 Ver en MLflow UI: mlflow ui")
except Exception as e:
mlflow.set_tag("status", "FAILED")
print(f"\n❌ Error: {e}")
raise
def init_variables(self):
"""Definir espacio de búsqueda para GridSearch"""
# ✅ GRID INTELIGENTE (~243 combinaciones = 1-3 horas)
self.param_grid = {
'n_estimators': [200], # 1 valor (200 suele ser óptimo)
'max_depth': [3, 4, 5], # 3 valores (clave)
'learning_rate': [0.02, 0.03], # 2 valores (0.01 es muy lento)
'reg_alpha': [3.0, 5.0], # 2 valores
'reg_lambda': [5.0, 8.0], # 2 valores
'gamma': [0.5, 1.0], # 2 valores
'subsample': [0.7], # 1 valor (0.7 suele funcionar)
'colsample_bytree': [0.7], # 1 valor
'colsample_bylevel': [0.6], # 1 valor
'min_child_weight': [5, 7] # 2 valores
}
# Combinaciones: 1 × 3 × 2 × 2 × 2 × 2 × 1 × 1 × 1 × 2 = 192
# Tiempo: ~1.5-3 horas ⏱️
# Loggear configuración del grid
if self.use_grid_search:
for param, values in self.param_grid.items():
mlflow.log_param(f"grid_{param}", str(values))
print("✅ Variables inicializadas")
def load_dataset(self):
"""Cargar y preparar dataset"""
self.df_data = pd.read_csv("dataset/processed/dataset_processed.csv")
self.y = self.df_data["y"]
self.df_data = self.df_data.drop(["y"], axis=1)
self.y_array = np.array(self.y).flatten()
# Filtrar outliers (3-17 corners)
mask = (self.y_array >= 3) & (self.y_array <= 17)
self.df_data = self.df_data[mask].copy()
self.y_array = self.y_array[mask]
# Limpiar nulos
if self.df_data.isnull().any().any():
self.df_data = self.df_data.fillna(0)
# Loggear info del dataset
mlflow.log_params({
"dataset_samples": len(self.df_data),
"dataset_features": self.df_data.shape[1],
"target_min": float(self.y_array.min()),
"target_max": float(self.y_array.max()),
"target_mean": float(self.y_array.mean()),
"target_std": float(self.y_array.std())
})
print(f"✅ Dataset cargado: {self.df_data.shape}")
def split_train_test(self, test_size_):
"""Dividir datos en train/val/test"""
self.X_train, self.X_test, self.y_train, self.y_test = train_test_split(
self.df_data, self.y_array,
test_size=test_size_,
random_state=42,
shuffle=True
)
# Escalar
self.scaler = StandardScaler()
self.X_train = pd.DataFrame(
self.scaler.fit_transform(self.X_train),
columns=self.X_train.columns
)
self.X_test = pd.DataFrame(
self.scaler.transform(self.X_test),
columns=self.X_test.columns
)
# Split validación
self.X_train_fit, self.X_val, self.y_train_fit, self.y_val = train_test_split(
self.X_train, self.y_train,
test_size=0.15,
random_state=43
)
# Loggear splits
mlflow.log_params({
"train_samples": len(self.X_train_fit),
"val_samples": len(self.X_val),
"test_samples": len(self.X_test),
"test_size": test_size_
})
print(f"✅ Train: {len(self.X_train_fit)} | Val: {len(self.X_val)} | Test: {len(self.X_test)}")
def define_model(self):
"""Definir modelo base y GridSearch"""
self.xgb_base = XGBRegressor(
objective="reg:squarederror",
tree_method="hist",
random_state=42,
n_jobs=-1,
verbosity=0
)
if self.use_grid_search:
self.kfold = KFold(n_splits=5, shuffle=True, random_state=42)
self.mae_scorer = make_scorer(mean_absolute_error, greater_is_better=False)
self.grid_search = GridSearchCV(
estimator=self.xgb_base,
param_grid=self.param_grid,
cv=self.kfold,
scoring=self.mae_scorer,
n_jobs=-1,
verbose=2,
return_train_score=True
)
def train_grid_search(self):
"""Ejecutar GridSearch y guardar mejores params"""
print("\n🔍 Buscando mejores hiperparámetros...")
self.grid_search.fit(self.X_train_fit, self.y_train_fit)
# Mejores parámetros
self.best_params = self.grid_search.best_params_
# Loggear en MLflow
for param, value in self.best_params.items():
mlflow.log_param(f"best_{param}", value)
mlflow.log_metric("cv_best_mae", -self.grid_search.best_score_)
print(f"\n✅ Mejores hiperparámetros encontrados:")
for param, value in self.best_params.items():
print(f" {param}: {value}")
print(f" CV MAE: {-self.grid_search.best_score_:.4f}")
def save_best_params(self):
"""Guardar mejores hiperparámetros en archivo JSON"""
os.makedirs("config", exist_ok=True)
config = {
"model_name": self.nombre,
"timestamp": self.timestamp,
"best_params": self.best_params,
"cv_mae": float(-self.grid_search.best_score_),
"run_id": self.run_id
}
with open(self.config_path, 'w') as f:
json.dump(config, f, indent=4)
# Loggear archivo en MLflow
mlflow.log_artifact(self.config_path)
print(f"💾 Hiperparámetros guardados en: {self.config_path}")
def load_best_params(self):
"""Cargar hiperparámetros desde archivo JSON"""
if not os.path.exists(self.config_path):
raise FileNotFoundError(
f"No se encontró {self.config_path}. "
"Ejecuta primero con use_grid_search=True"
)
with open(self.config_path, 'r') as f:
config = json.load(f)
self.best_params = config["best_params"]
# Loggear params en MLflow
for param, value in self.best_params.items():
mlflow.log_param(f"loaded_{param}", value)
mlflow.log_param("config_source", self.config_path)
mlflow.log_param("previous_cv_mae", config.get("cv_mae", "N/A"))
print(f"✅ Hiperparámetros cargados desde: {self.config_path}")
print(f" Origen: {config.get('model_name', 'unknown')} ({config.get('timestamp', 'unknown')})")
def train_model(self):
"""Entrenar modelo final con mejores params"""
self.xgb_model = XGBRegressor(
**self.best_params,
objective="reg:squarederror",
tree_method="hist",
random_state=42,
n_jobs=-1,
verbosity=0
)
self.xgb_model.fit(
self.X_train_fit,
self.y_train_fit,
eval_set=[(self.X_val, self.y_val)],
verbose=False
)
print("✅ Modelo entrenado")
def test_and_eval(self):
"""Evaluar y loggear métricas"""
# Predicciones
y_train_pred = self.xgb_model.predict(self.X_train_fit)
y_val_pred = self.xgb_model.predict(self.X_val)
y_test_pred = self.xgb_model.predict(self.X_test)
# Calcular métricas
metrics = {
'train': {
'mae': mean_absolute_error(self.y_train_fit, y_train_pred),
'rmse': np.sqrt(mean_squared_error(self.y_train_fit, y_train_pred)),
'r2': r2_score(self.y_train_fit, y_train_pred)
},
'val': {
'mae': mean_absolute_error(self.y_val, y_val_pred),
'rmse': np.sqrt(mean_squared_error(self.y_val, y_val_pred)),
'r2': r2_score(self.y_val, y_val_pred)
},
'test': {
'mae': mean_absolute_error(self.y_test, y_test_pred),
'rmse': np.sqrt(mean_squared_error(self.y_test, y_test_pred)),
'r2': r2_score(self.y_test, y_test_pred)
}
}
# Loggear TODAS las métricas en MLflow
for set_name, set_metrics in metrics.items():
for metric_name, value in set_metrics.items():
mlflow.log_metric(f"{set_name}_{metric_name}", value)
# Cross-validation
cv_mae = cross_val_score(
self.xgb_model, self.X_train, self.y_train,
cv=5, scoring='neg_mean_absolute_error'
)
cv_r2 = cross_val_score(
self.xgb_model, self.X_train, self.y_train,
cv=5, scoring='r2'
)
mlflow.log_metric("cv_mae_mean", -cv_mae.mean())
mlflow.log_metric("cv_mae_std", cv_mae.std())
mlflow.log_metric("cv_r2_mean", cv_r2.mean())
mlflow.log_metric("cv_r2_std", cv_r2.std())
# Análisis de errores
test_errors = np.abs(self.y_test - y_test_pred)
mlflow.log_metric("test_error_median", float(np.median(test_errors)))
mlflow.log_metric("test_error_p90", float(np.percentile(test_errors, 90)))
mlflow.log_metric("test_pct_error_lt_2", float((test_errors < 2.0).sum() / len(test_errors) * 100))
# Gap de overfitting
gap = metrics['train']['r2'] - metrics['test']['r2']
mlflow.log_metric("overfitting_gap", gap)
print(f"\n📊 MÉTRICAS:")
print(f" Train MAE: {metrics['train']['mae']:.4f} | R²: {metrics['train']['r2']:.4f}")
print(f" Val MAE: {metrics['val']['mae']:.4f} | R²: {metrics['val']['r2']:.4f}")
print(f" Test MAE: {metrics['test']['mae']:.4f} | R²: {metrics['test']['r2']:.4f}")
print(f" CV MAE: {-cv_mae.mean():.4f} ± {cv_mae.std():.4f}")
print(f" Overfitting Gap: {gap:.4f}")
def top_features(self):
"""Guardar importancia de features"""
feature_importance = pd.DataFrame({
'feature': self.df_data.columns,
'importance': self.xgb_model.feature_importances_
}).sort_values('importance', ascending=False)
# Guardar CSV
feature_importance.to_csv(f"models/feature_importance_{self.nombre}.csv", index=False)
mlflow.log_artifact(f"models/feature_importance_{self.nombre}.csv")
# Loggear top 10
for idx, row in feature_importance.head(10).iterrows():
mlflow.log_metric(f"feat_imp_{row['feature']}", row['importance'])
print(f"\n🔍 Top 5 features:")
for idx, row in feature_importance.head(5).iterrows():
print(f" {row['feature']}: {row['importance']:.4f}")
def save_models(self, nombre):
"""Guardar modelos localmente y en MLflow"""
os.makedirs("models", exist_ok=True)
# Paths
model_path = f'models/xgboost_corners_{nombre}.pkl'
scaler_path = f'models/scaler_corners_{nombre}.pkl'
# Guardar archivos
joblib.dump(self.xgb_model, model_path)
joblib.dump(self.scaler, scaler_path)
# Loggear en MLflow
mlflow.xgboost.log_model(
self.xgb_model,
artifact_path="model",
registered_model_name=f"corners_predictor"
)
mlflow.log_artifact(scaler_path, artifact_path="preprocessing")
print(f"\n💾 Modelos guardados:")
print(f" {model_path}")
print(f" {scaler_path}")
print(f" MLflow Model Registry ✓")
# ===========================
# USO
# ===========================
if __name__ == "__main__":
# ========================================
# OPCIÓN 1: Primera vez o cada 3-6 meses
# Ejecutar GridSearch (LENTO, 30-60 min)
# ========================================
# model = TRAIN_MODEL(
# nombre="v4_grid_search",
# use_grid_search=True # Busca mejores hiperparámetros
# )
# ========================================
# OPCIÓN 2: Reentrenamiento regular
# Usar hiperparámetros guardados (RÁPIDO, 2-5 min)
# ========================================
model = TRAIN_MODEL(
nombre="v4_retrain",
use_grid_search=True # Usa config/model_config.json
)