| """ |
| TDOA-Based Direction of Arrival (DoA) Estimation |
| |
| Calculates Time Difference of Arrival (TDOA) between microphone pairs |
| to estimate the direction of arrival for each speaker. |
| |
| Hearing Aid Array Configuration: |
| - Left Front (LF): Channel 0 |
| - Left Rear (LR): Channel 1 |
| - Right Front (RF): Channel 2 |
| - Right Rear (RR): Channel 3 |
| |
| Microphone spacing: ~15mm (typical hearing aid array) |
| """ |
|
|
| import numpy as np |
| from scipy.fft import fft, ifft |
| import warnings |
|
|
| |
| MICROPHONE_SPACING_MM = 15.0 |
| MICROPHONE_SPACING_M = MICROPHONE_SPACING_MM / 1000.0 |
|
|
|
|
| def compute_cross_correlation(signal1, signal2, max_lag_ms=5.0, sr=44100): |
| """ |
| Compute cross-correlation between two signals to find time delay using FFT. |
| |
| Args: |
| signal1: Reference signal (channel 0) |
| signal2: Test signal (channel 1) |
| max_lag_ms: Maximum lag to search (milliseconds) |
| sr: Sample rate (Hz) |
| |
| Returns: |
| lag_samples: Delay in samples (positive = signal2 lags signal1) |
| correlation: Peak cross-correlation value |
| """ |
| max_lag_samples = int(sr * max_lag_ms / 1000) |
|
|
| |
| window_samples = min(len(signal1), int(2 * sr)) |
| sig1_windowed = signal1[:window_samples] |
| sig2_windowed = signal2[:window_samples] |
|
|
| |
| sig1_norm = sig1_windowed / (np.std(sig1_windowed) + 1e-10) |
| sig2_norm = sig2_windowed / (np.std(sig2_windowed) + 1e-10) |
|
|
| |
| fft_len = 2 ** int(np.ceil(np.log2(len(sig1_norm) + len(sig2_norm) - 1))) |
|
|
| fft1 = fft(sig1_norm, fft_len) |
| fft2 = fft(sig2_norm, fft_len) |
|
|
| correlation_fft = ifft(fft1 * np.conj(fft2)).real |
| correlation = np.concatenate([correlation_fft[-(max_lag_samples):], correlation_fft[:(max_lag_samples+1)]]) |
|
|
| |
| peak_idx = np.argmax(np.abs(correlation)) |
| lag_samples = peak_idx - max_lag_samples |
| peak_value = correlation[peak_idx] |
|
|
| return lag_samples, peak_value / (np.max(np.abs(correlation)) + 1e-10) |
|
|
|
|
| def estimate_doa_from_tdoa(lag_samples, sr, mic_spacing_m=MICROPHONE_SPACING_M, speed_of_sound=343.0): |
| """ |
| Convert TDOA (time delay) to Direction of Arrival angle. |
| |
| For a linear array or 2D microphone configuration: |
| - DoA = arcsin(c * tau / d) |
| where: |
| - c = speed of sound |
| - tau = time delay |
| - d = microphone spacing |
| |
| Args: |
| lag_samples: Delay in samples |
| sr: Sample rate (Hz) |
| mic_spacing_m: Microphone spacing (meters) |
| speed_of_sound: Speed of sound (m/s) |
| |
| Returns: |
| doa_deg: Direction of Arrival in degrees |
| """ |
| tau = lag_samples / sr |
|
|
| |
| arg = (speed_of_sound * tau) / mic_spacing_m |
|
|
| |
| arg = np.clip(arg, -1.0, 1.0) |
|
|
| |
| doa_rad = np.arcsin(arg) |
| doa_deg = np.degrees(doa_rad) |
|
|
| return doa_deg |
|
|
|
|
| def estimate_speaker_doa(stereo_signal, sr, speaker_energy_threshold=0.01): |
| """ |
| Estimate Direction of Arrival for a speaker using microphone pair TDOA analysis. |
| |
| For 4-channel hearing aid array, compute DoA using multiple pairs: |
| - Front pair (LF-RF): Gives left/right angle |
| - Rear pair (LR-RR): Confirms left/right angle |
| - Front-rear pairs (LF-LR, RF-RR): Gives front/rear angle |
| |
| Args: |
| stereo_signal: 2D array of shape (n_samples, 2) or (n_samples, 4) |
| sr: Sample rate |
| speaker_energy_threshold: Skip if speaker energy below threshold |
| |
| Returns: |
| doa_dict: Dictionary with DoA estimates |
| - 'lr_angle': Left-right direction (-90 to +90, negative=left) |
| - 'fb_angle': Front-back direction |
| - 'estimated_angle': Primary DoA estimate (0°=front, 90°=right, 180°=rear, 270°=left) |
| - 'confidence': Confidence score (0-1) |
| """ |
| |
| if stereo_signal.ndim == 1: |
| return None |
|
|
| if stereo_signal.shape[1] < 2: |
| return None |
|
|
| |
| signal_energy = np.mean(stereo_signal ** 2) |
| if signal_energy < speaker_energy_threshold: |
| return None |
|
|
| doa_dict = { |
| 'lr_angle': None, |
| 'fb_angle': None, |
| 'estimated_angle': None, |
| 'confidence': 0.0, |
| 'method': 'TDOA cross-correlation' |
| } |
|
|
| try: |
| |
| if stereo_signal.shape[1] >= 2: |
| ch0 = stereo_signal[:, 0] |
| ch1 = stereo_signal[:, 1] |
|
|
| lag, xcorr_val = compute_cross_correlation(ch0, ch1, max_lag_ms=2.0, sr=sr) |
|
|
| if xcorr_val > 0.1: |
| lr_angle = estimate_doa_from_tdoa(lag, sr) |
| doa_dict['lr_angle'] = lr_angle |
| doa_dict['lr_confidence'] = xcorr_val |
|
|
| |
| if stereo_signal.shape[1] >= 4: |
| ch0 = stereo_signal[:, 0] |
| ch1 = stereo_signal[:, 1] |
|
|
| lag, xcorr_val = compute_cross_correlation(ch0, ch1, max_lag_ms=2.0, sr=sr) |
|
|
| if xcorr_val > 0.1: |
| fb_angle = estimate_doa_from_tdoa(lag, sr) |
| doa_dict['fb_angle'] = fb_angle |
| doa_dict['fb_confidence'] = xcorr_val |
|
|
| |
| if doa_dict['lr_angle'] is not None: |
| |
| |
| if doa_dict['lr_angle'] < -30: |
| estimated = 270 |
| elif doa_dict['lr_angle'] > 30: |
| estimated = 90 |
| else: |
| estimated = 0 |
|
|
| |
| if doa_dict['fb_angle'] is not None: |
| if doa_dict['fb_angle'] > 10: |
| estimated = 180 |
|
|
| doa_dict['estimated_angle'] = estimated |
|
|
| |
| avg_conf = doa_dict.get('lr_confidence', 0.5) |
| if doa_dict['fb_confidence']: |
| avg_conf = (avg_conf + doa_dict.get('fb_confidence', 0.5)) / 2 |
| doa_dict['confidence'] = avg_conf |
|
|
| return doa_dict |
|
|
| except Exception as e: |
| warnings.warn(f"DoA estimation failed: {e}") |
| return doa_dict |
|
|
|
|
| def estimate_all_speakers_doa(sources, sr, channel_info=None): |
| """ |
| Estimate DoA for multiple separated speaker sources. |
| |
| Args: |
| sources: Array of shape (n_samples, n_speakers) - separated sources |
| sr: Sample rate |
| channel_info: Optional info about which channels to use for each comparison |
| |
| Returns: |
| doa_results: List of DoA dictionaries, one per speaker |
| """ |
| doa_results = [] |
|
|
| for idx in range(sources.shape[1]): |
| source = sources[:, idx] |
|
|
| |
| |
| |
| doa_dict = { |
| 'speaker_id': idx, |
| 'energy': np.sqrt(np.mean(source ** 2)), |
| 'method': 'TDOA cross-correlation (post-separation estimate)', |
| 'note': 'True DoA estimation requires access to original multi-channel mixture', |
| 'estimated_angle': None, |
| 'confidence': 0.0 |
| } |
|
|
| doa_results.append(doa_dict) |
|
|
| return doa_results |
|
|
|
|
| def compute_tdoa_based_doa_for_mixture(mixture_4ch, sr, mic_spacing_m=MICROPHONE_SPACING_M): |
| """ |
| Compute TDOA-based DoA for speaker directions in a 4-channel mixture. |
| |
| This should be called BEFORE separation to get true spatial information. |
| |
| Args: |
| mixture_4ch: 4-channel audio array (n_samples, 4) |
| sr: Sample rate |
| mic_spacing_m: Microphone spacing |
| |
| Returns: |
| doa_estimates: Array of estimated angles |
| """ |
| if mixture_4ch.shape[1] != 4: |
| raise ValueError("Expected 4-channel input") |
|
|
| |
| |
| |
| |
| |
|
|
| doa_estimates = {} |
|
|
| |
| ch_lf = mixture_4ch[:, 0] |
| ch_rf = mixture_4ch[:, 2] |
| lag_front, xcorr_front = compute_cross_correlation(ch_lf, ch_rf, sr=sr) |
| angle_lr = estimate_doa_from_tdoa(lag_front, sr, mic_spacing_m) |
|
|
| doa_estimates['lr_angle'] = angle_lr |
| doa_estimates['lr_xcorr'] = xcorr_front |
|
|
| |
| ch_lf = mixture_4ch[:, 0] |
| ch_lr = mixture_4ch[:, 1] |
| lag_front_rear, xcorr_fr = compute_cross_correlation(ch_lf, ch_lr, sr=sr) |
| angle_fb = estimate_doa_from_tdoa(lag_front_rear, sr, mic_spacing_m) |
|
|
| doa_estimates['fb_angle'] = angle_fb |
| doa_estimates['fb_xcorr'] = xcorr_fr |
|
|
| return doa_estimates |
|
|
|
|
| if __name__ == '__main__': |
|
|
| |
| print("TDOA-Based DoA Estimation Module") |
| print("=" * 50) |
| print("\nUsage:") |
| print(" from doa_tdoa import estimate_speaker_doa, compute_tdoa_based_doa_for_mixture") |
| print(" ") |
| print(" # For 4-channel mixture (before separation):") |
| print(" mixture, sr = sf.read('example.wav')") |
| print(" doa_estimates = compute_tdoa_based_doa_for_mixture(mixture, sr)") |
| print(" ") |
| print(" # For separated speaker:") |
| print(" doa_result = estimate_speaker_doa(separated_speaker, sr)") |
|
|