Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Script to load and execute all classification models with one sample. | |
| Tests models from A4, A5, A5b, and A6. | |
| Data loading adapted from classification_baseline.py to use the same | |
| data processing pipeline for consistent feature extraction. | |
| NOTE: A4 Random Forest model was trained WITH the 5 duplicate NASM columns | |
| (No_1_NASM_Deviation through No_5_NASM_Deviation), while other models (A5, A5b, A6) | |
| were trained WITHOUT them. This script loads data WITH the duplicate columns | |
| to support the A4 model, and filters them out for other models as needed. | |
| """ | |
| import os | |
| import sys | |
| import pickle | |
| import warnings | |
| import numpy as np | |
| import pandas as pd | |
| from pathlib import Path | |
| from sklearn.preprocessing import StandardScaler | |
| from sklearn.model_selection import train_test_split | |
| # Suppress warnings for cleaner output | |
| warnings.filterwarnings('ignore') | |
| # Add project root to path | |
| project_root = os.path.abspath(os.path.dirname(__file__)) | |
| sys.path.insert(0, project_root) | |
| # Import model paths from all_classification.py | |
| sys.path.insert(0, project_root) | |
| from all_classification import ( | |
| a4_rf, | |
| a5_ensemnble, | |
| a5b_adaboost, | |
| a5b_bagging_tree, | |
| a6_svm | |
| ) | |
| # Import custom classes from A5b classification_adaboost.py | |
| # These are needed for unpickling the AdaBoost model | |
| #sys.path.insert(0, os.path.join(project_root, '..', 'A5b')) | |
| from adaboost_classes import ( | |
| AdaBoostEnsemble, | |
| WeightedDecisionTree | |
| ) | |
| # Data paths | |
| REPO_ROOT = os.path.abspath(os.path.join(project_root, '..')) | |
| DATA_DIR = os.path.join(REPO_ROOT, 'Datasets_all') | |
| # Weaklink categories (14 classes) | |
| WEAKLINK_CATEGORIES = [ | |
| 'ExcessiveForwardLean', 'ForwardHead', 'LeftArmFallForward', | |
| 'LeftAsymmetricalWeightShift', 'LeftHeelRises', 'LeftKneeMovesInward', | |
| 'LeftKneeMovesOutward', 'LeftShoulderElevation', 'RightArmFallForward', | |
| 'RightAsymmetricalWeightShift', 'RightHeelRises', 'RightKneeMovesInward', | |
| 'RightKneeMovesOutward', 'RightShoulderElevation' | |
| ] | |
| # Duplicate NASM columns to remove (as in classification_baseline.py) | |
| # NOTE: A4 Random Forest model was trained WITH these 5 duplicate columns, | |
| # so they must be kept in the data for A4 to work correctly | |
| DUPLICATE_NASM_COLS = [ | |
| 'No_1_NASM_Deviation', | |
| 'No_2_NASM_Deviation', | |
| 'No_3_NASM_Deviation', | |
| 'No_4_NASM_Deviation', | |
| 'No_5_NASM_Deviation', | |
| ] | |
| # Columns to exclude when extracting features | |
| EXCLUDE_COLS = ['ID', 'WeakestLink', 'EstimatedScore'] | |
| # Expected classification classes (14 weaklink categories) | |
| EXPECTED_CLASSES = [ | |
| 'ExcessiveForwardLean', 'ForwardHead', 'LeftArmFallForward', | |
| 'LeftAsymmetricalWeightShift', 'LeftHeelRises', 'LeftKneeMovesInward', | |
| 'LeftKneeMovesOutward', 'LeftShoulderElevation', 'RightArmFallForward', | |
| 'RightAsymmetricalWeightShift', 'RightHeelRises', 'RightKneeMovesInward', | |
| 'RightKneeMovesOutward', 'RightShoulderElevation' | |
| ] | |
| def load_and_prepare_data(): | |
| """Load and prepare data following the same pipeline as classification_baseline.py. | |
| NOTE: This function loads data WITH the 5 duplicate NASM columns because | |
| the A4 Random Forest model was trained with those columns included. | |
| Other models (A5, A5b, A6) will filter out these columns based on their feature_columns. | |
| """ | |
| # Load datasets | |
| movement_features_df = pd.read_csv(os.path.join(DATA_DIR, 'aimoscores.csv')) | |
| weaklink_scores_df = pd.read_csv(os.path.join(DATA_DIR, 'scores_and_weaklink.csv')) | |
| print('Movement features shape:', movement_features_df.shape) | |
| print('Weak link scores shape:', weaklink_scores_df.shape) | |
| # NOTE: We do NOT remove duplicate NASM columns here because | |
| # the A4 Random Forest model was trained WITH these columns | |
| # The other models (A5, A5b, A6) will filter them out based on their saved feature_columns | |
| print('NOTE: Keeping duplicate NASM columns for A4 Random Forest model compatibility') | |
| # Create WeakestLink target column | |
| weaklink_scores_df['WeakestLink'] = ( | |
| weaklink_scores_df[WEAKLINK_CATEGORIES].idxmax(axis=1) | |
| ) | |
| print('Weakest Link class distribution:') | |
| print(weaklink_scores_df['WeakestLink'].value_counts()) | |
| # Merge datasets | |
| target_df = weaklink_scores_df[['ID', 'WeakestLink']].copy() | |
| merged_df = movement_features_df.merge(target_df, on='ID', how='inner') | |
| print('Merged dataset shape:', merged_df.shape) | |
| # Extract feature columns - include ALL columns except EXCLUDE_COLS | |
| # This ensures the 5 duplicate NASM columns are included for A4 | |
| feature_columns = [c for c in merged_df.columns if c not in EXCLUDE_COLS] | |
| X = merged_df[feature_columns].values | |
| y = merged_df['WeakestLink'].values | |
| print(f'Feature matrix shape : {X.shape}') | |
| print(f'Number of features : {len(feature_columns)}') | |
| print(f'Number of classes : {len(np.unique(y))}') | |
| # Create train/test split (same as baseline) | |
| X_train, X_test, y_train, y_test = train_test_split( | |
| X, y, test_size=0.2, random_state=42, stratify=y | |
| ) | |
| # Fit scaler on training data | |
| scaler = StandardScaler() | |
| X_train_scaled = scaler.fit_transform(X_train) | |
| X_test_scaled = scaler.transform(X_test) | |
| return { | |
| 'feature_columns': feature_columns, | |
| 'scaler': scaler, | |
| 'X_train': X_train, | |
| 'X_train_scaled': X_train_scaled, | |
| 'y_train': y_train, | |
| 'X_test': X_test, | |
| 'X_test_scaled': X_test_scaled, | |
| 'y_test': y_test, | |
| 'merged_df': merged_df, | |
| } | |
| def load_model(model_path, model_name): | |
| """Load a model from a pickle file.""" | |
| full_path = os.path.join(project_root, model_path) | |
| if not os.path.exists(full_path): | |
| print(f" β οΈ Model file not found: {full_path}") | |
| return None, None, None, None | |
| try: | |
| with open(full_path, 'rb') as f: | |
| artifact = pickle.load(f) | |
| # Extract model and scaler based on artifact structure | |
| if isinstance(artifact, dict): | |
| model = artifact.get('model') | |
| scaler = artifact.get('scaler') | |
| feature_columns = artifact.get('feature_columns') | |
| else: | |
| # A6 SVM is a Pipeline object | |
| model = artifact | |
| # Extract scaler from pipeline if it exists | |
| if hasattr(model, 'steps') and len(model.steps) >= 1: | |
| # Find the scaler in the pipeline | |
| scaler = None | |
| for step_name, step_obj in model.steps: | |
| if hasattr(step_obj, 'transform'): | |
| # Check if this is a scaler (has n_features_in_ attribute) | |
| if hasattr(step_obj, 'n_features_in_') and not hasattr(step_obj, 'predict'): | |
| scaler = step_obj | |
| break | |
| # If no scaler found, try to get it from the first step | |
| if scaler is None and len(model.steps) > 0: | |
| first_step = model.steps[0][1] | |
| if hasattr(first_step, 'transform') and hasattr(first_step, 'n_features_in_'): | |
| scaler = first_step | |
| # For A6 SVM pipeline, extract feature columns from the scaler | |
| feature_columns = None | |
| if hasattr(model, 'steps') and len(model.steps) > 0: | |
| # Get feature names from the first step (should be the scaler) | |
| first_step = model.steps[0][1] | |
| if hasattr(first_step, 'get_feature_names_out'): | |
| try: | |
| names = first_step.get_feature_names_out() | |
| # Only use feature names if they are real column names, | |
| # not generic placeholder names like x0, x1, ... | |
| import re | |
| if not all(re.fullmatch(r'x\d+', n) for n in names): | |
| feature_columns = names | |
| # else: leave feature_columns = None; handled below | |
| except: | |
| pass | |
| print(f" β Loaded {model_name}") | |
| #print(model, scaler, feature_columns, artifact) | |
| return model, scaler, feature_columns, artifact | |
| except Exception as e: | |
| print(f" β Error loading {model_name}: {e}") | |
| return None, None, None, None | |
| def predict_with_model(model, scaler, sample_features, model_name): | |
| """Make a prediction using the model.""" | |
| try: | |
| features = sample_features.copy() | |
| # Apply scaler if available | |
| if scaler is not None: | |
| features = scaler.transform(features) | |
| # Make prediction | |
| prediction = model.predict(features) | |
| prediction_proba = None | |
| # Get prediction probabilities if available | |
| if hasattr(model, 'predict_proba'): | |
| prediction_proba = model.predict_proba(features) | |
| return prediction, prediction_proba, None | |
| except Exception as e: | |
| return None, None, str(e) | |
| def create_sample_from_training_data(training_data, feature_columns, scaler): | |
| """Create a sample from the training data for testing.""" | |
| # Get first sample from training data | |
| sample = training_data['X_train'][0:1].copy() | |
| sample_df = pd.DataFrame(sample, columns=feature_columns) | |
| # Scale if scaler is available | |
| if scaler is not None: | |
| sample_df_scaled = scaler.transform(sample_df) | |
| return sample_df, sample_df_scaled | |
| return sample_df, sample_df | |
| def filter_features_for_model(sample_df, model_feature_columns): | |
| """Filter sample data to only include features the model expects.""" | |
| available_features = [f for f in model_feature_columns if f in sample_df.columns] | |
| if len(available_features) == 0: | |
| print(f" β οΈ No matching features found, using all available") | |
| available_features = sample_df.columns.tolist() | |
| return sample_df[available_features] | |
| def main(): | |
| """Main function to test all models.""" | |
| print("=" * 60) | |
| print("Testing All Classification Models with One Sample") | |
| print("=" * 60) | |
| print() | |
| # Load and prepare data using the same pipeline as classification_baseline.py | |
| # NOTE: Data is loaded WITH the 5 duplicate NASM columns for A4 compatibility | |
| print("Loading data...") | |
| data = load_and_prepare_data() | |
| print() | |
| # Create sample from training data | |
| sample_features, sample_features_scaled = create_sample_from_training_data( | |
| data, data['feature_columns'], data['scaler'] | |
| ) | |
| print(f"Sample data shape: {sample_features.shape}") | |
| print(f"Number of features (including duplicates): {len(data['feature_columns'])}") | |
| print() | |
| # Define models to test | |
| models_to_test = [ | |
| ('A4 Random Forest', a4_rf), | |
| ('A5 Ensemble', a5_ensemnble), | |
| ('A5b Adaboost', a5b_adaboost), | |
| ('A5b Bagging Trees', a5b_bagging_tree), | |
| ('A6 SVM', a6_svm), | |
| ] | |
| results = [] | |
| for model_name, model_path in models_to_test: | |
| print(f"Testing {model_name}...") | |
| # Load model | |
| model, scaler, model_feature_columns, artifact = load_model(model_path, model_name) | |
| if model is None: | |
| print(f" Skipping {model_name} due to load error") | |
| results.append((model_name, 'LOAD_ERROR', None, None, None)) | |
| print() | |
| continue | |
| # Determine feature columns to use | |
| if model_feature_columns is not None: | |
| # Filter sample data to only include features the model expects | |
| test_features = filter_features_for_model(sample_features, model_feature_columns) | |
| print(f" Model expects {len(model_feature_columns)} features, using {len(test_features.columns)} available") | |
| elif hasattr(model, 'steps'): | |
| # Pipeline with generic/unknown feature names (e.g. A6 SVM trained without | |
| # the 5 duplicate NASM columns). Drop those duplicate columns so the number | |
| # of features matches what the pipeline's scaler expects. | |
| first_step = model.steps[0][1] | |
| n_expected = getattr(first_step, 'n_features_in_', None) | |
| cols_without_dupes = [c for c in sample_features.columns if c not in DUPLICATE_NASM_COLS] | |
| if n_expected is not None and len(cols_without_dupes) == n_expected: | |
| test_features = sample_features[cols_without_dupes] | |
| print(f" Pipeline expects {n_expected} features β dropped duplicate NASM cols, using {len(test_features.columns)} features") | |
| else: | |
| # Fallback: just take the first n_expected columns | |
| test_features = sample_features.iloc[:, :n_expected] if n_expected else sample_features | |
| print(f" Pipeline expects {n_expected} features, sliced sample to {len(test_features.columns)} features") | |
| else: | |
| test_features = sample_features | |
| print(f" Using all {len(sample_features.columns)} available features") | |
| # Make prediction | |
| # For A6 SVM pipeline, don't pass the scaler separately since it's already in the pipeline | |
| # For other models, pass the scaler if available | |
| if model_feature_columns is None and hasattr(model, 'steps'): | |
| # This is likely the A6 SVM pipeline - don't apply scaler separately | |
| scaler_to_use = None | |
| else: | |
| scaler_to_use = scaler | |
| prediction, prediction_proba, error = predict_with_model( | |
| model, scaler_to_use, test_features, model_name | |
| ) | |
| if error: | |
| print(f" β Prediction error: {error}") | |
| results.append((model_name, 'PREDICTION_ERROR', None, None, error)) | |
| print() | |
| continue | |
| # Display results | |
| print(f" β Prediction: {prediction[0]}") | |
| if prediction_proba is not None: | |
| print(f" β Prediction probabilities shape: {prediction_proba.shape}") | |
| top_classes_idx = np.argsort(prediction_proba[0])[-3:][::-1] | |
| top_classes = [EXPECTED_CLASSES[i] for i in top_classes_idx] | |
| top_probs = [prediction_proba[0][i] for i in top_classes_idx] | |
| print(f" β Top 3 classes: {list(zip(top_classes, [f'{p:.3f}' for p in top_probs]))}") | |
| print(f" β Model type: {type(model).__name__}") | |
| # Check if model has classes attribute | |
| if hasattr(model, 'classes_'): | |
| print(f" β Model classes: {list(model.classes_)}") | |
| results.append((model_name, 'SUCCESS', prediction, prediction_proba, None)) | |
| print() | |
| # Summary | |
| print("=" * 60) | |
| print("Summary") | |
| print("=" * 60) | |
| for model_name, status, prediction, proba, error in results: | |
| if status == 'SUCCESS': | |
| pred_str = prediction[0] if prediction is not None else 'N/A' | |
| print(f" {model_name}: β SUCCESS - Prediction: {pred_str}") | |
| else: | |
| print(f" {model_name}: β {status} - {error}") | |
| print() | |
| print("All models tested!") | |
| if __name__ == "__main__": | |
| main() | |