Spaces:
Sleeping
Sleeping
| import pandas as pd | |
| import numpy as np | |
| from pgmpy.models import BayesianNetwork | |
| from pgmpy.estimators import ( | |
| TreeSearch, HillClimbSearch, PC, | |
| MaximumLikelihoodEstimator, BayesianEstimator, | |
| BicScore, AICScore, K2Score, BDeuScore, BDsScore | |
| ) | |
| from pgmpy.inference import VariableElimination | |
| from sklearn.model_selection import train_test_split | |
| from sklearn.metrics import ( | |
| confusion_matrix, accuracy_score, precision_score, | |
| recall_score, f1_score, roc_curve, roc_auc_score | |
| ) | |
| from pgmpy.metrics import log_likelihood_score, structure_score | |
| import threading | |
| from datetime import datetime | |
| from networkx import is_directed_acyclic_graph, DiGraph | |
| class BayesianNetworkAnalyzer: | |
| """ | |
| 貝葉斯網路分析器 | |
| 支持多用戶同時使用,每個 session 獨立處理 | |
| """ | |
| # 類別級的鎖,用於線程安全 | |
| _lock = threading.Lock() | |
| # 儲存各 session 的分析結果 | |
| _session_results = {} | |
| def __init__(self, session_id): | |
| """ | |
| 初始化分析器 | |
| Args: | |
| session_id: 唯一的 session 識別碼 | |
| """ | |
| self.session_id = session_id | |
| self.model = None | |
| self.inference = None | |
| self.train_data = None | |
| self.test_data = None | |
| self.bins_dict = {} | |
| def run_analysis(self, df, cat_features, con_features, target_variable, | |
| test_fraction=0.25, algorithm='NB', estimator='ml', | |
| equivalent_sample_size=3, score_method='BIC', | |
| sig_level=0.05, n_bins=10): | |
| """ | |
| 執行完整的貝葉斯網路分析 - 完全對齊 Django 版本的順序 | |
| Args: | |
| df: 原始資料框 | |
| cat_features: 分類特徵列表 | |
| con_features: 連續特徵列表 | |
| target_variable: 目標變數名稱 | |
| test_fraction: 測試集比例 | |
| algorithm: 結構學習演算法 | |
| estimator: 參數估計方法 | |
| equivalent_sample_size: 等效樣本大小(用於貝葉斯估計) | |
| score_method: 評分方法(用於 Hill Climbing) | |
| sig_level: 顯著性水準(用於 PC 演算法) | |
| n_bins: 連續變數分箱數量 | |
| Returns: | |
| dict: 包含所有分析結果的字典 | |
| """ | |
| with self._lock: | |
| try: | |
| # 1. 資料預處理 (只選擇欄位和處理缺失值) | |
| processed_df = self._preprocess_data( | |
| df, cat_features, con_features, target_variable | |
| ) | |
| # 2. 分割訓練/測試集 (✅ random_state=526) | |
| self.train_data, self.test_data = train_test_split( | |
| processed_df, | |
| test_size=test_fraction, | |
| random_state=526, | |
| stratify=processed_df[target_variable] if target_variable in processed_df.columns else None | |
| ) | |
| # 3. ✅ 學習網路結構 (在分箱和編碼之前!) | |
| self.model = self._learn_structure( | |
| algorithm, score_method, sig_level, target_variable | |
| ) | |
| # 4. ✅ 對分類變數編碼 (在學習結構之後,分箱之前) | |
| self._encode_categorical_features(cat_features) | |
| # 5. ✅ 對連續變數分箱 (在編碼之後) | |
| self._bin_continuous_features(con_features, n_bins) | |
| # 6. 參數估計 | |
| self._fit_parameters(estimator, equivalent_sample_size) | |
| # 7. 初始化推論引擎 | |
| self.inference = VariableElimination(self.model) | |
| # 8. 評估模型 | |
| train_metrics = self._evaluate_model( | |
| self.train_data, target_variable, "train" | |
| ) | |
| test_metrics = self._evaluate_model( | |
| self.test_data, target_variable, "test" | |
| ) | |
| # 9. 獲取 CPD | |
| cpds = self._get_all_cpds() | |
| # 10. 計算模型評分 | |
| scores = self._calculate_scores() | |
| # 11. 整理結果 | |
| results = { | |
| 'model': self.model, | |
| 'inference': self.inference, | |
| 'train_metrics': train_metrics, | |
| 'test_metrics': test_metrics, | |
| 'cpds': cpds, | |
| 'scores': scores, | |
| 'parameters': { | |
| 'algorithm': algorithm, | |
| 'estimator': estimator, | |
| 'test_fraction': test_fraction, | |
| 'n_features': len(cat_features) + len(con_features), | |
| 'cat_features': cat_features, | |
| 'con_features': con_features, | |
| 'target_variable': target_variable, | |
| 'n_bins': n_bins, | |
| 'score_method': score_method, | |
| 'sig_level': sig_level, | |
| 'equivalent_sample_size': equivalent_sample_size | |
| }, | |
| 'timestamp': datetime.now().isoformat() | |
| } | |
| # 儲存到 session results | |
| self._session_results[self.session_id] = results | |
| return results | |
| except Exception as e: | |
| raise Exception(f"Analysis failed: {str(e)}") | |
| def _preprocess_data(self, df, cat_features, con_features, target_variable): | |
| """資料預處理 - 只選擇欄位和刪除缺失值""" | |
| # 選擇需要的欄位 | |
| selected_columns = cat_features + con_features + [target_variable] | |
| processed_df = df[selected_columns].copy() | |
| # 處理缺失值 | |
| processed_df = processed_df.dropna() | |
| return processed_df | |
| def _encode_categorical_features(self, cat_features): | |
| """ | |
| ✅ 將分類變數轉為 category codes - 完全對齊 Django | |
| 注意:只對 cat_features 編碼,不對分箱後的連續變數編碼 | |
| Django 只對 train_data 編碼,但我們為了一致性也對 test_data 編碼 | |
| """ | |
| for col in cat_features: | |
| if col in self.train_data.columns: | |
| if self.train_data[col].dtype == 'object': | |
| self.train_data[col] = self.train_data[col].astype('category').cat.codes | |
| # Django 沒有對 test_data 編碼,但為了預測時一致性,我們也編碼 | |
| if col in self.test_data.columns: | |
| if self.test_data[col].dtype == 'object': | |
| self.test_data[col] = self.test_data[col].astype('category').cat.codes | |
| def _bin_continuous_features(self, con_features, n_bins): | |
| """ | |
| ✅ 對連續變數分箱 - 完全對齊 Django 版本 | |
| 先用訓練集計算邊界,再套用到測試集 | |
| """ | |
| self.bins_dict = {} | |
| for col in con_features: | |
| if col in self.train_data.columns and self.train_data[col].notna().sum() > 0: | |
| # 使用訓練集計算分箱邊界 | |
| bin_edges = pd.cut( | |
| self.train_data[col], | |
| bins=n_bins, | |
| retbins=True, | |
| duplicates='drop' | |
| )[1] | |
| self.bins_dict[col] = bin_edges | |
| # 創建分箱標籤 (✅ 使用 – 而不是 -) | |
| bin_labels = [ | |
| f"{round(bin_edges[i], 2)}–{round(bin_edges[i+1], 2)}" | |
| for i in range(len(bin_edges) - 1) | |
| ] | |
| # 對訓練集分箱 | |
| self.train_data[col] = pd.cut( | |
| self.train_data[col], | |
| bins=bin_edges, | |
| labels=bin_labels, | |
| include_lowest=True | |
| ).astype(object).fillna("Missing") | |
| # 對測試集使用相同邊界分箱 | |
| if col in self.test_data.columns: | |
| self.test_data[col] = pd.cut( | |
| self.test_data[col], | |
| bins=bin_edges, | |
| labels=bin_labels, | |
| include_lowest=True | |
| ).astype(object).fillna("Missing") | |
| else: | |
| print(f"⚠️ Skipped binning column '{col}' – missing or all NaN") | |
| def _learn_structure(self, algorithm, score_method, sig_level, target_variable): | |
| """學習網路結構 - 完全對齊 Django 版本""" | |
| if algorithm == 'NB': | |
| # Naive Bayes | |
| edges = [ | |
| (target_variable, feature) | |
| for feature in self.train_data.columns | |
| if feature != target_variable | |
| ] | |
| model = BayesianNetwork(edges) | |
| elif algorithm == 'TAN': | |
| # Tree-Augmented Naive Bayes | |
| # ✅ 特殊情況處理: 如果同時存在'asia'和'either'列,特別指定'asia'作為根節點 | |
| if 'asia' in self.train_data.columns and 'either' in self.train_data.columns and target_variable == 'either': | |
| tan_search = TreeSearch(self.train_data, root_node='asia') | |
| else: | |
| tan_search = TreeSearch(self.train_data) | |
| structure = tan_search.estimate( | |
| estimator_type='tan', | |
| class_node=target_variable | |
| ) | |
| model = BayesianNetwork(structure.edges()) | |
| elif algorithm == 'CL': | |
| # Chow-Liu | |
| tan_search = TreeSearch(self.train_data) | |
| structure = tan_search.estimate( | |
| estimator_type='chow-liu', | |
| class_node=target_variable | |
| ) | |
| model = BayesianNetwork(structure.edges()) | |
| elif algorithm == 'HC': | |
| # Hill Climbing | |
| hc = HillClimbSearch(self.train_data) | |
| # 選擇評分方法 | |
| scoring_methods = { | |
| 'BIC': BicScore(self.train_data), | |
| 'AIC': AICScore(self.train_data), | |
| 'K2': K2Score(self.train_data), | |
| 'BDeu': BDeuScore(self.train_data), | |
| 'BDs': BDsScore(self.train_data) | |
| } | |
| structure = hc.estimate( | |
| scoring_method=scoring_methods[score_method] | |
| ) | |
| model = BayesianNetwork(structure.edges()) | |
| elif algorithm == 'PC': | |
| # PC Algorithm - ✅ 與 Django 完全一致的降級策略 | |
| pc = PC(self.train_data) | |
| # 嘗試不同的 max_cond_vars 直到成功 | |
| for max_cond in [5, 4, 3, 2, 1]: | |
| try: | |
| structure = pc.estimate( | |
| significance_level=sig_level, | |
| max_cond_vars=max_cond, | |
| ci_test='chi_square', | |
| variant='stable', | |
| n_jobs=1 # ✅ Django 第一次用 1 | |
| ) | |
| # 檢查是否有效 (✅ 與 Django 一致) | |
| edges = structure.edges() | |
| if is_directed_acyclic_graph(DiGraph(edges)) and any(target_variable in edge for edge in edges): | |
| model = BayesianNetwork(structure.edges()) | |
| break | |
| except: | |
| continue | |
| else: | |
| # 如果都失敗,使用 Naive Bayes (✅ 與 Django 一致) | |
| edges = [ | |
| (target_variable, feature) | |
| for feature in self.train_data.columns | |
| if feature != target_variable | |
| ] | |
| model = BayesianNetwork(edges) | |
| else: | |
| raise ValueError(f"Unknown algorithm: {algorithm}") | |
| return model | |
| def _fit_parameters(self, estimator, equivalent_sample_size): | |
| """參數估計""" | |
| if estimator == 'bn': | |
| self.model.fit( | |
| self.train_data, | |
| estimator=BayesianEstimator, | |
| equivalent_sample_size=equivalent_sample_size | |
| ) | |
| else: | |
| self.model.fit( | |
| self.train_data, | |
| estimator=MaximumLikelihoodEstimator | |
| ) | |
| def _predict_probabilities(self, data, target_variable): | |
| """ | |
| 預測機率 - ✅ 與 Django 版本完全一致 | |
| """ | |
| true_labels = [] | |
| predicted_probs = [] | |
| model_nodes = set(self.model.nodes()) | |
| for idx, row in data.iterrows(): | |
| # 準備 evidence (✅ 過濾只在模型中的變數) | |
| raw_evidence = row.drop(target_variable).to_dict() | |
| filtered_evidence = {k: v for k, v in raw_evidence.items() if k in model_nodes} | |
| true_label = row[target_variable] | |
| true_labels.append(true_label) | |
| try: | |
| result = self.inference.query( | |
| variables=[target_variable], | |
| evidence=filtered_evidence | |
| ) | |
| probs = result.values | |
| predicted_probs.append(probs) | |
| except Exception as e: | |
| print(f"⚠️ Inference failed at row {idx} | evidence keys: {list(filtered_evidence.keys())} | error: {e}") | |
| predicted_probs.append(None) | |
| # ✅ 過濾有效結果 (與 Django 一致) | |
| valid_data = [ | |
| (label, prob) | |
| for label, prob in zip(true_labels, predicted_probs) | |
| if prob is not None and len(prob) > 1 | |
| ] | |
| if not valid_data: | |
| return [], [] | |
| valid_labels, valid_probs = zip(*valid_data) | |
| prob_array = np.round(np.array([prob[1] for prob in valid_probs]), 4) | |
| return list(valid_labels), prob_array | |
| def _evaluate_model(self, data, target_variable, dataset_name): | |
| """評估模型效能 - ✅ 與 Django 完全一致""" | |
| # 預測 | |
| true_labels, pred_probs = self._predict_probabilities( | |
| data, target_variable | |
| ) | |
| if len(true_labels) == 0: | |
| return { | |
| 'accuracy': 0, | |
| 'precision': 0, | |
| 'recall': 0, | |
| 'f1': 0, | |
| 'auc': 0, | |
| 'g_mean': 0, | |
| 'p_mean': 0, | |
| 'specificity': 0, | |
| 'confusion_matrix': [[0, 0], [0, 0]], | |
| 'fpr': [0], | |
| 'tpr': [0] | |
| } | |
| # 二元預測 (threshold = 0.1, ✅ 與 Django 一致) | |
| threshold = 0.1 | |
| pred_labels = (pred_probs >= threshold).astype(int) | |
| # 計算指標 | |
| accuracy = accuracy_score(true_labels, pred_labels) * 100 | |
| precision = precision_score(true_labels, pred_labels, zero_division=0) * 100 | |
| recall = recall_score(true_labels, pred_labels, zero_division=0) * 100 | |
| f1 = f1_score(true_labels, pred_labels, zero_division=0) * 100 | |
| # ROC 曲線 | |
| pred_probs_clean = np.nan_to_num(pred_probs, nan=0.0) | |
| fpr, tpr, _ = roc_curve(true_labels, pred_probs_clean) | |
| auc = roc_auc_score(true_labels, pred_probs_clean) | |
| # 混淆矩陣 | |
| cm = confusion_matrix(true_labels, pred_labels).tolist() | |
| # G-mean 和 P-mean (✅ 與 Django 計算方式一致) | |
| tn, fp, fn, tp = confusion_matrix(true_labels, pred_labels).ravel() | |
| sensitivity = tp / (tp + fn) if (tp + fn) > 0 else 0 | |
| specificity = tn / (tn + fp) if (tn + fp) > 0 else 0 | |
| g_mean = np.sqrt(sensitivity * precision / 100) * 100 | |
| p_mean = np.sqrt(specificity * sensitivity) * 100 | |
| return { | |
| 'accuracy': accuracy, | |
| 'precision': precision, | |
| 'recall': recall, | |
| 'f1': f1, | |
| 'auc': auc, | |
| 'g_mean': g_mean, | |
| 'p_mean': p_mean, | |
| 'specificity': specificity * 100, | |
| 'confusion_matrix': cm, | |
| 'fpr': fpr.tolist(), | |
| 'tpr': tpr.tolist(), | |
| 'predicted_probs': pred_probs.tolist() | |
| } | |
| def _get_all_cpds(self): | |
| """獲取所有條件機率表""" | |
| cpds = {} | |
| for node in self.model.nodes(): | |
| cpd = self.model.get_cpds(node) | |
| cpds[node] = cpd | |
| return cpds | |
| def _calculate_scores(self): | |
| """計算模型評分""" | |
| scores = { | |
| 'log_likelihood': log_likelihood_score(self.model, self.train_data), | |
| 'bic': structure_score(self.model, self.train_data, scoring_method='bic'), | |
| 'k2': structure_score(self.model, self.train_data, scoring_method='k2'), | |
| 'bdeu': structure_score(self.model, self.train_data, scoring_method='bdeu'), | |
| 'bds': structure_score(self.model, self.train_data, scoring_method='bds') | |
| } | |
| return scores | |
| def save_model(self, filepath): | |
| """ | |
| 儲存訓練好的模型 | |
| 包含: model, bins_dict, train_data columns 等資訊 | |
| """ | |
| import pickle | |
| model_data = { | |
| 'model': self.model, | |
| 'bins_dict': self.bins_dict, | |
| 'train_columns': list(self.train_data.columns), | |
| 'timestamp': datetime.now().isoformat() | |
| } | |
| with open(filepath, 'wb') as f: | |
| pickle.dump(model_data, f) | |
| def load_model(self, filepath): | |
| """ | |
| 載入已訓練的模型 | |
| """ | |
| import pickle | |
| with open(filepath, 'rb') as f: | |
| model_data = pickle.load(f) | |
| self.model = model_data['model'] | |
| self.bins_dict = model_data['bins_dict'] | |
| self.inference = VariableElimination(self.model) | |
| return model_data | |
| def predict_single_instance(self, evidence_dict, target_variable): | |
| """ | |
| 對單一個案進行預測 | |
| """ | |
| processed_evidence = {} | |
| for key, value in evidence_dict.items(): | |
| if key in self.bins_dict: | |
| # 連續變數需要分箱 | |
| bins = self.bins_dict[key] | |
| # 🆕 處理超出範圍的值 | |
| if value < bins[0]: | |
| # 小於最小值,使用第一個 bin | |
| processed_evidence[key] = f"{round(bins[0], 2)}–{round(bins[1], 2)}" | |
| elif value > bins[-1]: | |
| # 大於最大值,使用最後一個 bin | |
| processed_evidence[key] = f"{round(bins[-2], 2)}–{round(bins[-1], 2)}" | |
| else: | |
| # 正常範圍內,找到對應的 bin | |
| for i in range(len(bins)-1): | |
| if bins[i] <= value <= bins[i+1]: | |
| processed_evidence[key] = f"{round(bins[i], 2)}–{round(bins[i+1], 2)}" | |
| break | |
| else: | |
| # 分類變數直接使用 | |
| processed_evidence[key] = value | |
| # 2. 進行推論 | |
| result = self.inference.query( | |
| variables=[target_variable], | |
| evidence=processed_evidence | |
| ) | |
| # 3. 整理結果 | |
| probs = result.values | |
| death_prob = probs[1] if len(probs) > 1 else probs[0] | |
| # 判斷風險等級 | |
| if death_prob >= 0.7: | |
| risk_level = "High" | |
| elif death_prob >= 0.3: | |
| risk_level = "Moderate" | |
| else: | |
| risk_level = "Low" | |
| return { | |
| 'probability': float(death_prob), | |
| 'risk_level': risk_level, | |
| 'all_probs': {i: float(p) for i, p in enumerate(probs)}, | |
| 'processed_evidence': processed_evidence | |
| } | |
| def get_session_results(cls, session_id): | |
| """獲取特定 session 的結果""" | |
| return cls._session_results.get(session_id) | |
| def clear_session_results(cls, session_id): | |
| """清除特定 session 的結果""" | |
| if session_id in cls._session_results: | |
| del cls._session_results[session_id] | |