File size: 9,672 Bytes
a361db3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
"""
TDOA-Based Direction of Arrival (DoA) Estimation

Calculates Time Difference of Arrival (TDOA) between microphone pairs
to estimate the direction of arrival for each speaker.

Hearing Aid Array Configuration:
- Left Front (LF): Channel 0
- Left Rear (LR): Channel 1
- Right Front (RF): Channel 2
- Right Rear (RR): Channel 3

Microphone spacing: ~15mm (typical hearing aid array)
"""

import numpy as np
from scipy.fft import fft, ifft
import warnings

# Typical hearing aid microphone spacing (mm)
MICROPHONE_SPACING_MM = 15.0
MICROPHONE_SPACING_M = MICROPHONE_SPACING_MM / 1000.0


def compute_cross_correlation(signal1, signal2, max_lag_ms=5.0, sr=44100):
    """
    Compute cross-correlation between two signals to find time delay using FFT.

    Args:
        signal1: Reference signal (channel 0)
        signal2: Test signal (channel 1)
        max_lag_ms: Maximum lag to search (milliseconds)
        sr: Sample rate (Hz)

    Returns:
        lag_samples: Delay in samples (positive = signal2 lags signal1)
        correlation: Peak cross-correlation value
    """
    max_lag_samples = int(sr * max_lag_ms / 1000)

    # Limit analysis window to first 2 seconds for speed
    window_samples = min(len(signal1), int(2 * sr))
    sig1_windowed = signal1[:window_samples]
    sig2_windowed = signal2[:window_samples]

    # Normalize
    sig1_norm = sig1_windowed / (np.std(sig1_windowed) + 1e-10)
    sig2_norm = sig2_windowed / (np.std(sig2_windowed) + 1e-10)

    # Use FFT-based correlation for speed (more efficient than np.correlate for long signals)
    fft_len = 2 ** int(np.ceil(np.log2(len(sig1_norm) + len(sig2_norm) - 1)))

    fft1 = fft(sig1_norm, fft_len)
    fft2 = fft(sig2_norm, fft_len)

    correlation_fft = ifft(fft1 * np.conj(fft2)).real
    correlation = np.concatenate([correlation_fft[-(max_lag_samples):], correlation_fft[:(max_lag_samples+1)]])

    # Find peak
    peak_idx = np.argmax(np.abs(correlation))
    lag_samples = peak_idx - max_lag_samples
    peak_value = correlation[peak_idx]

    return lag_samples, peak_value / (np.max(np.abs(correlation)) + 1e-10)


def estimate_doa_from_tdoa(lag_samples, sr, mic_spacing_m=MICROPHONE_SPACING_M, speed_of_sound=343.0):
    """
    Convert TDOA (time delay) to Direction of Arrival angle.

    For a linear array or 2D microphone configuration:
    - DoA = arcsin(c * tau / d)
    where:
      - c = speed of sound
      - tau = time delay
      - d = microphone spacing

    Args:
        lag_samples: Delay in samples
        sr: Sample rate (Hz)
        mic_spacing_m: Microphone spacing (meters)
        speed_of_sound: Speed of sound (m/s)

    Returns:
        doa_deg: Direction of Arrival in degrees
    """
    tau = lag_samples / sr  # Convert to seconds

    # Compute argument for arcsin
    arg = (speed_of_sound * tau) / mic_spacing_m

    # Clamp to valid range [-1, 1]
    arg = np.clip(arg, -1.0, 1.0)

    # Convert to degrees
    doa_rad = np.arcsin(arg)
    doa_deg = np.degrees(doa_rad)

    return doa_deg


