hedrekao
HF deploy: clean snapshot without local artifacts
a361db3
"""
TDOA-Based Direction of Arrival (DoA) Estimation
Calculates Time Difference of Arrival (TDOA) between microphone pairs
to estimate the direction of arrival for each speaker.
Hearing Aid Array Configuration:
- Left Front (LF): Channel 0
- Left Rear (LR): Channel 1
- Right Front (RF): Channel 2
- Right Rear (RR): Channel 3
Microphone spacing: ~15mm (typical hearing aid array)
"""
import numpy as np
from scipy.fft import fft, ifft
import warnings
# Typical hearing aid microphone spacing (mm)
MICROPHONE_SPACING_MM = 15.0
MICROPHONE_SPACING_M = MICROPHONE_SPACING_MM / 1000.0
def compute_cross_correlation(signal1, signal2, max_lag_ms=5.0, sr=44100):
"""
Compute cross-correlation between two signals to find time delay using FFT.
Args:
signal1: Reference signal (channel 0)
signal2: Test signal (channel 1)
max_lag_ms: Maximum lag to search (milliseconds)
sr: Sample rate (Hz)
Returns:
lag_samples: Delay in samples (positive = signal2 lags signal1)
correlation: Peak cross-correlation value
"""
max_lag_samples = int(sr * max_lag_ms / 1000)
# Limit analysis window to first 2 seconds for speed
window_samples = min(len(signal1), int(2 * sr))
sig1_windowed = signal1[:window_samples]
sig2_windowed = signal2[:window_samples]
# Normalize
sig1_norm = sig1_windowed / (np.std(sig1_windowed) + 1e-10)
sig2_norm = sig2_windowed / (np.std(sig2_windowed) + 1e-10)
# Use FFT-based correlation for speed (more efficient than np.correlate for long signals)
fft_len = 2 ** int(np.ceil(np.log2(len(sig1_norm) + len(sig2_norm) - 1)))
fft1 = fft(sig1_norm, fft_len)
fft2 = fft(sig2_norm, fft_len)
correlation_fft = ifft(fft1 * np.conj(fft2)).real
correlation = np.concatenate([correlation_fft[-(max_lag_samples):], correlation_fft[:(max_lag_samples+1)]])
# Find peak
peak_idx = np.argmax(np.abs(correlation))
lag_samples = peak_idx - max_lag_samples
peak_value = correlation[peak_idx]
return lag_samples, peak_value / (np.max(np.abs(correlation)) + 1e-10)
def estimate_doa_from_tdoa(lag_samples, sr, mic_spacing_m=MICROPHONE_SPACING_M, speed_of_sound=343.0):
"""
Convert TDOA (time delay) to Direction of Arrival angle.
For a linear array or 2D microphone configuration:
- DoA = arcsin(c * tau / d)
where:
- c = speed of sound
- tau = time delay
- d = microphone spacing
Args:
lag_samples: Delay in samples
sr: Sample rate (Hz)
mic_spacing_m: Microphone spacing (meters)
speed_of_sound: Speed of sound (m/s)
Returns:
doa_deg: Direction of Arrival in degrees
"""
tau = lag_samples / sr # Convert to seconds
# Compute argument for arcsin
arg = (speed_of_sound * tau) / mic_spacing_m
# Clamp to valid range [-1, 1]
arg = np.clip(arg, -1.0, 1.0)
# Convert to degrees
doa_rad = np.arcsin(arg)
doa_deg = np.degrees(doa_rad)
return doa_deg
def estimate_speaker_doa(stereo_signal, sr, speaker_energy_threshold=0.01):
"""
Estimate Direction of Arrival for a speaker using microphone pair TDOA analysis.
For 4-channel hearing aid array, compute DoA using multiple pairs:
- Front pair (LF-RF): Gives left/right angle
- Rear pair (LR-RR): Confirms left/right angle
- Front-rear pairs (LF-LR, RF-RR): Gives front/rear angle
Args:
stereo_signal: 2D array of shape (n_samples, 2) or (n_samples, 4)
sr: Sample rate
speaker_energy_threshold: Skip if speaker energy below threshold
Returns:
doa_dict: Dictionary with DoA estimates
- 'lr_angle': Left-right direction (-90 to +90, negative=left)
- 'fb_angle': Front-back direction
- 'estimated_angle': Primary DoA estimate (0°=front, 90°=right, 180°=rear, 270°=left)
- 'confidence': Confidence score (0-1)
"""
# Ensure proper shape
if stereo_signal.ndim == 1:
return None
if stereo_signal.shape[1] < 2:
return None
# Check energy threshold
signal_energy = np.mean(stereo_signal ** 2)
if signal_energy < speaker_energy_threshold:
return None
doa_dict = {
'lr_angle': None,
'fb_angle': None,
'estimated_angle': None,
'confidence': 0.0,
'method': 'TDOA cross-correlation'
}
try:
# If we have 4-channel, use Front-Left vs Front-Right
if stereo_signal.shape[1] >= 2:
ch0 = stereo_signal[:, 0] # Could be LF or another channel
ch1 = stereo_signal[:, 1] # Could be RF or another channel
lag, xcorr_val = compute_cross_correlation(ch0, ch1, max_lag_ms=2.0, sr=sr)
if xcorr_val > 0.1: # Only if correlation is strong enough
lr_angle = estimate_doa_from_tdoa(lag, sr)
doa_dict['lr_angle'] = lr_angle
doa_dict['lr_confidence'] = xcorr_val
# If we have rear channels, analyze front-rear
if stereo_signal.shape[1] >= 4:
ch0 = stereo_signal[:, 0] # Front
ch1 = stereo_signal[:, 1] # Rear (same side)
lag, xcorr_val = compute_cross_correlation(ch0, ch1, max_lag_ms=2.0, sr=sr)
if xcorr_val > 0.1:
fb_angle = estimate_doa_from_tdoa(lag, sr)
doa_dict['fb_angle'] = fb_angle
doa_dict['fb_confidence'] = xcorr_val
# Compute estimated angle
if doa_dict['lr_angle'] is not None:
# Map left-right angle to compass bearing
# Negative = left (270°), Positive = right (90°), Zero = center (0° or 180°)
if doa_dict['lr_angle'] < -30:
estimated = 270 # Left
elif doa_dict['lr_angle'] > 30:
estimated = 90 # Right
else:
estimated = 0 # Front/center
# Refine with front-back if available
if doa_dict['fb_angle'] is not None:
if doa_dict['fb_angle'] > 10:
estimated = 180 # Rear
doa_dict['estimated_angle'] = estimated
# Confidence
avg_conf = doa_dict.get('lr_confidence', 0.5)
if doa_dict['fb_confidence']:
avg_conf = (avg_conf + doa_dict.get('fb_confidence', 0.5)) / 2
doa_dict['confidence'] = avg_conf
return doa_dict
except Exception as e:
warnings.warn(f"DoA estimation failed: {e}")
return doa_dict
def estimate_all_speakers_doa(sources, sr, channel_info=None):
"""
Estimate DoA for multiple separated speaker sources.
Args:
sources: Array of shape (n_samples, n_speakers) - separated sources
sr: Sample rate
channel_info: Optional info about which channels to use for each comparison
Returns:
doa_results: List of DoA dictionaries, one per speaker
"""
doa_results = []
for idx in range(sources.shape[1]):
source = sources[:, idx]
# For each source, estimate DoA (in practice, this would use the original
# multi-channel mixture, not the separated source, but here we estimate
# from energy characteristics)
doa_dict = {
'speaker_id': idx,
'energy': np.sqrt(np.mean(source ** 2)),
'method': 'TDOA cross-correlation (post-separation estimate)',
'note': 'True DoA estimation requires access to original multi-channel mixture',
'estimated_angle': None,
'confidence': 0.0
}
doa_results.append(doa_dict)
return doa_results
def compute_tdoa_based_doa_for_mixture(mixture_4ch, sr, mic_spacing_m=MICROPHONE_SPACING_M):
"""
Compute TDOA-based DoA for speaker directions in a 4-channel mixture.
This should be called BEFORE separation to get true spatial information.
Args:
mixture_4ch: 4-channel audio array (n_samples, 4)
sr: Sample rate
mic_spacing_m: Microphone spacing
Returns:
doa_estimates: Array of estimated angles
"""
if mixture_4ch.shape[1] != 4:
raise ValueError("Expected 4-channel input")
# Channel mapping:
# 0 = LF (Left Front)
# 1 = LR (Left Rear)
# 2 = RF (Right Front)
# 3 = RR (Right Rear)
doa_estimates = {}
# Analyze Front pair (LF vs RF)
ch_lf = mixture_4ch[:, 0]
ch_rf = mixture_4ch[:, 2]
lag_front, xcorr_front = compute_cross_correlation(ch_lf, ch_rf, sr=sr)
angle_lr = estimate_doa_from_tdoa(lag_front, sr, mic_spacing_m)
doa_estimates['lr_angle'] = angle_lr
doa_estimates['lr_xcorr'] = xcorr_front
# Analyze Front-Rear pair (LF vs LR on left side)
ch_lf = mixture_4ch[:, 0]
ch_lr = mixture_4ch[:, 1]
lag_front_rear, xcorr_fr = compute_cross_correlation(ch_lf, ch_lr, sr=sr)
angle_fb = estimate_doa_from_tdoa(lag_front_rear, sr, mic_spacing_m)
doa_estimates['fb_angle'] = angle_fb
doa_estimates['fb_xcorr'] = xcorr_fr
return doa_estimates
if __name__ == '__main__':
# Example: Analyze a 4-channel WAV file
print("TDOA-Based DoA Estimation Module")
print("=" * 50)
print("\nUsage:")
print(" from doa_tdoa import estimate_speaker_doa, compute_tdoa_based_doa_for_mixture")
print(" ")
print(" # For 4-channel mixture (before separation):")
print(" mixture, sr = sf.read('example.wav')")
print(" doa_estimates = compute_tdoa_based_doa_for_mixture(mixture, sr)")
print(" ")
print(" # For separated speaker:")
print(" doa_result = estimate_speaker_doa(separated_speaker, sr)")