Spaces:
Sleeping
Sleeping
File size: 5,964 Bytes
1e5b98a 2127e22 1e5b98a 2127e22 1e5b98a 811858e 1e5b98a 811858e 1e5b98a 811858e 1e5b98a 811858e 1e5b98a | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 | # src/detect_drift.py
import json
import os
import sys
from pathlib import Path
import numpy as np
import pandas as pd
from sklearn.metrics import roc_auc_score, f1_score
import joblib
# Handle both module and direct execution
if __name__ == "__main__":
# Add parent directory to path for direct execution
sys.path.insert(0, str(Path(__file__).parent.parent))
from src.config import (
PROCESSED_DATA_DIR,
PRODUCTION_DATA_PATH,
BASELINE_METRICS_PATH,
TARGET_COLUMN,
LATEST_MODEL_PATH,
)
from src.model_utils import split_features_target
else:
# Relative imports for module execution
from .config import (
PROCESSED_DATA_DIR,
PRODUCTION_DATA_PATH,
BASELINE_METRICS_PATH,
TARGET_COLUMN,
LATEST_MODEL_PATH,
)
from .model_utils import split_features_target
def load_train_reference() -> pd.DataFrame:
train_path = PROCESSED_DATA_DIR / "train.csv"
if not train_path.exists():
raise FileNotFoundError(f"No se encontró {train_path}. Ejecuta primero src.data_prep.")
df_train = pd.read_csv(train_path, parse_dates=["application_date"])
return df_train
def load_production_batch() -> pd.DataFrame:
if not PRODUCTION_DATA_PATH.exists():
raise FileNotFoundError(
f"No se encontró batch de producción en {PRODUCTION_DATA_PATH}. Ejecuta src.simulate_production_data primero."
)
df_prod = pd.read_csv(PRODUCTION_DATA_PATH, parse_dates=["application_date"])
return df_prod
def load_baseline_metrics() -> dict:
"""Carga métricas baseline. Retorna None si no existen."""
if not BASELINE_METRICS_PATH.exists():
print(f"ADVERTENCIA: No se encontró {BASELINE_METRICS_PATH}.")
print("El modelo drift check se omitirá. Solo se verificará data drift.")
return None
with open(BASELINE_METRICS_PATH, "r", encoding="utf-8") as f:
return json.load(f)
def compute_data_drift(train_df: pd.DataFrame, prod_df: pd.DataFrame) -> bool:
"""Heurística simple de data drift: cambios en 'channel' y 'monthly_income'."""
# Distribución de channel
def channel_dist(df):
return df["channel"].value_counts(normalize=True).reindex(
["branch", "web", "partner", "call_center"], fill_value=0.0
)
ch_train = channel_dist(train_df)
ch_prod = channel_dist(prod_df)
tvd_channel = 0.5 * np.abs(ch_train - ch_prod).sum() # Total Variation Distance
# Promedio de ingreso
mean_income_train = train_df["monthly_income"].mean()
mean_income_prod = prod_df["monthly_income"].mean()
rel_change_income = abs(mean_income_prod - mean_income_train) / mean_income_train
print("=== DATA DRIFT CHECK ===")
print(f"Distribución channel (train):\n{ch_train}")
print(f"Distribución channel (prod):\n{ch_prod}")
print(f"TVD channel: {tvd_channel:.3f}")
print(f"Mean income train: {mean_income_train:.2f}")
print(f"Mean income prod : {mean_income_prod:.2f}")
print(f"Relative change income: {rel_change_income:.3f}")
# Umbrales simples
tvd_threshold = 0.25 # si cambia más de 25% la distribución de canales
income_threshold = 0.20 # si el promedio cambia más de 20%
data_drift = (tvd_channel > tvd_threshold) or (rel_change_income > income_threshold)
print(f"DATA_DRIFT = {data_drift} (tvd>{tvd_threshold} or rel_change>{income_threshold})")
return data_drift
def compute_model_drift(prod_df: pd.DataFrame, baseline_metrics: dict) -> bool:
"""Model drift: degradación de métricas vs baseline."""
if baseline_metrics is None:
print("=== MODEL DRIFT CHECK ===")
print("Omitido: No hay métricas baseline disponibles.")
return False
if not LATEST_MODEL_PATH.exists():
print("=== MODEL DRIFT CHECK ===")
print(f"ADVERTENCIA: No se encontró modelo en {LATEST_MODEL_PATH}.")
print("El model drift check se omitirá.")
return False
model = joblib.load(LATEST_MODEL_PATH)
X_prod, y_prod = split_features_target(prod_df, TARGET_COLUMN)
y_proba = model.predict_proba(X_prod)[:, 1]
y_pred = (y_proba >= 0.5).astype(int)
auc_prod = roc_auc_score(y_prod, y_proba)
f1_prod = f1_score(y_prod, y_pred)
print("=== MODEL DRIFT CHECK ===")
print(f"Baseline AUC: {baseline_metrics['auc_valid']:.4f}")
print(f"Prod AUC : {auc_prod:.4f}")
print(f"Baseline F1 : {baseline_metrics['f1_valid']:.4f}")
print(f"Prod F1 : {f1_prod:.4f}")
# Umbral: caída de 0.05 en AUC o F1
auc_drop = baseline_metrics["auc_valid"] - auc_prod
f1_drop = baseline_metrics["f1_valid"] - f1_prod
print(f"AUC drop: {auc_drop:.4f}")
print(f"F1 drop: {f1_drop:.4f}")
drift_auc = auc_drop > 0.05
drift_f1 = f1_drop > 0.05
model_drift = drift_auc or drift_f1
print(f"MODEL_DRIFT = {model_drift} (auc_drop>0.05 or f1_drop>0.05)")
return model_drift
def main() -> None:
print("=== DETECCIÓN DE DRIFT (DATA + MODEL) ===")
train_df = load_train_reference()
prod_df = load_production_batch()
baseline_metrics = load_baseline_metrics()
data_drift = compute_data_drift(train_df, prod_df)
model_drift = compute_model_drift(prod_df, baseline_metrics)
retrain = data_drift or model_drift
print("=========================================")
print(f"Resultado final → data_drift={data_drift}, model_drift={model_drift}, retrain={retrain}")
# Integración con GitHub Actions: exportar salida para condicionarlo en el workflow
github_output = os.environ.get("GITHUB_OUTPUT")
if github_output:
with open(github_output, "a", encoding="utf-8") as f:
f.write(f"retrain={str(retrain).lower()}\n")
f.write(f"data_drift={str(data_drift).lower()}\n")
f.write(f"model_drift={str(model_drift).lower()}\n")
if __name__ == "__main__":
main()
|