#!/usr/bin/env python3 """ Standardized Timing Benchmarking Framework for Classification Models This framework provides fair and consistent timing benchmarks for comparing classification models (A4, A5, A5b, A6) with metrics for: - Inference time (mean, std, min, max, percentiles) - Memory usage - Prediction accuracy - Model size - Feature extraction time Usage: python benchmark_timing.py [--samples N] [--repeats M] [--output FILE] Author: Benchmark Framework v1.0 """ import os import sys import pickle import time import tracemalloc import warnings import json import numpy as np import pandas as pd from pathlib import Path from datetime import datetime from sklearn.preprocessing import StandardScaler from sklearn.model_selection import train_test_split from typing import Dict, List, Tuple, Optional, Any from dataclasses import dataclass, field, asdict from collections import defaultdict import statistics # Suppress warnings for cleaner output warnings.filterwarnings('ignore') # Add project root to path project_root = os.path.abspath(os.path.dirname(__file__)) sys.path.insert(0, project_root) # Import model paths from all_classification import ( a4_rf, a5_ensemnble, a5b_adaboost, a5b_bagging_tree, a6_svm ) # Import custom classes for unpickling from adaboost_classes import ( AdaBoostEnsemble, WeightedDecisionTree ) # ============================================================================ # Configuration # ============================================================================ REPO_ROOT = os.path.abspath(os.path.join(project_root, '..')) DATA_DIR = os.path.join(REPO_ROOT, 'Datasets_all') OUTPUT_DIR = os.path.join(project_root, 'benchmark_results') # Weaklink categories (14 classes) WEAKLINK_CATEGORIES = [ 'ExcessiveForwardLean', 'ForwardHead', 'LeftArmFallForward', 'LeftAsymmetricalWeightShift', 'LeftHeelRises', 'LeftKneeMovesInward', 'LeftKneeMovesOutward', 'LeftShoulderElevation', 'RightArmFallForward', 'RightAsymmetricalWeightShift', 'RightHeelRises', 'RightKneeMovesInward', 'RightKneeMovesOutward', 'RightShoulderElevation' ] # Duplicate NASM columns DUPLICATE_NASM_COLS = [ 'No_1_NASM_Deviation', 'No_2_NASM_Deviation', 'No_3_NASM_Deviation', 'No_4_NASM_Deviation', 'No_5_NASM_Deviation', ] EXCLUDE_COLS = ['ID', 'WeakestLink', 'EstimatedScore'] EXPECTED_CLASSES = WEAKLINK_CATEGORIES.copy() # Benchmark parameters DEFAULT_NUM_SAMPLES = 100 DEFAULT_NUM_REPEATES = 10 DEFAULT_OUTPUT_FILE = None # ============================================================================ # Data Classes for Results # ============================================================================ @dataclass class ModelMetrics: """Metrics for a single model benchmark.""" model_name: str model_path: str # Timing metrics (seconds) inference_time_mean: float = 0.0 inference_time_std: float = 0.0 inference_time_min: float = 0.0 inference_time_max: float = 0.0 inference_time_p50: float = 0.0 inference_time_p95: float = 0.0 inference_time_p99: float = 0.0 # Memory metrics (bytes) memory_usage_mean: float = 0.0 memory_usage_std: float = 0.0 memory_usage_peak: float = 0.0 # Prediction metrics accuracy: float = 0.0 predictions_correct: int = 0 predictions_total: int = 0 # Model characteristics model_size_bytes: int = 0 num_features: int = 0 num_parameters: int = 0 model_type: str = "" # Feature extraction time (seconds) feature_extraction_time_mean: float = 0.0 # Raw timing samples timing_samples: List[float] = field(default_factory=list) memory_samples: List[float] = field(default_factory=list) # Status status: str = "SUCCESS" error_message: str = "" @dataclass class BenchmarkResults: """Complete benchmark results for all models.""" timestamp: str num_samples: int num_repeats: int models: Dict[str, ModelMetrics] = field(default_factory=dict) def to_dict(self) -> Dict[str, Any]: """Convert to dictionary for JSON serialization.""" return { 'timestamp': self.timestamp, 'num_samples': self.num_samples, 'num_repeats': self.num_repeats, 'models': { name: { **asdict(metrics), 'timing_samples': list(metrics.timing_samples), 'memory_samples': list(metrics.memory_samples) } for name, metrics in self.models.items() } } def to_json(self, filepath: Optional[str] = None) -> str: """Export to JSON string or file.""" data = self.to_dict() json_str = json.dumps(data, indent=2, default=str) if filepath: os.makedirs(os.path.dirname(filepath) or '.', exist_ok=True) with open(filepath, 'w') as f: f.write(json_str) return json_str # ============================================================================ # Data Loading Functions # ============================================================================ def load_and_prepare_data() -> Dict[str, Any]: """Load and prepare data following the same pipeline as classification_baseline.py. Returns: Dictionary containing: - feature_columns: List of feature column names - scaler: Fitted StandardScaler - X_train, X_test: Feature matrices (unscaled) - y_train, y_test: Target arrays - merged_df: Merged dataframe """ # Load datasets movement_features_df = pd.read_csv(os.path.join(DATA_DIR, 'aimoscores.csv')) weaklink_scores_df = pd.read_csv(os.path.join(DATA_DIR, 'scores_and_weaklink.csv')) print(f' Movement features shape: {movement_features_df.shape}') print(f' Weak link scores shape: {weaklink_scores_df.shape}') # Create WeakestLink target column weaklink_scores_df['WeakestLink'] = ( weaklink_scores_df[WEAKLINK_CATEGORIES].idxmax(axis=1) ) # Merge datasets target_df = weaklink_scores_df[['ID', 'WeakestLink']].copy() merged_df = movement_features_df.merge(target_df, on='ID', how='inner') print(f' Merged dataset shape: {merged_df.shape}') # Extract feature columns - include ALL columns except EXCLUDE_COLS feature_columns = [c for c in merged_df.columns if c not in EXCLUDE_COLS] X = merged_df[feature_columns].values y = merged_df['WeakestLink'].values print(f' Feature matrix shape: {X.shape}') print(f' Number of features: {len(feature_columns)}') print(f' Number of classes: {len(np.unique(y))}') # Create train/test split X_train, X_test, y_train, y_test = train_test_split( X, y, test_size=0.2, random_state=42, stratify=y ) # Fit scaler on training data scaler = StandardScaler() X_train_scaled = scaler.fit_transform(X_train) X_test_scaled = scaler.transform(X_test) return { 'feature_columns': feature_columns, 'scaler': scaler, 'X_train': X_train, 'X_train_scaled': X_train_scaled, 'y_train': y_train, 'X_test': X_test, 'X_test_scaled': X_test_scaled, 'y_test': y_test, 'merged_df': merged_df, } def create_samples_from_test_data( data: Dict[str, Any], num_samples: int ) -> Tuple[np.ndarray, np.ndarray]: """Create samples from test data for benchmarking. Args: data: Dictionary from load_and_prepare_data() num_samples: Number of samples to select Returns: Tuple of (sample_features, true_labels) """ # Use test data for benchmarking X_test = data['X_test'] y_test = data['y_test'] # Select first num_samples from test set n_samples = min(num_samples, len(X_test)) sample_features = X_test[:n_samples] true_labels = y_test[:n_samples] return sample_features, true_labels # ============================================================================ # Model Loading Functions # ============================================================================ def load_model(model_path: str, model_name: str) -> Tuple[Any, Optional[Any], Optional[List[str]], Any]: """Load a model from a pickle file. Args: model_path: Path to the pickle file model_name: Name of the model for logging Returns: Tuple of (model, scaler, feature_columns, artifact) """ full_path = os.path.join(project_root, model_path) if not os.path.exists(full_path): print(f" ⚠️ Model file not found: {full_path}") return None, None, None, None try: with open(full_path, 'rb') as f: artifact = pickle.load(f) # Extract model and scaler based on artifact structure if isinstance(artifact, dict): model = artifact.get('model') scaler = artifact.get('scaler') feature_columns = artifact.get('feature_columns') else: # A6 SVM is a Pipeline object model = artifact scaler = None feature_columns = None # Extract scaler from pipeline if it exists if hasattr(model, 'steps') and len(model.steps) >= 1: for step_name, step_obj in model.steps: if hasattr(step_obj, 'transform'): if hasattr(step_obj, 'n_features_in_') and not hasattr(step_obj, 'predict'): scaler = step_obj break # Extract feature columns from scaler if hasattr(model, 'steps') and len(model.steps) > 0: first_step = model.steps[0][1] if hasattr(first_step, 'get_feature_names_out'): try: names = first_step.get_feature_names_out() import re if not all(re.fullmatch(r'x\d+', n) for n in names): feature_columns = names except: pass print(f" ✓ Loaded {model_name}") return model, scaler, feature_columns, artifact except Exception as e: print(f" ✗ Error loading {model_name}: {e}") return None, None, None, None def get_model_info(model: Any) -> Dict[str, Any]: """Extract model information for benchmarking. Args: model: The trained model Returns: Dictionary with model characteristics """ info = { 'model_type': type(model).__name__, 'num_parameters': 0, 'num_features': 0 } # Count parameters based on model type if hasattr(model, 'n_estimators'): info['num_parameters'] += getattr(model, 'n_estimators', 0) if hasattr(model, 'estimators_'): info['num_parameters'] += len(getattr(model, 'estimators_', [])) if hasattr(model, 'n_features_in_'): info['num_features'] = model.n_features_in_ if hasattr(model, 'classes_'): info['num_classes'] = len(model.classes_) # For ensemble models if hasattr(model, 'estimators_'): for est in getattr(model, 'estimators_', []): if hasattr(est, 'n_features_in_'): info['num_features'] = est.n_features_in_ break return info # ============================================================================ # Benchmarking Functions # ============================================================================ def measure_inference_time( model: Any, scaler: Optional[Any], sample_features: np.ndarray, model_feature_columns: Optional[List[str]], feature_columns: List[str], num_repeats: int, single_sample_mode: bool = False ) -> Tuple[List[float], List[float], Optional[str]]: """Measure inference time for a model. Args: model: The trained model scaler: Scaler for feature preprocessing sample_features: Input features model_feature_columns: Expected feature columns for the model feature_columns: All available feature columns num_repeats: Number of repetitions for averaging single_sample_mode: If True, measure each sample individually (for single sample latency) Returns: Tuple of (timing_samples, memory_samples, error_message) """ timing_samples = [] memory_samples = [] try: # Filter features if needed if model_feature_columns is not None: available_features = [f for f in model_feature_columns if f in feature_columns] if len(available_features) > 0: # Convert column names to indices for numpy array feature_indices = [feature_columns.index(f) for f in available_features] test_features = sample_features[:, feature_indices] else: test_features = sample_features else: # model_feature_columns is None - likely A6 SVM pipeline # Check if we need to drop duplicate NASM columns if hasattr(model, 'steps') and len(model.steps) > 0: first_step = model.steps[0][1] n_expected = getattr(first_step, 'n_features_in_', None) if n_expected is not None: # Identify indices of duplicate NASM columns dup_indices = [i for i, c in enumerate(feature_columns) if c in DUPLICATE_NASM_COLS] # Get all indices except duplicate NASM columns valid_indices = [i for i in range(len(feature_columns)) if i not in dup_indices] if len(valid_indices) == n_expected: # Select only the columns that match expected features test_features = sample_features[:, valid_indices] else: # Fallback: slice to expected number of features test_features = sample_features[:, :n_expected] else: test_features = sample_features else: test_features = sample_features # Handle A6 SVM pipeline (scaler already in pipeline) if model_feature_columns is None and hasattr(model, 'steps'): scaler_to_use = None else: scaler_to_use = scaler # Determine how many predictions to make if single_sample_mode: # For single sample mode: repeat each sample individually num_predictions = num_repeats * len(test_features) else: # For batch mode: num_repeats on all samples num_predictions = num_repeats for i in range(num_predictions): # Start memory tracking tracemalloc.start() start_time = time.perf_counter() # Make prediction if single_sample_mode: # Single sample prediction: use one row at a time single_sample = test_features[i % len(test_features)].reshape(1, -1) if scaler_to_use is not None: features = scaler_to_use.transform(single_sample) else: features = single_sample else: # Batch prediction: use all samples if scaler_to_use is not None: features = scaler_to_use.transform(test_features) else: features = test_features prediction = model.predict(features) end_time = time.perf_counter() current, peak = tracemalloc.get_traced_memory() tracemalloc.stop() # Record measurements timing_samples.append(end_time - start_time) memory_samples.append(peak) return timing_samples, memory_samples, None except Exception as e: return [], [], str(e) def calculate_percentiles(values: List[float]) -> Dict[str, float]: """Calculate percentiles for a list of values. Args: values: List of numeric values Returns: Dictionary with percentile values """ if not values: return { 'p50': 0.0, 'p95': 0.0, 'p99': 0.0 } sorted_values = sorted(values) n = len(sorted_values) return { 'p50': sorted_values[int(n * 0.50)], 'p95': sorted_values[int(n * 0.95)], 'p99': sorted_values[int(n * 0.99)] } def benchmark_single_model( model_name: str, model_path: str, sample_features: np.ndarray, true_labels: np.ndarray, feature_columns: List[str], num_repeats: int, single_sample_mode: bool = False ) -> ModelMetrics: """Benchmark a single model. Args: model_name: Name of the model model_path: Path to the model file sample_features: Input features for benchmarking true_labels: Ground truth labels feature_columns: All available feature columns num_repeats: Number of repetitions single_sample_mode: If True, measure each sample individually (for single sample latency) Returns: ModelMetrics object with benchmark results """ metrics = ModelMetrics(model_name=model_name, model_path=model_path) print(f"\n Benchmarking {model_name}...") # Load model model, scaler, model_feature_columns, artifact = load_model(model_path, model_name) if model is None: metrics.status = "LOAD_ERROR" metrics.error_message = "Failed to load model" return metrics # Get model info model_info = get_model_info(model) metrics.model_type = model_info.get('model_type', type(model).__name__) metrics.num_features = model_info.get('num_features', 0) # Get model size try: model_size = os.path.getsize(os.path.join(project_root, model_path)) metrics.model_size_bytes = model_size except: metrics.model_size_bytes = 0 # Run inference benchmarks timing_samples, memory_samples, error = measure_inference_time( model, scaler, sample_features, model_feature_columns, feature_columns, num_repeats, single_sample_mode=single_sample_mode ) if error: metrics.status = "INFERENCE_ERROR" metrics.error_message = error return metrics # Store raw samples metrics.timing_samples = timing_samples metrics.memory_samples = memory_samples # Calculate timing statistics if timing_samples: metrics.inference_time_mean = statistics.mean(timing_samples) metrics.inference_time_std = statistics.stdev(timing_samples) if len(timing_samples) > 1 else 0.0 metrics.inference_time_min = min(timing_samples) metrics.inference_time_max = max(timing_samples) percentiles = calculate_percentiles(timing_samples) metrics.inference_time_p50 = percentiles['p50'] metrics.inference_time_p95 = percentiles['p95'] metrics.inference_time_p99 = percentiles['p99'] # Calculate memory statistics if memory_samples: metrics.memory_usage_mean = statistics.mean(memory_samples) metrics.memory_usage_std = statistics.stdev(memory_samples) if len(memory_samples) > 1 else 0.0 metrics.memory_usage_peak = max(memory_samples) # Test accuracy on the same samples try: # Filter features for prediction if model_feature_columns is not None: available_features = [f for f in model_feature_columns if f in feature_columns] if len(available_features) > 0: # Convert column names to indices for numpy array feature_indices = [feature_columns.index(f) for f in available_features] test_features = sample_features[:, feature_indices] else: test_features = sample_features else: # model_feature_columns is None - likely A6 SVM pipeline # Check if we need to drop duplicate NASM columns if hasattr(model, 'steps') and len(model.steps) > 0: first_step = model.steps[0][1] n_expected = getattr(first_step, 'n_features_in_', None) if n_expected is not None: # Identify indices of duplicate NASM columns dup_indices = [i for i, c in enumerate(feature_columns) if c in DUPLICATE_NASM_COLS] # Get all indices except duplicate NASM columns valid_indices = [i for i in range(len(feature_columns)) if i not in dup_indices] if len(valid_indices) == n_expected: # Select only the columns that match expected features test_features = sample_features[:, valid_indices] else: # Fallback: slice to expected number of features test_features = sample_features[:, :n_expected] else: test_features = sample_features else: test_features = sample_features # Handle A6 SVM pipeline if model_feature_columns is None and hasattr(model, 'steps'): scaler_to_use = None else: scaler_to_use = scaler if scaler_to_use is not None: features = scaler_to_use.transform(test_features) else: features = test_features predictions = model.predict(features) # Calculate accuracy correct = np.sum(predictions == true_labels) metrics.predictions_correct = int(correct) metrics.predictions_total = len(true_labels) metrics.accuracy = correct / len(true_labels) except Exception as e: print(f" ⚠️ Accuracy calculation failed: {e}") metrics.status = "SUCCESS" return metrics def run_benchmark( num_samples: int = DEFAULT_NUM_SAMPLES, num_repeats: int = DEFAULT_NUM_REPEATES, output_file: Optional[str] = None, single_sample_mode: bool = False ) -> BenchmarkResults: """Run complete benchmark on all models. Args: num_samples: Number of samples to benchmark num_repeats: Number of repetitions per sample output_file: Optional output file path for results single_sample_mode: If True, measure each sample individually (for single sample latency) Returns: BenchmarkResults object with all results """ print("=" * 70) print("STANDARDIZED TIMING BENCHMARKING FRAMEWORK") print("=" * 70) print(f"\nConfiguration:") print(f" Number of samples: {num_samples}") print(f" Number of repeats per sample: {num_repeats}") print(f" Total predictions per model: {num_samples * num_repeats}") print() # Load data print("Loading data...") data = load_and_prepare_data() print() # Create samples sample_features, true_labels = create_samples_from_test_data(data, num_samples) print(f"Created {num_samples} test samples for benchmarking") print() # Define models to benchmark models_to_benchmark = [ ('A4 Random Forest', a4_rf), ('A5 Ensemble', a5_ensemnble), ('A5b Adaboost', a5b_adaboost), ('A5b Bagging Trees', a5b_bagging_tree), ('A6 SVM', a6_svm), ] # Initialize results results = BenchmarkResults( timestamp=datetime.now().isoformat(), num_samples=num_samples, num_repeats=num_repeats ) # Benchmark each model print("=" * 70) print("Running Benchmarks") print("=" * 70) for model_name, model_path in models_to_benchmark: metrics = benchmark_single_model( model_name=model_name, model_path=model_path, sample_features=sample_features, true_labels=true_labels, feature_columns=data['feature_columns'], num_repeats=num_repeats, single_sample_mode=single_sample_mode ) results.models[model_name] = metrics # Print summary for this model print(f"\n {model_name} Results:") print(f" Status: {metrics.status}") if metrics.status == "SUCCESS": print(f" Inference Time:") print(f" Mean: {metrics.inference_time_mean*1000:.3f} ms") print(f" Std: {metrics.inference_time_std*1000:.3f} ms") print(f" P50: {metrics.inference_time_p50*1000:.3f} ms") print(f" P95: {metrics.inference_time_p95*1000:.3f} ms") print(f" P99: {metrics.inference_time_p99*1000:.3f} ms") print(f" Memory Usage:") print(f" Mean: {metrics.memory_usage_mean/1024:.1f} KB") print(f" Peak: {metrics.memory_usage_peak/1024:.1f} KB") print(f" Accuracy: {metrics.accuracy*100:.1f}% ({metrics.predictions_correct}/{metrics.predictions_total})") print(f" Model Size: {metrics.model_size_bytes/1024:.1f} KB") print(f" Features: {metrics.num_features}") else: print(f" Error: {metrics.error_message}") print() # Save results if output_file is None: output_file = os.path.join(OUTPUT_DIR, f"benchmark_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json") json_output = results.to_json(output_file) print(f"Results saved to: {output_file}") return results def run_single_sample_benchmark( num_samples: int = DEFAULT_NUM_SAMPLES, num_repeats: int = DEFAULT_NUM_REPEATES, output_file: Optional[str] = None ) -> BenchmarkResults: """Run benchmark with single sample prediction latency measurement. This function measures the latency for individual predictions rather than batch predictions, giving a more realistic view of single sample performance. Args: num_samples: Number of samples to benchmark num_repeats: Number of repetitions per sample output_file: Optional output file path for results Returns: BenchmarkResults object with all results """ return run_benchmark( num_samples=num_samples, num_repeats=num_repeats, output_file=output_file, single_sample_mode=True ) # ============================================================================ # Comparison and Analysis Functions # ============================================================================ def print_comparison_table(results: BenchmarkResults): """Print a formatted comparison table of all models.""" print("\n" + "=" * 90) print("MODEL COMPARISON SUMMARY") print("=" * 90) # Header print(f"{'Model':<20} {'Time (ms)':<15} {'Std':<10} {'P95':<10} {'Acc (%)':<10} {'Mem (KB)':<12} {'Size (KB)':<12}") print("-" * 90) # Sort by inference time for comparison sorted_models = sorted( results.models.items(), key=lambda x: x[1].inference_time_mean if x[1].status == "SUCCESS" else float('inf') ) for model_name, metrics in sorted_models: if metrics.status == "SUCCESS": time_ms = metrics.inference_time_mean * 1000 std_ms = metrics.inference_time_std * 1000 p95_ms = metrics.inference_time_p95 * 1000 acc = metrics.accuracy * 100 mem_kb = metrics.memory_usage_mean / 1024 size_kb = metrics.model_size_bytes / 1024 print(f"{model_name:<20} {time_ms:<15.3f} {std_ms:<10.3f} {p95_ms:<10.3f} {acc:<10.1f} {mem_kb:<12.1f} {size_kb:<12.1f}") else: print(f"{model_name:<20} {'ERROR':<15} {'-':<10} {'-':<10} {'-':<10} {'-':<12} {'-':<12}") print("=" * 90) def find_optimal_model(results: BenchmarkResults, priority: str = "speed"): """Find the optimal model based on specified criteria. Args: results: BenchmarkResults object priority: Optimization priority ("speed", "accuracy", "memory", "balanced") Returns: Tuple of (best_model_name, best_metrics) """ valid_models = { name: metrics for name, metrics in results.models.items() if metrics.status == "SUCCESS" } if not valid_models: return None, None if priority == "speed": # Minimum inference time best = min(valid_models.items(), key=lambda x: x[1].inference_time_mean) elif priority == "accuracy": # Maximum accuracy best = max(valid_models.items(), key=lambda x: x[1].accuracy) elif priority == "memory": # Minimum memory usage best = min(valid_models.items(), key=lambda x: x[1].memory_usage_mean) elif priority == "balanced": # Balanced score: weighted combination def balanced_score(item): metrics = item[1] # Normalize and combine metrics time_score = metrics.inference_time_mean acc_score = 1 - metrics.accuracy mem_score = metrics.memory_usage_mean / 1000000 # Scale down # Weighted sum (weights can be adjusted) return 0.5 * time_score + 0.3 * acc_score + 0.2 * mem_score best = min(valid_models.items(), key=balanced_score) else: best = min(valid_models.items(), key=lambda x: x[1].inference_time_mean) return best def print_recommendations(results: BenchmarkResults): """Print model recommendations based on different criteria.""" print("\n" + "=" * 70) print("MODEL RECOMMENDATIONS") print("=" * 70) criteria = [ ("Fastest Inference", "speed"), ("Highest Accuracy", "accuracy"), ("Lowest Memory Usage", "memory"), ("Best Balanced Performance", "balanced"), ] for description, priority in criteria: model_name, metrics = find_optimal_model(results, priority) if model_name: print(f"\n{description}:") print(f" Model: {model_name}") if priority == "speed": print(f" Inference Time: {metrics.inference_time_mean*1000:.3f} ms") elif priority == "accuracy": print(f" Accuracy: {metrics.accuracy*100:.1f}%") elif priority == "memory": print(f" Memory Usage: {metrics.memory_usage_mean/1024:.1f} KB") elif priority == "balanced": print(f" Inference Time: {metrics.inference_time_mean*1000:.3f} ms") print(f" Accuracy: {metrics.accuracy*100:.1f}%") print(f" Memory Usage: {metrics.memory_usage_mean/1024:.1f} KB") else: print(f"\n{description}:") print(" No valid models found") # ============================================================================ # Main Entry Point # ============================================================================ def main(): """Main entry point for the benchmarking framework.""" import argparse parser = argparse.ArgumentParser( description='Standardized Timing Benchmarking Framework for Classification Models' ) parser.add_argument( '--samples', '-n', type=int, default=DEFAULT_NUM_SAMPLES, help=f'Number of samples to benchmark (default: {DEFAULT_NUM_SAMPLES})' ) parser.add_argument( '--repeats', '-r', type=int, default=DEFAULT_NUM_REPEATES, help=f'Number of repeats per sample (default: {DEFAULT_NUM_REPEATES})' ) parser.add_argument( '--output', '-o', type=str, default=DEFAULT_OUTPUT_FILE, help='Output file for results (default: benchmark_results/timestamp.json)' ) parser.add_argument( '--compare', '-c', action='store_true', help='Print comparison table after benchmarking' ) parser.add_argument( '--recommend', '-R', action='store_true', help='Print model recommendations after benchmarking' ) parser.add_argument( '--single-sample', '-s', action='store_true', help='Measure single sample prediction latency (default: batch mode)' ) args = parser.parse_args() # Run benchmark if args.single_sample: results = run_single_sample_benchmark( num_samples=args.samples, num_repeats=args.repeats, output_file=args.output ) else: results = run_benchmark( num_samples=args.samples, num_repeats=args.repeats, output_file=args.output ) # Print comparison table if requested if args.compare: print_comparison_table(results) # Print recommendations if requested if args.recommend: print_recommendations(results) # Return results for programmatic use return results if __name__ == "__main__": results = main()