File size: 44,821 Bytes
b26156a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 |
#!/usr/bin/env python3
"""
Machine learning pose classification script.
Features:
1. Train classifiers on pose landmark inputs
2. Use selected landmark coordinates as features
3. Use folder names as class labels
4. Train and evaluate models
Usage:
python ml_pose_classifier.py [--data DATA_DIR] [--model MODEL_TYPE] [--test-size RATIO]
"""
import json
import argparse
import numpy as np
import time
from pathlib import Path
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.svm import SVC
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score
from sklearn.preprocessing import StandardScaler, LabelEncoder
# from sklearn.pipeline import Pipeline # not used
from sklearn.neural_network import MLPRegressor
import joblib
import matplotlib.pyplot as plt
# seaborn is optional; used only for confusion matrix plotting
try:
import seaborn as sns
SEABORN_AVAILABLE = True
except ImportError:
SEABORN_AVAILABLE = False
# ONNX related imports
try:
from skl2onnx import convert_sklearn
from skl2onnx.common.data_types import FloatTensorType
# onnx is not required here; we import it lazily where needed
ONNX_AVAILABLE = True
except ImportError:
ONNX_AVAILABLE = False
# ONNX Runtime import
try:
# onnxruntime is optional and not required unless ONNX runtime testing is implemented
ONNX_RUNTIME_AVAILABLE = False
except ImportError:
ONNX_RUNTIME_AVAILABLE = False
class PoseClassifier:
def __init__(self, model_type='random_forest'):
"""
Initialize the pose classifier.
Args:
model_type: model type ('random_forest', 'svm', 'gradient_boost', 'logistic', 'distilled_rf')
"""
self.model_type = model_type
self.model = None
self.student_model = None # If distillation is used, save student (MLP) model
self.scaler = StandardScaler()
self.label_encoder = LabelEncoder()
# Define joints we want to use (based on MediaPipe keypoint indices)
self.target_joints = [
'nose', # Head (nose as reference, but will actually be 0,0,0)
'left_shoulder', # Left shoulder
'right_shoulder', # Right shoulder
'left_elbow', # Left elbow
'right_elbow', # Right elbow
'left_wrist', # Left wrist
'right_wrist', # Right wrist
'left_hip', # Left hip
'right_hip', # Right hip
'left_knee', # Left knee
'right_knee', # Right knee
'left_ankle', # Left ankle
'right_ankle' # Right ankle
]
self.feature_columns = []
for joint in self.target_joints:
self.feature_columns.extend([f'{joint}_x', f'{joint}_y', f'{joint}_z'])
print(f"Target joints: {len(self.target_joints)}")
print(f"Feature dimension: {len(self.feature_columns)}")
print("Joint list:", ', '.join(self.target_joints))
def _get_model(self):
"""Create a classifier based on the selected model type."""
if self.model_type == 'random_forest':
return RandomForestClassifier(
n_estimators=100,
max_depth=15,
min_samples_split=5,
min_samples_leaf=2,
random_state=42,
n_jobs=-1
)
elif self.model_type == 'svm':
return SVC(
C=1.0,
kernel='rbf',
gamma='scale',
random_state=42
)
elif self.model_type == 'gradient_boost':
return GradientBoostingClassifier(
n_estimators=100,
learning_rate=0.1,
max_depth=6,
random_state=42
)
elif self.model_type == 'logistic':
return LogisticRegression(
C=10.0, # Increase regularization parameter to improve model complexity
max_iter=2000, # Increase maximum iterations
solver='lbfgs', # Use L-BFGS solver, suitable for small datasets
multi_class='multinomial', # Multi-class strategy
random_state=42,
n_jobs=-1
)
elif self.model_type == 'distilled_rf':
# Teacher uses random forest (returns an RF for training process)
return RandomForestClassifier(
n_estimators=100,
max_depth=15,
min_samples_split=5,
min_samples_leaf=2,
random_state=42,
n_jobs=-1
)
else:
raise ValueError(f"Unsupported model type: {self.model_type}")
def load_data(self, data_dir):
"""
Load pose data from JSON files
Args:
data_dir: Data directory containing label folders
Returns:
tuple: (feature data, labels)
"""
data_path = Path(data_dir)
all_features = []
all_labels = []
print(f"Loading data from: {data_path}")
# Iterate over each label directory
for label_dir in data_path.iterdir():
if not label_dir.is_dir() or not label_dir.name.startswith('label_'):
continue
label = label_dir.name
json_files = list(label_dir.glob('*.json'))
print(f"Processing {label}: {len(json_files)} files")
for json_file in json_files:
try:
with open(json_file, 'r', encoding='utf-8') as f:
data = json.load(f)
landmarks = data.get('landmarks', {})
# Extract coordinates of target joints
features = []
missing_joints = []
for joint in self.target_joints:
if joint in landmarks:
joint_data = landmarks[joint]
features.extend([
joint_data.get('x', 0.0),
joint_data.get('y', 0.0),
joint_data.get('z', 0.0)
])
else:
# If a joint is missing, fill with zeros
features.extend([0.0, 0.0, 0.0])
missing_joints.append(joint)
if len(features) == len(self.feature_columns):
all_features.append(features)
all_labels.append(label)
else:
print(f"Skipping file {json_file}: feature dimension mismatch")
if missing_joints:
print(f"File {json_file.name} missing joints: {missing_joints}")
except Exception as e:
print(f"Error reading file {json_file}: {e}")
continue
print(f"Loaded {len(all_features)} samples")
# count samples per label
label_counts = {}
for label in all_labels:
label_counts[label] = label_counts.get(label, 0) + 1
print("Label distribution:")
for label, count in sorted(label_counts.items()):
print(f" {label}: {count} samples")
return np.array(all_features), np.array(all_labels)
def train(self, X, y, test_size=0.2):
"""
Train the classifier.
Args:
X: feature data
y: labels
test_size: ratio for test split
Returns:
dict: a dictionary containing training results
"""
print(f"\nStarting training for model: {self.model_type}...")
print(f"Data shape: {X.shape}")
print(f"Number of labels: {len(np.unique(y))}")
# Encode labels
y_encoded = self.label_encoder.fit_transform(y)
# Split data
X_train, X_test, y_train, y_test = train_test_split(
X, y_encoded, test_size=test_size, random_state=42, stratify=y_encoded
)
print(f"Train set size: {X_train.shape[0]}")
print(f"Test set size: {X_test.shape[0]}")
# standardize features
X_train_scaled = self.scaler.fit_transform(X_train)
X_test_scaled = self.scaler.transform(X_test)
# If using distillation process: train RF teacher first, then train MLPRegressor student to fit teacher's predict_proba
if self.model_type == 'distilled_rf':
print("Using distillation: train RandomForest teacher, then fit an MLPRegressor student to teacher soft labels")
# Train teacher
teacher = self._get_model()
teacher.fit(X_train_scaled, y_train)
# Get teacher's probability distribution as soft labels
y_train_proba = teacher.predict_proba(X_train_scaled)
# Create and train student (MLPRegressor) to fit probability vectors
student = MLPRegressor(hidden_layer_sizes=(128, 64, 32),
activation='relu',
solver='adam',
max_iter=1000,
learning_rate_init=0.001,
random_state=42,
early_stopping=True,
validation_fraction=0.1)
print("Training student model to fit teacher probability outputs...")
print(f"Teacher probability output shape: {y_train_proba.shape}")
# Multi-output regression, target is probability vector
student.fit(X_train_scaled, y_train_proba)
# Save models
self.model = teacher
self.student_model = student
# Use student to predict on train/test sets
y_train_pred_proba = student.predict(X_train_scaled)
y_test_pred_proba = student.predict(X_test_scaled)
# Apply softmax to ensure probabilities sum to 1
def softmax(x):
exp_x = np.exp(x - np.max(x, axis=1, keepdims=True))
return exp_x / np.sum(exp_x, axis=1, keepdims=True)
y_train_pred_proba = softmax(y_train_pred_proba)
y_test_pred_proba = softmax(y_test_pred_proba)
y_train_pred = np.argmax(y_train_pred_proba, axis=1)
y_test_pred = np.argmax(y_test_pred_proba, axis=1)
print(f"Student predicted probability shape: {y_test_pred_proba.shape}")
print(f"Student training accuracy: {accuracy_score(y_train, y_train_pred):.4f}")
else:
# Standard flow: train a single model
self.model = self._get_model()
self.model.fit(X_train_scaled, y_train)
y_train_pred = self.model.predict(X_train_scaled)
y_test_pred = self.model.predict(X_test_scaled)
# compute accuracies
train_accuracy = accuracy_score(y_train, y_train_pred)
test_accuracy = accuracy_score(y_test, y_test_pred)
# cross validation on the model used for training
# if student_model exists, still use teacher for cross-val
cv_model = self.model if self.model is not None else None
if cv_model is not None:
cv_scores = cross_val_score(cv_model, X_train_scaled, y_train, cv=5)
else:
cv_scores = np.array([])
print("\nTraining results:")
print(f"Train accuracy: {train_accuracy:.4f}")
print(f"Test accuracy: {test_accuracy:.4f}")
print(f"5-fold CV accuracy: {cv_scores.mean():.4f} Β± {cv_scores.std():.4f}")
# classification report
print("\nTest set classification report:")
target_names = self.label_encoder.classes_
print(classification_report(y_test, y_test_pred, target_names=target_names))
# confusion matrix
cm = confusion_matrix(y_test, y_test_pred)
return {
'train_accuracy': train_accuracy,
'test_accuracy': test_accuracy,
'cv_scores': cv_scores,
'confusion_matrix': cm,
'target_names': target_names,
'X_test': X_test_scaled,
'y_test': y_test,
'y_test_pred': y_test_pred
}
def save_model(self, filepath):
"""Save trained model to disk."""
model_data = {
'model': self.model,
'scaler': self.scaler,
'label_encoder': self.label_encoder,
'model_type': self.model_type,
'target_joints': self.target_joints,
'feature_columns': self.feature_columns
}
joblib.dump(model_data, filepath)
print(f"Model saved to: {filepath}")
def load_model(self, filepath):
"""Load trained model from disk."""
model_data = joblib.load(filepath)
self.model = model_data['model']
self.scaler = model_data['scaler']
self.label_encoder = model_data['label_encoder']
self.model_type = model_data['model_type']
self.target_joints = model_data['target_joints']
self.feature_columns = model_data['feature_columns']
print(f"Model loaded from: {filepath}")
def predict(self, X):
"""Run prediction on input features."""
if self.model is None and self.student_model is None:
raise ValueError("Model not trained or loaded")
X_scaled = self.scaler.transform(X)
# Prefer to use student_model (if exists) to generate probability output
if self.student_model is not None:
proba = self.student_model.predict(X_scaled) # Returns probability vector
preds = np.argmax(proba, axis=1)
labels = self.label_encoder.inverse_transform(preds)
return labels, proba
# Otherwise fall back to original model
predictions = self.model.predict(X_scaled)
probabilities = None
if hasattr(self.model, 'predict_proba'):
probabilities = self.model.predict_proba(X_scaled)
return self.label_encoder.inverse_transform(predictions), probabilities
def predict_single_json(self, json_path):
"""
Predict pose class for a single JSON file.
Args:
json_path: path to the JSON file
Returns:
dict: prediction details or error information
"""
if self.model is None:
raise ValueError("Model not trained or loaded")
try:
# Read JSON file
with open(json_path, 'r', encoding='utf-8') as f:
data = json.load(f)
landmarks = data.get('landmarks', {})
# Extract coordinates of target joints
features = []
missing_joints = []
available_joints = []
for joint in self.target_joints:
if joint in landmarks:
joint_data = landmarks[joint]
features.extend([
joint_data.get('x', 0.0),
joint_data.get('y', 0.0),
joint_data.get('z', 0.0)
])
available_joints.append(joint)
else:
# If a joint is missing, fill with zeros
features.extend([0.0, 0.0, 0.0])
missing_joints.append(joint)
if len(features) != len(self.feature_columns):
raise ValueError(f"Feature dimension mismatch: expected {len(self.feature_columns)}, got {len(features)}")
# Convert to numpy array and predict
X = np.array([features])
predictions, probabilities = self.predict(X)
# build result dict
result = {
'file_path': str(json_path),
'file_name': Path(json_path).name,
'predicted_label': predictions[0],
'confidence_scores': {},
'available_joints': available_joints,
'missing_joints': missing_joints,
'joint_coverage': f"{len(available_joints)}/{len(self.target_joints)}"
}
# add per-class confidence scores
if probabilities is not None:
for i, label in enumerate(self.label_encoder.classes_):
result['confidence_scores'][label] = float(probabilities[0][i])
# highest confidence
max_prob_idx = np.argmax(probabilities[0])
result['max_confidence'] = float(probabilities[0][max_prob_idx])
return result
except Exception as e:
return {
'file_path': str(json_path),
'file_name': Path(json_path).name,
'error': str(e),
'predicted_label': None
}
def evaluate_test_directory(self, test_dir):
"""
Evaluate all data in a test directory.
Args:
test_dir: path to the test data directory
Returns:
dict: dictionary containing detailed evaluation results
"""
if self.model is None:
raise ValueError("Model not trained or loaded")
test_path = Path(test_dir)
if not test_path.exists():
raise ValueError(f"Test directory does not exist: {test_dir}")
# start timing
start_time = time.time()
print(f"Starting evaluation on test dataset: {test_path}")
print(f"Start time: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time))}")
# store all prediction results
all_results = []
label_stats = {}
total_prediction_time = 0.0
prediction_count = 0
# iterate over label folders
for label_dir in test_path.iterdir():
if not label_dir.is_dir() or not label_dir.name.startswith('label_'):
continue
true_label = label_dir.name
json_files = list(label_dir.glob('*.json'))
print(f"Evaluating {true_label}: {len(json_files)} files")
label_stats[true_label] = {
'total': len(json_files),
'correct': 0,
'incorrect': 0,
'errors': 0,
'predictions': {},
'confidence_scores': [],
'prediction_times': []
}
for json_file in json_files:
# Single prediction timing
pred_start_time = time.time()
result = self.predict_single_json(json_file)
pred_end_time = time.time()
single_prediction_time = pred_end_time - pred_start_time
total_prediction_time += single_prediction_time
prediction_count += 1
if 'error' in result:
label_stats[true_label]['errors'] += 1
print(f" Error: {json_file.name} - {result['error']}")
continue
predicted_label = result['predicted_label']
is_correct = predicted_label == true_label
if is_correct:
label_stats[true_label]['correct'] += 1
else:
label_stats[true_label]['incorrect'] += 1
# Count prediction distribution
if predicted_label not in label_stats[true_label]['predictions']:
label_stats[true_label]['predictions'][predicted_label] = 0
label_stats[true_label]['predictions'][predicted_label] += 1
# Record confidence and prediction time
if 'max_confidence' in result:
label_stats[true_label]['confidence_scores'].append(result['max_confidence'])
label_stats[true_label]['prediction_times'].append(single_prediction_time)
# Save detailed result
all_results.append({
'file_path': str(json_file),
'file_name': json_file.name,
'true_label': true_label,
'predicted_label': predicted_label,
'is_correct': is_correct,
'confidence': result.get('max_confidence', 0.0),
'confidence_scores': result.get('confidence_scores', {}),
'joint_coverage': result.get('joint_coverage', '0/13'),
'prediction_time': single_prediction_time
})
# end timing
end_time = time.time()
total_execution_time = end_time - start_time
# compute aggregate statistics
total_samples = sum(stats['total'] for stats in label_stats.values())
total_correct = sum(stats['correct'] for stats in label_stats.values())
total_errors = sum(stats['errors'] for stats in label_stats.values())
total_tested = total_samples - total_errors
overall_accuracy = total_correct / total_tested if total_tested > 0 else 0.0
avg_prediction_time = total_prediction_time / prediction_count if prediction_count > 0 else 0.0
# build confusion matrix
confusion_matrix = {}
for true_label in label_stats.keys():
confusion_matrix[true_label] = {}
for predicted_label in label_stats.keys():
confusion_matrix[true_label][predicted_label] = 0
for result in all_results:
if result.get('is_correct') is not None: # exclude error cases
true_label = result['true_label']
predicted_label = result['predicted_label']
confusion_matrix[true_label][predicted_label] += 1
return {
'label_stats': label_stats,
'overall_accuracy': overall_accuracy,
'total_samples': total_samples,
'total_correct': total_correct,
'total_errors': total_errors,
'total_tested': total_tested,
'confusion_matrix': confusion_matrix,
'detailed_results': all_results,
'timing_stats': {
'total_execution_time': total_execution_time,
'total_prediction_time': total_prediction_time,
'avg_prediction_time': avg_prediction_time,
'prediction_count': prediction_count,
'start_time': start_time,
'end_time': end_time,
'overhead_time': total_execution_time - total_prediction_time
}
}
def print_evaluation_report(self, eval_results):
"""
Print a detailed evaluation report.
Args:
eval_results: dictionary returned by evaluate_test_directory
"""
timing_stats = eval_results.get('timing_stats', {})
print("\n" + "=" * 80)
print("Test dataset evaluation report")
print("=" * 80)
# Overall statistics
print(f"Total samples: {eval_results['total_samples']}")
print(f"Successfully tested: {eval_results['total_tested']}")
print(f"Errors: {eval_results['total_errors']}")
print(
f"Overall accuracy: {eval_results['overall_accuracy']:.4f} "
f"({eval_results['total_correct']}/{eval_results['total_tested']})"
)
# Timing statistics
if timing_stats:
total_time = timing_stats['total_execution_time']
prediction_time = timing_stats['total_prediction_time']
avg_time = timing_stats['avg_prediction_time']
overhead_time = timing_stats['overhead_time']
prediction_count = timing_stats['prediction_count']
print("\nTiming statistics:")
print("-" * 50)
print(f"Total execution time: {total_time:.4f} s")
print(f"Total prediction time: {prediction_time:.4f} s")
print(f"Overhead time: {overhead_time:.4f} s")
print(f"Average prediction time: {avg_time * 1000:.2f} ms")
print(f"Prediction throughput: {prediction_count / total_time:.2f} preds/s")
print(
f"Prediction efficiency: {(prediction_time / total_time) * 100:.1f}% "
f"(prediction time / total)"
)
# Per-label detailed statistics
print("\nPer-label stats:")
print("-" * 80)
print(
f"{'Label':<10} {'Total':<6} {'Correct':<6} {'Wrong':<6} "
f"{'Accuracy':<8} {'AvgConf':<10} {'AvgPredTime':<12}"
)
print("-" * 80)
for label, stats in sorted(eval_results['label_stats'].items()):
accuracy = (
stats['correct'] / (stats['total'] - stats['errors'])
if (stats['total'] - stats['errors']) > 0
else 0.0
)
avg_confidence = (
np.mean(stats['confidence_scores']) if stats['confidence_scores'] else 0.0
)
avg_pred_time = (
np.mean(stats['prediction_times'])
if 'prediction_times' in stats and stats['prediction_times']
else 0.0
)
print(
f"{label:<10} {stats['total']:<6} {stats['correct']:<6} {stats['incorrect']:<6} "
f"{accuracy:.4f} {avg_confidence:.4f} {avg_pred_time * 1000:.2f}ms"
)
# Confusion matrix
print("\nConfusion matrix:")
print("-" * 60)
labels = sorted(eval_results['label_stats'].keys())
# Header row
print(f"{'True\\Pred':<12}", end="")
for label in labels:
print(f"{label:<10}", end="")
print()
# Data rows
for true_label in labels:
print(f"{true_label:<12}", end="")
for pred_label in labels:
count = eval_results['confusion_matrix'][true_label][pred_label]
print(f"{count:<10}", end="")
print()
# Per-label prediction distribution
print("\nPer-label prediction distribution:")
print("-" * 80)
for true_label, stats in sorted(eval_results['label_stats'].items()):
if stats['predictions']:
print(f"{true_label}:")
total_predictions = sum(stats['predictions'].values())
for pred_label, count in sorted(stats['predictions'].items()):
percentage = (count / total_predictions) * 100
print(f" -> {pred_label}: {count} ({percentage:.1f}%)")
# Error analysis
print("\nError analysis:")
print("-" * 40)
incorrect_results = [r for r in eval_results['detailed_results'] if not r['is_correct']]
if incorrect_results:
# Sort by confidence and show top mistaken predictions
incorrect_results.sort(key=lambda x: x['confidence'], reverse=True)
print("Highest-confidence incorrect predictions (top 10):")
for i, result in enumerate(incorrect_results[:10]):
pred_time = result.get('prediction_time', 0) * 1000 # ms
print(
f"{i + 1:2d}. {result['file_name']}: {result['true_label']} -> {result['predicted_label']} "
f"(conf: {result['confidence']:.4f}, time: {pred_time:.2f}ms)"
)
else:
print("No incorrect predictions found.")
# Performance analysis
if timing_stats and eval_results['detailed_results']:
print("\nPerformance analysis:")
print("-" * 40)
prediction_times = [
r.get('prediction_time', 0) for r in eval_results['detailed_results'] if 'prediction_time' in r
]
if prediction_times:
min_time = min(prediction_times) * 1000
max_time = max(prediction_times) * 1000
median_time = np.median(prediction_times) * 1000
std_time = np.std(prediction_times) * 1000
print("Prediction time distribution:")
print(f" Fastest: {min_time:.2f}ms")
print(f" Slowest: {max_time:.2f}ms")
print(f" Median: {median_time:.2f}ms")
print(f" Stddev: {std_time:.2f}ms")
print("\n" + "=" * 80)
def plot_confusion_matrix(self, cm, target_names, save_path=None):
"""Plot confusion matrix."""
plt.figure(figsize=(10, 8))
if SEABORN_AVAILABLE:
sns.heatmap(
cm,
annot=True,
fmt='d',
cmap='Blues',
xticklabels=target_names,
yticklabels=target_names,
)
else:
# Fallback using matplotlib only
im = plt.imshow(cm, interpolation='nearest', cmap=plt.cm.Blues)
plt.colorbar(im)
tick_marks = np.arange(len(target_names))
plt.xticks(tick_marks, target_names, rotation=45, ha='right')
plt.yticks(tick_marks, target_names)
# Annotate cells
thresh = cm.max() / 2.0 if cm.size else 0
for i in range(cm.shape[0]):
for j in range(cm.shape[1]):
plt.text(j, i, format(cm[i, j], 'd'),
ha="center", va="center",
color="white" if cm[i, j] > thresh else "black")
plt.title(f"{self.model_type.title()} model confusion matrix")
plt.xlabel('Predicted')
plt.ylabel('True')
if save_path:
plt.savefig(save_path, dpi=300, bbox_inches='tight')
print(f"Confusion matrix saved to: {save_path}")
plt.show()
def export_to_onnx(self, model_type='random_forest', output_path=None):
"""
Export the trained model to ONNX format (only models supported by Barracuda).
Note: Barracuda does not support LinearClassifier layers (e.g., LogisticRegression/SVM) β only tree models are supported.
"""
if not ONNX_AVAILABLE:
print("Error: ONNX export is unavailable. Please install skl2onnx and onnx packages:")
print("pip install skl2onnx onnx")
return None
if not hasattr(self, 'model') or self.model is None:
print("Error: Model is not trained yet. Please train the model first.")
return None
# Check if current model type matches requested export type
if hasattr(self, 'model_type') and self.model_type != model_type:
print(f"Warning: Currently trained {self.model_type} model, but requested to export {model_type} model")
print(f"Will export currently trained {self.model_type} model")
model_name = self.model_type
else:
model_name = model_type
# Barracuda only supports tree models, not LinearClassifier
if model_name in ['logistic', 'svm']:
print(f"β Barracuda/Unity does not support ONNX import for {model_name} models (LinearClassifier layer).")
print("Please use random_forest or gradient_boost for export.")
return None
# If student_model exists -> export student_model (MLP), otherwise export self.model
model_to_export = None
export_name = None
if self.student_model is not None:
model_to_export = self.student_model
export_name = 'distilled_mlp'
print("Detected student_model. Exporting student (MLP) to ONNX (suitable for Unity/Barracuda).")
else:
model_to_export = self.model
export_name = model_name
if model_to_export is None:
print("Error: No model available for export.")
return None
# Generate output file path
if output_path is None:
output_path = f"pose_classifier_{export_name}.onnx"
print(f"About to export model to: {output_path}, export target: {export_name}")
try:
feature_count = len(self.target_joints) * 3
initial_type = [('float_input', FloatTensorType([None, feature_count]))]
onnx_model = convert_sklearn(
model_to_export,
initial_types=initial_type,
target_opset=12
)
with open(output_path, "wb") as f:
f.write(onnx_model.SerializeToString())
print(f"β
Successfully exported {export_name} model to ONNX format: {output_path}")
# Save label mapping and Scaler parameters
label_mapping_path = output_path.replace('.onnx', '_labels.json')
label_mapping = {
'label_encoder_classes': self.label_encoder.classes_.tolist(),
'model_type': export_name,
'feature_count': feature_count,
'target_joints': self.target_joints,
'description': f'Pose classifier - {len(self.target_joints)} landmarks with x,y,z coordinates',
'scaler_mean': self.scaler.mean_.tolist(),
'scaler_scale': self.scaler.scale_.tolist()
}
with open(label_mapping_path, 'w', encoding='utf-8') as f:
json.dump(label_mapping, f, ensure_ascii=False, indent=2)
print(f"β
Label mapping and scaler parameters saved to: {label_mapping_path}")
print("β οΈ Note: The exported ONNX expects inputs to be standardized with scaler_mean/scaler_scale.")
return output_path
except Exception as e:
print(f"β ONNX export failed: {str(e)}")
import traceback
traceback.print_exc()
return None
def export_to_tflite(self, output_path=None):
"""
Export student_model (MLP) to TFLite format.
Dependencies: skl2onnx, onnx, onnx-tf, tensorflow
"""
if self.student_model is None:
print("β Only exporting student_model (MLPRegressor) to TFLite is supported. Please train with --model distilled_rf first.")
return None
try:
import onnx
from skl2onnx import convert_sklearn
from skl2onnx.common.data_types import FloatTensorType
from onnx_tf.backend import prepare
import tensorflow as tf
except ImportError:
print("β You need to install skl2onnx, onnx, onnx-tf, tensorflow.")
print("pip install skl2onnx onnx onnx-tf tensorflow")
return None
feature_count = len(self.target_joints) * 3
initial_type = [('float_input', FloatTensorType([None, feature_count]))]
# 1. Export to ONNX
print("Exporting student_model to ONNX...")
onnx_model = convert_sklearn(
self.student_model,
initial_types=initial_type,
target_opset=12
)
onnx_path = "temp_student.onnx"
with open(onnx_path, "wb") as f:
f.write(onnx_model.SerializeToString())
print(f"β
ONNX export successful: {onnx_path}")
# 2. ONNX -> TensorFlow SavedModel
print("Converting ONNX to TensorFlow SavedModel...")
tf_model = prepare(onnx.load(onnx_path))
tf_saved_path = "temp_student_tf"
tf_model.export_graph(tf_saved_path)
print(f"β
SavedModel export successful: {tf_saved_path}")
# 3. SavedModel -> TFLite
print("Converting SavedModel to TFLite...")
converter = tf.lite.TFLiteConverter.from_saved_model(tf_saved_path)
tflite_model = converter.convert()
if output_path is None:
output_path = "pose_classifier_distilled_mlp.tflite"
with open(output_path, "wb") as f:
f.write(tflite_model)
print(f"β
TFLite export successful: {output_path}")
# Cleanup temporary files (optional)
import os
os.remove(onnx_path)
import shutil
shutil.rmtree(tf_saved_path, ignore_errors=True)
return output_path
def main():
parser = argparse.ArgumentParser(description="Pose classification machine learning script")
parser.add_argument("--data", "-d", default="PoseData", help="Pose data directory (default: PoseData)")
parser.add_argument(
"--model",
"-m",
choices=['random_forest', 'svm', 'gradient_boost', 'logistic', 'distilled_rf'],
default='random_forest',
help="Model type (default: random_forest)",
)
parser.add_argument("--test-size", "-t", type=float, default=0.2, help="Test set ratio (default: 0.2)")
parser.add_argument("--save-model", "-s", help="Path to save the trained model")
parser.add_argument("--load-model", "-l", help="Path to load an already trained model")
parser.add_argument("--predict", "-p", help="Path of a single JSON file to predict")
parser.add_argument("--evaluate", "-e", help="Path of a test directory to evaluate all JSON files")
parser.add_argument("--no-plot", action="store_true", help="Do not display confusion matrix plot")
parser.add_argument("--train", action="store_true", help="Force training even if --load-model is provided")
parser.add_argument("--export-onnx", help="Export model to ONNX format; specify output file path")
parser.add_argument(
"--export-model-type",
choices=['random_forest', 'logistic', 'distilled_rf'],
default='random_forest',
help="Model type to export (default: random_forest)",
)
parser.add_argument("--test-onnx", help="Test an ONNX model; specify ONNX file path")
parser.add_argument("--onnx-labels", help="ONNX label mapping JSON path (auto-detect if not provided)")
parser.add_argument("--onnx-test-data", help="ONNX batch test data directory (if not provided, single-sample test)")
parser.add_argument(
"--export-tflite",
help="Export model to TFLite format; specify output path (supported for distilled_rf student model only)",
)
args = parser.parse_args()
print("Pose classification ML tool")
print("=" * 60)
# If ONNX test mode
if args.test_onnx:
print("ONNX model test mode")
print(f"ONNX model: {args.test_onnx}")
print("=" * 60)
# Create classifier instance for testing
classifier = PoseClassifier()
# Note: test_onnx_model is not implemented in this script; this is a placeholder.
# You can implement it later if needed.
print("ONNX test requested but functionality is not implemented in this script.")
return
# If evaluation mode
if args.evaluate:
if not args.load_model:
# Try to use default model file
default_model = f"pose_classifier_{args.model}.pkl"
if Path(default_model).exists():
args.load_model = default_model
else:
print(
f"Error: Need to specify model file path (--load-model) or ensure default model file exists: {default_model}"
)
return
print("Evaluation mode")
print(f"Test data directory: {args.evaluate}")
print(f"Model file: {args.load_model}")
print("=" * 60)
# Create classifier and load model
classifier = PoseClassifier(model_type=args.model)
classifier.load_model(args.load_model)
# Perform comprehensive evaluation
try:
eval_results = classifier.evaluate_test_directory(args.evaluate)
classifier.print_evaluation_report(eval_results)
except Exception as e:
print(f"Error during evaluation: {e}")
return
# Prediction-only mode
if args.predict:
if not args.load_model:
# Try to use default model file
default_model = f"pose_classifier_{args.model}.pkl"
if Path(default_model).exists():
args.load_model = default_model
else:
print(
f"Error: Need to specify model file path (--load-model) or ensure default model file exists: {default_model}"
)
return
print("Prediction mode")
print(f"JSON file: {args.predict}")
print(f"Model file: {args.load_model}")
print("=" * 60)
# Create classifier and load model
classifier = PoseClassifier(model_type=args.model)
classifier.load_model(args.load_model)
# Run prediction
result = classifier.predict_single_json(args.predict)
# Show prediction result
print("\nPrediction result:")
print(f"File: {result['file_name']}")
if 'error' in result:
print(f"Error: {result['error']}")
else:
print(f"Predicted label: {result['predicted_label']}")
print(f"Joint coverage: {result['joint_coverage']}")
if result['confidence_scores']:
print(f"Max confidence: {result['max_confidence']:.4f}")
print("\nPer-class confidence:")
sorted_scores = sorted(result['confidence_scores'].items(), key=lambda x: x[1], reverse=True)
for label, score in sorted_scores:
print(f" {label}: {score:.4f}")
if result['missing_joints']:
print(f"\nMissing joints: {', '.join(result['missing_joints'])}")
return
# Training mode
print("Training mode")
print(f"Data directory: {args.data}")
print(f"Model type: {args.model}")
print(f"Test size: {args.test_size}")
print("=" * 60)
# Check data directory
if not Path(args.data).exists():
print(f"Error: data directory does not exist: {args.data}")
return
# Create classifier
classifier = PoseClassifier(model_type=args.model)
# If loading an existing model and not forcing training
if args.load_model and not args.train:
print(f"Loading existing model: {args.load_model}")
classifier.load_model(args.load_model)
print("Model loaded, skipping training step")
else:
# Load data
X, y = classifier.load_data(args.data)
if len(X) == 0:
print("Error: no valid data found")
return
# Train model
results = classifier.train(X, y, test_size=args.test_size)
# Plot confusion matrix (if not disabled)
if not args.no_plot:
try:
classifier.plot_confusion_matrix(
results['confusion_matrix'], results['target_names'], save_path=f"confusion_matrix_{args.model}.png"
)
except Exception as e:
print(f"Error while plotting confusion matrix: {e}")
# Save model (if specified)
if args.save_model:
classifier.save_model(args.save_model)
else:
# Default save path
default_path = f"pose_classifier_{args.model}.pkl"
classifier.save_model(default_path)
print("\nTraining complete!")
print(f"Final test accuracy: {results['test_accuracy']:.4f}")
# Export ONNX if requested
if args.export_onnx:
print(f"\nExporting {args.export_model_type} model to ONNX format...")
onnx_path = classifier.export_to_onnx(model_type=args.export_model_type, output_path=args.export_onnx)
if onnx_path:
print(f"β
ONNX model exported: {onnx_path}")
# Export TFLite if requested
if args.export_tflite:
print("\nExporting student_model to TFLite format...")
tflite_path = classifier.export_to_tflite(output_path=args.export_tflite)
if tflite_path:
print(f"β
TFLite model exported: {tflite_path}")
if __name__ == "__main__":
main()
|