import pandas as pd import numpy as np from pgmpy.models import BayesianNetwork from pgmpy.estimators import ( TreeSearch, HillClimbSearch, PC, MaximumLikelihoodEstimator, BayesianEstimator, BicScore, AICScore, K2Score, BDeuScore, BDsScore ) from pgmpy.inference import VariableElimination from sklearn.model_selection import train_test_split from sklearn.metrics import ( confusion_matrix, accuracy_score, precision_score, recall_score, f1_score, roc_curve, roc_auc_score ) from pgmpy.metrics import log_likelihood_score, structure_score import threading from datetime import datetime from networkx import is_directed_acyclic_graph, DiGraph class BayesianNetworkAnalyzer: """ 貝葉斯網路分析器 支持多用戶同時使用,每個 session 獨立處理 """ # 類別級的鎖,用於線程安全 _lock = threading.Lock() # 儲存各 session 的分析結果 _session_results = {} def __init__(self, session_id): """ 初始化分析器 Args: session_id: 唯一的 session 識別碼 """ self.session_id = session_id self.model = None self.inference = None self.train_data = None self.test_data = None self.bins_dict = {} def run_analysis(self, df, cat_features, con_features, target_variable, test_fraction=0.25, algorithm='NB', estimator='ml', equivalent_sample_size=3, score_method='BIC', sig_level=0.05, n_bins=10): """ 執行完整的貝葉斯網路分析 - 完全對齊 Django 版本的順序 Args: df: 原始資料框 cat_features: 分類特徵列表 con_features: 連續特徵列表 target_variable: 目標變數名稱 test_fraction: 測試集比例 algorithm: 結構學習演算法 estimator: 參數估計方法 equivalent_sample_size: 等效樣本大小(用於貝葉斯估計) score_method: 評分方法(用於 Hill Climbing) sig_level: 顯著性水準(用於 PC 演算法) n_bins: 連續變數分箱數量 Returns: dict: 包含所有分析結果的字典 """ with self._lock: try: # 1. 資料預處理 (只選擇欄位和處理缺失值) processed_df = self._preprocess_data( df, cat_features, con_features, target_variable ) # 2. 分割訓練/測試集 (✅ random_state=526) self.train_data, self.test_data = train_test_split( processed_df, test_size=test_fraction, random_state=526, stratify=processed_df[target_variable] if target_variable in processed_df.columns else None ) # 3. ✅ 學習網路結構 (在分箱和編碼之前!) self.model = self._learn_structure( algorithm, score_method, sig_level, target_variable ) # 4. ✅ 對分類變數編碼 (在學習結構之後,分箱之前) self._encode_categorical_features(cat_features) # 5. ✅ 對連續變數分箱 (在編碼之後) self._bin_continuous_features(con_features, n_bins) # 6. 參數估計 self._fit_parameters(estimator, equivalent_sample_size) # 7. 初始化推論引擎 self.inference = VariableElimination(self.model) # 8. 評估模型 train_metrics = self._evaluate_model( self.train_data, target_variable, "train" ) test_metrics = self._evaluate_model( self.test_data, target_variable, "test" ) # 9. 獲取 CPD cpds = self._get_all_cpds() # 10. 計算模型評分 scores = self._calculate_scores() # 11. 整理結果 results = { 'model': self.model, 'inference': self.inference, 'train_metrics': train_metrics, 'test_metrics': test_metrics, 'cpds': cpds, 'scores': scores, 'parameters': { 'algorithm': algorithm, 'estimator': estimator, 'test_fraction': test_fraction, 'n_features': len(cat_features) + len(con_features), 'cat_features': cat_features, 'con_features': con_features, 'target_variable': target_variable, 'n_bins': n_bins, 'score_method': score_method, 'sig_level': sig_level, 'equivalent_sample_size': equivalent_sample_size }, 'timestamp': datetime.now().isoformat() } # 儲存到 session results self._session_results[self.session_id] = results return results except Exception as e: raise Exception(f"Analysis failed: {str(e)}") def _preprocess_data(self, df, cat_features, con_features, target_variable): """資料預處理 - 只選擇欄位和刪除缺失值""" # 選擇需要的欄位 selected_columns = cat_features + con_features + [target_variable] processed_df = df[selected_columns].copy() # 處理缺失值 processed_df = processed_df.dropna() return processed_df def _encode_categorical_features(self, cat_features): """ ✅ 將分類變數轉為 category codes - 完全對齊 Django 注意:只對 cat_features 編碼,不對分箱後的連續變數編碼 Django 只對 train_data 編碼,但我們為了一致性也對 test_data 編碼 """ for col in cat_features: if col in self.train_data.columns: if self.train_data[col].dtype == 'object': self.train_data[col] = self.train_data[col].astype('category').cat.codes # Django 沒有對 test_data 編碼,但為了預測時一致性,我們也編碼 if col in self.test_data.columns: if self.test_data[col].dtype == 'object': self.test_data[col] = self.test_data[col].astype('category').cat.codes def _bin_continuous_features(self, con_features, n_bins): """ ✅ 對連續變數分箱 - 完全對齊 Django 版本 先用訓練集計算邊界,再套用到測試集 """ self.bins_dict = {} for col in con_features: if col in self.train_data.columns and self.train_data[col].notna().sum() > 0: # 使用訓練集計算分箱邊界 bin_edges = pd.cut( self.train_data[col], bins=n_bins, retbins=True, duplicates='drop' )[1] self.bins_dict[col] = bin_edges # 創建分箱標籤 (✅ 使用 – 而不是 -) bin_labels = [ f"{round(bin_edges[i], 2)}–{round(bin_edges[i+1], 2)}" for i in range(len(bin_edges) - 1) ] # 對訓練集分箱 self.train_data[col] = pd.cut( self.train_data[col], bins=bin_edges, labels=bin_labels, include_lowest=True ).astype(object).fillna("Missing") # 對測試集使用相同邊界分箱 if col in self.test_data.columns: self.test_data[col] = pd.cut( self.test_data[col], bins=bin_edges, labels=bin_labels, include_lowest=True ).astype(object).fillna("Missing") else: print(f"⚠️ Skipped binning column '{col}' – missing or all NaN") def _learn_structure(self, algorithm, score_method, sig_level, target_variable): """學習網路結構 - 完全對齊 Django 版本""" if algorithm == 'NB': # Naive Bayes edges = [ (target_variable, feature) for feature in self.train_data.columns if feature != target_variable ] model = BayesianNetwork(edges) elif algorithm == 'TAN': # Tree-Augmented Naive Bayes # ✅ 特殊情況處理: 如果同時存在'asia'和'either'列,特別指定'asia'作為根節點 if 'asia' in self.train_data.columns and 'either' in self.train_data.columns and target_variable == 'either': tan_search = TreeSearch(self.train_data, root_node='asia') else: tan_search = TreeSearch(self.train_data) structure = tan_search.estimate( estimator_type='tan', class_node=target_variable ) model = BayesianNetwork(structure.edges()) elif algorithm == 'CL': # Chow-Liu tan_search = TreeSearch(self.train_data) structure = tan_search.estimate( estimator_type='chow-liu', class_node=target_variable ) model = BayesianNetwork(structure.edges()) elif algorithm == 'HC': # Hill Climbing hc = HillClimbSearch(self.train_data) # 選擇評分方法 scoring_methods = { 'BIC': BicScore(self.train_data), 'AIC': AICScore(self.train_data), 'K2': K2Score(self.train_data), 'BDeu': BDeuScore(self.train_data), 'BDs': BDsScore(self.train_data) } structure = hc.estimate( scoring_method=scoring_methods[score_method] ) model = BayesianNetwork(structure.edges()) elif algorithm == 'PC': # PC Algorithm - ✅ 與 Django 完全一致的降級策略 pc = PC(self.train_data) # 嘗試不同的 max_cond_vars 直到成功 for max_cond in [5, 4, 3, 2, 1]: try: structure = pc.estimate( significance_level=sig_level, max_cond_vars=max_cond, ci_test='chi_square', variant='stable', n_jobs=1 # ✅ Django 第一次用 1 ) # 檢查是否有效 (✅ 與 Django 一致) edges = structure.edges() if is_directed_acyclic_graph(DiGraph(edges)) and any(target_variable in edge for edge in edges): model = BayesianNetwork(structure.edges()) break except: continue else: # 如果都失敗,使用 Naive Bayes (✅ 與 Django 一致) edges = [ (target_variable, feature) for feature in self.train_data.columns if feature != target_variable ] model = BayesianNetwork(edges) else: raise ValueError(f"Unknown algorithm: {algorithm}") return model def _fit_parameters(self, estimator, equivalent_sample_size): """參數估計""" if estimator == 'bn': self.model.fit( self.train_data, estimator=BayesianEstimator, equivalent_sample_size=equivalent_sample_size ) else: self.model.fit( self.train_data, estimator=MaximumLikelihoodEstimator ) def _predict_probabilities(self, data, target_variable): """ 預測機率 - ✅ 與 Django 版本完全一致 """ true_labels = [] predicted_probs = [] model_nodes = set(self.model.nodes()) for idx, row in data.iterrows(): # 準備 evidence (✅ 過濾只在模型中的變數) raw_evidence = row.drop(target_variable).to_dict() filtered_evidence = {k: v for k, v in raw_evidence.items() if k in model_nodes} true_label = row[target_variable] true_labels.append(true_label) try: result = self.inference.query( variables=[target_variable], evidence=filtered_evidence ) probs = result.values predicted_probs.append(probs) except Exception as e: print(f"⚠️ Inference failed at row {idx} | evidence keys: {list(filtered_evidence.keys())} | error: {e}") predicted_probs.append(None) # ✅ 過濾有效結果 (與 Django 一致) valid_data = [ (label, prob) for label, prob in zip(true_labels, predicted_probs) if prob is not None and len(prob) > 1 ] if not valid_data: return [], [] valid_labels, valid_probs = zip(*valid_data) prob_array = np.round(np.array([prob[1] for prob in valid_probs]), 4) return list(valid_labels), prob_array def _evaluate_model(self, data, target_variable, dataset_name): """評估模型效能 - ✅ 與 Django 完全一致""" # 預測 true_labels, pred_probs = self._predict_probabilities( data, target_variable ) if len(true_labels) == 0: return { 'accuracy': 0, 'precision': 0, 'recall': 0, 'f1': 0, 'auc': 0, 'g_mean': 0, 'p_mean': 0, 'specificity': 0, 'confusion_matrix': [[0, 0], [0, 0]], 'fpr': [0], 'tpr': [0] } # 二元預測 (threshold = 0.1, ✅ 與 Django 一致) threshold = 0.1 pred_labels = (pred_probs >= threshold).astype(int) # 計算指標 accuracy = accuracy_score(true_labels, pred_labels) * 100 precision = precision_score(true_labels, pred_labels, zero_division=0) * 100 recall = recall_score(true_labels, pred_labels, zero_division=0) * 100 f1 = f1_score(true_labels, pred_labels, zero_division=0) * 100 # ROC 曲線 pred_probs_clean = np.nan_to_num(pred_probs, nan=0.0) fpr, tpr, _ = roc_curve(true_labels, pred_probs_clean) auc = roc_auc_score(true_labels, pred_probs_clean) # 混淆矩陣 cm = confusion_matrix(true_labels, pred_labels).tolist() # G-mean 和 P-mean (✅ 與 Django 計算方式一致) tn, fp, fn, tp = confusion_matrix(true_labels, pred_labels).ravel() sensitivity = tp / (tp + fn) if (tp + fn) > 0 else 0 specificity = tn / (tn + fp) if (tn + fp) > 0 else 0 g_mean = np.sqrt(sensitivity * precision / 100) * 100 p_mean = np.sqrt(specificity * sensitivity) * 100 return { 'accuracy': accuracy, 'precision': precision, 'recall': recall, 'f1': f1, 'auc': auc, 'g_mean': g_mean, 'p_mean': p_mean, 'specificity': specificity * 100, 'confusion_matrix': cm, 'fpr': fpr.tolist(), 'tpr': tpr.tolist(), 'predicted_probs': pred_probs.tolist() } def _get_all_cpds(self): """獲取所有條件機率表""" cpds = {} for node in self.model.nodes(): cpd = self.model.get_cpds(node) cpds[node] = cpd return cpds def _calculate_scores(self): """計算模型評分""" scores = { 'log_likelihood': log_likelihood_score(self.model, self.train_data), 'bic': structure_score(self.model, self.train_data, scoring_method='bic'), 'k2': structure_score(self.model, self.train_data, scoring_method='k2'), 'bdeu': structure_score(self.model, self.train_data, scoring_method='bdeu'), 'bds': structure_score(self.model, self.train_data, scoring_method='bds') } return scores def save_model(self, filepath): """ 儲存訓練好的模型 包含: model, bins_dict, train_data columns 等資訊 """ import pickle model_data = { 'model': self.model, 'bins_dict': self.bins_dict, 'train_columns': list(self.train_data.columns), 'timestamp': datetime.now().isoformat() } with open(filepath, 'wb') as f: pickle.dump(model_data, f) def load_model(self, filepath): """ 載入已訓練的模型 """ import pickle with open(filepath, 'rb') as f: model_data = pickle.load(f) self.model = model_data['model'] self.bins_dict = model_data['bins_dict'] self.inference = VariableElimination(self.model) return model_data def predict_single_instance(self, evidence_dict, target_variable): """ 對單一個案進行預測 """ processed_evidence = {} for key, value in evidence_dict.items(): if key in self.bins_dict: # 連續變數需要分箱 bins = self.bins_dict[key] # 🆕 處理超出範圍的值 if value < bins[0]: # 小於最小值,使用第一個 bin processed_evidence[key] = f"{round(bins[0], 2)}–{round(bins[1], 2)}" elif value > bins[-1]: # 大於最大值,使用最後一個 bin processed_evidence[key] = f"{round(bins[-2], 2)}–{round(bins[-1], 2)}" else: # 正常範圍內,找到對應的 bin for i in range(len(bins)-1): if bins[i] <= value <= bins[i+1]: processed_evidence[key] = f"{round(bins[i], 2)}–{round(bins[i+1], 2)}" break else: # 分類變數直接使用 processed_evidence[key] = value # 2. 進行推論 result = self.inference.query( variables=[target_variable], evidence=processed_evidence ) # 3. 整理結果 probs = result.values death_prob = probs[1] if len(probs) > 1 else probs[0] # 判斷風險等級 if death_prob >= 0.7: risk_level = "High" elif death_prob >= 0.3: risk_level = "Moderate" else: risk_level = "Low" return { 'probability': float(death_prob), 'risk_level': risk_level, 'all_probs': {i: float(p) for i, p in enumerate(probs)}, 'processed_evidence': processed_evidence } @classmethod def get_session_results(cls, session_id): """獲取特定 session 的結果""" return cls._session_results.get(session_id) @classmethod def clear_session_results(cls, session_id): """清除特定 session 的結果""" if session_id in cls._session_results: del cls._session_results[session_id]