File size: 5,964 Bytes
1e5b98a
 
 
2127e22
1e5b98a
 
 
 
 
 
 
2127e22
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1e5b98a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
811858e
1e5b98a
811858e
 
 
1e5b98a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
811858e
 
 
 
 
1e5b98a
 
811858e
 
 
 
1e5b98a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
# src/detect_drift.py
import json
import os
import sys
from pathlib import Path

import numpy as np
import pandas as pd
from sklearn.metrics import roc_auc_score, f1_score
import joblib

# Handle both module and direct execution
if __name__ == "__main__":
    # Add parent directory to path for direct execution
    sys.path.insert(0, str(Path(__file__).parent.parent))
    from src.config import (
        PROCESSED_DATA_DIR,
        PRODUCTION_DATA_PATH,
        BASELINE_METRICS_PATH,
        TARGET_COLUMN,
        LATEST_MODEL_PATH,
    )
    from src.model_utils import split_features_target
else:
    # Relative imports for module execution
    from .config import (
        PROCESSED_DATA_DIR,
        PRODUCTION_DATA_PATH,
        BASELINE_METRICS_PATH,
        TARGET_COLUMN,
        LATEST_MODEL_PATH,
    )
    from .model_utils import split_features_target


def load_train_reference() -> pd.DataFrame:
    train_path = PROCESSED_DATA_DIR / "train.csv"
    if not train_path.exists():
        raise FileNotFoundError(f"No se encontró {train_path}. Ejecuta primero src.data_prep.")
    df_train = pd.read_csv(train_path, parse_dates=["application_date"])
    return df_train


def load_production_batch() -> pd.DataFrame:
    if not PRODUCTION_DATA_PATH.exists():
        raise FileNotFoundError(
            f"No se encontró batch de producción en {PRODUCTION_DATA_PATH}. Ejecuta src.simulate_production_data primero."
        )
    df_prod = pd.read_csv(PRODUCTION_DATA_PATH, parse_dates=["application_date"])
    return df_prod


def load_baseline_metrics() -> dict:
    """Carga métricas baseline. Retorna None si no existen."""
    if not BASELINE_METRICS_PATH.exists():
        print(f"ADVERTENCIA: No se encontró {BASELINE_METRICS_PATH}.")
        print("El modelo drift check se omitirá. Solo se verificará data drift.")
        return None
    with open(BASELINE_METRICS_PATH, "r", encoding="utf-8") as f:
        return json.load(f)


def compute_data_drift(train_df: pd.DataFrame, prod_df: pd.DataFrame) -> bool:
    """Heurística simple de data drift: cambios en 'channel' y 'monthly_income'."""

    # Distribución de channel
    def channel_dist(df):
        return df["channel"].value_counts(normalize=True).reindex(
            ["branch", "web", "partner", "call_center"], fill_value=0.0
        )

    ch_train = channel_dist(train_df)
    ch_prod = channel_dist(prod_df)

    tvd_channel = 0.5 * np.abs(ch_train - ch_prod).sum()  # Total Variation Distance

    # Promedio de ingreso
    mean_income_train = train_df["monthly_income"].mean()
    mean_income_prod = prod_df["monthly_income"].mean()
    rel_change_income = abs(mean_income_prod - mean_income_train) / mean_income_train

    print("=== DATA DRIFT CHECK ===")
    print(f"Distribución channel (train):\n{ch_train}")
    print(f"Distribución channel (prod):\n{ch_prod}")
    print(f"TVD channel: {tvd_channel:.3f}")
    print(f"Mean income train: {mean_income_train:.2f}")
    print(f"Mean income prod : {mean_income_prod:.2f}")
    print(f"Relative change income: {rel_change_income:.3f}")

    # Umbrales simples
    tvd_threshold = 0.25       # si cambia más de 25% la distribución de canales
    income_threshold = 0.20    # si el promedio cambia más de 20%

    data_drift = (tvd_channel > tvd_threshold) or (rel_change_income > income_threshold)
    print(f"DATA_DRIFT = {data_drift} (tvd>{tvd_threshold} or rel_change>{income_threshold})")

    return data_drift


def compute_model_drift(prod_df: pd.DataFrame, baseline_metrics: dict) -> bool:
    """Model drift: degradación de métricas vs baseline."""
    
    if baseline_metrics is None:
        print("=== MODEL DRIFT CHECK ===")
        print("Omitido: No hay métricas baseline disponibles.")
        return False

    if not LATEST_MODEL_PATH.exists():
        print("=== MODEL DRIFT CHECK ===")
        print(f"ADVERTENCIA: No se encontró modelo en {LATEST_MODEL_PATH}.")
        print("El model drift check se omitirá.")
        return False

    model = joblib.load(LATEST_MODEL_PATH)
    X_prod, y_prod = split_features_target(prod_df, TARGET_COLUMN)

    y_proba = model.predict_proba(X_prod)[:, 1]
    y_pred = (y_proba >= 0.5).astype(int)

    auc_prod = roc_auc_score(y_prod, y_proba)
    f1_prod = f1_score(y_prod, y_pred)

    print("=== MODEL DRIFT CHECK ===")
    print(f"Baseline AUC: {baseline_metrics['auc_valid']:.4f}")
    print(f"Prod AUC    : {auc_prod:.4f}")
    print(f"Baseline F1 : {baseline_metrics['f1_valid']:.4f}")
    print(f"Prod F1     : {f1_prod:.4f}")

    # Umbral: caída de 0.05 en AUC o F1
    auc_drop = baseline_metrics["auc_valid"] - auc_prod
    f1_drop = baseline_metrics["f1_valid"] - f1_prod

    print(f"AUC drop: {auc_drop:.4f}")
    print(f"F1  drop: {f1_drop:.4f}")

    drift_auc = auc_drop > 0.05
    drift_f1 = f1_drop > 0.05

    model_drift = drift_auc or drift_f1
    print(f"MODEL_DRIFT = {model_drift} (auc_drop>0.05 or f1_drop>0.05)")

    return model_drift


def main() -> None:
    print("=== DETECCIÓN DE DRIFT (DATA + MODEL) ===")
    train_df = load_train_reference()
    prod_df = load_production_batch()
    baseline_metrics = load_baseline_metrics()

    data_drift = compute_data_drift(train_df, prod_df)
    model_drift = compute_model_drift(prod_df, baseline_metrics)

    retrain = data_drift or model_drift
    print("=========================================")
    print(f"Resultado final → data_drift={data_drift}, model_drift={model_drift}, retrain={retrain}")

    # Integración con GitHub Actions: exportar salida para condicionarlo en el workflow
    github_output = os.environ.get("GITHUB_OUTPUT")
    if github_output:
        with open(github_output, "a", encoding="utf-8") as f:
            f.write(f"retrain={str(retrain).lower()}\n")
            f.write(f"data_drift={str(data_drift).lower()}\n")
            f.write(f"model_drift={str(model_drift).lower()}\n")


if __name__ == "__main__":
    main()