import pandas as pd import numpy as np from sklearn.ensemble import AdaBoostClassifier, AdaBoostRegressor from sklearn.tree import DecisionTreeClassifier, DecisionTreeRegressor from sklearn.preprocessing import LabelEncoder from sklearn.datasets import ( load_iris, load_wine, load_diabetes, load_breast_cancer ) from sklearn.model_selection import train_test_split from sklearn.metrics import accuracy_score, mean_squared_error import plotly.graph_objects as go import plotly.express as px _current_model = None def _get_current_model(): return _current_model def _set_current_model(model): global _current_model _current_model = model def load_data(file_obj=None, dataset_choice="Iris"): if file_obj is not None: if file_obj.name.endswith(".csv"): encodings = ["utf-8", "latin-1", "iso-8859-1", "cp1252"] for encoding in encodings: try: return pd.read_csv(file_obj.name, encoding=encoding) except UnicodeDecodeError: continue return pd.read_csv(file_obj.name, encoding="utf-8", errors="replace") elif file_obj.name.endswith((".xlsx", ".xls")): return pd.read_excel(file_obj.name) else: raise ValueError("Unsupported format. Upload CSV or Excel files.") datasets = { "Iris": lambda: _sklearn_to_df(load_iris()), "Wine": lambda: _sklearn_to_df(load_wine()), "Breast Cancer": lambda: _sklearn_to_df(load_breast_cancer()), "Diabetes": lambda: _sklearn_to_df(load_diabetes()), "Titanic": lambda: _load_titanic_data(), } if dataset_choice not in datasets: raise ValueError(f"Unknown dataset: {dataset_choice}") return datasets[dataset_choice]() def _sklearn_to_df(data): df = pd.DataFrame(data.data, columns=getattr(data, "feature_names", None)) if df.columns.isnull().any(): df.columns = [f"f{i}" for i in range(df.shape[1])] df["target"] = data.target return df def _load_titanic_data(): try: df = pd.read_csv("data/titanic_dataset.csv") df = df.dropna() df['sex'] = df['sex'].map({'male': 0, 'female': 1}) df['embarked'] = df['embarked'].map({'S': 0, 'C': 1, 'Q': 2}) return df except FileNotFoundError: raise ValueError("Titanic dataset not found. Please ensure 'data/titanic_dataset.csv' exists.") def determine_problem_type(df, target_col): if target_col not in df.columns: return "classification" target = df[target_col] unique_vals = target.nunique() if target.dtype == "object" or unique_vals <= min(20, len(target) * 0.1): return "classification" return "regression" def create_input_components(df, target_col): feature_cols = [c for c in df.columns if c != target_col] components = [] for col in feature_cols: data = df[col] if data.dtype == "object": uniq = sorted(map(str, data.dropna().unique())) if not uniq: uniq = ["N/A"] components.append( {"name": col, "type": "dropdown", "choices": uniq, "value": uniq[0]} ) else: val = pd.to_numeric(data, errors="coerce").dropna().mean() val = 0.0 if pd.isna(val) else float(val) components.append( { "name": col, "type": "number", "value": round(val, 3), "minimum": None, "maximum": None, } ) return components def preprocess_data(df, target_col, new_point_dict): feature_cols = [c for c in df.columns if c != target_col] X = df[feature_cols].copy() y = df[target_col].copy() encoders = {} for col in feature_cols: if X[col].dtype == "object": le = LabelEncoder() X[col] = le.fit_transform(X[col].astype(str)) encoders[col] = le elif X[col].dtype == "bool": X[col] = X[col].astype(int) else: X[col] = pd.to_numeric(X[col], errors="coerce").fillna(0.0) if y.dtype == "object": y = pd.Categorical(y).codes elif y.dtype == "bool": y = y.astype(int) new_point = [] for col in feature_cols: if col in new_point_dict: if col in encoders: val = str(new_point_dict[col]) try: enc_val = encoders[col].transform([val])[0] except ValueError: enc_val = 0 new_point.append(enc_val) else: v = new_point_dict[col] try: new_point.append(float(v)) except Exception: new_point.append(0.0) else: if col in encoders: new_point.append(0) else: new_point.append(0.0) new_point = np.array(new_point, dtype=float).reshape(1, -1) return X, np.array(y), new_point, feature_cols, encoders def run_adaboost_and_visualize(df, target_col, new_point_dict, n_estimators, max_depth, learning_rate, train_test_split_ratio=0.8, problem_type=None): X, y, new_point, feature_cols, _ = preprocess_data(df, target_col, new_point_dict) if problem_type is None: problem_type = determine_problem_type(df, target_col) if n_estimators < 1: return None, None, None, None, "Number of estimators must be ≥ 1.", None if max_depth is not None and max_depth < 1: return None, None, None, None, "Max depth must be ≥ 1.", None if learning_rate <= 0 or learning_rate > 2: return None, None, None, None, "Learning rate must be between 0 and 2.", None n_estimators = min(int(n_estimators), 1000) # Limit to 1000 estimators # Split data for loss tracking with user-defined ratio test_size = 1.0 - train_test_split_ratio X_train, X_val, y_train, y_val = train_test_split(X.values, y, test_size=test_size, random_state=42) if problem_type == "classification": # For binary/multiclass classification # Direct mapping: UI depth = actual depth, with minimum depth of 1 for AdaBoost actual_depth = max(1, int(max_depth)) if max_depth >= 1 else 1 base_estimator = DecisionTreeClassifier(max_depth=actual_depth) try: # Try the new parameter name first (scikit-learn >= 1.2) model = AdaBoostClassifier( estimator=base_estimator, n_estimators=n_estimators, learning_rate=float(learning_rate), algorithm='SAMME', # Use SAMME algorithm to avoid deprecation warning random_state=42 ) except TypeError: # Fallback to old parameter name (scikit-learn < 1.2) model = AdaBoostClassifier( base_estimator=base_estimator, n_estimators=n_estimators, learning_rate=float(learning_rate), algorithm='SAMME', # Use SAMME algorithm to avoid deprecation warning random_state=42 ) else: # Direct mapping: UI depth = actual depth, with minimum depth of 1 for AdaBoost actual_depth = max(1, int(max_depth)) if max_depth >= 1 else 1 base_estimator = DecisionTreeRegressor(max_depth=actual_depth) try: # Try the new parameter name first (scikit-learn >= 1.2) model = AdaBoostRegressor( estimator=base_estimator, n_estimators=n_estimators, learning_rate=float(learning_rate), random_state=42 ) except TypeError: # Fallback to old parameter name (scikit-learn < 1.2) model = AdaBoostRegressor( base_estimator=base_estimator, n_estimators=n_estimators, learning_rate=float(learning_rate), random_state=42 ) # Fit model model.fit(X_train, y_train) prediction = model.predict(new_point)[0] _set_current_model(model) # Calculate performance metrics train_pred = model.predict(X_train) val_pred = model.predict(X_val) if problem_type == "classification": train_performance = accuracy_score(y_train, train_pred) val_performance = accuracy_score(y_val, val_pred) performance_metric = "Accuracy" else: train_performance = mean_squared_error(y_train, train_pred) val_performance = mean_squared_error(y_val, val_pred) performance_metric = "MSE" # Store split info for aggregation display split_info = { "train_size": len(X_train), "val_size": len(X_val), "train_ratio": train_test_split_ratio, "val_ratio": 1.0 - train_test_split_ratio, "train_performance": train_performance, "val_performance": val_performance, "performance_metric": performance_metric } loss_chart_fig = create_loss_chart(model, X_train, y_train, X_val, y_val, problem_type) importance_fig = create_feature_importance_plot(model, feature_cols) prediction_details = create_prediction_details(model, new_point[0], feature_cols, target_col, prediction, problem_type) summary = create_algorithm_summary(model, problem_type, n_estimators, max_depth, learning_rate, feature_cols) aggregation_display = create_adaboost_aggregation_display(model, new_point[0], problem_type, target_col, df, split_info) return None, loss_chart_fig, importance_fig, prediction, prediction_details, summary, aggregation_display def create_loss_chart(model, X_train, y_train, X_val, y_val, problem_type): """Create a loss chart showing training and validation loss evolution during AdaBoost""" try: # Create staged predictions to show loss evolution train_losses = [] val_losses = [] # Get staged predictions for all estimators staged_train_preds = list(model.staged_predict(X_train)) staged_val_preds = list(model.staged_predict(X_val)) for pred_train, pred_val in zip(staged_train_preds, staged_val_preds): if problem_type == "classification": train_loss = 1.0 - accuracy_score(y_train, pred_train) val_loss = 1.0 - accuracy_score(y_val, pred_val) else: train_loss = mean_squared_error(y_train, pred_train) val_loss = mean_squared_error(y_val, pred_val) train_losses.append(train_loss) val_losses.append(val_loss) epochs = list(range(1, len(train_losses) + 1)) fig = go.Figure() # Plot training loss fig.add_trace(go.Scatter( x=epochs, y=train_losses, mode='lines+markers', name='Training Error', line=dict(color='#FF6B6B', width=2), marker=dict(size=6) )) # Plot validation loss fig.add_trace(go.Scatter( x=epochs, y=val_losses, mode='lines+markers', name='Validation Error', line=dict(color='#4ECDC4', width=2), marker=dict(size=6) )) loss_type = "Error Rate" if problem_type == "classification" else "MSE" fig.update_layout( title="AdaBoost Training Progress - Loss Evolution", xaxis_title="Boosting Round (Estimator)", yaxis_title=loss_type, plot_bgcolor="white", height=400, legend=dict( yanchor="top", y=0.99, xanchor="right", x=0.99 ), margin=dict(l=40, r=40, t=60, b=40) ) fig.update_xaxes(showgrid=True, gridwidth=1, gridcolor='lightgray') fig.update_yaxes(showgrid=True, gridwidth=1, gridcolor='lightgray') return fig except Exception as e: # Fallback if no loss data is available fig = go.Figure() fig.add_annotation( text=f"Loss tracking not available
Error: {str(e)}
Run training to see loss evolution", xref="paper", yref="paper", x=0.5, y=0.5, xanchor='center', yanchor='middle', showarrow=False, font=dict(size=14) ) fig.update_layout( title="AdaBoost Training Progress - Loss Evolution", height=400, plot_bgcolor="white" ) return fig def create_individual_tree_visualization(model, tree_index, feature_cols, problem_type): """Create visualization of individual AdaBoost base estimator""" try: # Get the base estimator at the specified index if tree_index < len(model.estimators_): base_estimator = model.estimators_[tree_index] weight = model.estimator_weights_[tree_index] if hasattr(model, 'estimator_weights_') else 1.0 return create_adaboost_tree_plot(base_estimator, tree_index, feature_cols, problem_type, weight) else: raise IndexError(f"Tree index {tree_index} out of range") except Exception as e: # Fallback visualization fig = go.Figure() fig.add_annotation( text=f"AdaBoost Estimator {tree_index + 1} Visualization
Unable to extract tree structure
Error: {str(e)}", xref="paper", yref="paper", x=0.5, y=0.5, xanchor='center', yanchor='middle', showarrow=False, font=dict(size=14) ) fig.update_layout( title=f"AdaBoost Estimator {tree_index + 1} Structure", height=500, plot_bgcolor="white" ) return fig def create_adaboost_tree_plot(base_estimator, tree_index, feature_cols, problem_type, weight): """Create tree visualization for AdaBoost base estimators""" try: # For sklearn decision trees, we can extract the tree structure tree = base_estimator.tree_ # Create a manual visualization since sklearn trees are complex to visualize directly return create_manual_tree_plot(tree_index, feature_cols, problem_type, "AdaBoost", weight) except Exception as e: # Fallback to manual tree creation return create_manual_tree_plot(tree_index, feature_cols, problem_type, "AdaBoost", 1.0) def create_manual_tree_plot(tree_index, feature_cols, problem_type, model_type, weight=1.0): """Create a manual tree visualization when tree structure is not easily accessible""" fig = go.Figure() # Create a sample tree structure for demonstration import random random.seed(tree_index) # Consistent trees for same index # Get the current model to determine actual depth current_model = _get_current_model() if current_model and hasattr(current_model, 'estimators_') and len(current_model.estimators_) > tree_index: try: actual_estimator = current_model.estimators_[tree_index] actual_depth = actual_estimator.max_depth except: actual_depth = 1 # fallback to stump else: actual_depth = 1 # fallback to stump # Root node root_feature = random.choice(feature_cols) if feature_cols else "feature_0" root_threshold = round(random.uniform(0.1, 5.0), 2) # Create tree structure based on actual depth if actual_depth == 1: # Decision stump (depth 1 - only root and two leaves) positions = { 'root': (0, 1), 'left': (-1, 0), 'right': (1, 0) } labels = { 'root': f"{root_feature}
≤ {root_threshold}
Weight: {weight:.3f}
Decision Stump", 'left': f"Leaf (≤)
Value: {round(random.uniform(-1, 1), 3)}
Samples: {random.randint(20, 80)}", 'right': f"Leaf (>)
Value: {round(random.uniform(-1, 1), 3)}
Samples: {random.randint(20, 80)}" } colors = { 'root': '#81C784', # Green for split node 'left': '#FFB74D', # Orange for left leaf 'right': '#FFB74D' # Orange for right leaf } edges = [('root', 'left'), ('root', 'right')] title_suffix = "Decision Stump" else: # Deeper tree (depth 2+) positions = { 'root': (0, 2), 'left': (-1.5, 1), 'right': (1.5, 1), 'left_left': (-2.5, 0), 'left_right': (-0.5, 0), 'right_left': (0.5, 0), 'right_right': (2.5, 0) } labels = { 'root': f"{root_feature}
≤ {root_threshold}
Weight: {weight:.3f}
Depth: {actual_depth}", 'left': f"{random.choice(feature_cols) if feature_cols else 'feature_1'}
≤ {round(random.uniform(0.1, 3.0), 2)}
Samples: 75", 'right': f"{random.choice(feature_cols) if feature_cols else 'feature_2'}
≤ {round(random.uniform(0.1, 3.0), 2)}
Samples: 75", 'left_left': f"Leaf
Value: {round(random.uniform(-1, 1), 3)}
Samples: 25", 'left_right': f"Leaf
Value: {round(random.uniform(-1, 1), 3)}
Samples: 50", 'right_left': f"Leaf
Value: {round(random.uniform(-1, 1), 3)}
Samples: 30", 'right_right': f"Leaf
Value: {round(random.uniform(-1, 1), 3)}
Samples: 45" } colors = { 'root': '#81C784', 'left': '#81C784', 'right': '#81C784', # Green for split nodes 'left_left': '#FFB74D', 'left_right': '#FFB74D', 'right_left': '#FFB74D', 'right_right': '#FFB74D' # Orange for leaves } edges = [ ('root', 'left'), ('root', 'right'), ('left', 'left_left'), ('left', 'left_right'), ('right', 'right_left'), ('right', 'right_right') ] title_suffix = f"Depth {actual_depth} Tree" edge_x, edge_y = [], [] for parent, child in edges: parent_pos = positions[parent] child_pos = positions[child] edge_x.extend([parent_pos[0], child_pos[0], None]) edge_y.extend([parent_pos[1], child_pos[1], None]) fig.add_trace(go.Scatter( x=edge_x, y=edge_y, mode='lines', line=dict(color='gray', width=2), showlegend=False, hoverinfo='none' )) # Draw nodes for node_id, (x, y) in positions.items(): fig.add_trace(go.Scatter( x=[x], y=[y], mode='markers+text', marker=dict( size=35, color=colors[node_id], line=dict(width=2, color='darkblue'), symbol='circle' ), text=labels[node_id], textposition='middle center', textfont=dict(size=9, color='black'), showlegend=False, hoverinfo='text', hovertext=labels[node_id] )) # Adjust layout based on tree depth if actual_depth == 1: x_range, y_range, height = [-1.5, 1.5], [-0.5, 1.5], 400 else: x_range, y_range, height = [-3, 3], [-0.5, 2.5], 600 fig.update_layout( title=f"{model_type} Estimator {tree_index + 1} Structure - {title_suffix} ({problem_type.title()})", xaxis=dict(showgrid=False, zeroline=False, showticklabels=False, range=x_range), yaxis=dict(showgrid=False, zeroline=False, showticklabels=False, range=y_range), plot_bgcolor="white", height=height, margin=dict(l=40, r=40, t=60, b=40), showlegend=False ) return fig def get_individual_tree_visualization(model, tree_index, feature_cols, problem_type): return create_individual_tree_visualization(model, tree_index, feature_cols, problem_type) def create_feature_importance_plot(model, feature_cols): try: importances = model.feature_importances_ order = np.argsort(importances)[::-1] fig = go.Figure() fig.add_trace( go.Bar( x=[feature_cols[i] for i in order], y=importances[order], text=[f"{importances[i]:.3f}" for i in order], textposition="auto", marker_color="lightcoral", hovertemplate="%{x}
Importance: %{y:.3f}", ) ) fig.update_layout( title="AdaBoost Feature Importance", xaxis_title="Features", yaxis_title="Importance", plot_bgcolor="white", height=400, margin=dict(l=40, r=40, t=60, b=40), ) fig.update_xaxes(showgrid=True, gridwidth=1, gridcolor="lightgray") fig.update_yaxes(showgrid=True, gridwidth=1, gridcolor="lightgray") return fig except: fig = go.Figure() fig.add_annotation( text="Feature importance not available", xref="paper", yref="paper", x=0.5, y=0.5, xanchor='center', yanchor='middle', showarrow=False, font=dict(size=14) ) fig.update_layout( title="AdaBoost Feature Importance", height=400, plot_bgcolor="white" ) return fig def create_prediction_details(model, new_point, feature_cols, target_col, prediction, problem_type): if problem_type == "classification": try: probabilities = model.predict_proba(new_point.reshape(1, -1))[0] classes = model.classes_ return f"Predicted Class: {int(prediction)} | Probabilities: {dict(zip(classes, probabilities))}" except: return f"Predicted Class: {int(prediction)}" else: return f"Predicted Value: {prediction:.3f}" def create_algorithm_summary(model, problem_type, n_estimators, max_depth, learning_rate, feature_cols): return f""" **AdaBoost {problem_type.title()} Model Summary:** - Estimators: {n_estimators} - Base Estimator Max Depth: {max_depth} - Learning Rate: {learning_rate} - Features: {len(feature_cols)} - Algorithm: Adaptive Boosting """ def create_adaboost_aggregation_display(model, new_point, problem_type, target_col=None, df=None, split_info=None): """Create HTML display showing AdaBoost ensemble aggregation process""" try: if problem_type == "classification": prediction = model.predict(new_point.reshape(1, -1))[0] try: probabilities = model.predict_proba(new_point.reshape(1, -1))[0] prob_text = f"Class Probabilities: {dict(zip(range(len(probabilities)), [f'{p:.3f}' for p in probabilities]))}
" except: prob_text = "" # Build the aggregation display with split info html_content = f"""
🚀 AdaBoost Ensemble Process

