bayesian-network / utils.py
Wen1201's picture
Upload 8 files
7d4f1a2 verified
import plotly.graph_objects as go
import plotly.express as px
import pandas as pd
import numpy as np
import networkx as nx
from plotly.subplots import make_subplots
def plot_roc_curve(fpr, tpr, auc, title="ROC Curve"):
"""
繪製 ROC 曲線
Args:
fpr: False positive rate
tpr: True positive rate
auc: Area under curve
title: 圖表標題
Returns:
plotly figure
"""
fig = go.Figure()
# ROC 曲線
fig.add_trace(go.Scatter(
x=fpr,
y=tpr,
mode='lines',
name=f'ROC Curve (AUC = {auc:.4f})',
line=dict(color='#2d6ca2', width=2)
))
# 對角線(隨機分類器)
fig.add_trace(go.Scatter(
x=[0, 1],
y=[0, 1],
mode='lines',
name='Random Classifier',
line=dict(color='gray', width=1, dash='dash')
))
fig.update_layout(
title=title,
xaxis_title='False Positive Rate',
yaxis_title='True Positive Rate',
width=600,
height=500,
template='plotly_white',
legend=dict(x=0.6, y=0.1)
)
return fig
def plot_confusion_matrix(cm, title="Confusion Matrix"):
"""
繪製混淆矩陣
Args:
cm: 混淆矩陣 (2x2 list)
title: 圖表標題
Returns:
plotly figure
"""
# 轉換為 numpy array
cm_array = np.array(cm)
# 計算百分比
cm_percent = cm_array / cm_array.sum() * 100
# 創建標籤
labels = [
[f'{cm_array[i][j]}<br>({cm_percent[i][j]:.1f}%)'
for j in range(2)]
for i in range(2)
]
fig = go.Figure(data=go.Heatmap(
z=cm_array,
x=['Predicted: 0', 'Predicted: 1'],
y=['Actual: 0', 'Actual: 1'],
text=labels,
texttemplate='%{text}',
textfont={"size": 14},
colorscale='Blues',
showscale=True
))
fig.update_layout(
title=title,
width=500,
height=450,
template='plotly_white'
)
return fig
def plot_probability_distribution(probs, title="Probability Distribution"):
"""
繪製機率分佈圖
Args:
probs: 預測機率列表
title: 圖表標題
Returns:
plotly figure
"""
fig = go.Figure()
fig.add_trace(go.Histogram(
x=probs,
nbinsx=20,
name='Predicted Probabilities',
marker=dict(
color='#2d6ca2',
line=dict(color='white', width=1)
)
))
fig.update_layout(
title=title,
xaxis_title='Predicted Probability for Class 1',
yaxis_title='Frequency',
width=700,
height=400,
template='plotly_white',
showlegend=False
)
fig.update_xaxes(range=[0, 1])
return fig
def generate_network_graph(model):
"""
生成貝葉斯網路結構圖
Args:
model: BayesianNetwork 模型
Returns:
plotly figure
"""
# 創建 NetworkX 圖
G = nx.DiGraph()
G.add_edges_from(model.edges())
# 使用層次佈局
try:
pos = nx.spring_layout(G, k=2, iterations=50, seed=42)
except:
pos = nx.circular_layout(G)
# 提取節點和邊的座標
edge_x = []
edge_y = []
for edge in G.edges():
x0, y0 = pos[edge[0]]
x1, y1 = pos[edge[1]]
edge_x.extend([x0, x1, None])
edge_y.extend([y0, y1, None])
edge_trace = go.Scatter(
x=edge_x, y=edge_y,
line=dict(width=2, color='#888'),
hoverinfo='none',
mode='lines',
showlegend=False
)
node_x = []
node_y = []
node_text = []
for node in G.nodes():
x, y = pos[node]
node_x.append(x)
node_y.append(y)
node_text.append(node)
node_trace = go.Scatter(
x=node_x, y=node_y,
mode='markers+text',
hoverinfo='text',
text=node_text,
textposition="top center",
showlegend=False,
marker=dict(
size=30,
color='#2d6ca2',
line=dict(width=2, color='white')
)
)
# 添加箭頭
annotations = []
for edge in G.edges():
x0, y0 = pos[edge[0]]
x1, y1 = pos[edge[1]]
# 計算箭頭位置(在邊的中點)
mid_x = (x0 + x1) / 2
mid_y = (y0 + y1) / 2
annotations.append(
dict(
ax=x0, ay=y0,
axref='x', ayref='y',
x=x1, y=y1,
xref='x', yref='y',
showarrow=True,
arrowhead=2,
arrowsize=1,
arrowwidth=2,
arrowcolor='#888'
)
)
fig = go.Figure(data=[edge_trace, node_trace])
fig.update_layout(
title='Bayesian Network Structure',
titlefont_size=16,
showlegend=False,
hovermode='closest',
margin=dict(b=20, l=5, r=5, t=40),
annotations=annotations,
xaxis=dict(showgrid=False, zeroline=False, showticklabels=False),
yaxis=dict(showgrid=False, zeroline=False, showticklabels=False),
width=900,
height=700,
template='plotly_white'
)
return fig
def create_cpd_table(cpd):
"""
創建條件機率表的 DataFrame
Args:
cpd: CPD 物件
Returns:
pandas DataFrame
"""
if cpd is None:
return pd.DataFrame()
# 獲取變數資訊
variable = cpd.variable
evidence_vars = cpd.variables[1:] if len(cpd.variables) > 1 else []
# 如果是根節點(沒有父節點)
if not evidence_vars:
values = np.round(cpd.values.flatten(), 4)
df = pd.DataFrame(
{variable: values},
index=[f"{variable}({i})" for i in range(len(values))]
)
return df
# 有父節點的情況
evidence_card = cpd.cardinality[1:]
# 生成多層索引欄位
from itertools import product
column_values = list(product(*[range(card) for card in evidence_card]))
# 創建欄位名稱
columns = pd.MultiIndex.from_tuples(
[tuple(f"{var}({val})" for var, val in zip(evidence_vars, vals))
for vals in column_values],
names=evidence_vars
)
# 重塑 CPD 值
reshaped_values = cpd.values.reshape(len(cpd.values), -1)
reshaped_values = np.round(reshaped_values, 4)
# 創建 DataFrame
df = pd.DataFrame(
reshaped_values,
index=[f"{variable}({i})" for i in range(len(cpd.values))],
columns=columns
)
return df
def create_metrics_comparison_table(train_metrics, test_metrics):
"""
創建訓練集和測試集指標比較表
Args:
train_metrics: 訓練集指標字典
test_metrics: 測試集指標字典
Returns:
pandas DataFrame
"""
metrics_data = {
'Metric': [
'Accuracy', 'Precision', 'Recall', 'F1-Score',
'AUC', 'G-mean', 'P-mean', 'Specificity'
],
'Training Set': [
f"{train_metrics['accuracy']:.2f}%",
f"{train_metrics['precision']:.2f}%",
f"{train_metrics['recall']:.2f}%",
f"{train_metrics['f1']:.2f}%",
f"{train_metrics['auc']:.4f}",
f"{train_metrics['g_mean']:.2f}%",
f"{train_metrics['p_mean']:.2f}%",
f"{train_metrics['specificity']:.2f}%"
],
'Test Set': [
f"{test_metrics['accuracy']:.2f}%",
f"{test_metrics['precision']:.2f}%",
f"{test_metrics['recall']:.2f}%",
f"{test_metrics['f1']:.2f}%",
f"{test_metrics['auc']:.4f}",
f"{test_metrics['g_mean']:.2f}%",
f"{test_metrics['p_mean']:.2f}%",
f"{test_metrics['specificity']:.2f}%"
]
}
df = pd.DataFrame(metrics_data)
return df
def export_results_to_json(results, filename="analysis_results.json"):
"""
將結果匯出為 JSON 格式
Args:
results: 分析結果字典
filename: 檔案名稱
Returns:
JSON 字串
"""
import json
# 移除無法序列化的物件
exportable_results = {
'parameters': results['parameters'],
'train_metrics': {
k: v for k, v in results['train_metrics'].items()
if k not in ['fpr', 'tpr', 'predicted_probs']
},
'test_metrics': {
k: v for k, v in results['test_metrics'].items()
if k not in ['fpr', 'tpr', 'predicted_probs']
},
'scores': results['scores'],
'network_edges': list(results['model'].edges()),
'timestamp': results['timestamp']
}
return json.dumps(exportable_results, indent=2)
def calculate_performance_gap(train_metrics, test_metrics):
"""
計算訓練集和測試集之間的效能差距
Args:
train_metrics: 訓練集指標
test_metrics: 測試集指標
Returns:
dict: 效能差距字典
"""
gaps = {
'accuracy_gap': train_metrics['accuracy'] - test_metrics['accuracy'],
'precision_gap': train_metrics['precision'] - test_metrics['precision'],
'recall_gap': train_metrics['recall'] - test_metrics['recall'],
'f1_gap': train_metrics['f1'] - test_metrics['f1'],
'auc_gap': train_metrics['auc'] - test_metrics['auc']
}
# 判斷是否有過擬合
avg_gap = np.mean([abs(v) for v in gaps.values()])
overfitting_status = "High" if avg_gap > 10 else "Moderate" if avg_gap > 5 else "Low"
gaps['average_gap'] = avg_gap
gaps['overfitting_risk'] = overfitting_status
return gaps