File size: 4,106 Bytes
114e6ac
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
from __future__ import annotations

from typing import Iterable

import numpy as np
import pandas as pd


def safe_divide(numer: pd.Series, denom: pd.Series) -> tuple[pd.Series, pd.Series]:
    numer = pd.to_numeric(numer, errors="coerce")
    denom = pd.to_numeric(denom, errors="coerce")
    denom_zero = denom.isna() | (denom == 0)
    return numer / denom.replace(0, np.nan), denom_zero


def new_features_creation(
    df: pd.DataFrame,
    *,
    days_employed_sentinel: int = 365243,
    engineered_sources: Iterable[str] | None = None,
) -> pd.DataFrame:
    df_features = df.copy()
    if engineered_sources is not None:
        for col in engineered_sources:
            if col not in df_features.columns:
                df_features[col] = np.nan

    if "DAYS_EMPLOYED" in df_features.columns:
        if "DAYS_EMPLOYED_ANOM" not in df_features.columns:
            sentinel_mask = df_features["DAYS_EMPLOYED"] == days_employed_sentinel
            df_features["DAYS_EMPLOYED_ANOM"] = sentinel_mask.astype(int)
            df_features.loc[sentinel_mask, "DAYS_EMPLOYED"] = np.nan

    def _add_ratio(numer_col: str, denom_col: str, ratio_name: str) -> None:
        if numer_col not in df_features.columns or denom_col not in df_features.columns:
            df_features[ratio_name] = np.nan
            df_features[f"DENOM_ZERO_{ratio_name}"] = 1
            return
        ratio, denom_zero = safe_divide(df_features[numer_col], df_features[denom_col])
        df_features[ratio_name] = ratio
        df_features[f"DENOM_ZERO_{ratio_name}"] = denom_zero.astype(int)

    _add_ratio("DAYS_EMPLOYED", "DAYS_BIRTH", "DAYS_EMPLOYED_PERC")
    _add_ratio("AMT_INCOME_TOTAL", "AMT_CREDIT", "INCOME_CREDIT_PERC")
    _add_ratio("AMT_INCOME_TOTAL", "CNT_FAM_MEMBERS", "INCOME_PER_PERSON")
    _add_ratio("AMT_ANNUITY", "AMT_INCOME_TOTAL", "ANNUITY_INCOME_PERC")
    _add_ratio("AMT_ANNUITY", "AMT_CREDIT", "PAYMENT_RATE")

    df_features.replace([np.inf, -np.inf], np.nan, inplace=True)
    return df_features


def select_missing_indicator_columns(
    df: pd.DataFrame,
    *,
    exclude_cols: set[str] | None = None,
    min_missing_rate: float = 0.0,
) -> list[str]:
    exclude = exclude_cols or set()
    numeric_cols = df.select_dtypes(include=["number"]).columns
    missing_rate = df[numeric_cols].isna().mean()
    cols: list[str] = []
    for col in numeric_cols:
        if col in exclude:
            continue
        if col.startswith(("DENOM_ZERO_", "is_missing_", "is_outlier_")):
            continue
        if col.endswith("_ANOM"):
            continue
        if missing_rate.get(col, 0.0) <= min_missing_rate:
            continue
        cols.append(col)
    return cols


def add_missingness_indicators(
    df: pd.DataFrame, indicator_cols: list[str]
) -> pd.DataFrame:
    for col in indicator_cols:
        if col not in df.columns:
            df[col] = np.nan
        df[f"is_missing_{col}"] = pd.to_numeric(df[col], errors="coerce").isna().astype(int)
    return df


def compute_outlier_bounds(
    df: pd.DataFrame,
    outlier_columns: list[str],
    *,
    lower_q: float = 0.01,
    upper_q: float = 0.99,
) -> dict[str, tuple[float, float]]:
    bounds: dict[str, tuple[float, float]] = {}
    for col in outlier_columns:
        if col not in df.columns:
            continue
        values = pd.to_numeric(df[col], errors="coerce")
        if values.dropna().empty:
            continue
        bounds[col] = (
            float(values.quantile(lower_q)),
            float(values.quantile(upper_q)),
        )
    return bounds


def apply_outlier_clipping(
    df: pd.DataFrame, outlier_bounds: dict[str, tuple[float, float]]
) -> pd.DataFrame:
    if not outlier_bounds:
        return df
    df = df.copy()
    for col, (low, high) in outlier_bounds.items():
        if col not in df.columns:
            continue
        values = pd.to_numeric(df[col], errors="coerce")
        mask = values.notna() & ((values < low) | (values > high))
        df[f"is_outlier_{col}"] = mask.astype(int)
        df[col] = values.clip(lower=low, upper=high)
    return df