📊 Model Configuration:
• {model.n_estimators} weak learners in ensemble
• Base Estimator: Decision Tree
• Learning rate: {model.learning_rate}
""" if split_info: html_content += f"""
📊 Data Split Information:
• Training Set: {split_info['train_size']} samples ({split_info['train_ratio']:.1%})
• Validation Set: {split_info['val_size']} samples ({split_info['val_ratio']:.1%})
📈 Model Performance:
• Training {split_info['performance_metric']}: {split_info['train_performance']:.4f}
• Validation {split_info['performance_metric']}: {split_info['val_performance']:.4f}
""" html_content += f"""
🎯 Final Prediction:
• Predicted Class: {int(prediction)}
• {prob_text}
⚡ AdaBoost Process:
1. Train weak learners sequentially
2. Focus on misclassified examples by adjusting weights
3. Combine predictions using weighted voting
4. Final prediction aggregates all {model.n_estimators} learners
""" else: prediction = model.predict(new_point.reshape(1, -1))[0] html_content = f"""
🚀 AdaBoost Ensemble Process

📊 Model Configuration:
• {model.n_estimators} weak learners in ensemble
• Base Estimator: Decision Tree
• Learning rate: {model.learning_rate}
""" if split_info: html_content += f"""
📊 Data Split Information:
• Training Set: {split_info['train_size']} samples ({split_info['train_ratio']:.1%})
• Validation Set: {split_info['val_size']} samples ({split_info['val_ratio']:.1%})
📈 Model Performance:
• Training {split_info['performance_metric']}: {split_info['train_performance']:.4f}
• Validation {split_info['performance_metric']}: {split_info['val_performance']:.4f}
""" html_content += f"""
🎯 Final Prediction:
• Predicted Value: {prediction:.3f}
⚡ AdaBoost Process:
1. Train weak learners sequentially
2. Focus on poorly predicted examples by adjusting weights
3. Combine predictions using weighted averaging
4. Final prediction aggregates all {model.n_estimators} learners
""" return html_content except Exception as e: return f"""
🚀 AdaBoost Process

Error generating aggregation display: {str(e)}
"""