risk-control-sequence-models / fusion_model.py
yonghao's picture
Add fusion model and research report
b2806bd verified
"""
Late Fusion: App序列向量 + 征信模型预测 → 最终风控决策
=========================================================
两个模型各自独立建模后,在决策层融合
方法: Late Fusion (拼接各自输出 → 简单分类器)
原因: App 序列和征信数据本质不同,early fusion 会相互干扰
"""
import numpy as np
import pandas as pd
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import roc_auc_score
from scipy.stats import ks_2samp
import logging
logger = logging.getLogger(__name__)
def late_fusion(
app_embeddings: np.ndarray, # (n_users, 256) CoLES 用户向量
app_risk_prob: np.ndarray, # (n_users,) App 模型预测概率
credit_risk_prob: np.ndarray, # (n_users,) 征信模型预测概率
credit_features: np.ndarray, # (n_users, n_features) 征信原始特征 (可选)
y_true: np.ndarray, # (n_users,) 真实标签
method: str = 'stacking' # 'simple_avg', 'weighted_avg', 'stacking', 'lgbm'
):
"""
融合策略:
1. simple_avg: 简单平均两个模型的概率
2. weighted_avg: 加权平均 (权重由验证集确定)
3. stacking: 用两个模型的输出 + App embedding 作为特征,训练 LR
4. lgbm: 用 LightGBM 做 stacking (最强)
"""
if method == 'simple_avg':
fusion_pred = 0.5 * app_risk_prob + 0.5 * credit_risk_prob
elif method == 'weighted_avg':
best_auc = 0
best_w = 0.5
for w in np.arange(0.1, 1.0, 0.05):
pred = w * app_risk_prob + (1 - w) * credit_risk_prob
auc = roc_auc_score(y_true, pred)
if auc > best_auc:
best_auc = auc
best_w = w
fusion_pred = best_w * app_risk_prob + (1 - best_w) * credit_risk_prob
logger.info(f"Optimal weight: App={best_w:.2f}, Credit={1-best_w:.2f}")
elif method == 'stacking':
X_stack = np.column_stack([
app_risk_prob.reshape(-1, 1),
credit_risk_prob.reshape(-1, 1),
app_embeddings,
])
n = len(y_true)
split = int(n * 0.8)
lr = LogisticRegression(C=1.0, max_iter=1000, class_weight='balanced')
lr.fit(X_stack[:split], y_true[:split])
fusion_pred = lr.predict_proba(X_stack[split:])[:, 1]
y_eval = y_true[split:]
auc = roc_auc_score(y_eval, fusion_pred)
ks = ks_2samp(fusion_pred[y_eval==1], fusion_pred[y_eval==0]).statistic
logger.info(f"Stacking (LR): AUC={auc:.4f}, KS={ks:.4f}")
return fusion_pred, auc, ks
elif method == 'lgbm':
import lightgbm as lgb
X_stack = np.column_stack([
app_risk_prob.reshape(-1, 1),
credit_risk_prob.reshape(-1, 1),
app_embeddings,
])
n = len(y_true)
split = int(n * 0.8)
train_data = lgb.Dataset(X_stack[:split], label=y_true[:split])
val_data = lgb.Dataset(X_stack[split:], label=y_true[split:])
params = {
'objective': 'binary', 'metric': 'auc',
'learning_rate': 0.05, 'num_leaves': 31,
'verbose': -1, 'n_jobs': -1,
}
model = lgb.train(params, train_data, num_boost_round=200,
valid_sets=[val_data],
callbacks=[lgb.early_stopping(30)])
fusion_pred = model.predict(X_stack[split:])
y_eval = y_true[split:]
auc = roc_auc_score(y_eval, fusion_pred)
ks = ks_2samp(fusion_pred[y_eval==1], fusion_pred[y_eval==0]).statistic
logger.info(f"Stacking (LightGBM): AUC={auc:.4f}, KS={ks:.4f}")
return fusion_pred, auc, ks
# 评估
auc = roc_auc_score(y_true, fusion_pred)
ks = ks_2samp(fusion_pred[y_true==1], fusion_pred[y_true==0]).statistic
logger.info(f"Fusion ({method}): AUC={auc:.4f}, KS={ks:.4f}")
return fusion_pred, auc, ks
# ============================================================
# 使用示例
# ============================================================
"""
完整工作流:
# 1. App 序列模型
from app_sequence_model import pretrain_coles, preprocess_app_sequence
pretrained = pretrain_coles(user_sequences)
app_embeddings = extract_user_embeddings(pretrained, user_sequences) # (N, 256)
app_risk_prob = app_classifier.predict_proba(app_embeddings)
# 2. 征信模型
from credit_bureau_model import train_tabm, train_lightgbm
tabm_pred = tabm_model.predict(credit_features)
lgb_pred = lgb_model.predict(credit_features)
credit_risk_prob = 0.5 * tabm_pred + 0.5 * lgb_pred
# 3. 融合
fusion_pred, auc, ks = late_fusion(
app_embeddings=app_embeddings,
app_risk_prob=app_risk_prob,
credit_risk_prob=credit_risk_prob,
credit_features=None,
y_true=labels,
method='lgbm' # 推荐
)
# 4. 阈值决策
threshold = 0.15 # 由 KS 校准确定
decision = (fusion_pred >= threshold).astype(int)
# 0 = 通过, 1 = 拒绝
"""