Spaces:
Sleeping
Sleeping
| import os | |
| import pickle | |
| import warnings | |
| import numpy as np | |
| import pandas as pd | |
| import matplotlib.pyplot as plt | |
| import seaborn as sns | |
| from pathlib import Path | |
| from scipy import stats | |
| from typing import List, Tuple, Dict, Any | |
| from sklearn.model_selection import ( | |
| train_test_split, StratifiedKFold, cross_validate | |
| ) | |
| from sklearn.base import BaseEstimator, ClassifierMixin | |
| from sklearn.preprocessing import StandardScaler | |
| from sklearn.metrics import ( | |
| accuracy_score, precision_score, recall_score, f1_score, | |
| classification_report, confusion_matrix | |
| ) | |
| from sklearn.tree import DecisionTreeClassifier | |
| from sklearn.ensemble import ( | |
| RandomForestClassifier, | |
| VotingClassifier, | |
| BaggingClassifier, | |
| StackingClassifier, | |
| ) | |
| import xgboost as xgb | |
| import lightgbm as lgb | |
| warnings.filterwarnings('ignore') | |
| np.random.seed(42) | |
| REPO_ROOT = os.path.abspath(os.path.join(os.getcwd(), '..')) | |
| DATA_DIR = os.path.join(REPO_ROOT, 'Datasets_all') | |
| OUT_DIR = Path('models') | |
| OUT_DIR.mkdir(exist_ok=True) | |
| RANDOM_STATE = 42 | |
| N_SPLITS = 5 | |
| CHAMPION_F1 = 0.6110 # Score from A4 | |
| class WeightedDecisionTree(DecisionTreeClassifier): | |
| """ | |
| A wrapper around DecisionTreeClassifier that properly handles sample weights. | |
| This tree is grown based on weighted training errors. | |
| """ | |
| def __init__(self, max_depth: int = 5, min_samples_split: int = 2, | |
| min_samples_leaf: int = 1, random_state: int = 42): | |
| super().__init__( | |
| max_depth=max_depth, | |
| min_samples_split=min_samples_split, | |
| min_samples_leaf=min_samples_leaf, | |
| random_state=random_state | |
| ) | |
| def fit(self, X, y, sample_weight=None): | |
| """Fit the decision tree with optional sample weights.""" | |
| return super().fit(X, y, sample_weight=sample_weight) | |
| class AdaBoostEnsemble(BaseEstimator, ClassifierMixin): | |
| """ | |
| AdaBoost ensemble of decision trees where each tree is grown based on | |
| weighted training errors. Weights are updated based on the error of | |
| previous trees. | |
| The algorithm: | |
| 1. Initialize equal weights for all training samples | |
| 2. For each tree in the ensemble: | |
| - Train a decision tree on weighted data | |
| - Calculate weighted error rate | |
| - Compute tree weight (alpha) | |
| - Update sample weights (increase for misclassified, decrease for correct) | |
| - Normalize weights | |
| 3. Make predictions using weighted voting | |
| """ | |
| def __init__( | |
| self, | |
| n_estimators: int = 50, | |
| max_depth: int = 5, | |
| min_samples_split: int = 2, | |
| min_samples_leaf: int = 1, | |
| random_state: int = 42 | |
| ): | |
| self.n_estimators = n_estimators | |
| self.max_depth = max_depth | |
| self.min_samples_split = min_samples_split | |
| self.min_samples_leaf = min_samples_leaf | |
| self.random_state = random_state | |
| self.trees: List[WeightedDecisionTree] = [] | |
| self.tree_weights: List[float] = [] | |
| self.n_classes: int = 0 | |
| self.classes_: np.ndarray = None | |
| def _initialize_weights(self, n_samples: int) -> np.ndarray: | |
| """Initialize equal weights for all samples.""" | |
| return np.ones(n_samples) / n_samples | |
| def _update_weights( | |
| self, | |
| weights: np.ndarray, | |
| y_true: np.ndarray, | |
| y_pred: np.ndarray, | |
| alpha: float | |
| ) -> np.ndarray: | |
| """ | |
| Update sample weights based on prediction errors. | |
| Increase weight for misclassified samples, decrease for correct. | |
| """ | |
| # Misclassified samples get multiplied by exp(alpha) | |
| # Correctly classified samples get multiplied by exp(-alpha) | |
| misclassified = y_true != y_pred | |
| updated_weights = weights * np.exp(alpha * misclassified.astype(float)) | |
| # Normalize weights | |
| return updated_weights / updated_weights.sum() | |
| def _compute_weighted_error( | |
| self, | |
| weights: np.ndarray, | |
| y_true: np.ndarray, | |
| y_pred: np.ndarray | |
| ) -> float: | |
| """Compute weighted error rate.""" | |
| misclassified = (y_true != y_pred).astype(float) | |
| return np.sum(weights * misclassified) / np.sum(weights) | |
| def _compute_alpha(self, error: float) -> float: | |
| """ | |
| Compute the weight of the classifier. | |
| Avoid division by zero and log(0). | |
| """ | |
| if error <= 0: | |
| return 10.0 # Very high weight for perfect classifier | |
| if error >= 1: | |
| return -10.0 # Very negative weight for completely wrong classifier | |
| return 0.5 * np.log((1 - error) / error) | |
| def fit(self, X: np.ndarray, y: np.ndarray) -> 'AdaBoostEnsemble': | |
| """Fit the AdaBoost ensemble.""" | |
| n_samples, n_features = X.shape | |
| self.classes_ = np.unique(y) | |
| self.n_classes = len(self.classes_) | |
| # Initialize sample weights | |
| weights = self._initialize_weights(n_samples) | |
| for i in range(self.n_estimators): | |
| # Create and train decision tree with current weights | |
| tree = WeightedDecisionTree( | |
| max_depth=self.max_depth, | |
| min_samples_split=self.min_samples_split, | |
| min_samples_leaf=self.min_samples_leaf, | |
| random_state=self.random_state + i | |
| ) | |
| tree.fit(X, y, sample_weight=weights) | |
| # Make predictions | |
| y_pred = tree.predict(X) | |
| # Calculate weighted error | |
| error = self._compute_weighted_error(weights, y, y_pred) | |
| # Compute tree weight (alpha) | |
| alpha = self._compute_alpha(error) | |
| # Update sample weights | |
| weights = self._update_weights(weights, y, y_pred, alpha) | |
| # Store tree and its weight | |
| self.trees.append(tree) | |
| self.tree_weights.append(alpha) | |
| print(f"Tree {i+1}/{self.n_estimators}: Error={error:.4f}, Alpha={alpha:.4f}") | |
| return self | |
| def predict(self, X: np.ndarray) -> np.ndarray: | |
| """Predict using weighted voting.""" | |
| # Get predictions from all trees | |
| all_predictions = np.array([tree.predict(X) for tree in self.trees]) | |
| # Get class labels | |
| classes = self.classes_ | |
| # Compute weighted votes for each class | |
| n_samples = X.shape[0] | |
| weighted_votes = np.zeros((n_samples, len(classes))) | |
| for tree_idx, tree in enumerate(self.trees): | |
| alpha = self.tree_weights[tree_idx] | |
| predictions = all_predictions[tree_idx] | |
| for class_idx, class_label in enumerate(classes): | |
| weighted_votes[:, class_idx] += alpha * (predictions == class_label) | |
| # Return class with highest weighted vote | |
| return classes[np.argmax(weighted_votes, axis=1)] | |
| def predict_proba(self, X: np.ndarray) -> np.ndarray: | |
| """Predict class probabilities using weighted voting.""" | |
| # Get predictions from all trees | |
| all_predictions = np.array([tree.predict(X) for tree in self.trees]) | |
| # Get class labels | |
| classes = self.classes_ | |
| # Compute weighted vote proportions for each class | |
| n_samples = X.shape[0] | |
| weighted_votes = np.zeros((n_samples, len(classes))) | |
| total_weight = sum(abs(w) for w in self.tree_weights) | |
| for tree_idx, tree in enumerate(self.trees): | |
| alpha = self.tree_weights[tree_idx] | |
| predictions = all_predictions[tree_idx] | |
| for class_idx, class_label in enumerate(classes): | |
| weighted_votes[:, class_idx] += abs(alpha) * (predictions == class_label) | |
| # Normalize to get probabilities | |
| return weighted_votes / total_weight | |
| def evaluate_cv(model, X, y, cv, name='Model'): | |
| """Evaluate model using cross-validation.""" | |
| scoring = { | |
| 'accuracy' : 'accuracy', | |
| 'f1' : 'f1_weighted', | |
| 'precision': 'precision_weighted', | |
| 'recall' : 'recall_weighted', | |
| } | |
| cv_res = cross_validate(model, X, y, cv=cv, scoring=scoring) | |
| return { | |
| 'Model' : name, | |
| 'Accuracy_mean' : cv_res['test_accuracy'].mean(), | |
| 'Accuracy_std' : cv_res['test_accuracy'].std(), | |
| 'F1_mean' : cv_res['test_f1'].mean(), | |
| 'F1_std' : cv_res['test_f1'].std(), | |
| 'Precision_mean': cv_res['test_precision'].mean(), | |
| 'Recall_mean' : cv_res['test_recall'].mean(), | |
| '_f1_scores' : cv_res['test_f1'], | |
| } | |
| # Load data | |
| movement_features_df = pd.read_csv(os.path.join(DATA_DIR, 'aimoscores.csv')) | |
| weaklink_scores_df = pd.read_csv(os.path.join(DATA_DIR, 'scores_and_weaklink.csv')) | |
| print('Movement features shape:', movement_features_df.shape) | |
| print('Weak link scores shape:', weaklink_scores_df.shape) | |
| DUPLICATE_NASM_COLS = [ | |
| 'No_1_NASM_Deviation', | |
| 'No_2_NASM_Deviation', | |
| 'No_3_NASM_Deviation', | |
| 'No_4_NASM_Deviation', | |
| 'No_5_NASM_Deviation', | |
| ] | |
| movement_features_df = movement_features_df.drop(columns=DUPLICATE_NASM_COLS) | |
| print('Shape after duplicate removal:', movement_features_df.shape) | |
| weaklink_categories = [ | |
| 'ExcessiveForwardLean', 'ForwardHead', 'LeftArmFallForward', | |
| 'LeftAsymmetricalWeightShift', 'LeftHeelRises', 'LeftKneeMovesInward', | |
| 'LeftKneeMovesOutward', 'LeftShoulderElevation', 'RightArmFallForward', | |
| 'RightAsymmetricalWeightShift', 'RightHeelRises', 'RightKneeMovesInward', | |
| 'RightKneeMovesOutward', 'RightShoulderElevation', | |
| ] | |
| weaklink_scores_df['WeakestLink'] = ( | |
| weaklink_scores_df[weaklink_categories].idxmax(axis=1) | |
| ) | |
| print('Weakest Link class distribution:') | |
| print(weaklink_scores_df['WeakestLink'].value_counts()) | |
| # Merge Datasets | |
| target_df = weaklink_scores_df[['ID', 'WeakestLink']].copy() | |
| merged_df = movement_features_df.merge(target_df, on='ID', how='inner') | |
| print('Merged dataset shape:', merged_df.shape) | |
| EXCLUDE_COLS = ['ID', 'WeakestLink', 'EstimatedScore'] | |
| feature_columns = [c for c in merged_df.columns if c not in EXCLUDE_COLS] | |
| X = merged_df[feature_columns].values | |
| y = merged_df['WeakestLink'].values | |
| print(f'Feature matrix shape : {X.shape}') | |
| print(f'Number of features : {len(feature_columns)}') | |
| print(f'Number of classes : {len(np.unique(y))}') | |
| # Split data | |
| X_train, X_test, y_train, y_test = train_test_split( | |
| X, y, test_size=0.2, random_state=RANDOM_STATE, stratify=y | |
| ) | |
| scaler = StandardScaler() | |
| X_train_scaled = scaler.fit_transform(X_train) | |
| X_test_scaled = scaler.transform(X_test) | |
| print(f'Training samples : {X_train.shape[0]}') | |
| print(f'Test samples : {X_test.shape[0]}') | |
| cv_strategy = StratifiedKFold( | |
| n_splits=N_SPLITS, shuffle=True, random_state=RANDOM_STATE | |
| ) | |
| # Train AdaBoost ensemble | |
| print("\n" + "="*60) | |
| print("TRAINING ADABOOST ENSEMBLE") | |
| print("="*60) | |
| adaboost_model = AdaBoostEnsemble( | |
| n_estimators=50, | |
| max_depth=5, | |
| min_samples_split=5, | |
| min_samples_leaf=2, | |
| random_state=RANDOM_STATE | |
| ) | |
| adaboost_model.fit(X_train_scaled, y_train) | |
| # Cross-validation | |
| adaboost_cv = evaluate_cv( | |
| adaboost_model, X_train_scaled, y_train, cv_strategy, | |
| name='AdaBoost Ensemble' | |
| ) | |
| # Test set evaluation | |
| adaboost_model.fit(X_train_scaled, y_train) | |
| y_pred_adaboost = adaboost_model.predict(X_test_scaled) | |
| test_f1_adaboost = f1_score(y_test, y_pred_adaboost, average='weighted') | |
| test_acc_adaboost = accuracy_score(y_test, y_pred_adaboost) | |
| test_prec_adaboost = precision_score(y_test, y_pred_adaboost, average='weighted', zero_division=0) | |
| test_rec_adaboost = recall_score(y_test, y_pred_adaboost, average='weighted', zero_division=0) | |
| print("\n" + "="*60) | |
| print("ADABOOST RESULTS") | |
| print("="*60) | |
| print(f'CV F1: {adaboost_cv["F1_mean"]:.4f} +/- {adaboost_cv["F1_std"]:.4f}') | |
| print(f'Test F1: {test_f1_adaboost:.4f}') | |
| print(f'Test Accuracy: {test_acc_adaboost:.4f}') | |
| print(f'Test Precision: {test_prec_adaboost:.4f}') | |
| print(f'Test Recall: {test_rec_adaboost:.4f}') | |
| # Compare with baseline models | |
| rf_champion = RandomForestClassifier( | |
| n_estimators=200, max_depth=15, | |
| min_samples_split=5, min_samples_leaf=2, | |
| class_weight='balanced', | |
| random_state=RANDOM_STATE, n_jobs=-1 | |
| ) | |
| rf_cv = evaluate_cv( | |
| rf_champion, X_train_scaled, y_train, cv_strategy, | |
| name='Random Forest (Baseline)' | |
| ) | |
| rf_champion.fit(X_train_scaled, y_train) | |
| y_pred_rf = rf_champion.predict(X_test_scaled) | |
| test_f1_rf = f1_score(y_test, y_pred_rf, average='weighted') | |
| print("\n" + "="*60) | |
| print("COMPARISON WITH BASELINE") | |
| print("="*60) | |
| print(f'Random Forest CV F1: {rf_cv["F1_mean"]:.4f} +/- {rf_cv["F1_std"]:.4f}') | |
| print(f'Random Forest Test F1: {test_f1_rf:.4f}') | |
| # Statistical significance test | |
| def corrected_resampled_ttest(scores_a, scores_b, n_train, n_test): | |
| k = len(scores_a) | |
| diff = scores_a - scores_b | |
| d_bar = diff.mean() | |
| s_sq = diff.var(ddof=1) | |
| var_corr = (1/k + n_test/n_train) * s_sq | |
| t_stat = d_bar / np.sqrt(var_corr) | |
| p_value = 2 * (1 - stats.t.cdf(abs(t_stat), df=k-1)) | |
| return float(t_stat), float(p_value) | |
| n_total = len(X_train_scaled) | |
| n_test_fold = n_total // N_SPLITS | |
| n_train_fold = n_total - n_test_fold | |
| result_map = { | |
| 'AdaBoost Ensemble': adaboost_cv['_f1_scores'], | |
| 'Random Forest': rf_cv['_f1_scores'] | |
| } | |
| adaboost_scores = result_map['AdaBoost Ensemble'] | |
| rf_scores = result_map['Random Forest'] | |
| t, p = corrected_resampled_ttest(adaboost_scores, rf_scores, n_train_fold, n_test_fold) | |
| print(f"\nStatistical Test (AdaBoost vs Random Forest):") | |
| print(f" t-statistic: {t:+.3f}") | |
| print(f" p-value: {p:.4f}") | |
| print(f" Significant at α=0.05: {'Yes' if p < 0.05 else 'No'}") | |
| # Save model | |
| artifact = { | |
| 'model' : adaboost_model, | |
| 'model_name' : 'AdaBoost Ensemble', | |
| 'scaler' : scaler, | |
| 'feature_columns' : feature_columns, | |
| 'cv_metrics': { | |
| 'f1_mean' : float(adaboost_cv['F1_mean']), | |
| 'f1_std' : float(adaboost_cv['F1_std']), | |
| 'accuracy_mean': float(adaboost_cv['Accuracy_mean']), | |
| }, | |
| 'test_metrics': { | |
| 'f1' : float(test_f1_adaboost), | |
| 'accuracy' : float(test_acc_adaboost), | |
| 'precision': float(test_prec_adaboost), | |
| 'recall' : float(test_rec_adaboost), | |
| }, | |
| 'a4_champion_f1' : CHAMPION_F1, | |
| 'improvement_pct': float((test_f1_adaboost - CHAMPION_F1) / CHAMPION_F1 * 100), | |
| } | |
| out_path = OUT_DIR / 'adaboost_classification.pkl' | |
| with open(out_path, 'wb') as f: | |
| pickle.dump(artifact, f) | |
| print(f'\nSaved model to: {out_path}') | |
| # Classification report | |
| print('\nCLASSIFICATION REPORT: AdaBoost Ensemble') | |
| print(classification_report(y_test, y_pred_adaboost, zero_division=0)) | |
| # Feature importance analysis (simplified) | |
| print("\n" + "="*60) | |
| print("FEATURE IMPORTANCE ANALYSIS") | |
| print("="*60) | |
| # Calculate feature importance as average across all trees | |
| all_importances = np.zeros(len(feature_columns)) | |
| for tree in adaboost_model.trees: | |
| all_importances += tree.feature_importances_ | |
| avg_importances = all_importances / len(adaboost_model.trees) | |
| importance_df = pd.DataFrame({ | |
| 'Feature': feature_columns, | |
| 'Importance': avg_importances | |
| }).sort_values('Importance', ascending=False) | |
| print("\nTop 10 Most Important Features:") | |
| print(importance_df.head(10).to_string(index=False)) | |
| # Plot feature importance | |
| plt.figure(figsize=(12, 8)) | |
| top_features = importance_df.head(15) | |
| plt.barh(range(len(top_features)), top_features['Importance'].values) | |
| plt.yticks(range(len(top_features)), top_features['Feature'].values) | |
| plt.xlabel('Average Feature Importance') | |
| plt.ylabel('Features') | |
| plt.title('Top 15 Feature Importance - AdaBoost Ensemble') | |
| plt.gca().invert_yaxis() | |
| plt.tight_layout() | |
| plt.savefig(OUT_DIR / 'adaboost_feature_importance.png', dpi=150) | |
| plt.close() | |
| print(f"\nSaved feature importance plot to: {OUT_DIR / 'adaboost_feature_importance.png'}") | |