#!/usr/bin/env python3 """ GPU-accelerated implementation of multivariate Gaussian overlap calculation using CuPy. This provides massive speedup for large-scale analyses by processing all task pairs simultaneously. """ import numpy as np import warnings from typing import Optional # Try to import CuPy for GPU acceleration try: import cupy as cp GPU_AVAILABLE = True print("โœ… CuPy GPU acceleration available") except ImportError: GPU_AVAILABLE = False cp = None print("โš ๏ธ CuPy not available. Install with: pip install cupy-cuda12x") # Check for CUDA availability if GPU_AVAILABLE: try: # Test if CUDA is actually available device = cp.cuda.Device(0) GPU_READY = True print(f"๐Ÿš€ GPU ready: Device {device.id} (RTX detected)") except: GPU_READY = False GPU_AVAILABLE = False print("โš ๏ธ CUDA not available, disabling GPU acceleration") else: GPU_READY = False def compute_overlap_batch_gpu(means1_batch, vars1_batch, means2_batch, vars2_batch, tol=1e-12, biomechanical_filter=False): """ GPU-accelerated batch overlap computation using CuPy. Processes all subjects simultaneously with full GPU vectorization. This is the "throw everything in" approach for maximum GPU utilization. Parameters: means1_batch: np.ndarray shape (n_subjects, 150, n_features) vars1_batch: np.ndarray shape (n_subjects, 150, n_features) means2_batch: np.ndarray shape (n_subjects, 150, n_features) vars2_batch: np.ndarray shape (n_subjects, 150, n_features) tol: float, tolerance for variance validity biomechanical_filter: bool, apply biomechanical filtering Returns: np.ndarray shape (n_subjects, 150, 150) - overlap values """ if not GPU_AVAILABLE: raise RuntimeError("CuPy not available for GPU computation") n_subjects, n_phases, n_features = means1_batch.shape # Transfer to GPU - single transfer for all data means1_gpu = cp.asarray(means1_batch, dtype=cp.float32) vars1_gpu = cp.asarray(vars1_batch, dtype=cp.float32) means2_gpu = cp.asarray(means2_batch, dtype=cp.float32) vars2_gpu = cp.asarray(vars2_batch, dtype=cp.float32) # Pre-allocate output on GPU overlap_batch_gpu = cp.zeros((n_subjects, 150, 150), dtype=cp.float32) # CRITICAL OPTIMIZATION: Use broadcasting to compute ALL phase pairs at once # Shape transformations for broadcasting: # means1: (n_subjects, 150, 1, n_features) - for phase_i # means2: (n_subjects, 1, 150, n_features) - for phase_j # Result: (n_subjects, 150, 150, n_features) - all pairs means1_exp = means1_gpu[:, :, cp.newaxis, :] # (n_subjects, 150, 1, n_features) vars1_exp = vars1_gpu[:, :, cp.newaxis, :] means2_exp = means2_gpu[:, cp.newaxis, :, :] # (n_subjects, 1, 150, n_features) vars2_exp = vars2_gpu[:, cp.newaxis, :, :] # Compute all differences and variance sums simultaneously diff = means1_exp - means2_exp # Shape: (n_subjects, 150, 150, n_features) var_sum = vars1_exp + vars2_exp # Shape: (n_subjects, 150, 150, n_features) # NaN handling: Create validity mask valid_mask = (~cp.isnan(diff).any(axis=3) & ~cp.isnan(var_sum).any(axis=3) & (var_sum > tol).all(axis=3)) # Shape: (n_subjects, 150, 150) # Compute quadratic form for valid entries only # Use where to avoid division by zero quad_terms = cp.where(valid_mask[:, :, :, cp.newaxis], diff * diff / var_sum, 0.0) # Shape: (n_subjects, 150, 150, n_features) # Sum over features quad_sum = cp.sum(quad_terms, axis=3) # Shape: (n_subjects, 150, 150) # Apply exponential with underflow protection # Only compute exp for valid entries with reasonable values safe_exp_mask = valid_mask & (quad_sum * 0.5 <= 20.0) overlap_batch_gpu = cp.where(safe_exp_mask, cp.exp(-0.5 * quad_sum), 0.0) # Apply biomechanical filtering if requested if biomechanical_filter: overlap_batch_gpu = _apply_biomechanical_filter_gpu( overlap_batch_gpu, means1_gpu, vars1_gpu, means2_gpu, vars2_gpu, tol ) # Transfer back to CPU - single transfer result = cp.asnumpy(overlap_batch_gpu).astype(np.float64) # Final clipping on CPU np.clip(result, 0.0, 1.0, out=result) return result def _apply_biomechanical_filter_gpu(overlap_batch, means1_batch, vars1_batch, means2_batch, vars2_batch, tol): """Apply biomechanical filtering on GPU using vectorized operations.""" n_subjects = overlap_batch.shape[0] negligible_threshold = 0.1 ampable_threshold = 0.2 ci_factor = 1.96 # Only process first feature (torque) for biomechanical filtering means1_torque = means1_batch[:, :, 0] # Shape: (n_subjects, 150) means2_torque = means2_batch[:, :, 0] vars1_torque = vars1_batch[:, :, 0] vars2_torque = vars2_batch[:, :, 0] # Vectorized std and CI calculations std1 = cp.sqrt(vars1_torque) std2 = cp.sqrt(vars2_torque) ci_lo1 = means1_torque - ci_factor * std1 ci_hi1 = means1_torque + ci_factor * std1 ci_lo2 = means2_torque - ci_factor * std2 ci_hi2 = means2_torque + ci_factor * std2 # Vectorized mask computation negligible1 = ((ci_lo1 >= -negligible_threshold) & (ci_hi1 <= negligible_threshold)) # Shape: (n_subjects, 150) negligible2 = ((ci_lo2 >= -negligible_threshold) & (ci_hi2 <= negligible_threshold)) ampable1 = cp.abs(means1_torque) > ampable_threshold ampable2 = cp.abs(means2_torque) > ampable_threshold # Broadcast to phase pair dimensions using newaxis neg1_exp = negligible1[:, :, cp.newaxis] # (n_subjects, 150, 1) neg2_exp = negligible2[:, cp.newaxis, :] # (n_subjects, 1, 150) amp1_exp = ampable1[:, :, cp.newaxis] amp2_exp = ampable2[:, cp.newaxis, :] # Three-level filtering masks # Negligible-negligible: Both torques are negligible m0 = neg1_exp & neg2_exp # Shape: (n_subjects, 150, 150) # Amplitude conflicts: One negligible, other ampable m1 = ((neg1_exp & amp2_exp) | (neg2_exp & amp1_exp)) # Sign reversal cases: Neither negligible-negligible nor amplitude conflict m2 = ~(m0 | m1) # Apply negligible-negligible rule (set to 1.0) overlap_batch = cp.where(m0, 1.0, overlap_batch) # Apply sign reversal filtering for m2 cases if cp.any(m2): # Get indices where filtering is needed s_idx, i_idx, j_idx = cp.where(m2) if len(s_idx) > 0: # Vectorized probability calculations std1_safe = cp.maximum(std1, tol) std2_safe = cp.maximum(std2, tol) z1 = means1_torque / std1_safe z2 = means2_torque / std2_safe # Normal CDF approximation (vectorized) def norm_cdf_gpu(x): # Abramowitz and Stegun approximation t = 1.0 / (1.0 + 0.2316419 * cp.abs(x)) d = 0.3989423 * cp.exp(-x * x / 2.0) prob = d * t * (0.3193815 + t * (-0.3565638 + t * (1.781478 + t * (-1.821256 + t * 1.330274)))) return cp.where(x > 0, 1.0 - prob, prob) Ppos1 = norm_cdf_gpu(z1) Ppos2 = norm_cdf_gpu(z2) # Sign-mismatch probability for selected indices Pdiff_sign = (Ppos1[s_idx, i_idx] * (1.0 - Ppos2[s_idx, j_idx]) + (1.0 - Ppos1[s_idx, i_idx]) * Ppos2[s_idx, j_idx]) # Mean-difference penalty (vectorized ramp function) mean_diff = cp.abs(means1_torque[s_idx, i_idx] - means2_torque[s_idx, j_idx]) s_thresh, e_thresh = 0.2, 0.5 # Linear ramp penalty penalty = cp.clip((mean_diff - s_thresh) / (e_thresh - s_thresh), 0.0, 1.0) # Combine penalties Pdiff = cp.maximum(Pdiff_sign, penalty) # Apply penalty to overlaps current_overlaps = overlap_batch[s_idx, i_idx, j_idx] output_diff = 1.0 - current_overlaps scaled_output_diff = output_diff * Pdiff overlap_batch[s_idx, i_idx, j_idx] = 1.0 - scaled_output_diff return overlap_batch def compute_overlap_batch_gpu_chunked(means1_batch, vars1_batch, means2_batch, vars2_batch, chunk_size=None, **kwargs): """ Chunked GPU processing for very large datasets that don't fit in GPU memory. Automatically determines optimal chunk size based on available GPU memory. """ if not GPU_AVAILABLE: raise RuntimeError("CuPy not available for GPU computation") n_subjects = means1_batch.shape[0] if chunk_size is None: # Estimate chunk size based on GPU memory mempool = cp.get_default_memory_pool() available_memory = mempool.free_bytes() # Rough estimate: each subject needs ~150*150*4 bytes for overlap + input arrays bytes_per_subject = 150 * 150 * 4 * 6 # 6 arrays (means1, vars1, means2, vars2, overlap, temp) estimated_chunk_size = max(1, int(available_memory * 0.8 // bytes_per_subject)) chunk_size = min(estimated_chunk_size, n_subjects) print(f"๐Ÿ”ง Auto-determined GPU chunk size: {chunk_size} subjects") if chunk_size >= n_subjects: # Process all at once return compute_overlap_batch_gpu(means1_batch, vars1_batch, means2_batch, vars2_batch, **kwargs) # Process in chunks results = [] for i in range(0, n_subjects, chunk_size): end_idx = min(i + chunk_size, n_subjects) chunk_result = compute_overlap_batch_gpu( means1_batch[i:end_idx], vars1_batch[i:end_idx], means2_batch[i:end_idx], vars2_batch[i:end_idx], **kwargs ) results.append(chunk_result) return np.concatenate(results, axis=0) def benchmark_gpu_vs_cpu(): """Benchmark GPU vs CPU performance on sample data.""" if not GPU_AVAILABLE: print("GPU not available for benchmarking") return import time # Create test data n_subjects = 10 n_features = 4 print(f"๐Ÿ”ง Benchmarking with {n_subjects} subjects, {n_features} features...") means1 = np.random.randn(n_subjects, 150, n_features).astype(np.float32) vars1 = np.abs(np.random.randn(n_subjects, 150, n_features)).astype(np.float32) + 0.1 means2 = np.random.randn(n_subjects, 150, n_features).astype(np.float32) vars2 = np.abs(np.random.randn(n_subjects, 150, n_features)).astype(np.float32) + 0.1 # Warm up GPU if GPU_AVAILABLE: _ = compute_overlap_batch_gpu(means1[:2], vars1[:2], means2[:2], vars2[:2]) # Benchmark GPU if GPU_AVAILABLE: start = time.time() result_gpu = compute_overlap_batch_gpu(means1, vars1, means2, vars2) gpu_time = time.time() - start print(f"๐Ÿš€ GPU time: {gpu_time:.4f} seconds") else: result_gpu = None gpu_time = float('inf') # Benchmark CPU (Numba fallback) try: from .numba_overlap import compute_overlap_batch start = time.time() result_cpu = compute_overlap_batch(means1, vars1, means2, vars2) cpu_time = time.time() - start print(f"๐Ÿ”ง CPU time: {cpu_time:.4f} seconds") if GPU_AVAILABLE and result_gpu is not None: speedup = cpu_time / gpu_time print(f"๐Ÿ“ˆ GPU Speedup: {speedup:.1f}x") # Check accuracy max_diff = np.max(np.abs(result_gpu.astype(np.float64) - result_cpu)) print(f"๐ŸŽฏ Max difference: {max_diff:.2e}") except ImportError: print("โŒ Numba not available for CPU comparison") def compute_overlap_batch_gpu_mega(all_means1_batch, all_vars1_batch, all_means2_batch, all_vars2_batch, valid_mask, tol=1e-12, biomechanical_filter=False): """ MEGA-BATCH GPU computation: Process ALL task pairs simultaneously. This is the ultimate "throw everything in" approach for maximum GPU utilization. Processes hundreds of task pairs ร— subjects ร— phase pairs in a single GPU call. Parameters: all_means1_batch: np.ndarray shape (n_task_pairs, n_subjects_max, 150, n_features) all_vars1_batch: np.ndarray shape (n_task_pairs, n_subjects_max, 150, n_features) all_means2_batch: np.ndarray shape (n_task_pairs, n_subjects_max, 150, n_features) all_vars2_batch: np.ndarray shape (n_task_pairs, n_subjects_max, 150, n_features) valid_mask: np.ndarray shape (n_task_pairs, n_subjects_max) - bool mask for valid subjects tol: float, tolerance for variance validity biomechanical_filter: bool, apply biomechanical filtering Returns: np.ndarray shape (n_task_pairs, n_subjects_max, 150, 150) - overlap values """ if not GPU_AVAILABLE: raise RuntimeError("CuPy not available for mega-batch GPU computation") n_task_pairs, n_subjects_max, n_phases, n_features = all_means1_batch.shape print(f"๐Ÿš€ GPU Mega-batch: Processing {n_task_pairs} task pairs ร— {n_subjects_max} subjects ร— {150*150} phase pairs") print(f"๐Ÿ“Š Total computations: {n_task_pairs * n_subjects_max * 150 * 150:,}") # Transfer ALL data to GPU in single transfer means1_gpu = cp.asarray(all_means1_batch, dtype=cp.float32) vars1_gpu = cp.asarray(all_vars1_batch, dtype=cp.float32) means2_gpu = cp.asarray(all_means2_batch, dtype=cp.float32) vars2_gpu = cp.asarray(all_vars2_batch, dtype=cp.float32) valid_gpu = cp.asarray(valid_mask, dtype=cp.bool_) # Pre-allocate output on GPU overlap_batch_gpu = cp.zeros((n_task_pairs, n_subjects_max, 150, 150), dtype=cp.float32) # MEGA BROADCASTING: Process ALL task pairs and subjects simultaneously # Shape transformations for 5D broadcasting: # (n_task_pairs, n_subjects_max, 150, 1, n_features) vs (n_task_pairs, n_subjects_max, 1, 150, n_features) means1_exp = means1_gpu[:, :, :, cp.newaxis, :] # Add phase_j dimension vars1_exp = vars1_gpu[:, :, :, cp.newaxis, :] means2_exp = means2_gpu[:, :, cp.newaxis, :, :] # Add phase_i dimension vars2_exp = vars2_gpu[:, :, cp.newaxis, :, :] # Compute ALL differences and variance sums simultaneously # Shape: (n_task_pairs, n_subjects_max, 150, 150, n_features) diff = means1_exp - means2_exp var_sum = vars1_exp + vars2_exp # Create mega validity mask # Shape: (n_task_pairs, n_subjects_max, 150, 150) subject_valid = valid_gpu[:, :, cp.newaxis, cp.newaxis] # Broadcast to all phase pairs # NaN and variance validity for ALL data simultaneously nan_valid = (~cp.isnan(diff).any(axis=4) & ~cp.isnan(var_sum).any(axis=4) & (var_sum > tol).all(axis=4)) # Combined validity mask full_valid_mask = subject_valid & nan_valid # Compute quadratic form for ALL valid entries quad_terms = cp.where(full_valid_mask[:, :, :, :, cp.newaxis], diff * diff / var_sum, 0.0) # Sum over features for ALL task pairs simultaneously quad_sum = cp.sum(quad_terms, axis=4) # Shape: (n_task_pairs, n_subjects_max, 150, 150) # Apply exponential with underflow protection safe_exp_mask = full_valid_mask & (quad_sum * 0.5 <= 20.0) overlap_batch_gpu = cp.where(safe_exp_mask, cp.exp(-0.5 * quad_sum), 0.0) # Apply biomechanical filtering if requested if biomechanical_filter: overlap_batch_gpu = _apply_biomechanical_filter_gpu_mega( overlap_batch_gpu, means1_gpu, vars1_gpu, means2_gpu, vars2_gpu, valid_gpu, tol ) # Transfer back to CPU - single transfer for ALL results print("๐Ÿ“ฅ Transferring results from GPU...") result = cp.asnumpy(overlap_batch_gpu).astype(np.float64) # Final clipping np.clip(result, 0.0, 1.0, out=result) print(f"โœ… Mega-batch GPU computation complete!") return result def _apply_biomechanical_filter_gpu_mega(overlap_batch, means1_batch, vars1_batch, means2_batch, vars2_batch, valid_mask, tol): """Apply biomechanical filtering for mega-batch on GPU.""" negligible_threshold = 0.1 ampable_threshold = 0.2 ci_factor = 1.96 n_task_pairs, n_subjects_max = overlap_batch.shape[:2] # Only process first feature (torque) for biomechanical filtering means1_torque = means1_batch[:, :, :, 0] # Shape: (n_task_pairs, n_subjects_max, 150) means2_torque = means2_batch[:, :, :, 0] vars1_torque = vars1_batch[:, :, :, 0] vars2_torque = vars2_batch[:, :, :, 0] # Vectorized std and CI calculations for ALL task pairs std1 = cp.sqrt(vars1_torque) std2 = cp.sqrt(vars2_torque) ci_lo1 = means1_torque - ci_factor * std1 ci_hi1 = means1_torque + ci_factor * std1 ci_lo2 = means2_torque - ci_factor * std2 ci_hi2 = means2_torque + ci_factor * std2 # Vectorized mask computation for ALL task pairs negligible1 = ((ci_lo1 >= -negligible_threshold) & (ci_hi1 <= negligible_threshold)) negligible2 = ((ci_lo2 >= -negligible_threshold) & (ci_hi2 <= negligible_threshold)) ampable1 = cp.abs(means1_torque) > ampable_threshold ampable2 = cp.abs(means2_torque) > ampable_threshold # Broadcast to phase pair dimensions # Shape: (n_task_pairs, n_subjects_max, 150, 1) neg1_exp = negligible1[:, :, :, cp.newaxis] amp1_exp = ampable1[:, :, :, cp.newaxis] # Shape: (n_task_pairs, n_subjects_max, 1, 150) neg2_exp = negligible2[:, :, cp.newaxis, :] amp2_exp = ampable2[:, :, cp.newaxis, :] # Apply subject validity mask valid_exp = valid_mask[:, :, cp.newaxis, cp.newaxis] # Three-level filtering masks for ALL task pairs m0 = (neg1_exp & neg2_exp) & valid_exp # Negligible-negligible m1 = ((neg1_exp & amp2_exp) | (neg2_exp & amp1_exp)) & valid_exp # Amplitude conflicts m2 = ~(m0 | m1) & valid_exp # Sign reversal cases # Apply negligible-negligible rule overlap_batch = cp.where(m0, 1.0, overlap_batch) # Apply sign reversal filtering for m2 cases (if any exist) if cp.any(m2): # For mega-batch, we'll use a simplified linear ramp for performance # (Full probability calculation would be too expensive for this scale) # Get phase indices for m2 cases t_idx, s_idx, i_idx, j_idx = cp.where(m2) if len(t_idx) > 0: # Mean-difference penalty (vectorized) mean_diff = cp.abs(means1_torque[t_idx, s_idx, i_idx] - means2_torque[t_idx, s_idx, j_idx]) # Linear ramp penalty (simplified for mega-batch performance) s_thresh, e_thresh = 0.2, 0.5 penalty = cp.clip((mean_diff - s_thresh) / (e_thresh - s_thresh), 0.0, 1.0) # Apply penalty to overlaps current_overlaps = overlap_batch[t_idx, s_idx, i_idx, j_idx] output_diff = 1.0 - current_overlaps scaled_output_diff = output_diff * penalty overlap_batch[t_idx, s_idx, i_idx, j_idx] = 1.0 - scaled_output_diff return overlap_batch def estimate_mega_batch_memory(n_task_pairs, n_subjects_max, n_features): """ Estimate GPU memory requirements for mega-batch processing. CRITICAL: This accounts for the 5D broadcasting that happens during GPU computation: - Input: (n_task_pairs, n_subjects_max, 150, n_features) - Broadcast to: (n_task_pairs, n_subjects_max, 150, 150, n_features) for computation - The 150x150 expansion is the killer for large feature counts! """ # Input arrays (pre-broadcasting) input_size = 4 * n_task_pairs * n_subjects_max * 150 * n_features * 4 # 4 input arrays # Output array output_size = n_task_pairs * n_subjects_max * 150 * 150 * 4 # CRITICAL: 5D broadcasting intermediate tensors during computation # These are the real memory hogs: (n_task_pairs, n_subjects_max, 150, 150, n_features) broadcast_5d_size = n_task_pairs * n_subjects_max * 150 * 150 * n_features * 4 # We need multiple of these simultaneously (diff, var_sum, quad_terms, etc.) intermediate_5d_size = broadcast_5d_size * 4 # Conservative estimate: 4 large 5D tensors total_bytes = input_size + output_size + intermediate_5d_size total_gb = total_bytes / (1024**3) return total_gb def get_available_gpu_memory_gb(): """Get available GPU memory in GB.""" if not GPU_AVAILABLE: return 0.0 try: # Get GPU memory info directly from CuPy device device = cp.cuda.Device() total_mem = device.mem_info[1] # Total memory used_mem = device.mem_info[1] - device.mem_info[0] # Used = Total - Free # Use 70% of free memory as safety margin free_mem = device.mem_info[0] * 0.7 available_gb = free_mem / (1024**3) return max(0.5, available_gb) # Ensure at least 0.5GB for minimal chunking except: # Fallback: assume 5GB available for RTX series return 5.0 def calculate_optimal_chunk_size(total_pairs, n_subjects_max, n_features, target_memory_gb=None): """Calculate optimal chunk size based on available GPU memory.""" if not GPU_AVAILABLE: return 1 if target_memory_gb is None: target_memory_gb = get_available_gpu_memory_gb() # Binary search for optimal chunk size min_chunk = 1 max_chunk = total_pairs optimal_chunk = 1 while min_chunk <= max_chunk: mid_chunk = (min_chunk + max_chunk) // 2 memory_needed = estimate_mega_batch_memory(mid_chunk, n_subjects_max, n_features) if memory_needed <= target_memory_gb: optimal_chunk = mid_chunk min_chunk = mid_chunk + 1 else: max_chunk = mid_chunk - 1 # Ensure at least 1 task pair per chunk return max(1, optimal_chunk) def get_available_ram_gb(): """Get available system RAM in GB.""" try: import psutil available_ram_gb = psutil.virtual_memory().available / (1024**3) return available_ram_gb except ImportError: # Fallback: assume 16GB available (conservative) return 16.0 def calculate_ram_max_chunk_size(n_subjects_max, n_features, available_ram_gb): """Calculate maximum chunk size based on available RAM for numpy arrays.""" # Each chunk needs 4 arrays: all_means1, all_vars1, all_means2, all_vars2 # Shape per array: (chunk_size, n_subjects_max, 150, n_features) # Each element: 4 bytes (float32) bytes_per_task_pair = 4 * n_subjects_max * 150 * n_features * 4 # 4 arrays ร— 4 bytes # Use 70% of available RAM as safety margin safe_ram_bytes = available_ram_gb * 0.7 * (1024**3) max_chunk_size = int(safe_ram_bytes / bytes_per_task_pair) return max(1, max_chunk_size) def calculate_optimal_chunk_size_dual_constraint(total_pairs, n_subjects_max, n_features): """ Calculate optimal chunk size considering BOTH GPU memory and system RAM constraints. This prevents out-of-memory errors by respecting both: 1. GPU memory limits (for CuPy processing) 2. System RAM limits (for numpy array allocation) CRITICAL: For very large feature counts (>100), the 5D broadcasting becomes prohibitively expensive, so we use much more conservative estimates. Returns the minimum chunk size that satisfies both constraints. """ if not GPU_AVAILABLE: return 1 # Get available memory for both constraints gpu_memory_gb = get_available_gpu_memory_gb() ram_memory_gb = get_available_ram_gb() # CRITICAL: For large feature counts, the 5D broadcasting dominates memory usage # We need to be much more conservative if n_features > 100: print(f"โš ๏ธ Large feature count ({n_features}) detected - using conservative chunking") # For large features, memory usage scales roughly with features^2 due to broadcasting # Use a much smaller base and scale down aggressively feature_penalty = (n_features / 100) ** 1.5 # Exponential penalty conservative_gpu_memory = gpu_memory_gb / feature_penalty conservative_ram_memory = ram_memory_gb / (feature_penalty * 0.5) # RAM less affected gpu_max_chunk = calculate_optimal_chunk_size(total_pairs, n_subjects_max, n_features, conservative_gpu_memory) ram_max_chunk = calculate_ram_max_chunk_size(n_subjects_max, n_features, conservative_ram_memory) else: # Normal calculation for reasonable feature counts gpu_max_chunk = calculate_optimal_chunk_size(total_pairs, n_subjects_max, n_features, gpu_memory_gb) ram_max_chunk = calculate_ram_max_chunk_size(n_subjects_max, n_features, ram_memory_gb) # Use the most restrictive constraint optimal_chunk = min(gpu_max_chunk, ram_max_chunk, total_pairs) print(f"๐Ÿ”ง Dual-constraint analysis:") print(f" GPU memory: {gpu_memory_gb:.2f} GB โ†’ max {gpu_max_chunk} pairs") print(f" RAM memory: {ram_memory_gb:.2f} GB โ†’ max {ram_max_chunk} pairs") print(f" Using most restrictive: {optimal_chunk} pairs per chunk") # For very large feature counts, ensure we don't go too high if n_features > 100: # Cap at a reasonable maximum for large feature counts max_safe_chunk = max(1, int(50000 / n_features)) # Rough heuristic optimal_chunk = min(optimal_chunk, max_safe_chunk) if optimal_chunk == max_safe_chunk: print(f" ๐Ÿ”’ Capped at {optimal_chunk} pairs due to large feature count") return max(1, optimal_chunk) def compute_overlap_batch_gpu_mega_chunked(all_means1_batch, all_vars1_batch, all_means2_batch, all_vars2_batch, valid_mask, tol=1e-12, biomechanical_filter=False, progress_callback=None): """ Chunked mega-batch GPU computation: Process task pairs in optimal chunks. Automatically determines chunk size based on available GPU memory and processes task pairs in chunks while maintaining all subjects per chunk for maximum efficiency. Parameters: all_means1_batch: np.ndarray shape (n_task_pairs, n_subjects_max, 150, n_features) all_vars1_batch: np.ndarray shape (n_task_pairs, n_subjects_max, 150, n_features) all_means2_batch: np.ndarray shape (n_task_pairs, n_subjects_max, 150, n_features) all_vars2_batch: np.ndarray shape (n_task_pairs, n_subjects_max, 150, n_features) valid_mask: np.ndarray shape (n_task_pairs, n_subjects_max) - bool mask for valid subjects tol: float, tolerance for variance validity biomechanical_filter: bool, apply biomechanical filtering progress_callback: callable, progress reporting function Returns: np.ndarray shape (n_task_pairs, n_subjects_max, 150, 150) - overlap values """ if not GPU_AVAILABLE: raise RuntimeError("CuPy not available for chunked mega-batch GPU computation") n_task_pairs, n_subjects_max, n_phases, n_features = all_means1_batch.shape # Calculate optimal chunk size using dual constraints (GPU + RAM) chunk_size = calculate_optimal_chunk_size_dual_constraint(n_task_pairs, n_subjects_max, n_features) print(f"๐Ÿ”ง Chunking Strategy:") print(f" Total task pairs: {n_task_pairs:,}") print(f" Optimal chunk size: {chunk_size:,} task pairs") print(f" Number of chunks: {(n_task_pairs + chunk_size - 1) // chunk_size}") # Try single batch first, but catch out-of-memory errors if chunk_size >= n_task_pairs: print("๐Ÿš€ Attempting single mega-batch processing...") try: return compute_overlap_batch_gpu_mega( all_means1_batch, all_vars1_batch, all_means2_batch, all_vars2_batch, valid_mask, tol, biomechanical_filter ) except Exception as e: if "OutOfMemoryError" in str(type(e)) or "out of memory" in str(e).lower(): print(f"โš ๏ธ Single batch failed with memory error, forcing chunking...") # Recalculate with much more conservative memory estimate conservative_memory = min(available_memory * 0.3, 3.0) # Use max 3GB or 30% of available chunk_size = calculate_optimal_chunk_size(n_task_pairs, n_subjects_max, n_features, conservative_memory) chunk_size = max(1, chunk_size // 2) # Further reduce chunk size print(f"๐Ÿ”ง Fallback chunk size: {chunk_size} pairs (conservative estimate)") else: raise e # Process in chunks print(f"๐Ÿ”„ Processing {n_task_pairs:,} task pairs in chunks of {chunk_size:,}...") results = [] for chunk_start in range(0, n_task_pairs, chunk_size): chunk_end = min(chunk_start + chunk_size, n_task_pairs) chunk_num = len(results) + 1 total_chunks = (n_task_pairs + chunk_size - 1) // chunk_size print(f"๐Ÿš€ Processing chunk {chunk_num}/{total_chunks} (task pairs {chunk_start}:{chunk_end})...") # Extract chunk data chunk_means1 = all_means1_batch[chunk_start:chunk_end] chunk_vars1 = all_vars1_batch[chunk_start:chunk_end] chunk_means2 = all_means2_batch[chunk_start:chunk_end] chunk_vars2 = all_vars2_batch[chunk_start:chunk_end] chunk_valid = valid_mask[chunk_start:chunk_end] # Process chunk with additional error handling import time start_time = time.time() try: chunk_result = compute_overlap_batch_gpu_mega( chunk_means1, chunk_vars1, chunk_means2, chunk_vars2, chunk_valid, tol, biomechanical_filter ) chunk_time = time.time() - start_time except Exception as e: if "OutOfMemoryError" in str(type(e)) or "out of memory" in str(e).lower(): print(f" โš ๏ธ Chunk {chunk_num} still too large, attempting progressive reduction...") # Progressive reduction: try smaller and smaller chunks chunk_result = _process_chunk_with_progressive_reduction( chunk_means1, chunk_vars1, chunk_means2, chunk_vars2, chunk_valid, tol, biomechanical_filter, chunk_num ) chunk_time = time.time() - start_time else: raise e results.append(chunk_result) # Progress reporting progress = (chunk_end) / n_task_pairs if progress_callback: progress_callback(progress * 0.9) # Save 10% for final aggregation # Performance metrics chunk_pairs = chunk_end - chunk_start valid_computations = np.sum(chunk_valid) * 150 * 150 throughput = valid_computations / chunk_time if chunk_time > 0 else 0 print(f" โœ… Chunk {chunk_num} complete: {chunk_time:.2f}s, {throughput:,.0f} computations/sec") # Memory cleanup if GPU_AVAILABLE: cp.get_default_memory_pool().free_all_blocks() print("๐Ÿ”ง Combining chunk results...") final_result = np.concatenate(results, axis=0) if progress_callback: progress_callback(1.0) print(f"โœ… Chunked mega-batch processing complete!") print(f"๐Ÿ“Š Final result shape: {final_result.shape}") return final_result def _process_chunk_with_progressive_reduction(chunk_means1, chunk_vars1, chunk_means2, chunk_vars2, chunk_valid, tol, biomechanical_filter, chunk_num): """ Process a chunk with progressive size reduction if out-of-memory errors occur. Tries progressively smaller sub-chunks until successful or reaches minimum size. """ chunk_size = chunk_means1.shape[0] # Try progressively smaller sub-chunks: 50%, 25%, 12.5%, etc. reduction_factors = [0.5, 0.25, 0.125, 0.0625] # Down to 1/16th for factor in reduction_factors: sub_chunk_size = max(1, int(chunk_size * factor)) print(f" ๐Ÿ”„ Trying sub-chunk size: {sub_chunk_size} pairs ({factor*100:.1f}% of original)") try: # Process the chunk in sub-chunks sub_results = [] for start_idx in range(0, chunk_size, sub_chunk_size): end_idx = min(start_idx + sub_chunk_size, chunk_size) sub_result = compute_overlap_batch_gpu_mega( chunk_means1[start_idx:end_idx], chunk_vars1[start_idx:end_idx], chunk_means2[start_idx:end_idx], chunk_vars2[start_idx:end_idx], chunk_valid[start_idx:end_idx], tol, biomechanical_filter ) sub_results.append(sub_result) # Clear GPU memory between sub-chunks if GPU_AVAILABLE: cp.get_default_memory_pool().free_all_blocks() # Combine all sub-results final_result = np.concatenate(sub_results, axis=0) print(f" โœ… Progressive reduction successful with {sub_chunk_size}-pair sub-chunks") return final_result except Exception as e: if "OutOfMemoryError" in str(type(e)) or "out of memory" in str(e).lower(): print(f" โŒ Sub-chunk size {sub_chunk_size} still too large") continue else: raise e # If all reduction attempts failed, we need to fall back to sequential processing # Processing one pair at a time with GPU overhead is actually slower than CPU print(f" โŒ All reduction attempts failed - chunk too large for GPU mega-batch") print(f" ๐Ÿ’ก Recommendation: Use smaller time windows or switch to sequential processing") print(f" ๐Ÿ”„ Falling back to CPU-based processing for this chunk...") # Fall back to CPU processing for this chunk try: from .numba_overlap import compute_overlap_batch_numba_ultra_fast # Process on CPU using Numba (much faster than single GPU pairs) cpu_results = [] for i in range(chunk_size): means1_i = chunk_means1[i] # Shape: (n_subjects, 150, n_features) vars1_i = chunk_vars1[i] means2_i = chunk_means2[i] vars2_i = chunk_vars2[i] valid_i = chunk_valid[i] # Shape: (n_subjects,) # Process valid subjects only valid_indices = np.where(valid_i)[0] if len(valid_indices) > 0: cpu_result = compute_overlap_batch_numba_ultra_fast( means1_i[valid_indices], vars1_i[valid_indices], means2_i[valid_indices], vars2_i[valid_indices] ) # Reshape to expected format full_result = np.zeros((1, chunk_valid.shape[1], 150, 150), dtype=np.float32) full_result[0, valid_indices] = cpu_result cpu_results.append(full_result) else: # No valid subjects empty_result = np.zeros((1, chunk_valid.shape[1], 150, 150), dtype=np.float32) cpu_results.append(empty_result) final_result = np.concatenate(cpu_results, axis=0) print(f" โœ… CPU fallback processing completed") return final_result except ImportError: print(f" โŒ CPU fallback not available - creating zero results") # Last resort: return zeros final_result = np.zeros((chunk_size, chunk_valid.shape[1], 150, 150), dtype=np.float32) return final_result if __name__ == "__main__": print("๐Ÿงช Testing GPU overlap calculation...") if GPU_AVAILABLE: benchmark_gpu_vs_cpu() # Test mega-batch functionality print("\n๐Ÿš€ Testing mega-batch functionality...") # Create test data for multiple task pairs n_task_pairs = 5 n_subjects_max = 3 n_features = 4 all_means1 = np.random.randn(n_task_pairs, n_subjects_max, 150, n_features).astype(np.float32) all_vars1 = np.abs(np.random.randn(n_task_pairs, n_subjects_max, 150, n_features)).astype(np.float32) + 0.1 all_means2 = np.random.randn(n_task_pairs, n_subjects_max, 150, n_features).astype(np.float32) all_vars2 = np.abs(np.random.randn(n_task_pairs, n_subjects_max, 150, n_features)).astype(np.float32) + 0.1 valid_mask = np.ones((n_task_pairs, n_subjects_max), dtype=bool) import time start = time.time() result = compute_overlap_batch_gpu_mega(all_means1, all_vars1, all_means2, all_vars2, valid_mask) end = time.time() print(f"โœ… Mega-batch result shape: {result.shape}") print(f"โฑ๏ธ Mega-batch time: {end - start:.4f}s") print(f"๐Ÿ“Š Throughput: {n_task_pairs * n_subjects_max * 150 * 150 / (end - start):,.0f} computations/sec") else: print("โŒ GPU testing requires CuPy and CUDA")