github_sync / A6 /test_classification_loading.py
Bachstelze
add time bench and viz
a639edc
#!/usr/bin/env python3
"""
Script to load and execute all classification models with one sample.
Tests models from A4, A5, A5b, and A6.
Data loading adapted from classification_baseline.py to use the same
data processing pipeline for consistent feature extraction.
NOTE: A4 Random Forest model was trained WITH the 5 duplicate NASM columns
(No_1_NASM_Deviation through No_5_NASM_Deviation), while other models (A5, A5b, A6)
were trained WITHOUT them. This script loads data WITH the duplicate columns
to support the A4 model, and filters them out for other models as needed.
"""
import os
import sys
import pickle
import warnings
import numpy as np
import pandas as pd
from pathlib import Path
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
# Suppress warnings for cleaner output
warnings.filterwarnings('ignore')
# Add project root to path
project_root = os.path.abspath(os.path.dirname(__file__))
sys.path.insert(0, project_root)
# Import model paths from all_classification.py
sys.path.insert(0, project_root)
from all_classification import (
a4_rf,
a5_ensemnble,
a5b_adaboost,
a5b_bagging_tree,
a6_svm
)
# Import custom classes from A5b classification_adaboost.py
# These are needed for unpickling the AdaBoost model
#sys.path.insert(0, os.path.join(project_root, '..', 'A5b'))
from adaboost_classes import (
AdaBoostEnsemble,
WeightedDecisionTree
)
# Data paths
REPO_ROOT = os.path.abspath(os.path.join(project_root, '..'))
DATA_DIR = os.path.join(REPO_ROOT, 'Datasets_all')
# Weaklink categories (14 classes)
WEAKLINK_CATEGORIES = [
'ExcessiveForwardLean', 'ForwardHead', 'LeftArmFallForward',
'LeftAsymmetricalWeightShift', 'LeftHeelRises', 'LeftKneeMovesInward',
'LeftKneeMovesOutward', 'LeftShoulderElevation', 'RightArmFallForward',
'RightAsymmetricalWeightShift', 'RightHeelRises', 'RightKneeMovesInward',
'RightKneeMovesOutward', 'RightShoulderElevation'
]
# Duplicate NASM columns to remove (as in classification_baseline.py)
# NOTE: A4 Random Forest model was trained WITH these 5 duplicate columns,
# so they must be kept in the data for A4 to work correctly
DUPLICATE_NASM_COLS = [
'No_1_NASM_Deviation',
'No_2_NASM_Deviation',
'No_3_NASM_Deviation',
'No_4_NASM_Deviation',
'No_5_NASM_Deviation',
]
# Columns to exclude when extracting features
EXCLUDE_COLS = ['ID', 'WeakestLink', 'EstimatedScore']
# Expected classification classes (14 weaklink categories)
EXPECTED_CLASSES = [
'ExcessiveForwardLean', 'ForwardHead', 'LeftArmFallForward',
'LeftAsymmetricalWeightShift', 'LeftHeelRises', 'LeftKneeMovesInward',
'LeftKneeMovesOutward', 'LeftShoulderElevation', 'RightArmFallForward',
'RightAsymmetricalWeightShift', 'RightHeelRises', 'RightKneeMovesInward',
'RightKneeMovesOutward', 'RightShoulderElevation'
]
def load_and_prepare_data():
"""Load and prepare data following the same pipeline as classification_baseline.py.
NOTE: This function loads data WITH the 5 duplicate NASM columns because
the A4 Random Forest model was trained with those columns included.
Other models (A5, A5b, A6) will filter out these columns based on their feature_columns.
"""
# Load datasets
movement_features_df = pd.read_csv(os.path.join(DATA_DIR, 'aimoscores.csv'))
weaklink_scores_df = pd.read_csv(os.path.join(DATA_DIR, 'scores_and_weaklink.csv'))
print('Movement features shape:', movement_features_df.shape)
print('Weak link scores shape:', weaklink_scores_df.shape)
# NOTE: We do NOT remove duplicate NASM columns here because
# the A4 Random Forest model was trained WITH these columns
# The other models (A5, A5b, A6) will filter them out based on their saved feature_columns
print('NOTE: Keeping duplicate NASM columns for A4 Random Forest model compatibility')
# Create WeakestLink target column
weaklink_scores_df['WeakestLink'] = (
weaklink_scores_df[WEAKLINK_CATEGORIES].idxmax(axis=1)
)
print('Weakest Link class distribution:')
print(weaklink_scores_df['WeakestLink'].value_counts())
# Merge datasets
target_df = weaklink_scores_df[['ID', 'WeakestLink']].copy()
merged_df = movement_features_df.merge(target_df, on='ID', how='inner')
print('Merged dataset shape:', merged_df.shape)
# Extract feature columns - include ALL columns except EXCLUDE_COLS
# This ensures the 5 duplicate NASM columns are included for A4
feature_columns = [c for c in merged_df.columns if c not in EXCLUDE_COLS]
X = merged_df[feature_columns].values
y = merged_df['WeakestLink'].values
print(f'Feature matrix shape : {X.shape}')
print(f'Number of features : {len(feature_columns)}')
print(f'Number of classes : {len(np.unique(y))}')
# Create train/test split (same as baseline)
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42, stratify=y
)
# Fit scaler on training data
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)
return {
'feature_columns': feature_columns,
'scaler': scaler,
'X_train': X_train,
'X_train_scaled': X_train_scaled,
'y_train': y_train,
'X_test': X_test,
'X_test_scaled': X_test_scaled,
'y_test': y_test,
'merged_df': merged_df,
}
def load_model(model_path, model_name):
"""Load a model from a pickle file."""
full_path = os.path.join(project_root, model_path)
if not os.path.exists(full_path):
print(f" ⚠️ Model file not found: {full_path}")
return None, None, None, None
try:
with open(full_path, 'rb') as f:
artifact = pickle.load(f)
# Extract model and scaler based on artifact structure
if isinstance(artifact, dict):
model = artifact.get('model')
scaler = artifact.get('scaler')
feature_columns = artifact.get('feature_columns')
else:
# A6 SVM is a Pipeline object
model = artifact
# Extract scaler from pipeline if it exists
if hasattr(model, 'steps') and len(model.steps) >= 1:
# Find the scaler in the pipeline
scaler = None
for step_name, step_obj in model.steps:
if hasattr(step_obj, 'transform'):
# Check if this is a scaler (has n_features_in_ attribute)
if hasattr(step_obj, 'n_features_in_') and not hasattr(step_obj, 'predict'):
scaler = step_obj
break
# If no scaler found, try to get it from the first step
if scaler is None and len(model.steps) > 0:
first_step = model.steps[0][1]
if hasattr(first_step, 'transform') and hasattr(first_step, 'n_features_in_'):
scaler = first_step
# For A6 SVM pipeline, extract feature columns from the scaler
feature_columns = None
if hasattr(model, 'steps') and len(model.steps) > 0:
# Get feature names from the first step (should be the scaler)
first_step = model.steps[0][1]
if hasattr(first_step, 'get_feature_names_out'):
try:
names = first_step.get_feature_names_out()
# Only use feature names if they are real column names,
# not generic placeholder names like x0, x1, ...
import re
if not all(re.fullmatch(r'x\d+', n) for n in names):
feature_columns = names
# else: leave feature_columns = None; handled below
except:
pass
print(f" βœ“ Loaded {model_name}")
#print(model, scaler, feature_columns, artifact)
return model, scaler, feature_columns, artifact
except Exception as e:
print(f" βœ— Error loading {model_name}: {e}")
return None, None, None, None
def predict_with_model(model, scaler, sample_features, model_name):
"""Make a prediction using the model."""
try:
features = sample_features.copy()
# Apply scaler if available
if scaler is not None:
features = scaler.transform(features)
# Make prediction
prediction = model.predict(features)
prediction_proba = None
# Get prediction probabilities if available
if hasattr(model, 'predict_proba'):
prediction_proba = model.predict_proba(features)
return prediction, prediction_proba, None
except Exception as e:
return None, None, str(e)
def create_sample_from_training_data(training_data, feature_columns, scaler):
"""Create a sample from the training data for testing."""
# Get first sample from training data
sample = training_data['X_train'][0:1].copy()
sample_df = pd.DataFrame(sample, columns=feature_columns)
# Scale if scaler is available
if scaler is not None:
sample_df_scaled = scaler.transform(sample_df)
return sample_df, sample_df_scaled
return sample_df, sample_df
def filter_features_for_model(sample_df, model_feature_columns):
"""Filter sample data to only include features the model expects."""
available_features = [f for f in model_feature_columns if f in sample_df.columns]
if len(available_features) == 0:
print(f" ⚠️ No matching features found, using all available")
available_features = sample_df.columns.tolist()
return sample_df[available_features]
def main():
"""Main function to test all models."""
print("=" * 60)
print("Testing All Classification Models with One Sample")
print("=" * 60)
print()
# Load and prepare data using the same pipeline as classification_baseline.py
# NOTE: Data is loaded WITH the 5 duplicate NASM columns for A4 compatibility
print("Loading data...")
data = load_and_prepare_data()
print()
# Create sample from training data
sample_features, sample_features_scaled = create_sample_from_training_data(
data, data['feature_columns'], data['scaler']
)
print(f"Sample data shape: {sample_features.shape}")
print(f"Number of features (including duplicates): {len(data['feature_columns'])}")
print()
# Define models to test
models_to_test = [
('A4 Random Forest', a4_rf),
('A5 Ensemble', a5_ensemnble),
('A5b Adaboost', a5b_adaboost),
('A5b Bagging Trees', a5b_bagging_tree),
('A6 SVM', a6_svm),
]
results = []
for model_name, model_path in models_to_test:
print(f"Testing {model_name}...")
# Load model
model, scaler, model_feature_columns, artifact = load_model(model_path, model_name)
if model is None:
print(f" Skipping {model_name} due to load error")
results.append((model_name, 'LOAD_ERROR', None, None, None))
print()
continue
# Determine feature columns to use
if model_feature_columns is not None:
# Filter sample data to only include features the model expects
test_features = filter_features_for_model(sample_features, model_feature_columns)
print(f" Model expects {len(model_feature_columns)} features, using {len(test_features.columns)} available")
elif hasattr(model, 'steps'):
# Pipeline with generic/unknown feature names (e.g. A6 SVM trained without
# the 5 duplicate NASM columns). Drop those duplicate columns so the number
# of features matches what the pipeline's scaler expects.
first_step = model.steps[0][1]
n_expected = getattr(first_step, 'n_features_in_', None)
cols_without_dupes = [c for c in sample_features.columns if c not in DUPLICATE_NASM_COLS]
if n_expected is not None and len(cols_without_dupes) == n_expected:
test_features = sample_features[cols_without_dupes]
print(f" Pipeline expects {n_expected} features β€” dropped duplicate NASM cols, using {len(test_features.columns)} features")
else:
# Fallback: just take the first n_expected columns
test_features = sample_features.iloc[:, :n_expected] if n_expected else sample_features
print(f" Pipeline expects {n_expected} features, sliced sample to {len(test_features.columns)} features")
else:
test_features = sample_features
print(f" Using all {len(sample_features.columns)} available features")
# Make prediction
# For A6 SVM pipeline, don't pass the scaler separately since it's already in the pipeline
# For other models, pass the scaler if available
if model_feature_columns is None and hasattr(model, 'steps'):
# This is likely the A6 SVM pipeline - don't apply scaler separately
scaler_to_use = None
else:
scaler_to_use = scaler
prediction, prediction_proba, error = predict_with_model(
model, scaler_to_use, test_features, model_name
)
if error:
print(f" βœ— Prediction error: {error}")
results.append((model_name, 'PREDICTION_ERROR', None, None, error))
print()
continue
# Display results
print(f" βœ“ Prediction: {prediction[0]}")
if prediction_proba is not None:
print(f" βœ“ Prediction probabilities shape: {prediction_proba.shape}")
top_classes_idx = np.argsort(prediction_proba[0])[-3:][::-1]
top_classes = [EXPECTED_CLASSES[i] for i in top_classes_idx]
top_probs = [prediction_proba[0][i] for i in top_classes_idx]
print(f" βœ“ Top 3 classes: {list(zip(top_classes, [f'{p:.3f}' for p in top_probs]))}")
print(f" βœ“ Model type: {type(model).__name__}")
# Check if model has classes attribute
if hasattr(model, 'classes_'):
print(f" βœ“ Model classes: {list(model.classes_)}")
results.append((model_name, 'SUCCESS', prediction, prediction_proba, None))
print()
# Summary
print("=" * 60)
print("Summary")
print("=" * 60)
for model_name, status, prediction, proba, error in results:
if status == 'SUCCESS':
pred_str = prediction[0] if prediction is not None else 'N/A'
print(f" {model_name}: βœ“ SUCCESS - Prediction: {pred_str}")
else:
print(f" {model_name}: βœ— {status} - {error}")
print()
print("All models tested!")
if __name__ == "__main__":
main()