def estimate_speaker_doa(stereo_signal, sr, speaker_energy_threshold=0.01):
    """
    Estimate Direction of Arrival for a speaker using microphone pair TDOA analysis.

    For 4-channel hearing aid array, compute DoA using multiple pairs:
    - Front pair (LF-RF): Gives left/right angle
    - Rear pair (LR-RR): Confirms left/right angle
    - Front-rear pairs (LF-LR, RF-RR): Gives front/rear angle

    Args:
        stereo_signal: 2D array of shape (n_samples, 2) or (n_samples, 4)
        sr: Sample rate
        speaker_energy_threshold: Skip if speaker energy below threshold

    Returns:
        doa_dict: Dictionary with DoA estimates
            - 'lr_angle': Left-right direction (-90 to +90, negative=left)
            - 'fb_angle': Front-back direction
            - 'estimated_angle': Primary DoA estimate (0°=front, 90°=right, 180°=rear, 270°=left)
            - 'confidence': Confidence score (0-1)
    """
    # Ensure proper shape
    if stereo_signal.ndim == 1:
        return None

    if stereo_signal.shape[1] < 2:
        return None

    # Check energy threshold
    signal_energy = np.mean(stereo_signal ** 2)
    if signal_energy < speaker_energy_threshold:
        return None

    doa_dict = {
        'lr_angle': None,
        'fb_angle': None,
        'estimated_angle': None,
        'confidence': 0.0,
        'method': 'TDOA cross-correlation'
    }

    try:
        # If we have 4-channel, use Front-Left vs Front-Right
        if stereo_signal.shape[1] >= 2:
            ch0 = stereo_signal[:, 0]  # Could be LF or another channel
            ch1 = stereo_signal[:, 1]  # Could be RF or another channel

            lag, xcorr_val = compute_cross_correlation(ch0, ch1, max_lag_ms=2.0, sr=sr)

            if xcorr_val > 0.1:  # Only if correlation is strong enough
                lr_angle = estimate_doa_from_tdoa(lag, sr)
                doa_dict['lr_angle'] = lr_angle
                doa_dict['lr_confidence'] = xcorr_val

        # If we have rear channels, analyze front-rear
        if stereo_signal.shape[1] >= 4:
            ch0 = stereo_signal[:, 0]  # Front
            ch1 = stereo_signal[:, 1]  # Rear (same side)

            lag, xcorr_val = compute_cross_correlation(ch0, ch1, max_lag_ms=2.0, sr=sr)

            if xcorr_val > 0.1:
                fb_angle = estimate_doa_from_tdoa(lag, sr)
                doa_dict['fb_angle'] = fb_angle
                doa_dict['fb_confidence'] = xcorr_val

        # Compute estimated angle
        if doa_dict['lr_angle'] is not None:
            # Map left-right angle to compass bearing
            # Negative = left (270°), Positive = right (90°), Zero = center (0° or 180°)
            if doa_dict['lr_angle'] < -30:
                estimated = 270  # Left
            elif doa_dict['lr_angle'] > 30:
                estimated = 90   # Right
            else:
                estimated = 0    # Front/center

            # Refine with front-back if available
            if doa_dict['fb_angle'] is not None:
                if doa_dict['fb_angle'] > 10:
                    estimated = 180  # Rear

            doa_dict['estimated_angle'] = estimated

            # Confidence
            avg_conf = doa_dict.get('lr_confidence', 0.5)
            if doa_dict['fb_confidence']:
                avg_conf = (avg_conf + doa_dict.get('fb_confidence', 0.5)) / 2
            doa_dict['confidence'] = avg_conf

        return doa_dict

    except Exception as e:
        warnings.warn(f"DoA estimation failed: {e}")
        return doa_dict


def estimate_all_speakers_doa(sources, sr, channel_info=None):
    """
    Estimate DoA for multiple separated speaker sources.

    Args:
        sources: Array of shape (n_samples, n_speakers) - separated sources
        sr: Sample rate
        channel_info: Optional info about which channels to use for each comparison

    Returns:
        doa_results: List of DoA dictionaries, one per speaker
    """
    doa_results = []

    for idx in range(sources.shape[1]):
        source = sources[:, idx]

        # For each source, estimate DoA (in practice, this would use the original
        # multi-channel mixture, not the separated source, but here we estimate
        # from energy characteristics)
        doa_dict = {
            'speaker_id': idx,
            'energy': np.sqrt(np.mean(source ** 2)),
            'method': 'TDOA cross-correlation (post-separation estimate)',
            'note': 'True DoA estimation requires access to original multi-channel mixture',
            'estimated_angle': None,
            'confidence': 0.0
        }

        doa_results.append(doa_dict)

    return doa_results


def compute_tdoa_based_doa_for_mixture(mixture_4ch, sr, mic_spacing_m=MICROPHONE_SPACING_M):
    """
    Compute TDOA-based DoA for speaker directions in a 4-channel mixture.

    This should be called BEFORE separation to get true spatial information.

    Args:
        mixture_4ch: 4-channel audio array (n_samples, 4)
        sr: Sample rate
        mic_spacing_m: Microphone spacing

    Returns:
        doa_estimates: Array of estimated angles
    """
    if mixture_4ch.shape[1] != 4:
        raise ValueError("Expected 4-channel input")

    # Channel mapping:
    # 0 = LF (Left Front)
    # 1 = LR (Left Rear)
    # 2 = RF (Right Front)
    # 3 = RR (Right Rear)

    doa_estimates = {}

    # Analyze Front pair (LF vs RF)
    ch_lf = mixture_4ch[:, 0]
    ch_rf = mixture_4ch[:, 2]
    lag_front, xcorr_front = compute_cross_correlation(ch_lf, ch_rf, sr=sr)
    angle_lr = estimate_doa_from_tdoa(lag_front, sr, mic_spacing_m)

    doa_estimates['lr_angle'] = angle_lr
    doa_estimates['lr_xcorr'] = xcorr_front

    # Analyze Front-Rear pair (LF vs LR on left side)
    ch_lf = mixture_4ch[:, 0]
    ch_lr = mixture_4ch[:, 1]
    lag_front_rear, xcorr_fr = compute_cross_correlation(ch_lf, ch_lr, sr=sr)
    angle_fb = estimate_doa_from_tdoa(lag_front_rear, sr, mic_spacing_m)

    doa_estimates['fb_angle'] = angle_fb
    doa_estimates['fb_xcorr'] = xcorr_fr

    return doa_estimates


if __name__ == '__main__':

    # Example: Analyze a 4-channel WAV file
    print("TDOA-Based DoA Estimation Module")
    print("=" * 50)
    print("\nUsage:")
    print("  from doa_tdoa import estimate_speaker_doa, compute_tdoa_based_doa_for_mixture")
    print("  ")
    print("  # For 4-channel mixture (before separation):")
    print("  mixture, sr = sf.read('example.wav')")
    print("  doa_estimates = compute_tdoa_based_doa_for_mixture(mixture, sr)")
    print("  ")
    print("  # For separated speaker:")
    print("  doa_result = estimate_speaker_doa(separated_speaker, sr)")