bayesian-network / bn_core.py
Wen1201's picture
Upload 3 files
0ee744a verified
import pandas as pd
import numpy as np
from pgmpy.models import BayesianNetwork
from pgmpy.estimators import (
TreeSearch, HillClimbSearch, PC,
MaximumLikelihoodEstimator, BayesianEstimator,
BicScore, AICScore, K2Score, BDeuScore, BDsScore
)
from pgmpy.inference import VariableElimination
from sklearn.model_selection import train_test_split
from sklearn.metrics import (
confusion_matrix, accuracy_score, precision_score,
recall_score, f1_score, roc_curve, roc_auc_score
)
from pgmpy.metrics import log_likelihood_score, structure_score
import threading
from datetime import datetime
from networkx import is_directed_acyclic_graph, DiGraph
class BayesianNetworkAnalyzer:
"""
貝葉斯網路分析器
支持多用戶同時使用,每個 session 獨立處理
"""
# 類別級的鎖,用於線程安全
_lock = threading.Lock()
# 儲存各 session 的分析結果
_session_results = {}
def __init__(self, session_id):
"""
初始化分析器
Args:
session_id: 唯一的 session 識別碼
"""
self.session_id = session_id
self.model = None
self.inference = None
self.train_data = None
self.test_data = None
self.bins_dict = {}
def run_analysis(self, df, cat_features, con_features, target_variable,
test_fraction=0.25, algorithm='NB', estimator='ml',
equivalent_sample_size=3, score_method='BIC',
sig_level=0.05, n_bins=10):
"""
執行完整的貝葉斯網路分析 - 完全對齊 Django 版本的順序
Args:
df: 原始資料框
cat_features: 分類特徵列表
con_features: 連續特徵列表
target_variable: 目標變數名稱
test_fraction: 測試集比例
algorithm: 結構學習演算法
estimator: 參數估計方法
equivalent_sample_size: 等效樣本大小(用於貝葉斯估計)
score_method: 評分方法(用於 Hill Climbing)
sig_level: 顯著性水準(用於 PC 演算法)
n_bins: 連續變數分箱數量
Returns:
dict: 包含所有分析結果的字典
"""
with self._lock:
try:
# 1. 資料預處理 (只選擇欄位和處理缺失值)
processed_df = self._preprocess_data(
df, cat_features, con_features, target_variable
)
# 2. 分割訓練/測試集 (✅ random_state=526)
self.train_data, self.test_data = train_test_split(
processed_df,
test_size=test_fraction,
random_state=526,
stratify=processed_df[target_variable] if target_variable in processed_df.columns else None
)
# 3. ✅ 學習網路結構 (在分箱和編碼之前!)
self.model = self._learn_structure(
algorithm, score_method, sig_level, target_variable
)
# 4. ✅ 對分類變數編碼 (在學習結構之後,分箱之前)
self._encode_categorical_features(cat_features)
# 5. ✅ 對連續變數分箱 (在編碼之後)
self._bin_continuous_features(con_features, n_bins)
# 6. 參數估計
self._fit_parameters(estimator, equivalent_sample_size)
# 7. 初始化推論引擎
self.inference = VariableElimination(self.model)
# 8. 評估模型
train_metrics = self._evaluate_model(
self.train_data, target_variable, "train"
)
test_metrics = self._evaluate_model(
self.test_data, target_variable, "test"
)
# 9. 獲取 CPD
cpds = self._get_all_cpds()
# 10. 計算模型評分
scores = self._calculate_scores()
# 11. 整理結果
results = {
'model': self.model,
'inference': self.inference,
'train_metrics': train_metrics,
'test_metrics': test_metrics,
'cpds': cpds,
'scores': scores,
'parameters': {
'algorithm': algorithm,
'estimator': estimator,
'test_fraction': test_fraction,
'n_features': len(cat_features) + len(con_features),
'cat_features': cat_features,
'con_features': con_features,
'target_variable': target_variable,
'n_bins': n_bins,
'score_method': score_method,
'sig_level': sig_level,
'equivalent_sample_size': equivalent_sample_size
},
'timestamp': datetime.now().isoformat()
}
# 儲存到 session results
self._session_results[self.session_id] = results
return results
except Exception as e:
raise Exception(f"Analysis failed: {str(e)}")
def _preprocess_data(self, df, cat_features, con_features, target_variable):
"""資料預處理 - 只選擇欄位和刪除缺失值"""
# 選擇需要的欄位
selected_columns = cat_features + con_features + [target_variable]
processed_df = df[selected_columns].copy()
# 處理缺失值
processed_df = processed_df.dropna()
return processed_df
def _encode_categorical_features(self, cat_features):
"""
✅ 將分類變數轉為 category codes - 完全對齊 Django
注意:只對 cat_features 編碼,不對分箱後的連續變數編碼
Django 只對 train_data 編碼,但我們為了一致性也對 test_data 編碼
"""
for col in cat_features:
if col in self.train_data.columns:
if self.train_data[col].dtype == 'object':
self.train_data[col] = self.train_data[col].astype('category').cat.codes
# Django 沒有對 test_data 編碼,但為了預測時一致性,我們也編碼
if col in self.test_data.columns:
if self.test_data[col].dtype == 'object':
self.test_data[col] = self.test_data[col].astype('category').cat.codes
def _bin_continuous_features(self, con_features, n_bins):
"""
✅ 對連續變數分箱 - 完全對齊 Django 版本
先用訓練集計算邊界,再套用到測試集
"""
self.bins_dict = {}
for col in con_features:
if col in self.train_data.columns and self.train_data[col].notna().sum() > 0:
# 使用訓練集計算分箱邊界
bin_edges = pd.cut(
self.train_data[col],
bins=n_bins,
retbins=True,
duplicates='drop'
)[1]
self.bins_dict[col] = bin_edges
# 創建分箱標籤 (✅ 使用 – 而不是 -)
bin_labels = [
f"{round(bin_edges[i], 2)}{round(bin_edges[i+1], 2)}"
for i in range(len(bin_edges) - 1)
]
# 對訓練集分箱
self.train_data[col] = pd.cut(
self.train_data[col],
bins=bin_edges,
labels=bin_labels,
include_lowest=True
).astype(object).fillna("Missing")
# 對測試集使用相同邊界分箱
if col in self.test_data.columns:
self.test_data[col] = pd.cut(
self.test_data[col],
bins=bin_edges,
labels=bin_labels,
include_lowest=True
).astype(object).fillna("Missing")
else:
print(f"⚠️ Skipped binning column '{col}' – missing or all NaN")
def _learn_structure(self, algorithm, score_method, sig_level, target_variable):
"""學習網路結構 - 完全對齊 Django 版本"""
if algorithm == 'NB':
# Naive Bayes
edges = [
(target_variable, feature)
for feature in self.train_data.columns
if feature != target_variable
]
model = BayesianNetwork(edges)
elif algorithm == 'TAN':
# Tree-Augmented Naive Bayes
# ✅ 特殊情況處理: 如果同時存在'asia'和'either'列,特別指定'asia'作為根節點
if 'asia' in self.train_data.columns and 'either' in self.train_data.columns and target_variable == 'either':
tan_search = TreeSearch(self.train_data, root_node='asia')
else:
tan_search = TreeSearch(self.train_data)
structure = tan_search.estimate(
estimator_type='tan',
class_node=target_variable
)
model = BayesianNetwork(structure.edges())
elif algorithm == 'CL':
# Chow-Liu
tan_search = TreeSearch(self.train_data)
structure = tan_search.estimate(
estimator_type='chow-liu',
class_node=target_variable
)
model = BayesianNetwork(structure.edges())
elif algorithm == 'HC':
# Hill Climbing
hc = HillClimbSearch(self.train_data)
# 選擇評分方法
scoring_methods = {
'BIC': BicScore(self.train_data),
'AIC': AICScore(self.train_data),
'K2': K2Score(self.train_data),
'BDeu': BDeuScore(self.train_data),
'BDs': BDsScore(self.train_data)
}
structure = hc.estimate(
scoring_method=scoring_methods[score_method]
)
model = BayesianNetwork(structure.edges())
elif algorithm == 'PC':
# PC Algorithm - ✅ 與 Django 完全一致的降級策略
pc = PC(self.train_data)
# 嘗試不同的 max_cond_vars 直到成功
for max_cond in [5, 4, 3, 2, 1]:
try:
structure = pc.estimate(
significance_level=sig_level,
max_cond_vars=max_cond,
ci_test='chi_square',
variant='stable',
n_jobs=1 # ✅ Django 第一次用 1
)
# 檢查是否有效 (✅ 與 Django 一致)
edges = structure.edges()
if is_directed_acyclic_graph(DiGraph(edges)) and any(target_variable in edge for edge in edges):
model = BayesianNetwork(structure.edges())
break
except:
continue
else:
# 如果都失敗,使用 Naive Bayes (✅ 與 Django 一致)
edges = [
(target_variable, feature)
for feature in self.train_data.columns
if feature != target_variable
]
model = BayesianNetwork(edges)
else:
raise ValueError(f"Unknown algorithm: {algorithm}")
return model
def _fit_parameters(self, estimator, equivalent_sample_size):
"""參數估計"""
if estimator == 'bn':
self.model.fit(
self.train_data,
estimator=BayesianEstimator,
equivalent_sample_size=equivalent_sample_size
)
else:
self.model.fit(
self.train_data,
estimator=MaximumLikelihoodEstimator
)
def _predict_probabilities(self, data, target_variable):
"""
預測機率 - ✅ 與 Django 版本完全一致
"""
true_labels = []
predicted_probs = []
model_nodes = set(self.model.nodes())
for idx, row in data.iterrows():
# 準備 evidence (✅ 過濾只在模型中的變數)
raw_evidence = row.drop(target_variable).to_dict()
filtered_evidence = {k: v for k, v in raw_evidence.items() if k in model_nodes}
true_label = row[target_variable]
true_labels.append(true_label)
try:
result = self.inference.query(
variables=[target_variable],
evidence=filtered_evidence
)
probs = result.values
predicted_probs.append(probs)
except Exception as e:
print(f"⚠️ Inference failed at row {idx} | evidence keys: {list(filtered_evidence.keys())} | error: {e}")
predicted_probs.append(None)
# ✅ 過濾有效結果 (與 Django 一致)
valid_data = [
(label, prob)
for label, prob in zip(true_labels, predicted_probs)
if prob is not None and len(prob) > 1
]
if not valid_data:
return [], []
valid_labels, valid_probs = zip(*valid_data)
prob_array = np.round(np.array([prob[1] for prob in valid_probs]), 4)
return list(valid_labels), prob_array
def _evaluate_model(self, data, target_variable, dataset_name):
"""評估模型效能 - ✅ 與 Django 完全一致"""
# 預測
true_labels, pred_probs = self._predict_probabilities(
data, target_variable
)
if len(true_labels) == 0:
return {
'accuracy': 0,
'precision': 0,
'recall': 0,
'f1': 0,
'auc': 0,
'g_mean': 0,
'p_mean': 0,
'specificity': 0,
'confusion_matrix': [[0, 0], [0, 0]],
'fpr': [0],
'tpr': [0]
}
# 二元預測 (threshold = 0.1, ✅ 與 Django 一致)
threshold = 0.1
pred_labels = (pred_probs >= threshold).astype(int)
# 計算指標
accuracy = accuracy_score(true_labels, pred_labels) * 100
precision = precision_score(true_labels, pred_labels, zero_division=0) * 100
recall = recall_score(true_labels, pred_labels, zero_division=0) * 100
f1 = f1_score(true_labels, pred_labels, zero_division=0) * 100
# ROC 曲線
pred_probs_clean = np.nan_to_num(pred_probs, nan=0.0)
fpr, tpr, _ = roc_curve(true_labels, pred_probs_clean)
auc = roc_auc_score(true_labels, pred_probs_clean)
# 混淆矩陣
cm = confusion_matrix(true_labels, pred_labels).tolist()
# G-mean 和 P-mean (✅ 與 Django 計算方式一致)
tn, fp, fn, tp = confusion_matrix(true_labels, pred_labels).ravel()
sensitivity = tp / (tp + fn) if (tp + fn) > 0 else 0
specificity = tn / (tn + fp) if (tn + fp) > 0 else 0
g_mean = np.sqrt(sensitivity * precision / 100) * 100
p_mean = np.sqrt(specificity * sensitivity) * 100
return {
'accuracy': accuracy,
'precision': precision,
'recall': recall,
'f1': f1,
'auc': auc,
'g_mean': g_mean,
'p_mean': p_mean,
'specificity': specificity * 100,
'confusion_matrix': cm,
'fpr': fpr.tolist(),
'tpr': tpr.tolist(),
'predicted_probs': pred_probs.tolist()
}
def _get_all_cpds(self):
"""獲取所有條件機率表"""
cpds = {}
for node in self.model.nodes():
cpd = self.model.get_cpds(node)
cpds[node] = cpd
return cpds
def _calculate_scores(self):
"""計算模型評分"""
scores = {
'log_likelihood': log_likelihood_score(self.model, self.train_data),
'bic': structure_score(self.model, self.train_data, scoring_method='bic'),
'k2': structure_score(self.model, self.train_data, scoring_method='k2'),
'bdeu': structure_score(self.model, self.train_data, scoring_method='bdeu'),
'bds': structure_score(self.model, self.train_data, scoring_method='bds')
}
return scores
def save_model(self, filepath):
"""
儲存訓練好的模型
包含: model, bins_dict, train_data columns 等資訊
"""
import pickle
model_data = {
'model': self.model,
'bins_dict': self.bins_dict,
'train_columns': list(self.train_data.columns),
'timestamp': datetime.now().isoformat()
}
with open(filepath, 'wb') as f:
pickle.dump(model_data, f)
def load_model(self, filepath):
"""
載入已訓練的模型
"""
import pickle
with open(filepath, 'rb') as f:
model_data = pickle.load(f)
self.model = model_data['model']
self.bins_dict = model_data['bins_dict']
self.inference = VariableElimination(self.model)
return model_data
def predict_single_instance(self, evidence_dict, target_variable):
"""
對單一個案進行預測
"""
processed_evidence = {}
for key, value in evidence_dict.items():
if key in self.bins_dict:
# 連續變數需要分箱
bins = self.bins_dict[key]
# 🆕 處理超出範圍的值
if value < bins[0]:
# 小於最小值,使用第一個 bin
processed_evidence[key] = f"{round(bins[0], 2)}{round(bins[1], 2)}"
elif value > bins[-1]:
# 大於最大值,使用最後一個 bin
processed_evidence[key] = f"{round(bins[-2], 2)}{round(bins[-1], 2)}"
else:
# 正常範圍內,找到對應的 bin
for i in range(len(bins)-1):
if bins[i] <= value <= bins[i+1]:
processed_evidence[key] = f"{round(bins[i], 2)}{round(bins[i+1], 2)}"
break
else:
# 分類變數直接使用
processed_evidence[key] = value
# 2. 進行推論
result = self.inference.query(
variables=[target_variable],
evidence=processed_evidence
)
# 3. 整理結果
probs = result.values
death_prob = probs[1] if len(probs) > 1 else probs[0]
# 判斷風險等級
if death_prob >= 0.7:
risk_level = "High"
elif death_prob >= 0.3:
risk_level = "Moderate"
else:
risk_level = "Low"
return {
'probability': float(death_prob),
'risk_level': risk_level,
'all_probs': {i: float(p) for i, p in enumerate(probs)},
'processed_evidence': processed_evidence
}
@classmethod
def get_session_results(cls, session_id):
"""獲取特定 session 的結果"""
return cls._session_results.get(session_id)
@classmethod
def clear_session_results(cls, session_id):
"""清除特定 session 的結果"""
if session_id in cls._session_results:
del cls._session_results[session_id]