Spaces:
Runtime error
Runtime error
| #!/usr/bin/env python3 | |
| """ | |
| GPU-accelerated implementation of multivariate Gaussian overlap calculation using CuPy. | |
| This provides massive speedup for large-scale analyses by processing all task pairs simultaneously. | |
| """ | |
| import numpy as np | |
| import warnings | |
| from typing import Optional | |
| # Try to import CuPy for GPU acceleration | |
| try: | |
| import cupy as cp | |
| GPU_AVAILABLE = True | |
| print("β CuPy GPU acceleration available") | |
| except ImportError: | |
| GPU_AVAILABLE = False | |
| cp = None | |
| print("β οΈ CuPy not available. Install with: pip install cupy-cuda12x") | |
| # Check for CUDA availability | |
| if GPU_AVAILABLE: | |
| try: | |
| # Test if CUDA is actually available | |
| device = cp.cuda.Device(0) | |
| GPU_READY = True | |
| print(f"π GPU ready: Device {device.id} (RTX detected)") | |
| except: | |
| GPU_READY = False | |
| GPU_AVAILABLE = False | |
| print("β οΈ CUDA not available, disabling GPU acceleration") | |
| else: | |
| GPU_READY = False | |
| def compute_overlap_batch_gpu(means1_batch, vars1_batch, means2_batch, vars2_batch, | |
| tol=1e-12, biomechanical_filter=False): | |
| """ | |
| GPU-accelerated batch overlap computation using CuPy. | |
| Processes all subjects simultaneously with full GPU vectorization. | |
| This is the "throw everything in" approach for maximum GPU utilization. | |
| Parameters: | |
| means1_batch: np.ndarray shape (n_subjects, 150, n_features) | |
| vars1_batch: np.ndarray shape (n_subjects, 150, n_features) | |
| means2_batch: np.ndarray shape (n_subjects, 150, n_features) | |
| vars2_batch: np.ndarray shape (n_subjects, 150, n_features) | |
| tol: float, tolerance for variance validity | |
| biomechanical_filter: bool, apply biomechanical filtering | |
| Returns: | |
| np.ndarray shape (n_subjects, 150, 150) - overlap values | |
| """ | |
| if not GPU_AVAILABLE: | |
| raise RuntimeError("CuPy not available for GPU computation") | |
| n_subjects, n_phases, n_features = means1_batch.shape | |
| # Transfer to GPU - single transfer for all data | |
| means1_gpu = cp.asarray(means1_batch, dtype=cp.float32) | |
| vars1_gpu = cp.asarray(vars1_batch, dtype=cp.float32) | |
| means2_gpu = cp.asarray(means2_batch, dtype=cp.float32) | |
| vars2_gpu = cp.asarray(vars2_batch, dtype=cp.float32) | |
| # Pre-allocate output on GPU | |
| overlap_batch_gpu = cp.zeros((n_subjects, 150, 150), dtype=cp.float32) | |
| # CRITICAL OPTIMIZATION: Use broadcasting to compute ALL phase pairs at once | |
| # Shape transformations for broadcasting: | |
| # means1: (n_subjects, 150, 1, n_features) - for phase_i | |
| # means2: (n_subjects, 1, 150, n_features) - for phase_j | |
| # Result: (n_subjects, 150, 150, n_features) - all pairs | |
| means1_exp = means1_gpu[:, :, cp.newaxis, :] # (n_subjects, 150, 1, n_features) | |
| vars1_exp = vars1_gpu[:, :, cp.newaxis, :] | |
| means2_exp = means2_gpu[:, cp.newaxis, :, :] # (n_subjects, 1, 150, n_features) | |
| vars2_exp = vars2_gpu[:, cp.newaxis, :, :] | |
| # Compute all differences and variance sums simultaneously | |
| diff = means1_exp - means2_exp # Shape: (n_subjects, 150, 150, n_features) | |
| var_sum = vars1_exp + vars2_exp # Shape: (n_subjects, 150, 150, n_features) | |
| # NaN handling: Create validity mask | |
| valid_mask = (~cp.isnan(diff).any(axis=3) & | |
| ~cp.isnan(var_sum).any(axis=3) & | |
| (var_sum > tol).all(axis=3)) # Shape: (n_subjects, 150, 150) | |
| # Compute quadratic form for valid entries only | |
| # Use where to avoid division by zero | |
| quad_terms = cp.where(valid_mask[:, :, :, cp.newaxis], | |
| diff * diff / var_sum, | |
| 0.0) # Shape: (n_subjects, 150, 150, n_features) | |
| # Sum over features | |
| quad_sum = cp.sum(quad_terms, axis=3) # Shape: (n_subjects, 150, 150) | |
| # Apply exponential with underflow protection | |
| # Only compute exp for valid entries with reasonable values | |
| safe_exp_mask = valid_mask & (quad_sum * 0.5 <= 20.0) | |
| overlap_batch_gpu = cp.where(safe_exp_mask, | |
| cp.exp(-0.5 * quad_sum), | |
| 0.0) | |
| # Apply biomechanical filtering if requested | |
| if biomechanical_filter: | |
| overlap_batch_gpu = _apply_biomechanical_filter_gpu( | |
| overlap_batch_gpu, means1_gpu, vars1_gpu, means2_gpu, vars2_gpu, tol | |
| ) | |
| # Transfer back to CPU - single transfer | |
| result = cp.asnumpy(overlap_batch_gpu).astype(np.float64) | |
| # Final clipping on CPU | |
| np.clip(result, 0.0, 1.0, out=result) | |
| return result | |
| def _apply_biomechanical_filter_gpu(overlap_batch, means1_batch, vars1_batch, | |
| means2_batch, vars2_batch, tol): | |
| """Apply biomechanical filtering on GPU using vectorized operations.""" | |
| n_subjects = overlap_batch.shape[0] | |
| negligible_threshold = 0.1 | |
| ampable_threshold = 0.2 | |
| ci_factor = 1.96 | |
| # Only process first feature (torque) for biomechanical filtering | |
| means1_torque = means1_batch[:, :, 0] # Shape: (n_subjects, 150) | |
| means2_torque = means2_batch[:, :, 0] | |
| vars1_torque = vars1_batch[:, :, 0] | |
| vars2_torque = vars2_batch[:, :, 0] | |
| # Vectorized std and CI calculations | |
| std1 = cp.sqrt(vars1_torque) | |
| std2 = cp.sqrt(vars2_torque) | |
| ci_lo1 = means1_torque - ci_factor * std1 | |
| ci_hi1 = means1_torque + ci_factor * std1 | |
| ci_lo2 = means2_torque - ci_factor * std2 | |
| ci_hi2 = means2_torque + ci_factor * std2 | |
| # Vectorized mask computation | |
| negligible1 = ((ci_lo1 >= -negligible_threshold) & | |
| (ci_hi1 <= negligible_threshold)) # Shape: (n_subjects, 150) | |
| negligible2 = ((ci_lo2 >= -negligible_threshold) & | |
| (ci_hi2 <= negligible_threshold)) | |
| ampable1 = cp.abs(means1_torque) > ampable_threshold | |
| ampable2 = cp.abs(means2_torque) > ampable_threshold | |
| # Broadcast to phase pair dimensions using newaxis | |
| neg1_exp = negligible1[:, :, cp.newaxis] # (n_subjects, 150, 1) | |
| neg2_exp = negligible2[:, cp.newaxis, :] # (n_subjects, 1, 150) | |
| amp1_exp = ampable1[:, :, cp.newaxis] | |
| amp2_exp = ampable2[:, cp.newaxis, :] | |
| # Three-level filtering masks | |
| # Negligible-negligible: Both torques are negligible | |
| m0 = neg1_exp & neg2_exp # Shape: (n_subjects, 150, 150) | |
| # Amplitude conflicts: One negligible, other ampable | |
| m1 = ((neg1_exp & amp2_exp) | (neg2_exp & amp1_exp)) | |
| # Sign reversal cases: Neither negligible-negligible nor amplitude conflict | |
| m2 = ~(m0 | m1) | |
| # Apply negligible-negligible rule (set to 1.0) | |
| overlap_batch = cp.where(m0, 1.0, overlap_batch) | |
| # Apply sign reversal filtering for m2 cases | |
| if cp.any(m2): | |
| # Get indices where filtering is needed | |
| s_idx, i_idx, j_idx = cp.where(m2) | |
| if len(s_idx) > 0: | |
| # Vectorized probability calculations | |
| std1_safe = cp.maximum(std1, tol) | |
| std2_safe = cp.maximum(std2, tol) | |
| z1 = means1_torque / std1_safe | |
| z2 = means2_torque / std2_safe | |
| # Normal CDF approximation (vectorized) | |
| def norm_cdf_gpu(x): | |
| # Abramowitz and Stegun approximation | |
| t = 1.0 / (1.0 + 0.2316419 * cp.abs(x)) | |
| d = 0.3989423 * cp.exp(-x * x / 2.0) | |
| prob = d * t * (0.3193815 + t * (-0.3565638 + | |
| t * (1.781478 + t * (-1.821256 + t * 1.330274)))) | |
| return cp.where(x > 0, 1.0 - prob, prob) | |
| Ppos1 = norm_cdf_gpu(z1) | |
| Ppos2 = norm_cdf_gpu(z2) | |
| # Sign-mismatch probability for selected indices | |
| Pdiff_sign = (Ppos1[s_idx, i_idx] * (1.0 - Ppos2[s_idx, j_idx]) + | |
| (1.0 - Ppos1[s_idx, i_idx]) * Ppos2[s_idx, j_idx]) | |
| # Mean-difference penalty (vectorized ramp function) | |
| mean_diff = cp.abs(means1_torque[s_idx, i_idx] - means2_torque[s_idx, j_idx]) | |
| s_thresh, e_thresh = 0.2, 0.5 | |
| # Linear ramp penalty | |
| penalty = cp.clip((mean_diff - s_thresh) / (e_thresh - s_thresh), 0.0, 1.0) | |
| # Combine penalties | |
| Pdiff = cp.maximum(Pdiff_sign, penalty) | |
| # Apply penalty to overlaps | |
| current_overlaps = overlap_batch[s_idx, i_idx, j_idx] | |
| output_diff = 1.0 - current_overlaps | |
| scaled_output_diff = output_diff * Pdiff | |
| overlap_batch[s_idx, i_idx, j_idx] = 1.0 - scaled_output_diff | |
| return overlap_batch | |
| def compute_overlap_batch_gpu_chunked(means1_batch, vars1_batch, means2_batch, vars2_batch, | |
| chunk_size=None, **kwargs): | |
| """ | |
| Chunked GPU processing for very large datasets that don't fit in GPU memory. | |
| Automatically determines optimal chunk size based on available GPU memory. | |
| """ | |
| if not GPU_AVAILABLE: | |
| raise RuntimeError("CuPy not available for GPU computation") | |
| n_subjects = means1_batch.shape[0] | |
| if chunk_size is None: | |
| # Estimate chunk size based on GPU memory | |
| mempool = cp.get_default_memory_pool() | |
| available_memory = mempool.free_bytes() | |
| # Rough estimate: each subject needs ~150*150*4 bytes for overlap + input arrays | |
| bytes_per_subject = 150 * 150 * 4 * 6 # 6 arrays (means1, vars1, means2, vars2, overlap, temp) | |
| estimated_chunk_size = max(1, int(available_memory * 0.8 // bytes_per_subject)) | |
| chunk_size = min(estimated_chunk_size, n_subjects) | |
| print(f"π§ Auto-determined GPU chunk size: {chunk_size} subjects") | |
| if chunk_size >= n_subjects: | |
| # Process all at once | |
| return compute_overlap_batch_gpu(means1_batch, vars1_batch, | |
| means2_batch, vars2_batch, **kwargs) | |
| # Process in chunks | |
| results = [] | |
| for i in range(0, n_subjects, chunk_size): | |
| end_idx = min(i + chunk_size, n_subjects) | |
| chunk_result = compute_overlap_batch_gpu( | |
| means1_batch[i:end_idx], | |
| vars1_batch[i:end_idx], | |
| means2_batch[i:end_idx], | |
| vars2_batch[i:end_idx], | |
| **kwargs | |
| ) | |
| results.append(chunk_result) | |
| return np.concatenate(results, axis=0) | |
| def benchmark_gpu_vs_cpu(): | |
| """Benchmark GPU vs CPU performance on sample data.""" | |
| if not GPU_AVAILABLE: | |
| print("GPU not available for benchmarking") | |
| return | |
| import time | |
| # Create test data | |
| n_subjects = 10 | |
| n_features = 4 | |
| print(f"π§ Benchmarking with {n_subjects} subjects, {n_features} features...") | |
| means1 = np.random.randn(n_subjects, 150, n_features).astype(np.float32) | |
| vars1 = np.abs(np.random.randn(n_subjects, 150, n_features)).astype(np.float32) + 0.1 | |
| means2 = np.random.randn(n_subjects, 150, n_features).astype(np.float32) | |
| vars2 = np.abs(np.random.randn(n_subjects, 150, n_features)).astype(np.float32) + 0.1 | |
| # Warm up GPU | |
| if GPU_AVAILABLE: | |
| _ = compute_overlap_batch_gpu(means1[:2], vars1[:2], means2[:2], vars2[:2]) | |
| # Benchmark GPU | |
| if GPU_AVAILABLE: | |
| start = time.time() | |
| result_gpu = compute_overlap_batch_gpu(means1, vars1, means2, vars2) | |
| gpu_time = time.time() - start | |
| print(f"π GPU time: {gpu_time:.4f} seconds") | |
| else: | |
| result_gpu = None | |
| gpu_time = float('inf') | |
| # Benchmark CPU (Numba fallback) | |
| try: | |
| from .numba_overlap import compute_overlap_batch | |
| start = time.time() | |
| result_cpu = compute_overlap_batch(means1, vars1, means2, vars2) | |
| cpu_time = time.time() - start | |
| print(f"π§ CPU time: {cpu_time:.4f} seconds") | |
| if GPU_AVAILABLE and result_gpu is not None: | |
| speedup = cpu_time / gpu_time | |
| print(f"π GPU Speedup: {speedup:.1f}x") | |
| # Check accuracy | |
| max_diff = np.max(np.abs(result_gpu.astype(np.float64) - result_cpu)) | |
| print(f"π― Max difference: {max_diff:.2e}") | |
| except ImportError: | |
| print("β Numba not available for CPU comparison") | |
| def compute_overlap_batch_gpu_mega(all_means1_batch, all_vars1_batch, all_means2_batch, all_vars2_batch, | |
| valid_mask, tol=1e-12, biomechanical_filter=False): | |
| """ | |
| MEGA-BATCH GPU computation: Process ALL task pairs simultaneously. | |
| This is the ultimate "throw everything in" approach for maximum GPU utilization. | |
| Processes hundreds of task pairs Γ subjects Γ phase pairs in a single GPU call. | |
| Parameters: | |
| all_means1_batch: np.ndarray shape (n_task_pairs, n_subjects_max, 150, n_features) | |
| all_vars1_batch: np.ndarray shape (n_task_pairs, n_subjects_max, 150, n_features) | |
| all_means2_batch: np.ndarray shape (n_task_pairs, n_subjects_max, 150, n_features) | |
| all_vars2_batch: np.ndarray shape (n_task_pairs, n_subjects_max, 150, n_features) | |
| valid_mask: np.ndarray shape (n_task_pairs, n_subjects_max) - bool mask for valid subjects | |
| tol: float, tolerance for variance validity | |
| biomechanical_filter: bool, apply biomechanical filtering | |
| Returns: | |
| np.ndarray shape (n_task_pairs, n_subjects_max, 150, 150) - overlap values | |
| """ | |
| if not GPU_AVAILABLE: | |
| raise RuntimeError("CuPy not available for mega-batch GPU computation") | |
| n_task_pairs, n_subjects_max, n_phases, n_features = all_means1_batch.shape | |
| print(f"π GPU Mega-batch: Processing {n_task_pairs} task pairs Γ {n_subjects_max} subjects Γ {150*150} phase pairs") | |
| print(f"π Total computations: {n_task_pairs * n_subjects_max * 150 * 150:,}") | |
| # Transfer ALL data to GPU in single transfer | |
| means1_gpu = cp.asarray(all_means1_batch, dtype=cp.float32) | |
| vars1_gpu = cp.asarray(all_vars1_batch, dtype=cp.float32) | |
| means2_gpu = cp.asarray(all_means2_batch, dtype=cp.float32) | |
| vars2_gpu = cp.asarray(all_vars2_batch, dtype=cp.float32) | |
| valid_gpu = cp.asarray(valid_mask, dtype=cp.bool_) | |
| # Pre-allocate output on GPU | |
| overlap_batch_gpu = cp.zeros((n_task_pairs, n_subjects_max, 150, 150), dtype=cp.float32) | |
| # MEGA BROADCASTING: Process ALL task pairs and subjects simultaneously | |
| # Shape transformations for 5D broadcasting: | |
| # (n_task_pairs, n_subjects_max, 150, 1, n_features) vs (n_task_pairs, n_subjects_max, 1, 150, n_features) | |
| means1_exp = means1_gpu[:, :, :, cp.newaxis, :] # Add phase_j dimension | |
| vars1_exp = vars1_gpu[:, :, :, cp.newaxis, :] | |
| means2_exp = means2_gpu[:, :, cp.newaxis, :, :] # Add phase_i dimension | |
| vars2_exp = vars2_gpu[:, :, cp.newaxis, :, :] | |
| # Compute ALL differences and variance sums simultaneously | |
| # Shape: (n_task_pairs, n_subjects_max, 150, 150, n_features) | |
| diff = means1_exp - means2_exp | |
| var_sum = vars1_exp + vars2_exp | |
| # Create mega validity mask | |
| # Shape: (n_task_pairs, n_subjects_max, 150, 150) | |
| subject_valid = valid_gpu[:, :, cp.newaxis, cp.newaxis] # Broadcast to all phase pairs | |
| # NaN and variance validity for ALL data simultaneously | |
| nan_valid = (~cp.isnan(diff).any(axis=4) & | |
| ~cp.isnan(var_sum).any(axis=4) & | |
| (var_sum > tol).all(axis=4)) | |
| # Combined validity mask | |
| full_valid_mask = subject_valid & nan_valid | |
| # Compute quadratic form for ALL valid entries | |
| quad_terms = cp.where(full_valid_mask[:, :, :, :, cp.newaxis], | |
| diff * diff / var_sum, | |
| 0.0) | |
| # Sum over features for ALL task pairs simultaneously | |
| quad_sum = cp.sum(quad_terms, axis=4) # Shape: (n_task_pairs, n_subjects_max, 150, 150) | |
| # Apply exponential with underflow protection | |
| safe_exp_mask = full_valid_mask & (quad_sum * 0.5 <= 20.0) | |
| overlap_batch_gpu = cp.where(safe_exp_mask, | |
| cp.exp(-0.5 * quad_sum), | |
| 0.0) | |
| # Apply biomechanical filtering if requested | |
| if biomechanical_filter: | |
| overlap_batch_gpu = _apply_biomechanical_filter_gpu_mega( | |
| overlap_batch_gpu, means1_gpu, vars1_gpu, means2_gpu, vars2_gpu, valid_gpu, tol | |
| ) | |
| # Transfer back to CPU - single transfer for ALL results | |
| print("π₯ Transferring results from GPU...") | |
| result = cp.asnumpy(overlap_batch_gpu).astype(np.float64) | |
| # Final clipping | |
| np.clip(result, 0.0, 1.0, out=result) | |
| print(f"β Mega-batch GPU computation complete!") | |
| return result | |
| def _apply_biomechanical_filter_gpu_mega(overlap_batch, means1_batch, vars1_batch, | |
| means2_batch, vars2_batch, valid_mask, tol): | |
| """Apply biomechanical filtering for mega-batch on GPU.""" | |
| negligible_threshold = 0.1 | |
| ampable_threshold = 0.2 | |
| ci_factor = 1.96 | |
| n_task_pairs, n_subjects_max = overlap_batch.shape[:2] | |
| # Only process first feature (torque) for biomechanical filtering | |
| means1_torque = means1_batch[:, :, :, 0] # Shape: (n_task_pairs, n_subjects_max, 150) | |
| means2_torque = means2_batch[:, :, :, 0] | |
| vars1_torque = vars1_batch[:, :, :, 0] | |
| vars2_torque = vars2_batch[:, :, :, 0] | |
| # Vectorized std and CI calculations for ALL task pairs | |
| std1 = cp.sqrt(vars1_torque) | |
| std2 = cp.sqrt(vars2_torque) | |
| ci_lo1 = means1_torque - ci_factor * std1 | |
| ci_hi1 = means1_torque + ci_factor * std1 | |
| ci_lo2 = means2_torque - ci_factor * std2 | |
| ci_hi2 = means2_torque + ci_factor * std2 | |
| # Vectorized mask computation for ALL task pairs | |
| negligible1 = ((ci_lo1 >= -negligible_threshold) & | |
| (ci_hi1 <= negligible_threshold)) | |
| negligible2 = ((ci_lo2 >= -negligible_threshold) & | |
| (ci_hi2 <= negligible_threshold)) | |
| ampable1 = cp.abs(means1_torque) > ampable_threshold | |
| ampable2 = cp.abs(means2_torque) > ampable_threshold | |
| # Broadcast to phase pair dimensions | |
| # Shape: (n_task_pairs, n_subjects_max, 150, 1) | |
| neg1_exp = negligible1[:, :, :, cp.newaxis] | |
| amp1_exp = ampable1[:, :, :, cp.newaxis] | |
| # Shape: (n_task_pairs, n_subjects_max, 1, 150) | |
| neg2_exp = negligible2[:, :, cp.newaxis, :] | |
| amp2_exp = ampable2[:, :, cp.newaxis, :] | |
| # Apply subject validity mask | |
| valid_exp = valid_mask[:, :, cp.newaxis, cp.newaxis] | |
| # Three-level filtering masks for ALL task pairs | |
| m0 = (neg1_exp & neg2_exp) & valid_exp # Negligible-negligible | |
| m1 = ((neg1_exp & amp2_exp) | (neg2_exp & amp1_exp)) & valid_exp # Amplitude conflicts | |
| m2 = ~(m0 | m1) & valid_exp # Sign reversal cases | |
| # Apply negligible-negligible rule | |
| overlap_batch = cp.where(m0, 1.0, overlap_batch) | |
| # Apply sign reversal filtering for m2 cases (if any exist) | |
| if cp.any(m2): | |
| # For mega-batch, we'll use a simplified linear ramp for performance | |
| # (Full probability calculation would be too expensive for this scale) | |
| # Get phase indices for m2 cases | |
| t_idx, s_idx, i_idx, j_idx = cp.where(m2) | |
| if len(t_idx) > 0: | |
| # Mean-difference penalty (vectorized) | |
| mean_diff = cp.abs(means1_torque[t_idx, s_idx, i_idx] - | |
| means2_torque[t_idx, s_idx, j_idx]) | |
| # Linear ramp penalty (simplified for mega-batch performance) | |
| s_thresh, e_thresh = 0.2, 0.5 | |
| penalty = cp.clip((mean_diff - s_thresh) / (e_thresh - s_thresh), 0.0, 1.0) | |
| # Apply penalty to overlaps | |
| current_overlaps = overlap_batch[t_idx, s_idx, i_idx, j_idx] | |
| output_diff = 1.0 - current_overlaps | |
| scaled_output_diff = output_diff * penalty | |
| overlap_batch[t_idx, s_idx, i_idx, j_idx] = 1.0 - scaled_output_diff | |
| return overlap_batch | |
| def estimate_mega_batch_memory(n_task_pairs, n_subjects_max, n_features): | |
| """ | |
| Estimate GPU memory requirements for mega-batch processing. | |
| CRITICAL: This accounts for the 5D broadcasting that happens during GPU computation: | |
| - Input: (n_task_pairs, n_subjects_max, 150, n_features) | |
| - Broadcast to: (n_task_pairs, n_subjects_max, 150, 150, n_features) for computation | |
| - The 150x150 expansion is the killer for large feature counts! | |
| """ | |
| # Input arrays (pre-broadcasting) | |
| input_size = 4 * n_task_pairs * n_subjects_max * 150 * n_features * 4 # 4 input arrays | |
| # Output array | |
| output_size = n_task_pairs * n_subjects_max * 150 * 150 * 4 | |
| # CRITICAL: 5D broadcasting intermediate tensors during computation | |
| # These are the real memory hogs: (n_task_pairs, n_subjects_max, 150, 150, n_features) | |
| broadcast_5d_size = n_task_pairs * n_subjects_max * 150 * 150 * n_features * 4 | |
| # We need multiple of these simultaneously (diff, var_sum, quad_terms, etc.) | |
| intermediate_5d_size = broadcast_5d_size * 4 # Conservative estimate: 4 large 5D tensors | |
| total_bytes = input_size + output_size + intermediate_5d_size | |
| total_gb = total_bytes / (1024**3) | |
| return total_gb | |
| def get_available_gpu_memory_gb(): | |
| """Get available GPU memory in GB.""" | |
| if not GPU_AVAILABLE: | |
| return 0.0 | |
| try: | |
| # Get GPU memory info directly from CuPy device | |
| device = cp.cuda.Device() | |
| total_mem = device.mem_info[1] # Total memory | |
| used_mem = device.mem_info[1] - device.mem_info[0] # Used = Total - Free | |
| # Use 70% of free memory as safety margin | |
| free_mem = device.mem_info[0] * 0.7 | |
| available_gb = free_mem / (1024**3) | |
| return max(0.5, available_gb) # Ensure at least 0.5GB for minimal chunking | |
| except: | |
| # Fallback: assume 5GB available for RTX series | |
| return 5.0 | |
| def calculate_optimal_chunk_size(total_pairs, n_subjects_max, n_features, target_memory_gb=None): | |
| """Calculate optimal chunk size based on available GPU memory.""" | |
| if not GPU_AVAILABLE: | |
| return 1 | |
| if target_memory_gb is None: | |
| target_memory_gb = get_available_gpu_memory_gb() | |
| # Binary search for optimal chunk size | |
| min_chunk = 1 | |
| max_chunk = total_pairs | |
| optimal_chunk = 1 | |
| while min_chunk <= max_chunk: | |
| mid_chunk = (min_chunk + max_chunk) // 2 | |
| memory_needed = estimate_mega_batch_memory(mid_chunk, n_subjects_max, n_features) | |
| if memory_needed <= target_memory_gb: | |
| optimal_chunk = mid_chunk | |
| min_chunk = mid_chunk + 1 | |
| else: | |
| max_chunk = mid_chunk - 1 | |
| # Ensure at least 1 task pair per chunk | |
| return max(1, optimal_chunk) | |
| def get_available_ram_gb(): | |
| """Get available system RAM in GB.""" | |
| try: | |
| import psutil | |
| available_ram_gb = psutil.virtual_memory().available / (1024**3) | |
| return available_ram_gb | |
| except ImportError: | |
| # Fallback: assume 16GB available (conservative) | |
| return 16.0 | |
| def calculate_ram_max_chunk_size(n_subjects_max, n_features, available_ram_gb): | |
| """Calculate maximum chunk size based on available RAM for numpy arrays.""" | |
| # Each chunk needs 4 arrays: all_means1, all_vars1, all_means2, all_vars2 | |
| # Shape per array: (chunk_size, n_subjects_max, 150, n_features) | |
| # Each element: 4 bytes (float32) | |
| bytes_per_task_pair = 4 * n_subjects_max * 150 * n_features * 4 # 4 arrays Γ 4 bytes | |
| # Use 70% of available RAM as safety margin | |
| safe_ram_bytes = available_ram_gb * 0.7 * (1024**3) | |
| max_chunk_size = int(safe_ram_bytes / bytes_per_task_pair) | |
| return max(1, max_chunk_size) | |
| def calculate_optimal_chunk_size_dual_constraint(total_pairs, n_subjects_max, n_features): | |
| """ | |
| Calculate optimal chunk size considering BOTH GPU memory and system RAM constraints. | |
| This prevents out-of-memory errors by respecting both: | |
| 1. GPU memory limits (for CuPy processing) | |
| 2. System RAM limits (for numpy array allocation) | |
| CRITICAL: For very large feature counts (>100), the 5D broadcasting becomes | |
| prohibitively expensive, so we use much more conservative estimates. | |
| Returns the minimum chunk size that satisfies both constraints. | |
| """ | |
| if not GPU_AVAILABLE: | |
| return 1 | |
| # Get available memory for both constraints | |
| gpu_memory_gb = get_available_gpu_memory_gb() | |
| ram_memory_gb = get_available_ram_gb() | |
| # CRITICAL: For large feature counts, the 5D broadcasting dominates memory usage | |
| # We need to be much more conservative | |
| if n_features > 100: | |
| print(f"β οΈ Large feature count ({n_features}) detected - using conservative chunking") | |
| # For large features, memory usage scales roughly with features^2 due to broadcasting | |
| # Use a much smaller base and scale down aggressively | |
| feature_penalty = (n_features / 100) ** 1.5 # Exponential penalty | |
| conservative_gpu_memory = gpu_memory_gb / feature_penalty | |
| conservative_ram_memory = ram_memory_gb / (feature_penalty * 0.5) # RAM less affected | |
| gpu_max_chunk = calculate_optimal_chunk_size(total_pairs, n_subjects_max, n_features, conservative_gpu_memory) | |
| ram_max_chunk = calculate_ram_max_chunk_size(n_subjects_max, n_features, conservative_ram_memory) | |
| else: | |
| # Normal calculation for reasonable feature counts | |
| gpu_max_chunk = calculate_optimal_chunk_size(total_pairs, n_subjects_max, n_features, gpu_memory_gb) | |
| ram_max_chunk = calculate_ram_max_chunk_size(n_subjects_max, n_features, ram_memory_gb) | |
| # Use the most restrictive constraint | |
| optimal_chunk = min(gpu_max_chunk, ram_max_chunk, total_pairs) | |
| print(f"π§ Dual-constraint analysis:") | |
| print(f" GPU memory: {gpu_memory_gb:.2f} GB β max {gpu_max_chunk} pairs") | |
| print(f" RAM memory: {ram_memory_gb:.2f} GB β max {ram_max_chunk} pairs") | |
| print(f" Using most restrictive: {optimal_chunk} pairs per chunk") | |
| # For very large feature counts, ensure we don't go too high | |
| if n_features > 100: | |
| # Cap at a reasonable maximum for large feature counts | |
| max_safe_chunk = max(1, int(50000 / n_features)) # Rough heuristic | |
| optimal_chunk = min(optimal_chunk, max_safe_chunk) | |
| if optimal_chunk == max_safe_chunk: | |
| print(f" π Capped at {optimal_chunk} pairs due to large feature count") | |
| return max(1, optimal_chunk) | |
| def compute_overlap_batch_gpu_mega_chunked(all_means1_batch, all_vars1_batch, all_means2_batch, all_vars2_batch, | |
| valid_mask, tol=1e-12, biomechanical_filter=False, progress_callback=None): | |
| """ | |
| Chunked mega-batch GPU computation: Process task pairs in optimal chunks. | |
| Automatically determines chunk size based on available GPU memory and processes | |
| task pairs in chunks while maintaining all subjects per chunk for maximum efficiency. | |
| Parameters: | |
| all_means1_batch: np.ndarray shape (n_task_pairs, n_subjects_max, 150, n_features) | |
| all_vars1_batch: np.ndarray shape (n_task_pairs, n_subjects_max, 150, n_features) | |
| all_means2_batch: np.ndarray shape (n_task_pairs, n_subjects_max, 150, n_features) | |
| all_vars2_batch: np.ndarray shape (n_task_pairs, n_subjects_max, 150, n_features) | |
| valid_mask: np.ndarray shape (n_task_pairs, n_subjects_max) - bool mask for valid subjects | |
| tol: float, tolerance for variance validity | |
| biomechanical_filter: bool, apply biomechanical filtering | |
| progress_callback: callable, progress reporting function | |
| Returns: | |
| np.ndarray shape (n_task_pairs, n_subjects_max, 150, 150) - overlap values | |
| """ | |
| if not GPU_AVAILABLE: | |
| raise RuntimeError("CuPy not available for chunked mega-batch GPU computation") | |
| n_task_pairs, n_subjects_max, n_phases, n_features = all_means1_batch.shape | |
| # Calculate optimal chunk size using dual constraints (GPU + RAM) | |
| chunk_size = calculate_optimal_chunk_size_dual_constraint(n_task_pairs, n_subjects_max, n_features) | |
| print(f"π§ Chunking Strategy:") | |
| print(f" Total task pairs: {n_task_pairs:,}") | |
| print(f" Optimal chunk size: {chunk_size:,} task pairs") | |
| print(f" Number of chunks: {(n_task_pairs + chunk_size - 1) // chunk_size}") | |
| # Try single batch first, but catch out-of-memory errors | |
| if chunk_size >= n_task_pairs: | |
| print("π Attempting single mega-batch processing...") | |
| try: | |
| return compute_overlap_batch_gpu_mega( | |
| all_means1_batch, all_vars1_batch, all_means2_batch, all_vars2_batch, | |
| valid_mask, tol, biomechanical_filter | |
| ) | |
| except Exception as e: | |
| if "OutOfMemoryError" in str(type(e)) or "out of memory" in str(e).lower(): | |
| print(f"β οΈ Single batch failed with memory error, forcing chunking...") | |
| # Recalculate with much more conservative memory estimate | |
| conservative_memory = min(available_memory * 0.3, 3.0) # Use max 3GB or 30% of available | |
| chunk_size = calculate_optimal_chunk_size(n_task_pairs, n_subjects_max, n_features, conservative_memory) | |
| chunk_size = max(1, chunk_size // 2) # Further reduce chunk size | |
| print(f"π§ Fallback chunk size: {chunk_size} pairs (conservative estimate)") | |
| else: | |
| raise e | |
| # Process in chunks | |
| print(f"π Processing {n_task_pairs:,} task pairs in chunks of {chunk_size:,}...") | |
| results = [] | |
| for chunk_start in range(0, n_task_pairs, chunk_size): | |
| chunk_end = min(chunk_start + chunk_size, n_task_pairs) | |
| chunk_num = len(results) + 1 | |
| total_chunks = (n_task_pairs + chunk_size - 1) // chunk_size | |
| print(f"π Processing chunk {chunk_num}/{total_chunks} (task pairs {chunk_start}:{chunk_end})...") | |
| # Extract chunk data | |
| chunk_means1 = all_means1_batch[chunk_start:chunk_end] | |
| chunk_vars1 = all_vars1_batch[chunk_start:chunk_end] | |
| chunk_means2 = all_means2_batch[chunk_start:chunk_end] | |
| chunk_vars2 = all_vars2_batch[chunk_start:chunk_end] | |
| chunk_valid = valid_mask[chunk_start:chunk_end] | |
| # Process chunk with additional error handling | |
| import time | |
| start_time = time.time() | |
| try: | |
| chunk_result = compute_overlap_batch_gpu_mega( | |
| chunk_means1, chunk_vars1, chunk_means2, chunk_vars2, | |
| chunk_valid, tol, biomechanical_filter | |
| ) | |
| chunk_time = time.time() - start_time | |
| except Exception as e: | |
| if "OutOfMemoryError" in str(type(e)) or "out of memory" in str(e).lower(): | |
| print(f" β οΈ Chunk {chunk_num} still too large, attempting progressive reduction...") | |
| # Progressive reduction: try smaller and smaller chunks | |
| chunk_result = _process_chunk_with_progressive_reduction( | |
| chunk_means1, chunk_vars1, chunk_means2, chunk_vars2, | |
| chunk_valid, tol, biomechanical_filter, chunk_num | |
| ) | |
| chunk_time = time.time() - start_time | |
| else: | |
| raise e | |
| results.append(chunk_result) | |
| # Progress reporting | |
| progress = (chunk_end) / n_task_pairs | |
| if progress_callback: | |
| progress_callback(progress * 0.9) # Save 10% for final aggregation | |
| # Performance metrics | |
| chunk_pairs = chunk_end - chunk_start | |
| valid_computations = np.sum(chunk_valid) * 150 * 150 | |
| throughput = valid_computations / chunk_time if chunk_time > 0 else 0 | |
| print(f" β Chunk {chunk_num} complete: {chunk_time:.2f}s, {throughput:,.0f} computations/sec") | |
| # Memory cleanup | |
| if GPU_AVAILABLE: | |
| cp.get_default_memory_pool().free_all_blocks() | |
| print("π§ Combining chunk results...") | |
| final_result = np.concatenate(results, axis=0) | |
| if progress_callback: | |
| progress_callback(1.0) | |
| print(f"β Chunked mega-batch processing complete!") | |
| print(f"π Final result shape: {final_result.shape}") | |
| return final_result | |
| def _process_chunk_with_progressive_reduction(chunk_means1, chunk_vars1, chunk_means2, chunk_vars2, | |
| chunk_valid, tol, biomechanical_filter, chunk_num): | |
| """ | |
| Process a chunk with progressive size reduction if out-of-memory errors occur. | |
| Tries progressively smaller sub-chunks until successful or reaches minimum size. | |
| """ | |
| chunk_size = chunk_means1.shape[0] | |
| # Try progressively smaller sub-chunks: 50%, 25%, 12.5%, etc. | |
| reduction_factors = [0.5, 0.25, 0.125, 0.0625] # Down to 1/16th | |
| for factor in reduction_factors: | |
| sub_chunk_size = max(1, int(chunk_size * factor)) | |
| print(f" π Trying sub-chunk size: {sub_chunk_size} pairs ({factor*100:.1f}% of original)") | |
| try: | |
| # Process the chunk in sub-chunks | |
| sub_results = [] | |
| for start_idx in range(0, chunk_size, sub_chunk_size): | |
| end_idx = min(start_idx + sub_chunk_size, chunk_size) | |
| sub_result = compute_overlap_batch_gpu_mega( | |
| chunk_means1[start_idx:end_idx], | |
| chunk_vars1[start_idx:end_idx], | |
| chunk_means2[start_idx:end_idx], | |
| chunk_vars2[start_idx:end_idx], | |
| chunk_valid[start_idx:end_idx], | |
| tol, biomechanical_filter | |
| ) | |
| sub_results.append(sub_result) | |
| # Clear GPU memory between sub-chunks | |
| if GPU_AVAILABLE: | |
| cp.get_default_memory_pool().free_all_blocks() | |
| # Combine all sub-results | |
| final_result = np.concatenate(sub_results, axis=0) | |
| print(f" β Progressive reduction successful with {sub_chunk_size}-pair sub-chunks") | |
| return final_result | |
| except Exception as e: | |
| if "OutOfMemoryError" in str(type(e)) or "out of memory" in str(e).lower(): | |
| print(f" β Sub-chunk size {sub_chunk_size} still too large") | |
| continue | |
| else: | |
| raise e | |
| # If all reduction attempts failed, we need to fall back to sequential processing | |
| # Processing one pair at a time with GPU overhead is actually slower than CPU | |
| print(f" β All reduction attempts failed - chunk too large for GPU mega-batch") | |
| print(f" π‘ Recommendation: Use smaller time windows or switch to sequential processing") | |
| print(f" π Falling back to CPU-based processing for this chunk...") | |
| # Fall back to CPU processing for this chunk | |
| try: | |
| from .numba_overlap import compute_overlap_batch_numba_ultra_fast | |
| # Process on CPU using Numba (much faster than single GPU pairs) | |
| cpu_results = [] | |
| for i in range(chunk_size): | |
| means1_i = chunk_means1[i] # Shape: (n_subjects, 150, n_features) | |
| vars1_i = chunk_vars1[i] | |
| means2_i = chunk_means2[i] | |
| vars2_i = chunk_vars2[i] | |
| valid_i = chunk_valid[i] # Shape: (n_subjects,) | |
| # Process valid subjects only | |
| valid_indices = np.where(valid_i)[0] | |
| if len(valid_indices) > 0: | |
| cpu_result = compute_overlap_batch_numba_ultra_fast( | |
| means1_i[valid_indices], vars1_i[valid_indices], | |
| means2_i[valid_indices], vars2_i[valid_indices] | |
| ) | |
| # Reshape to expected format | |
| full_result = np.zeros((1, chunk_valid.shape[1], 150, 150), dtype=np.float32) | |
| full_result[0, valid_indices] = cpu_result | |
| cpu_results.append(full_result) | |
| else: | |
| # No valid subjects | |
| empty_result = np.zeros((1, chunk_valid.shape[1], 150, 150), dtype=np.float32) | |
| cpu_results.append(empty_result) | |
| final_result = np.concatenate(cpu_results, axis=0) | |
| print(f" β CPU fallback processing completed") | |
| return final_result | |
| except ImportError: | |
| print(f" β CPU fallback not available - creating zero results") | |
| # Last resort: return zeros | |
| final_result = np.zeros((chunk_size, chunk_valid.shape[1], 150, 150), dtype=np.float32) | |
| return final_result | |
| if __name__ == "__main__": | |
| print("π§ͺ Testing GPU overlap calculation...") | |
| if GPU_AVAILABLE: | |
| benchmark_gpu_vs_cpu() | |
| # Test mega-batch functionality | |
| print("\nπ Testing mega-batch functionality...") | |
| # Create test data for multiple task pairs | |
| n_task_pairs = 5 | |
| n_subjects_max = 3 | |
| n_features = 4 | |
| all_means1 = np.random.randn(n_task_pairs, n_subjects_max, 150, n_features).astype(np.float32) | |
| all_vars1 = np.abs(np.random.randn(n_task_pairs, n_subjects_max, 150, n_features)).astype(np.float32) + 0.1 | |
| all_means2 = np.random.randn(n_task_pairs, n_subjects_max, 150, n_features).astype(np.float32) | |
| all_vars2 = np.abs(np.random.randn(n_task_pairs, n_subjects_max, 150, n_features)).astype(np.float32) + 0.1 | |
| valid_mask = np.ones((n_task_pairs, n_subjects_max), dtype=bool) | |
| import time | |
| start = time.time() | |
| result = compute_overlap_batch_gpu_mega(all_means1, all_vars1, all_means2, all_vars2, valid_mask) | |
| end = time.time() | |
| print(f"β Mega-batch result shape: {result.shape}") | |
| print(f"β±οΈ Mega-batch time: {end - start:.4f}s") | |
| print(f"π Throughput: {n_task_pairs * n_subjects_max * 150 * 150 / (end - start):,.0f} computations/sec") | |
| else: | |
| print("β GPU testing requires CuPy and CUDA") |