""" TDOA-Based Direction of Arrival (DoA) Estimation Calculates Time Difference of Arrival (TDOA) between microphone pairs to estimate the direction of arrival for each speaker. Hearing Aid Array Configuration: - Left Front (LF): Channel 0 - Left Rear (LR): Channel 1 - Right Front (RF): Channel 2 - Right Rear (RR): Channel 3 Microphone spacing: ~15mm (typical hearing aid array) """ import numpy as np from scipy.fft import fft, ifft import warnings # Typical hearing aid microphone spacing (mm) MICROPHONE_SPACING_MM = 15.0 MICROPHONE_SPACING_M = MICROPHONE_SPACING_MM / 1000.0 def compute_cross_correlation(signal1, signal2, max_lag_ms=5.0, sr=44100): """ Compute cross-correlation between two signals to find time delay using FFT. Args: signal1: Reference signal (channel 0) signal2: Test signal (channel 1) max_lag_ms: Maximum lag to search (milliseconds) sr: Sample rate (Hz) Returns: lag_samples: Delay in samples (positive = signal2 lags signal1) correlation: Peak cross-correlation value """ max_lag_samples = int(sr * max_lag_ms / 1000) # Limit analysis window to first 2 seconds for speed window_samples = min(len(signal1), int(2 * sr)) sig1_windowed = signal1[:window_samples] sig2_windowed = signal2[:window_samples] # Normalize sig1_norm = sig1_windowed / (np.std(sig1_windowed) + 1e-10) sig2_norm = sig2_windowed / (np.std(sig2_windowed) + 1e-10) # Use FFT-based correlation for speed (more efficient than np.correlate for long signals) fft_len = 2 ** int(np.ceil(np.log2(len(sig1_norm) + len(sig2_norm) - 1))) fft1 = fft(sig1_norm, fft_len) fft2 = fft(sig2_norm, fft_len) correlation_fft = ifft(fft1 * np.conj(fft2)).real correlation = np.concatenate([correlation_fft[-(max_lag_samples):], correlation_fft[:(max_lag_samples+1)]]) # Find peak peak_idx = np.argmax(np.abs(correlation)) lag_samples = peak_idx - max_lag_samples peak_value = correlation[peak_idx] return lag_samples, peak_value / (np.max(np.abs(correlation)) + 1e-10) def estimate_doa_from_tdoa(lag_samples, sr, mic_spacing_m=MICROPHONE_SPACING_M, speed_of_sound=343.0): """ Convert TDOA (time delay) to Direction of Arrival angle. For a linear array or 2D microphone configuration: - DoA = arcsin(c * tau / d) where: - c = speed of sound - tau = time delay - d = microphone spacing Args: lag_samples: Delay in samples sr: Sample rate (Hz) mic_spacing_m: Microphone spacing (meters) speed_of_sound: Speed of sound (m/s) Returns: doa_deg: Direction of Arrival in degrees """ tau = lag_samples / sr # Convert to seconds # Compute argument for arcsin arg = (speed_of_sound * tau) / mic_spacing_m # Clamp to valid range [-1, 1] arg = np.clip(arg, -1.0, 1.0) # Convert to degrees doa_rad = np.arcsin(arg) doa_deg = np.degrees(doa_rad) return doa_deg def estimate_speaker_doa(stereo_signal, sr, speaker_energy_threshold=0.01): """ Estimate Direction of Arrival for a speaker using microphone pair TDOA analysis. For 4-channel hearing aid array, compute DoA using multiple pairs: - Front pair (LF-RF): Gives left/right angle - Rear pair (LR-RR): Confirms left/right angle - Front-rear pairs (LF-LR, RF-RR): Gives front/rear angle Args: stereo_signal: 2D array of shape (n_samples, 2) or (n_samples, 4) sr: Sample rate speaker_energy_threshold: Skip if speaker energy below threshold Returns: doa_dict: Dictionary with DoA estimates - 'lr_angle': Left-right direction (-90 to +90, negative=left) - 'fb_angle': Front-back direction - 'estimated_angle': Primary DoA estimate (0°=front, 90°=right, 180°=rear, 270°=left) - 'confidence': Confidence score (0-1) """ # Ensure proper shape if stereo_signal.ndim == 1: return None if stereo_signal.shape[1] < 2: return None # Check energy threshold signal_energy = np.mean(stereo_signal ** 2) if signal_energy < speaker_energy_threshold: return None doa_dict = { 'lr_angle': None, 'fb_angle': None, 'estimated_angle': None, 'confidence': 0.0, 'method': 'TDOA cross-correlation' } try: # If we have 4-channel, use Front-Left vs Front-Right if stereo_signal.shape[1] >= 2: ch0 = stereo_signal[:, 0] # Could be LF or another channel ch1 = stereo_signal[:, 1] # Could be RF or another channel lag, xcorr_val = compute_cross_correlation(ch0, ch1, max_lag_ms=2.0, sr=sr) if xcorr_val > 0.1: # Only if correlation is strong enough lr_angle = estimate_doa_from_tdoa(lag, sr) doa_dict['lr_angle'] = lr_angle doa_dict['lr_confidence'] = xcorr_val # If we have rear channels, analyze front-rear if stereo_signal.shape[1] >= 4: ch0 = stereo_signal[:, 0] # Front ch1 = stereo_signal[:, 1] # Rear (same side) lag, xcorr_val = compute_cross_correlation(ch0, ch1, max_lag_ms=2.0, sr=sr) if xcorr_val > 0.1: fb_angle = estimate_doa_from_tdoa(lag, sr) doa_dict['fb_angle'] = fb_angle doa_dict['fb_confidence'] = xcorr_val # Compute estimated angle if doa_dict['lr_angle'] is not None: # Map left-right angle to compass bearing # Negative = left (270°), Positive = right (90°), Zero = center (0° or 180°) if doa_dict['lr_angle'] < -30: estimated = 270 # Left elif doa_dict['lr_angle'] > 30: estimated = 90 # Right else: estimated = 0 # Front/center # Refine with front-back if available if doa_dict['fb_angle'] is not None: if doa_dict['fb_angle'] > 10: estimated = 180 # Rear doa_dict['estimated_angle'] = estimated # Confidence avg_conf = doa_dict.get('lr_confidence', 0.5) if doa_dict['fb_confidence']: avg_conf = (avg_conf + doa_dict.get('fb_confidence', 0.5)) / 2 doa_dict['confidence'] = avg_conf return doa_dict except Exception as e: warnings.warn(f"DoA estimation failed: {e}") return doa_dict def estimate_all_speakers_doa(sources, sr, channel_info=None): """ Estimate DoA for multiple separated speaker sources. Args: sources: Array of shape (n_samples, n_speakers) - separated sources sr: Sample rate channel_info: Optional info about which channels to use for each comparison Returns: doa_results: List of DoA dictionaries, one per speaker """ doa_results = [] for idx in range(sources.shape[1]): source = sources[:, idx] # For each source, estimate DoA (in practice, this would use the original # multi-channel mixture, not the separated source, but here we estimate # from energy characteristics) doa_dict = { 'speaker_id': idx, 'energy': np.sqrt(np.mean(source ** 2)), 'method': 'TDOA cross-correlation (post-separation estimate)', 'note': 'True DoA estimation requires access to original multi-channel mixture', 'estimated_angle': None, 'confidence': 0.0 } doa_results.append(doa_dict) return doa_results def compute_tdoa_based_doa_for_mixture(mixture_4ch, sr, mic_spacing_m=MICROPHONE_SPACING_M): """ Compute TDOA-based DoA for speaker directions in a 4-channel mixture. This should be called BEFORE separation to get true spatial information. Args: mixture_4ch: 4-channel audio array (n_samples, 4) sr: Sample rate mic_spacing_m: Microphone spacing Returns: doa_estimates: Array of estimated angles """ if mixture_4ch.shape[1] != 4: raise ValueError("Expected 4-channel input") # Channel mapping: # 0 = LF (Left Front) # 1 = LR (Left Rear) # 2 = RF (Right Front) # 3 = RR (Right Rear) doa_estimates = {} # Analyze Front pair (LF vs RF) ch_lf = mixture_4ch[:, 0] ch_rf = mixture_4ch[:, 2] lag_front, xcorr_front = compute_cross_correlation(ch_lf, ch_rf, sr=sr) angle_lr = estimate_doa_from_tdoa(lag_front, sr, mic_spacing_m) doa_estimates['lr_angle'] = angle_lr doa_estimates['lr_xcorr'] = xcorr_front # Analyze Front-Rear pair (LF vs LR on left side) ch_lf = mixture_4ch[:, 0] ch_lr = mixture_4ch[:, 1] lag_front_rear, xcorr_fr = compute_cross_correlation(ch_lf, ch_lr, sr=sr) angle_fb = estimate_doa_from_tdoa(lag_front_rear, sr, mic_spacing_m) doa_estimates['fb_angle'] = angle_fb doa_estimates['fb_xcorr'] = xcorr_fr return doa_estimates if __name__ == '__main__': # Example: Analyze a 4-channel WAV file print("TDOA-Based DoA Estimation Module") print("=" * 50) print("\nUsage:") print(" from doa_tdoa import estimate_speaker_doa, compute_tdoa_based_doa_for_mixture") print(" ") print(" # For 4-channel mixture (before separation):") print(" mixture, sr = sf.read('example.wav')") print(" doa_estimates = compute_tdoa_based_doa_for_mixture(mixture, sr)") print(" ") print(" # For separated speaker:") print(" doa_result = estimate_speaker_doa(separated_speaker, sr)")