# src/detect_drift.py import json import os import sys from pathlib import Path import numpy as np import pandas as pd from sklearn.metrics import roc_auc_score, f1_score import joblib # Handle both module and direct execution if __name__ == "__main__": # Add parent directory to path for direct execution sys.path.insert(0, str(Path(__file__).parent.parent)) from src.config import ( PROCESSED_DATA_DIR, PRODUCTION_DATA_PATH, BASELINE_METRICS_PATH, TARGET_COLUMN, LATEST_MODEL_PATH, ) from src.model_utils import split_features_target else: # Relative imports for module execution from .config import ( PROCESSED_DATA_DIR, PRODUCTION_DATA_PATH, BASELINE_METRICS_PATH, TARGET_COLUMN, LATEST_MODEL_PATH, ) from .model_utils import split_features_target def load_train_reference() -> pd.DataFrame: train_path = PROCESSED_DATA_DIR / "train.csv" if not train_path.exists(): raise FileNotFoundError(f"No se encontró {train_path}. Ejecuta primero src.data_prep.") df_train = pd.read_csv(train_path, parse_dates=["application_date"]) return df_train def load_production_batch() -> pd.DataFrame: if not PRODUCTION_DATA_PATH.exists(): raise FileNotFoundError( f"No se encontró batch de producción en {PRODUCTION_DATA_PATH}. Ejecuta src.simulate_production_data primero." ) df_prod = pd.read_csv(PRODUCTION_DATA_PATH, parse_dates=["application_date"]) return df_prod def load_baseline_metrics() -> dict: """Carga métricas baseline. Retorna None si no existen.""" if not BASELINE_METRICS_PATH.exists(): print(f"ADVERTENCIA: No se encontró {BASELINE_METRICS_PATH}.") print("El modelo drift check se omitirá. Solo se verificará data drift.") return None with open(BASELINE_METRICS_PATH, "r", encoding="utf-8") as f: return json.load(f) def compute_data_drift(train_df: pd.DataFrame, prod_df: pd.DataFrame) -> bool: """Heurística simple de data drift: cambios en 'channel' y 'monthly_income'.""" # Distribución de channel def channel_dist(df): return df["channel"].value_counts(normalize=True).reindex( ["branch", "web", "partner", "call_center"], fill_value=0.0 ) ch_train = channel_dist(train_df) ch_prod = channel_dist(prod_df) tvd_channel = 0.5 * np.abs(ch_train - ch_prod).sum() # Total Variation Distance # Promedio de ingreso mean_income_train = train_df["monthly_income"].mean() mean_income_prod = prod_df["monthly_income"].mean() rel_change_income = abs(mean_income_prod - mean_income_train) / mean_income_train print("=== DATA DRIFT CHECK ===") print(f"Distribución channel (train):\n{ch_train}") print(f"Distribución channel (prod):\n{ch_prod}") print(f"TVD channel: {tvd_channel:.3f}") print(f"Mean income train: {mean_income_train:.2f}") print(f"Mean income prod : {mean_income_prod:.2f}") print(f"Relative change income: {rel_change_income:.3f}") # Umbrales simples tvd_threshold = 0.25 # si cambia más de 25% la distribución de canales income_threshold = 0.20 # si el promedio cambia más de 20% data_drift = (tvd_channel > tvd_threshold) or (rel_change_income > income_threshold) print(f"DATA_DRIFT = {data_drift} (tvd>{tvd_threshold} or rel_change>{income_threshold})") return data_drift def compute_model_drift(prod_df: pd.DataFrame, baseline_metrics: dict) -> bool: """Model drift: degradación de métricas vs baseline.""" if baseline_metrics is None: print("=== MODEL DRIFT CHECK ===") print("Omitido: No hay métricas baseline disponibles.") return False if not LATEST_MODEL_PATH.exists(): print("=== MODEL DRIFT CHECK ===") print(f"ADVERTENCIA: No se encontró modelo en {LATEST_MODEL_PATH}.") print("El model drift check se omitirá.") return False model = joblib.load(LATEST_MODEL_PATH) X_prod, y_prod = split_features_target(prod_df, TARGET_COLUMN) y_proba = model.predict_proba(X_prod)[:, 1] y_pred = (y_proba >= 0.5).astype(int) auc_prod = roc_auc_score(y_prod, y_proba) f1_prod = f1_score(y_prod, y_pred) print("=== MODEL DRIFT CHECK ===") print(f"Baseline AUC: {baseline_metrics['auc_valid']:.4f}") print(f"Prod AUC : {auc_prod:.4f}") print(f"Baseline F1 : {baseline_metrics['f1_valid']:.4f}") print(f"Prod F1 : {f1_prod:.4f}") # Umbral: caída de 0.05 en AUC o F1 auc_drop = baseline_metrics["auc_valid"] - auc_prod f1_drop = baseline_metrics["f1_valid"] - f1_prod print(f"AUC drop: {auc_drop:.4f}") print(f"F1 drop: {f1_drop:.4f}") drift_auc = auc_drop > 0.05 drift_f1 = f1_drop > 0.05 model_drift = drift_auc or drift_f1 print(f"MODEL_DRIFT = {model_drift} (auc_drop>0.05 or f1_drop>0.05)") return model_drift def main() -> None: print("=== DETECCIÓN DE DRIFT (DATA + MODEL) ===") train_df = load_train_reference() prod_df = load_production_batch() baseline_metrics = load_baseline_metrics() data_drift = compute_data_drift(train_df, prod_df) model_drift = compute_model_drift(prod_df, baseline_metrics) retrain = data_drift or model_drift print("=========================================") print(f"Resultado final → data_drift={data_drift}, model_drift={model_drift}, retrain={retrain}") # Integración con GitHub Actions: exportar salida para condicionarlo en el workflow github_output = os.environ.get("GITHUB_OUTPUT") if github_output: with open(github_output, "a", encoding="utf-8") as f: f.write(f"retrain={str(retrain).lower()}\n") f.write(f"data_drift={str(data_drift).lower()}\n") f.write(f"model_drift={str(model_drift).lower()}\n") if __name__ == "__main__": main()