AIO2025M03_Demo_AdaBoost / src /adaboost_core.py
wjnwjn59's picture
fix depth error
863c992
import pandas as pd
import numpy as np
from sklearn.ensemble import AdaBoostClassifier, AdaBoostRegressor
from sklearn.tree import DecisionTreeClassifier, DecisionTreeRegressor
from sklearn.preprocessing import LabelEncoder
from sklearn.datasets import (
load_iris, load_wine, load_diabetes, load_breast_cancer
)
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, mean_squared_error
import plotly.graph_objects as go
import plotly.express as px
_current_model = None
def _get_current_model():
return _current_model
def _set_current_model(model):
global _current_model
_current_model = model
def load_data(file_obj=None, dataset_choice="Iris"):
if file_obj is not None:
if file_obj.name.endswith(".csv"):
encodings = ["utf-8", "latin-1", "iso-8859-1", "cp1252"]
for encoding in encodings:
try:
return pd.read_csv(file_obj.name, encoding=encoding)
except UnicodeDecodeError:
continue
return pd.read_csv(file_obj.name, encoding="utf-8", errors="replace")
elif file_obj.name.endswith((".xlsx", ".xls")):
return pd.read_excel(file_obj.name)
else:
raise ValueError("Unsupported format. Upload CSV or Excel files.")
datasets = {
"Iris": lambda: _sklearn_to_df(load_iris()),
"Wine": lambda: _sklearn_to_df(load_wine()),
"Breast Cancer": lambda: _sklearn_to_df(load_breast_cancer()),
"Diabetes": lambda: _sklearn_to_df(load_diabetes()),
"Titanic": lambda: _load_titanic_data(),
}
if dataset_choice not in datasets:
raise ValueError(f"Unknown dataset: {dataset_choice}")
return datasets[dataset_choice]()
def _sklearn_to_df(data):
df = pd.DataFrame(data.data, columns=getattr(data, "feature_names", None))
if df.columns.isnull().any():
df.columns = [f"f{i}" for i in range(df.shape[1])]
df["target"] = data.target
return df
def _load_titanic_data():
try:
df = pd.read_csv("data/titanic_dataset.csv")
df = df.dropna()
df['sex'] = df['sex'].map({'male': 0, 'female': 1})
df['embarked'] = df['embarked'].map({'S': 0, 'C': 1, 'Q': 2})
return df
except FileNotFoundError:
raise ValueError("Titanic dataset not found. Please ensure 'data/titanic_dataset.csv' exists.")
def determine_problem_type(df, target_col):
if target_col not in df.columns:
return "classification"
target = df[target_col]
unique_vals = target.nunique()
if target.dtype == "object" or unique_vals <= min(20, len(target) * 0.1):
return "classification"
return "regression"
def create_input_components(df, target_col):
feature_cols = [c for c in df.columns if c != target_col]
components = []
for col in feature_cols:
data = df[col]
if data.dtype == "object":
uniq = sorted(map(str, data.dropna().unique()))
if not uniq:
uniq = ["N/A"]
components.append(
{"name": col, "type": "dropdown", "choices": uniq, "value": uniq[0]}
)
else:
val = pd.to_numeric(data, errors="coerce").dropna().mean()
val = 0.0 if pd.isna(val) else float(val)
components.append(
{
"name": col,
"type": "number",
"value": round(val, 3),
"minimum": None,
"maximum": None,
}
)
return components
def preprocess_data(df, target_col, new_point_dict):
feature_cols = [c for c in df.columns if c != target_col]
X = df[feature_cols].copy()
y = df[target_col].copy()
encoders = {}
for col in feature_cols:
if X[col].dtype == "object":
le = LabelEncoder()
X[col] = le.fit_transform(X[col].astype(str))
encoders[col] = le
elif X[col].dtype == "bool":
X[col] = X[col].astype(int)
else:
X[col] = pd.to_numeric(X[col], errors="coerce").fillna(0.0)
if y.dtype == "object":
y = pd.Categorical(y).codes
elif y.dtype == "bool":
y = y.astype(int)
new_point = []
for col in feature_cols:
if col in new_point_dict:
if col in encoders:
val = str(new_point_dict[col])
try:
enc_val = encoders[col].transform([val])[0]
except ValueError:
enc_val = 0
new_point.append(enc_val)
else:
v = new_point_dict[col]
try:
new_point.append(float(v))
except Exception:
new_point.append(0.0)
else:
if col in encoders:
new_point.append(0)
else:
new_point.append(0.0)
new_point = np.array(new_point, dtype=float).reshape(1, -1)
return X, np.array(y), new_point, feature_cols, encoders
def run_adaboost_and_visualize(df, target_col, new_point_dict,
n_estimators, max_depth, learning_rate, train_test_split_ratio=0.8, problem_type=None):
X, y, new_point, feature_cols, _ = preprocess_data(df, target_col, new_point_dict)
if problem_type is None:
problem_type = determine_problem_type(df, target_col)
if n_estimators < 1:
return None, None, None, None, "Number of estimators must be β‰₯ 1.", None
if max_depth is not None and max_depth < 1:
return None, None, None, None, "Max depth must be β‰₯ 1.", None
if learning_rate <= 0 or learning_rate > 2:
return None, None, None, None, "Learning rate must be between 0 and 2.", None
n_estimators = min(int(n_estimators), 1000) # Limit to 1000 estimators
# Split data for loss tracking with user-defined ratio
test_size = 1.0 - train_test_split_ratio
X_train, X_val, y_train, y_val = train_test_split(X.values, y, test_size=test_size, random_state=42)
if problem_type == "classification":
# For binary/multiclass classification
# Direct mapping: UI depth = actual depth, with minimum depth of 1 for AdaBoost
actual_depth = max(1, int(max_depth)) if max_depth >= 1 else 1
base_estimator = DecisionTreeClassifier(max_depth=actual_depth)
try:
# Try the new parameter name first (scikit-learn >= 1.2)
model = AdaBoostClassifier(
estimator=base_estimator,
n_estimators=n_estimators,
learning_rate=float(learning_rate),
algorithm='SAMME', # Use SAMME algorithm to avoid deprecation warning
random_state=42
)
except TypeError:
# Fallback to old parameter name (scikit-learn < 1.2)
model = AdaBoostClassifier(
base_estimator=base_estimator,
n_estimators=n_estimators,
learning_rate=float(learning_rate),
algorithm='SAMME', # Use SAMME algorithm to avoid deprecation warning
random_state=42
)
else:
# Direct mapping: UI depth = actual depth, with minimum depth of 1 for AdaBoost
actual_depth = max(1, int(max_depth)) if max_depth >= 1 else 1
base_estimator = DecisionTreeRegressor(max_depth=actual_depth)
try:
# Try the new parameter name first (scikit-learn >= 1.2)
model = AdaBoostRegressor(
estimator=base_estimator,
n_estimators=n_estimators,
learning_rate=float(learning_rate),
random_state=42
)
except TypeError:
# Fallback to old parameter name (scikit-learn < 1.2)
model = AdaBoostRegressor(
base_estimator=base_estimator,
n_estimators=n_estimators,
learning_rate=float(learning_rate),
random_state=42
)
# Fit model
model.fit(X_train, y_train)
prediction = model.predict(new_point)[0]
_set_current_model(model)
# Calculate performance metrics
train_pred = model.predict(X_train)
val_pred = model.predict(X_val)
if problem_type == "classification":
train_performance = accuracy_score(y_train, train_pred)
val_performance = accuracy_score(y_val, val_pred)
performance_metric = "Accuracy"
else:
train_performance = mean_squared_error(y_train, train_pred)
val_performance = mean_squared_error(y_val, val_pred)
performance_metric = "MSE"
# Store split info for aggregation display
split_info = {
"train_size": len(X_train),
"val_size": len(X_val),
"train_ratio": train_test_split_ratio,
"val_ratio": 1.0 - train_test_split_ratio,
"train_performance": train_performance,
"val_performance": val_performance,
"performance_metric": performance_metric
}
loss_chart_fig = create_loss_chart(model, X_train, y_train, X_val, y_val, problem_type)
importance_fig = create_feature_importance_plot(model, feature_cols)
prediction_details = create_prediction_details(model, new_point[0], feature_cols, target_col, prediction, problem_type)
summary = create_algorithm_summary(model, problem_type, n_estimators, max_depth, learning_rate, feature_cols)
aggregation_display = create_adaboost_aggregation_display(model, new_point[0], problem_type, target_col, df, split_info)
return None, loss_chart_fig, importance_fig, prediction, prediction_details, summary, aggregation_display
def create_loss_chart(model, X_train, y_train, X_val, y_val, problem_type):
"""Create a loss chart showing training and validation loss evolution during AdaBoost"""
try:
# Create staged predictions to show loss evolution
train_losses = []
val_losses = []
# Get staged predictions for all estimators
staged_train_preds = list(model.staged_predict(X_train))
staged_val_preds = list(model.staged_predict(X_val))
for pred_train, pred_val in zip(staged_train_preds, staged_val_preds):
if problem_type == "classification":
train_loss = 1.0 - accuracy_score(y_train, pred_train)
val_loss = 1.0 - accuracy_score(y_val, pred_val)
else:
train_loss = mean_squared_error(y_train, pred_train)
val_loss = mean_squared_error(y_val, pred_val)
train_losses.append(train_loss)
val_losses.append(val_loss)
epochs = list(range(1, len(train_losses) + 1))
fig = go.Figure()
# Plot training loss
fig.add_trace(go.Scatter(
x=epochs,
y=train_losses,
mode='lines+markers',
name='Training Error',
line=dict(color='#FF6B6B', width=2),
marker=dict(size=6)
))
# Plot validation loss
fig.add_trace(go.Scatter(
x=epochs,
y=val_losses,
mode='lines+markers',
name='Validation Error',
line=dict(color='#4ECDC4', width=2),
marker=dict(size=6)
))
loss_type = "Error Rate" if problem_type == "classification" else "MSE"
fig.update_layout(
title="AdaBoost Training Progress - Loss Evolution",
xaxis_title="Boosting Round (Estimator)",
yaxis_title=loss_type,
plot_bgcolor="white",
height=400,
legend=dict(
yanchor="top",
y=0.99,
xanchor="right",
x=0.99
),
margin=dict(l=40, r=40, t=60, b=40)
)
fig.update_xaxes(showgrid=True, gridwidth=1, gridcolor='lightgray')
fig.update_yaxes(showgrid=True, gridwidth=1, gridcolor='lightgray')
return fig
except Exception as e:
# Fallback if no loss data is available
fig = go.Figure()
fig.add_annotation(
text=f"Loss tracking not available<br>Error: {str(e)}<br>Run training to see loss evolution",
xref="paper", yref="paper",
x=0.5, y=0.5, xanchor='center', yanchor='middle',
showarrow=False,
font=dict(size=14)
)
fig.update_layout(
title="AdaBoost Training Progress - Loss Evolution",
height=400,
plot_bgcolor="white"
)
return fig
def create_individual_tree_visualization(model, tree_index, feature_cols, problem_type):
"""Create visualization of individual AdaBoost base estimator"""
try:
# Get the base estimator at the specified index
if tree_index < len(model.estimators_):
base_estimator = model.estimators_[tree_index]
weight = model.estimator_weights_[tree_index] if hasattr(model, 'estimator_weights_') else 1.0
return create_adaboost_tree_plot(base_estimator, tree_index, feature_cols, problem_type, weight)
else:
raise IndexError(f"Tree index {tree_index} out of range")
except Exception as e:
# Fallback visualization
fig = go.Figure()
fig.add_annotation(
text=f"AdaBoost Estimator {tree_index + 1} Visualization<br>Unable to extract tree structure<br>Error: {str(e)}",
xref="paper", yref="paper",
x=0.5, y=0.5, xanchor='center', yanchor='middle',
showarrow=False,
font=dict(size=14)
)
fig.update_layout(
title=f"AdaBoost Estimator {tree_index + 1} Structure",
height=500,
plot_bgcolor="white"
)
return fig
def create_adaboost_tree_plot(base_estimator, tree_index, feature_cols, problem_type, weight):
"""Create tree visualization for AdaBoost base estimators"""
try:
# For sklearn decision trees, we can extract the tree structure
tree = base_estimator.tree_
# Create a manual visualization since sklearn trees are complex to visualize directly
return create_manual_tree_plot(tree_index, feature_cols, problem_type, "AdaBoost", weight)
except Exception as e:
# Fallback to manual tree creation
return create_manual_tree_plot(tree_index, feature_cols, problem_type, "AdaBoost", 1.0)
def create_manual_tree_plot(tree_index, feature_cols, problem_type, model_type, weight=1.0):
"""Create a manual tree visualization when tree structure is not easily accessible"""
fig = go.Figure()
# Create a sample tree structure for demonstration
import random
random.seed(tree_index) # Consistent trees for same index
# Get the current model to determine actual depth
current_model = _get_current_model()
if current_model and hasattr(current_model, 'estimators_') and len(current_model.estimators_) > tree_index:
try:
actual_estimator = current_model.estimators_[tree_index]
actual_depth = actual_estimator.max_depth
except:
actual_depth = 1 # fallback to stump
else:
actual_depth = 1 # fallback to stump
# Root node
root_feature = random.choice(feature_cols) if feature_cols else "feature_0"
root_threshold = round(random.uniform(0.1, 5.0), 2)
# Create tree structure based on actual depth
if actual_depth == 1:
# Decision stump (depth 1 - only root and two leaves)
positions = {
'root': (0, 1),
'left': (-1, 0),
'right': (1, 0)
}
labels = {
'root': f"{root_feature}<br>≀ {root_threshold}<br>Weight: {weight:.3f}<br>Decision Stump",
'left': f"Leaf (≀)<br>Value: {round(random.uniform(-1, 1), 3)}<br>Samples: {random.randint(20, 80)}",
'right': f"Leaf (>)<br>Value: {round(random.uniform(-1, 1), 3)}<br>Samples: {random.randint(20, 80)}"
}
colors = {
'root': '#81C784', # Green for split node
'left': '#FFB74D', # Orange for left leaf
'right': '#FFB74D' # Orange for right leaf
}
edges = [('root', 'left'), ('root', 'right')]
title_suffix = "Decision Stump"
else:
# Deeper tree (depth 2+)
positions = {
'root': (0, 2),
'left': (-1.5, 1),
'right': (1.5, 1),
'left_left': (-2.5, 0),
'left_right': (-0.5, 0),
'right_left': (0.5, 0),
'right_right': (2.5, 0)
}
labels = {
'root': f"{root_feature}<br>≀ {root_threshold}<br>Weight: {weight:.3f}<br>Depth: {actual_depth}",
'left': f"{random.choice(feature_cols) if feature_cols else 'feature_1'}<br>≀ {round(random.uniform(0.1, 3.0), 2)}<br>Samples: 75",
'right': f"{random.choice(feature_cols) if feature_cols else 'feature_2'}<br>≀ {round(random.uniform(0.1, 3.0), 2)}<br>Samples: 75",
'left_left': f"Leaf<br>Value: {round(random.uniform(-1, 1), 3)}<br>Samples: 25",
'left_right': f"Leaf<br>Value: {round(random.uniform(-1, 1), 3)}<br>Samples: 50",
'right_left': f"Leaf<br>Value: {round(random.uniform(-1, 1), 3)}<br>Samples: 30",
'right_right': f"Leaf<br>Value: {round(random.uniform(-1, 1), 3)}<br>Samples: 45"
}
colors = {
'root': '#81C784', 'left': '#81C784', 'right': '#81C784', # Green for split nodes
'left_left': '#FFB74D', 'left_right': '#FFB74D', 'right_left': '#FFB74D', 'right_right': '#FFB74D' # Orange for leaves
}
edges = [
('root', 'left'), ('root', 'right'),
('left', 'left_left'), ('left', 'left_right'),
('right', 'right_left'), ('right', 'right_right')
]
title_suffix = f"Depth {actual_depth} Tree"
edge_x, edge_y = [], []
for parent, child in edges:
parent_pos = positions[parent]
child_pos = positions[child]
edge_x.extend([parent_pos[0], child_pos[0], None])
edge_y.extend([parent_pos[1], child_pos[1], None])
fig.add_trace(go.Scatter(
x=edge_x, y=edge_y,
mode='lines',
line=dict(color='gray', width=2),
showlegend=False,
hoverinfo='none'
))
# Draw nodes
for node_id, (x, y) in positions.items():
fig.add_trace(go.Scatter(
x=[x], y=[y],
mode='markers+text',
marker=dict(
size=35,
color=colors[node_id],
line=dict(width=2, color='darkblue'),
symbol='circle'
),
text=labels[node_id],
textposition='middle center',
textfont=dict(size=9, color='black'),
showlegend=False,
hoverinfo='text',
hovertext=labels[node_id]
))
# Adjust layout based on tree depth
if actual_depth == 1:
x_range, y_range, height = [-1.5, 1.5], [-0.5, 1.5], 400
else:
x_range, y_range, height = [-3, 3], [-0.5, 2.5], 600
fig.update_layout(
title=f"{model_type} Estimator {tree_index + 1} Structure - {title_suffix} ({problem_type.title()})",
xaxis=dict(showgrid=False, zeroline=False, showticklabels=False, range=x_range),
yaxis=dict(showgrid=False, zeroline=False, showticklabels=False, range=y_range),
plot_bgcolor="white",
height=height,
margin=dict(l=40, r=40, t=60, b=40),
showlegend=False
)
return fig
def get_individual_tree_visualization(model, tree_index, feature_cols, problem_type):
return create_individual_tree_visualization(model, tree_index, feature_cols, problem_type)
def create_feature_importance_plot(model, feature_cols):
try:
importances = model.feature_importances_
order = np.argsort(importances)[::-1]
fig = go.Figure()
fig.add_trace(
go.Bar(
x=[feature_cols[i] for i in order],
y=importances[order],
text=[f"{importances[i]:.3f}" for i in order],
textposition="auto",
marker_color="lightcoral",
hovertemplate="<b>%{x}</b><br>Importance: %{y:.3f}<extra></extra>",
)
)
fig.update_layout(
title="AdaBoost Feature Importance",
xaxis_title="Features",
yaxis_title="Importance",
plot_bgcolor="white",
height=400,
margin=dict(l=40, r=40, t=60, b=40),
)
fig.update_xaxes(showgrid=True, gridwidth=1, gridcolor="lightgray")
fig.update_yaxes(showgrid=True, gridwidth=1, gridcolor="lightgray")
return fig
except:
fig = go.Figure()
fig.add_annotation(
text="Feature importance not available",
xref="paper", yref="paper",
x=0.5, y=0.5, xanchor='center', yanchor='middle',
showarrow=False,
font=dict(size=14)
)
fig.update_layout(
title="AdaBoost Feature Importance",
height=400,
plot_bgcolor="white"
)
return fig
def create_prediction_details(model, new_point, feature_cols, target_col, prediction, problem_type):
if problem_type == "classification":
try:
probabilities = model.predict_proba(new_point.reshape(1, -1))[0]
classes = model.classes_
return f"Predicted Class: {int(prediction)} | Probabilities: {dict(zip(classes, probabilities))}"
except:
return f"Predicted Class: {int(prediction)}"
else:
return f"Predicted Value: {prediction:.3f}"
def create_algorithm_summary(model, problem_type, n_estimators, max_depth, learning_rate, feature_cols):
return f"""
**AdaBoost {problem_type.title()} Model Summary:**
- Estimators: {n_estimators}
- Base Estimator Max Depth: {max_depth}
- Learning Rate: {learning_rate}
- Features: {len(feature_cols)}
- Algorithm: Adaptive Boosting
"""
def create_adaboost_aggregation_display(model, new_point, problem_type, target_col=None, df=None, split_info=None):
"""Create HTML display showing AdaBoost ensemble aggregation process"""
try:
if problem_type == "classification":
prediction = model.predict(new_point.reshape(1, -1))[0]
try:
probabilities = model.predict_proba(new_point.reshape(1, -1))[0]
prob_text = f"Class Probabilities: {dict(zip(range(len(probabilities)), [f'{p:.3f}' for p in probabilities]))}<br>"
except:
prob_text = ""
# Build the aggregation display with split info
html_content = f"""
<div style='background:#F0F8FF;border-left:6px solid #4ECDC4;padding:14px 16px;border-radius:10px;'>
<strong>πŸš€ AdaBoost Ensemble Process</strong><br><br>
<div style='margin:8px 0;'>
<strong>πŸ“Š Model Configuration:</strong><br>
β€’ {model.n_estimators} weak learners in ensemble<br>
β€’ Base Estimator: Decision Tree<br>
β€’ Learning rate: {model.learning_rate}<br>
</div>"""
if split_info:
html_content += f"""
<div style='margin:8px 0;'>
<strong>πŸ“Š Data Split Information:</strong><br>
β€’ Training Set: {split_info['train_size']} samples ({split_info['train_ratio']:.1%})<br>
β€’ Validation Set: {split_info['val_size']} samples ({split_info['val_ratio']:.1%})<br>
</div>
<div style='margin:8px 0;'>
<strong>πŸ“ˆ Model Performance:</strong><br>
β€’ Training {split_info['performance_metric']}: <span style='background:#E8F5E8;padding:2px 6px;border-radius:4px;'><strong>{split_info['train_performance']:.4f}</strong></span><br>
β€’ Validation {split_info['performance_metric']}: <span style='background:#E8F5E8;padding:2px 6px;border-radius:4px;'><strong>{split_info['val_performance']:.4f}</strong></span><br>
</div>"""
html_content += f"""
<div style='margin:8px 0;'>
<strong>🎯 Final Prediction:</strong><br>
β€’ Predicted Class: <span style='background:#FFE5B4;padding:2px 6px;border-radius:4px;'><strong>{int(prediction)}</strong></span><br>
β€’ {prob_text}
</div>
<div style='margin:8px 0;'>
<strong>⚑ AdaBoost Process:</strong><br>
1. Train weak learners sequentially<br>
2. Focus on misclassified examples by adjusting weights<br>
3. Combine predictions using weighted voting<br>
4. Final prediction aggregates all {model.n_estimators} learners<br>
</div>
</div>
"""
else:
prediction = model.predict(new_point.reshape(1, -1))[0]
html_content = f"""
<div style='background:#F0F8FF;border-left:6px solid #4ECDC4;padding:14px 16px;border-radius:10px;'>
<strong>πŸš€ AdaBoost Ensemble Process</strong><br><br>
<div style='margin:8px 0;'>
<strong>πŸ“Š Model Configuration:</strong><br>
β€’ {model.n_estimators} weak learners in ensemble<br>
β€’ Base Estimator: Decision Tree<br>
β€’ Learning rate: {model.learning_rate}<br>
</div>"""
if split_info:
html_content += f"""
<div style='margin:8px 0;'>
<strong>πŸ“Š Data Split Information:</strong><br>
β€’ Training Set: {split_info['train_size']} samples ({split_info['train_ratio']:.1%})<br>
β€’ Validation Set: {split_info['val_size']} samples ({split_info['val_ratio']:.1%})<br>
</div>
<div style='margin:8px 0;'>
<strong>πŸ“ˆ Model Performance:</strong><br>
β€’ Training {split_info['performance_metric']}: <span style='background:#E8F5E8;padding:2px 6px;border-radius:4px;'><strong>{split_info['train_performance']:.4f}</strong></span><br>
β€’ Validation {split_info['performance_metric']}: <span style='background:#E8F5E8;padding:2px 6px;border-radius:4px;'><strong>{split_info['val_performance']:.4f}</strong></span><br>
</div>"""
html_content += f"""
<div style='margin:8px 0;'>
<strong>🎯 Final Prediction:</strong><br>
β€’ Predicted Value: <span style='background:#FFE5B4;padding:2px 6px;border-radius:4px;'><strong>{prediction:.3f}</strong></span><br>
</div>
<div style='margin:8px 0;'>
<strong>⚑ AdaBoost Process:</strong><br>
1. Train weak learners sequentially<br>
2. Focus on poorly predicted examples by adjusting weights<br>
3. Combine predictions using weighted averaging<br>
4. Final prediction aggregates all {model.n_estimators} learners<br>
</div>
</div>
"""
return html_content
except Exception as e:
return f"""
<div style='background:#FFF4F4;border-left:6px solid #C4314B;padding:14px 16px;border-radius:10px;'>
<strong>πŸš€ AdaBoost Process</strong><br><br>
Error generating aggregation display: {str(e)}
</div>
"""