| |
|
|
| import sweep |
| import numpy as np |
| from numpy import fft |
| from scipy import signal |
| from scipy.io import wavfile |
| import sys |
|
|
| def extract_sweep(pilot, y, pilot_len, sweep_len, silence_len): |
| pilot = np.concatenate([pilot, np.zeros(len(y)-len(pilot))]) |
| N = fft.rfft(pilot) |
| Y = fft.rfft(y) |
| xcorr = fft.irfft(Y * np.conj(N)) |
| pos = np.argmax(np.abs(xcorr[:sweep_len])) |
| pilot_offset = sweep_len+pilot_len+2*silence_len |
| pilot1 = y[pos:pos+pilot_len] |
| pilot2 = y[pilot_offset+pos:pilot_offset+pos+pilot_len] |
| drift_xcorr = fft.irfft(fft.rfft(pilot1) * np.conj(fft.rfft(pilot2))) |
| drift = np.argmax(np.abs(drift_xcorr)) |
| if drift > pilot_len//2: |
| drift = drift - pilot_len |
| print(f"measured drift is {drift} samples ({100*drift/(pilot_len + sweep_len + 2*silence_len)})%"); |
| return y[pos+pilot_len+silence_len//2 : pos+pilot_len+silence_len+sweep_len-drift+silence_len//2] |
|
|
| def deconv_rir(pilot, x, y, Fs=48000, duration=60): |
| pilot_len = Fs |
| sweep_len=Fs*duration |
| silence_len = Fs |
| |
| y = extract_sweep(pilot, y, pilot_len, sweep_len, silence_len) |
| x = np.concatenate([x, np.zeros(sweep_len)]) |
| y = np.concatenate([y, np.zeros(sweep_len-silence_len)]) |
| X = fft.rfft(x) |
| Y = fft.rfft(y) |
| |
| if len(Y) >= len(X): |
| Y = Y[:len(X)] |
| else: |
| Y = np.concatenate([Y, np.zeros(len(X)-len(Y))]) |
| |
| rir = fft.irfft(Y*np.conj(X)/(1.+X*np.conj(X))) |
| |
| direct = np.max(np.abs(rir)) |
| direct_pos = np.argmax(np.abs(rir)) |
| crop_pos = np.argwhere(np.abs(rir[:direct_pos+1]) > .02*direct)[0][0] |
| rir = rir[crop_pos:] |
| |
| noise_floor = np.mean(rir[Fs*10:Fs*20]**2) |
| smoothed = signal.lfilter(np.array([.002]), np.array([1, -.998]), rir[:Fs*10]**2) |
| rir_length = np.argwhere(smoothed > 15*noise_floor)[-1][0] |
| rir = rir[:rir_length] |
| |
| rir = rir/np.sqrt(np.sum(rir**2)) |
| return rir |
|
|
|
|
| if __name__ == '__main__': |
| duration=60 |
| |
| sine = sweep.compute_sweep(duration) |
| |
| _,mic=wavfile.read(sys.argv[1]) |
| |
| pilot=sweep.compute_sweep(1.) |
|
|
| rir = deconv_rir(pilot, sine, mic, duration=duration) |
| rir.astype('float32').tofile(sys.argv[2]) |
|
|