github_sync / A5b /classification_adaboost.py
Bachstelze
test baseline with cv only
f5e4068
import os
import pickle
import warnings
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from pathlib import Path
from scipy import stats
from typing import List, Tuple, Dict, Any
from sklearn.model_selection import (
train_test_split, StratifiedKFold, cross_validate
)
from sklearn.base import BaseEstimator, ClassifierMixin
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import (
accuracy_score, precision_score, recall_score, f1_score,
classification_report, confusion_matrix
)
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import (
RandomForestClassifier,
VotingClassifier,
BaggingClassifier,
StackingClassifier,
)
import xgboost as xgb
import lightgbm as lgb
warnings.filterwarnings('ignore')
np.random.seed(42)
REPO_ROOT = os.path.abspath(os.path.join(os.getcwd(), '..'))
DATA_DIR = os.path.join(REPO_ROOT, 'Datasets_all')
OUT_DIR = Path('models')
OUT_DIR.mkdir(exist_ok=True)
RANDOM_STATE = 42
N_SPLITS = 5
CHAMPION_F1 = 0.6110 # Score from A4
class WeightedDecisionTree(DecisionTreeClassifier):
"""
A wrapper around DecisionTreeClassifier that properly handles sample weights.
This tree is grown based on weighted training errors.
"""
def __init__(self, max_depth: int = 5, min_samples_split: int = 2,
min_samples_leaf: int = 1, random_state: int = 42):
super().__init__(
max_depth=max_depth,
min_samples_split=min_samples_split,
min_samples_leaf=min_samples_leaf,
random_state=random_state
)
def fit(self, X, y, sample_weight=None):
"""Fit the decision tree with optional sample weights."""
return super().fit(X, y, sample_weight=sample_weight)
class AdaBoostEnsemble(BaseEstimator, ClassifierMixin):
"""
AdaBoost ensemble of decision trees where each tree is grown based on
weighted training errors. Weights are updated based on the error of
previous trees.
The algorithm:
1. Initialize equal weights for all training samples
2. For each tree in the ensemble:
- Train a decision tree on weighted data
- Calculate weighted error rate
- Compute tree weight (alpha)
- Update sample weights (increase for misclassified, decrease for correct)
- Normalize weights
3. Make predictions using weighted voting
"""
def __init__(
self,
n_estimators: int = 50,
max_depth: int = 5,
min_samples_split: int = 2,
min_samples_leaf: int = 1,
random_state: int = 42
):
self.n_estimators = n_estimators
self.max_depth = max_depth
self.min_samples_split = min_samples_split
self.min_samples_leaf = min_samples_leaf
self.random_state = random_state
self.trees: List[WeightedDecisionTree] = []
self.tree_weights: List[float] = []
self.n_classes: int = 0
self.classes_: np.ndarray = None
def _initialize_weights(self, n_samples: int) -> np.ndarray:
"""Initialize equal weights for all samples."""
return np.ones(n_samples) / n_samples
def _update_weights(
self,
weights: np.ndarray,
y_true: np.ndarray,
y_pred: np.ndarray,
alpha: float
) -> np.ndarray:
"""
Update sample weights based on prediction errors.
Increase weight for misclassified samples, decrease for correct.
"""
# Misclassified samples get multiplied by exp(alpha)
# Correctly classified samples get multiplied by exp(-alpha)
misclassified = y_true != y_pred
updated_weights = weights * np.exp(alpha * misclassified.astype(float))
# Normalize weights
return updated_weights / updated_weights.sum()
def _compute_weighted_error(
self,
weights: np.ndarray,
y_true: np.ndarray,
y_pred: np.ndarray
) -> float:
"""Compute weighted error rate."""
misclassified = (y_true != y_pred).astype(float)
return np.sum(weights * misclassified) / np.sum(weights)
def _compute_alpha(self, error: float) -> float:
"""
Compute the weight of the classifier.
Avoid division by zero and log(0).
"""
if error <= 0:
return 10.0 # Very high weight for perfect classifier
if error >= 1:
return -10.0 # Very negative weight for completely wrong classifier
return 0.5 * np.log((1 - error) / error)
def fit(self, X: np.ndarray, y: np.ndarray) -> 'AdaBoostEnsemble':
"""Fit the AdaBoost ensemble."""
n_samples, n_features = X.shape
self.classes_ = np.unique(y)
self.n_classes = len(self.classes_)
# Initialize sample weights
weights = self._initialize_weights(n_samples)
for i in range(self.n_estimators):
# Create and train decision tree with current weights
tree = WeightedDecisionTree(
max_depth=self.max_depth,
min_samples_split=self.min_samples_split,
min_samples_leaf=self.min_samples_leaf,
random_state=self.random_state + i
)
tree.fit(X, y, sample_weight=weights)
# Make predictions
y_pred = tree.predict(X)
# Calculate weighted error
error = self._compute_weighted_error(weights, y, y_pred)
# Compute tree weight (alpha)
alpha = self._compute_alpha(error)
# Update sample weights
weights = self._update_weights(weights, y, y_pred, alpha)
# Store tree and its weight
self.trees.append(tree)
self.tree_weights.append(alpha)
print(f"Tree {i+1}/{self.n_estimators}: Error={error:.4f}, Alpha={alpha:.4f}")
return self
def predict(self, X: np.ndarray) -> np.ndarray:
"""Predict using weighted voting."""
# Get predictions from all trees
all_predictions = np.array([tree.predict(X) for tree in self.trees])
# Get class labels
classes = self.classes_
# Compute weighted votes for each class
n_samples = X.shape[0]
weighted_votes = np.zeros((n_samples, len(classes)))
for tree_idx, tree in enumerate(self.trees):
alpha = self.tree_weights[tree_idx]
predictions = all_predictions[tree_idx]
for class_idx, class_label in enumerate(classes):
weighted_votes[:, class_idx] += alpha * (predictions == class_label)
# Return class with highest weighted vote
return classes[np.argmax(weighted_votes, axis=1)]
def predict_proba(self, X: np.ndarray) -> np.ndarray:
"""Predict class probabilities using weighted voting."""
# Get predictions from all trees
all_predictions = np.array([tree.predict(X) for tree in self.trees])
# Get class labels
classes = self.classes_
# Compute weighted vote proportions for each class
n_samples = X.shape[0]
weighted_votes = np.zeros((n_samples, len(classes)))
total_weight = sum(abs(w) for w in self.tree_weights)
for tree_idx, tree in enumerate(self.trees):
alpha = self.tree_weights[tree_idx]
predictions = all_predictions[tree_idx]
for class_idx, class_label in enumerate(classes):
weighted_votes[:, class_idx] += abs(alpha) * (predictions == class_label)
# Normalize to get probabilities
return weighted_votes / total_weight
def evaluate_cv(model, X, y, cv, name='Model'):
"""Evaluate model using cross-validation."""
scoring = {
'accuracy' : 'accuracy',
'f1' : 'f1_weighted',
'precision': 'precision_weighted',
'recall' : 'recall_weighted',
}
cv_res = cross_validate(model, X, y, cv=cv, scoring=scoring)
return {
'Model' : name,
'Accuracy_mean' : cv_res['test_accuracy'].mean(),
'Accuracy_std' : cv_res['test_accuracy'].std(),
'F1_mean' : cv_res['test_f1'].mean(),
'F1_std' : cv_res['test_f1'].std(),
'Precision_mean': cv_res['test_precision'].mean(),
'Recall_mean' : cv_res['test_recall'].mean(),
'_f1_scores' : cv_res['test_f1'],
}
# Load data
movement_features_df = pd.read_csv(os.path.join(DATA_DIR, 'aimoscores.csv'))
weaklink_scores_df = pd.read_csv(os.path.join(DATA_DIR, 'scores_and_weaklink.csv'))
print('Movement features shape:', movement_features_df.shape)
print('Weak link scores shape:', weaklink_scores_df.shape)
DUPLICATE_NASM_COLS = [
'No_1_NASM_Deviation',
'No_2_NASM_Deviation',
'No_3_NASM_Deviation',
'No_4_NASM_Deviation',
'No_5_NASM_Deviation',
]
movement_features_df = movement_features_df.drop(columns=DUPLICATE_NASM_COLS)
print('Shape after duplicate removal:', movement_features_df.shape)
weaklink_categories = [
'ExcessiveForwardLean', 'ForwardHead', 'LeftArmFallForward',
'LeftAsymmetricalWeightShift', 'LeftHeelRises', 'LeftKneeMovesInward',
'LeftKneeMovesOutward', 'LeftShoulderElevation', 'RightArmFallForward',
'RightAsymmetricalWeightShift', 'RightHeelRises', 'RightKneeMovesInward',
'RightKneeMovesOutward', 'RightShoulderElevation',
]
weaklink_scores_df['WeakestLink'] = (
weaklink_scores_df[weaklink_categories].idxmax(axis=1)
)
print('Weakest Link class distribution:')
print(weaklink_scores_df['WeakestLink'].value_counts())
# Merge Datasets
target_df = weaklink_scores_df[['ID', 'WeakestLink']].copy()
merged_df = movement_features_df.merge(target_df, on='ID', how='inner')
print('Merged dataset shape:', merged_df.shape)
EXCLUDE_COLS = ['ID', 'WeakestLink', 'EstimatedScore']
feature_columns = [c for c in merged_df.columns if c not in EXCLUDE_COLS]
X = merged_df[feature_columns].values
y = merged_df['WeakestLink'].values
print(f'Feature matrix shape : {X.shape}')
print(f'Number of features : {len(feature_columns)}')
print(f'Number of classes : {len(np.unique(y))}')
# Split data
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=RANDOM_STATE, stratify=y
)
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)
print(f'Training samples : {X_train.shape[0]}')
print(f'Test samples : {X_test.shape[0]}')
cv_strategy = StratifiedKFold(
n_splits=N_SPLITS, shuffle=True, random_state=RANDOM_STATE
)
# Train AdaBoost ensemble
print("\n" + "="*60)
print("TRAINING ADABOOST ENSEMBLE")
print("="*60)
adaboost_model = AdaBoostEnsemble(
n_estimators=50,
max_depth=5,
min_samples_split=5,
min_samples_leaf=2,
random_state=RANDOM_STATE
)
adaboost_model.fit(X_train_scaled, y_train)
# Cross-validation
adaboost_cv = evaluate_cv(
adaboost_model, X_train_scaled, y_train, cv_strategy,
name='AdaBoost Ensemble'
)
# Test set evaluation
adaboost_model.fit(X_train_scaled, y_train)
y_pred_adaboost = adaboost_model.predict(X_test_scaled)
test_f1_adaboost = f1_score(y_test, y_pred_adaboost, average='weighted')
test_acc_adaboost = accuracy_score(y_test, y_pred_adaboost)
test_prec_adaboost = precision_score(y_test, y_pred_adaboost, average='weighted', zero_division=0)
test_rec_adaboost = recall_score(y_test, y_pred_adaboost, average='weighted', zero_division=0)
print("\n" + "="*60)
print("ADABOOST RESULTS")
print("="*60)
print(f'CV F1: {adaboost_cv["F1_mean"]:.4f} +/- {adaboost_cv["F1_std"]:.4f}')
print(f'Test F1: {test_f1_adaboost:.4f}')
print(f'Test Accuracy: {test_acc_adaboost:.4f}')
print(f'Test Precision: {test_prec_adaboost:.4f}')
print(f'Test Recall: {test_rec_adaboost:.4f}')
# Compare with baseline models
rf_champion = RandomForestClassifier(
n_estimators=200, max_depth=15,
min_samples_split=5, min_samples_leaf=2,
class_weight='balanced',
random_state=RANDOM_STATE, n_jobs=-1
)
rf_cv = evaluate_cv(
rf_champion, X_train_scaled, y_train, cv_strategy,
name='Random Forest (Baseline)'
)
rf_champion.fit(X_train_scaled, y_train)
y_pred_rf = rf_champion.predict(X_test_scaled)
test_f1_rf = f1_score(y_test, y_pred_rf, average='weighted')
print("\n" + "="*60)
print("COMPARISON WITH BASELINE")
print("="*60)
print(f'Random Forest CV F1: {rf_cv["F1_mean"]:.4f} +/- {rf_cv["F1_std"]:.4f}')
print(f'Random Forest Test F1: {test_f1_rf:.4f}')
# Statistical significance test
def corrected_resampled_ttest(scores_a, scores_b, n_train, n_test):
k = len(scores_a)
diff = scores_a - scores_b
d_bar = diff.mean()
s_sq = diff.var(ddof=1)
var_corr = (1/k + n_test/n_train) * s_sq
t_stat = d_bar / np.sqrt(var_corr)
p_value = 2 * (1 - stats.t.cdf(abs(t_stat), df=k-1))
return float(t_stat), float(p_value)
n_total = len(X_train_scaled)
n_test_fold = n_total // N_SPLITS
n_train_fold = n_total - n_test_fold
result_map = {
'AdaBoost Ensemble': adaboost_cv['_f1_scores'],
'Random Forest': rf_cv['_f1_scores']
}
adaboost_scores = result_map['AdaBoost Ensemble']
rf_scores = result_map['Random Forest']
t, p = corrected_resampled_ttest(adaboost_scores, rf_scores, n_train_fold, n_test_fold)
print(f"\nStatistical Test (AdaBoost vs Random Forest):")
print(f" t-statistic: {t:+.3f}")
print(f" p-value: {p:.4f}")
print(f" Significant at α=0.05: {'Yes' if p < 0.05 else 'No'}")
# Save model
artifact = {
'model' : adaboost_model,
'model_name' : 'AdaBoost Ensemble',
'scaler' : scaler,
'feature_columns' : feature_columns,
'cv_metrics': {
'f1_mean' : float(adaboost_cv['F1_mean']),
'f1_std' : float(adaboost_cv['F1_std']),
'accuracy_mean': float(adaboost_cv['Accuracy_mean']),
},
'test_metrics': {
'f1' : float(test_f1_adaboost),
'accuracy' : float(test_acc_adaboost),
'precision': float(test_prec_adaboost),
'recall' : float(test_rec_adaboost),
},
'a4_champion_f1' : CHAMPION_F1,
'improvement_pct': float((test_f1_adaboost - CHAMPION_F1) / CHAMPION_F1 * 100),
}
out_path = OUT_DIR / 'adaboost_classification.pkl'
with open(out_path, 'wb') as f:
pickle.dump(artifact, f)
print(f'\nSaved model to: {out_path}')
# Classification report
print('\nCLASSIFICATION REPORT: AdaBoost Ensemble')
print(classification_report(y_test, y_pred_adaboost, zero_division=0))
# Feature importance analysis (simplified)
print("\n" + "="*60)
print("FEATURE IMPORTANCE ANALYSIS")
print("="*60)
# Calculate feature importance as average across all trees
all_importances = np.zeros(len(feature_columns))
for tree in adaboost_model.trees:
all_importances += tree.feature_importances_
avg_importances = all_importances / len(adaboost_model.trees)
importance_df = pd.DataFrame({
'Feature': feature_columns,
'Importance': avg_importances
}).sort_values('Importance', ascending=False)
print("\nTop 10 Most Important Features:")
print(importance_df.head(10).to_string(index=False))
# Plot feature importance
plt.figure(figsize=(12, 8))
top_features = importance_df.head(15)
plt.barh(range(len(top_features)), top_features['Importance'].values)
plt.yticks(range(len(top_features)), top_features['Feature'].values)
plt.xlabel('Average Feature Importance')
plt.ylabel('Features')
plt.title('Top 15 Feature Importance - AdaBoost Ensemble')
plt.gca().invert_yaxis()
plt.tight_layout()
plt.savefig(OUT_DIR / 'adaboost_feature_importance.png', dpi=150)
plt.close()
print(f"\nSaved feature importance plot to: {OUT_DIR / 'adaboost_feature_importance.png'}")