""" Late Fusion: App序列向量 + 征信模型预测 → 最终风控决策 ========================================================= 两个模型各自独立建模后,在决策层融合 方法: Late Fusion (拼接各自输出 → 简单分类器) 原因: App 序列和征信数据本质不同,early fusion 会相互干扰 """ import numpy as np import pandas as pd from sklearn.linear_model import LogisticRegression from sklearn.metrics import roc_auc_score from scipy.stats import ks_2samp import logging logger = logging.getLogger(__name__) def late_fusion( app_embeddings: np.ndarray, # (n_users, 256) CoLES 用户向量 app_risk_prob: np.ndarray, # (n_users,) App 模型预测概率 credit_risk_prob: np.ndarray, # (n_users,) 征信模型预测概率 credit_features: np.ndarray, # (n_users, n_features) 征信原始特征 (可选) y_true: np.ndarray, # (n_users,) 真实标签 method: str = 'stacking' # 'simple_avg', 'weighted_avg', 'stacking', 'lgbm' ): """ 融合策略: 1. simple_avg: 简单平均两个模型的概率 2. weighted_avg: 加权平均 (权重由验证集确定) 3. stacking: 用两个模型的输出 + App embedding 作为特征,训练 LR 4. lgbm: 用 LightGBM 做 stacking (最强) """ if method == 'simple_avg': fusion_pred = 0.5 * app_risk_prob + 0.5 * credit_risk_prob elif method == 'weighted_avg': best_auc = 0 best_w = 0.5 for w in np.arange(0.1, 1.0, 0.05): pred = w * app_risk_prob + (1 - w) * credit_risk_prob auc = roc_auc_score(y_true, pred) if auc > best_auc: best_auc = auc best_w = w fusion_pred = best_w * app_risk_prob + (1 - best_w) * credit_risk_prob logger.info(f"Optimal weight: App={best_w:.2f}, Credit={1-best_w:.2f}") elif method == 'stacking': X_stack = np.column_stack([ app_risk_prob.reshape(-1, 1), credit_risk_prob.reshape(-1, 1), app_embeddings, ]) n = len(y_true) split = int(n * 0.8) lr = LogisticRegression(C=1.0, max_iter=1000, class_weight='balanced') lr.fit(X_stack[:split], y_true[:split]) fusion_pred = lr.predict_proba(X_stack[split:])[:, 1] y_eval = y_true[split:] auc = roc_auc_score(y_eval, fusion_pred) ks = ks_2samp(fusion_pred[y_eval==1], fusion_pred[y_eval==0]).statistic logger.info(f"Stacking (LR): AUC={auc:.4f}, KS={ks:.4f}") return fusion_pred, auc, ks elif method == 'lgbm': import lightgbm as lgb X_stack = np.column_stack([ app_risk_prob.reshape(-1, 1), credit_risk_prob.reshape(-1, 1), app_embeddings, ]) n = len(y_true) split = int(n * 0.8) train_data = lgb.Dataset(X_stack[:split], label=y_true[:split]) val_data = lgb.Dataset(X_stack[split:], label=y_true[split:]) params = { 'objective': 'binary', 'metric': 'auc', 'learning_rate': 0.05, 'num_leaves': 31, 'verbose': -1, 'n_jobs': -1, } model = lgb.train(params, train_data, num_boost_round=200, valid_sets=[val_data], callbacks=[lgb.early_stopping(30)]) fusion_pred = model.predict(X_stack[split:]) y_eval = y_true[split:] auc = roc_auc_score(y_eval, fusion_pred) ks = ks_2samp(fusion_pred[y_eval==1], fusion_pred[y_eval==0]).statistic logger.info(f"Stacking (LightGBM): AUC={auc:.4f}, KS={ks:.4f}") return fusion_pred, auc, ks # 评估 auc = roc_auc_score(y_true, fusion_pred) ks = ks_2samp(fusion_pred[y_true==1], fusion_pred[y_true==0]).statistic logger.info(f"Fusion ({method}): AUC={auc:.4f}, KS={ks:.4f}") return fusion_pred, auc, ks # ============================================================ # 使用示例 # ============================================================ """ 完整工作流: # 1. App 序列模型 from app_sequence_model import pretrain_coles, preprocess_app_sequence pretrained = pretrain_coles(user_sequences) app_embeddings = extract_user_embeddings(pretrained, user_sequences) # (N, 256) app_risk_prob = app_classifier.predict_proba(app_embeddings) # 2. 征信模型 from credit_bureau_model import train_tabm, train_lightgbm tabm_pred = tabm_model.predict(credit_features) lgb_pred = lgb_model.predict(credit_features) credit_risk_prob = 0.5 * tabm_pred + 0.5 * lgb_pred # 3. 融合 fusion_pred, auc, ks = late_fusion( app_embeddings=app_embeddings, app_risk_prob=app_risk_prob, credit_risk_prob=credit_risk_prob, credit_features=None, y_true=labels, method='lgbm' # 推荐 ) # 4. 阈值决策 threshold = 0.15 # 由 KS 校准确定 decision = (fusion_pred >= threshold).astype(int) # 0 = 通过, 1 = 拒绝 """