MLOps-risk-model / src /detect_drift.py
github-actions[bot]
deploy: sync from GitHub main
811858e
# src/detect_drift.py
import json
import os
import sys
from pathlib import Path
import numpy as np
import pandas as pd
from sklearn.metrics import roc_auc_score, f1_score
import joblib
# Handle both module and direct execution
if __name__ == "__main__":
# Add parent directory to path for direct execution
sys.path.insert(0, str(Path(__file__).parent.parent))
from src.config import (
PROCESSED_DATA_DIR,
PRODUCTION_DATA_PATH,
BASELINE_METRICS_PATH,
TARGET_COLUMN,
LATEST_MODEL_PATH,
)
from src.model_utils import split_features_target
else:
# Relative imports for module execution
from .config import (
PROCESSED_DATA_DIR,
PRODUCTION_DATA_PATH,
BASELINE_METRICS_PATH,
TARGET_COLUMN,
LATEST_MODEL_PATH,
)
from .model_utils import split_features_target
def load_train_reference() -> pd.DataFrame:
train_path = PROCESSED_DATA_DIR / "train.csv"
if not train_path.exists():
raise FileNotFoundError(f"No se encontró {train_path}. Ejecuta primero src.data_prep.")
df_train = pd.read_csv(train_path, parse_dates=["application_date"])
return df_train
def load_production_batch() -> pd.DataFrame:
if not PRODUCTION_DATA_PATH.exists():
raise FileNotFoundError(
f"No se encontró batch de producción en {PRODUCTION_DATA_PATH}. Ejecuta src.simulate_production_data primero."
)
df_prod = pd.read_csv(PRODUCTION_DATA_PATH, parse_dates=["application_date"])
return df_prod
def load_baseline_metrics() -> dict:
"""Carga métricas baseline. Retorna None si no existen."""
if not BASELINE_METRICS_PATH.exists():
print(f"ADVERTENCIA: No se encontró {BASELINE_METRICS_PATH}.")
print("El modelo drift check se omitirá. Solo se verificará data drift.")
return None
with open(BASELINE_METRICS_PATH, "r", encoding="utf-8") as f:
return json.load(f)
def compute_data_drift(train_df: pd.DataFrame, prod_df: pd.DataFrame) -> bool:
"""Heurística simple de data drift: cambios en 'channel' y 'monthly_income'."""
# Distribución de channel
def channel_dist(df):
return df["channel"].value_counts(normalize=True).reindex(
["branch", "web", "partner", "call_center"], fill_value=0.0
)
ch_train = channel_dist(train_df)
ch_prod = channel_dist(prod_df)
tvd_channel = 0.5 * np.abs(ch_train - ch_prod).sum() # Total Variation Distance
# Promedio de ingreso
mean_income_train = train_df["monthly_income"].mean()
mean_income_prod = prod_df["monthly_income"].mean()
rel_change_income = abs(mean_income_prod - mean_income_train) / mean_income_train
print("=== DATA DRIFT CHECK ===")
print(f"Distribución channel (train):\n{ch_train}")
print(f"Distribución channel (prod):\n{ch_prod}")
print(f"TVD channel: {tvd_channel:.3f}")
print(f"Mean income train: {mean_income_train:.2f}")
print(f"Mean income prod : {mean_income_prod:.2f}")
print(f"Relative change income: {rel_change_income:.3f}")
# Umbrales simples
tvd_threshold = 0.25 # si cambia más de 25% la distribución de canales
income_threshold = 0.20 # si el promedio cambia más de 20%
data_drift = (tvd_channel > tvd_threshold) or (rel_change_income > income_threshold)
print(f"DATA_DRIFT = {data_drift} (tvd>{tvd_threshold} or rel_change>{income_threshold})")
return data_drift
def compute_model_drift(prod_df: pd.DataFrame, baseline_metrics: dict) -> bool:
"""Model drift: degradación de métricas vs baseline."""
if baseline_metrics is None:
print("=== MODEL DRIFT CHECK ===")
print("Omitido: No hay métricas baseline disponibles.")
return False
if not LATEST_MODEL_PATH.exists():
print("=== MODEL DRIFT CHECK ===")
print(f"ADVERTENCIA: No se encontró modelo en {LATEST_MODEL_PATH}.")
print("El model drift check se omitirá.")
return False
model = joblib.load(LATEST_MODEL_PATH)
X_prod, y_prod = split_features_target(prod_df, TARGET_COLUMN)
y_proba = model.predict_proba(X_prod)[:, 1]
y_pred = (y_proba >= 0.5).astype(int)
auc_prod = roc_auc_score(y_prod, y_proba)
f1_prod = f1_score(y_prod, y_pred)
print("=== MODEL DRIFT CHECK ===")
print(f"Baseline AUC: {baseline_metrics['auc_valid']:.4f}")
print(f"Prod AUC : {auc_prod:.4f}")
print(f"Baseline F1 : {baseline_metrics['f1_valid']:.4f}")
print(f"Prod F1 : {f1_prod:.4f}")
# Umbral: caída de 0.05 en AUC o F1
auc_drop = baseline_metrics["auc_valid"] - auc_prod
f1_drop = baseline_metrics["f1_valid"] - f1_prod
print(f"AUC drop: {auc_drop:.4f}")
print(f"F1 drop: {f1_drop:.4f}")
drift_auc = auc_drop > 0.05
drift_f1 = f1_drop > 0.05
model_drift = drift_auc or drift_f1
print(f"MODEL_DRIFT = {model_drift} (auc_drop>0.05 or f1_drop>0.05)")
return model_drift
def main() -> None:
print("=== DETECCIÓN DE DRIFT (DATA + MODEL) ===")
train_df = load_train_reference()
prod_df = load_production_batch()
baseline_metrics = load_baseline_metrics()
data_drift = compute_data_drift(train_df, prod_df)
model_drift = compute_model_drift(prod_df, baseline_metrics)
retrain = data_drift or model_drift
print("=========================================")
print(f"Resultado final → data_drift={data_drift}, model_drift={model_drift}, retrain={retrain}")
# Integración con GitHub Actions: exportar salida para condicionarlo en el workflow
github_output = os.environ.get("GITHUB_OUTPUT")
if github_output:
with open(github_output, "a", encoding="utf-8") as f:
f.write(f"retrain={str(retrain).lower()}\n")
f.write(f"data_drift={str(data_drift).lower()}\n")
f.write(f"model_drift={str(model_drift).lower()}\n")
if __name__ == "__main__":
main()