| import os |
| import queue |
| import sys |
| import time |
|
|
| import matplotlib.pyplot as plt |
| import numpy as np |
| import sounddevice as sd |
| import soundfile as sf |
| from gpiozero import Buzzer |
| from scipy import signal |
| from scipy.io import wavfile |
|
|
| buzzer = Buzzer(2) |
| assert np |
|
|
|
|
| |
|
|
| |
| if os.path.exists("rec1.wav") | os.path.exists("rec2.wav") | os.path.exists("rec3.wav"): |
| if input("Delete old files? (y/n)") == "y": |
| os.remove("rec1.wav") |
| os.remove("rec2.wav") |
| os.remove("rec3.wav") |
|
|
| q1 = queue.Queue() |
| q2 = queue.Queue() |
| q3 = queue.Queue() |
|
|
| |
| fs = 48000 |
|
|
| sd.default.samplerate = fs |
|
|
| print(sd.query_devices()) |
|
|
| |
| def callback1(indata, frames, time, status): |
| """This is called (from a separate thread) for each audio block.""" |
| if status: |
| print(status, file=sys.stderr) |
| q1.put(indata.copy()) |
|
|
|
|
| def callback2(indata, frames, time, status): |
| """This is called (from a separate thread) for each audio block.""" |
| if status: |
| print(status, file=sys.stderr) |
| q2.put(indata.copy()) |
|
|
|
|
| def callback3(indata, frames, time, status): |
| """This is called (from a separate thread) for each audio block.""" |
| if status: |
| print(status, file=sys.stderr) |
| q3.put(indata.copy()) |
|
|
| |
| def calc_delay(file1, file2, file3): |
| |
| data1 = wavfile.read(file1)[1] |
| data2 = wavfile.read(file2)[1] |
| data3 = wavfile.read(file3)[1] |
|
|
| peak1, _ = signal.find_peaks(data1, height=7000) |
| peak2, _ = signal.find_peaks(data2, height=7000) |
| peak3, _ = signal.find_peaks(data3, height=7000) |
| if peak1.size == 0 or peak2.size == 0 or peak3.size == 0: |
| |
| raise Exception("No peak found, change the height value") |
| for i in peak1: |
| frame1 = i |
| if frame1 / fs > 1: |
| break |
| for i in peak2: |
| frame2 = i |
| if frame2 / fs > 1: |
| break |
| for i in peak3: |
| frame3 = i |
| if frame3 / fs > 1: |
| break |
| return [max([frame1, frame2, frame3]) - x for x in [frame1, frame2, frame3]] |
|
|
| |
| def find_order(file1, file2, file3): |
| data1 = wavfile.read(file1)[1] |
| data2 = wavfile.read(file2)[1] |
| data3 = wavfile.read(file3)[1] |
|
|
| peak1, _ = signal.find_peaks(data1, height=7000) |
| peak2, _ = signal.find_peaks(data2, height=7000) |
| peak3, _ = signal.find_peaks(data3, height=7000) |
| if peak1.size == 0 or peak2.size == 0 or peak3.size == 0: |
| |
| raise Exception( |
| "No peak found, change the height value or try again but louder" |
| ) |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
|
|
| return [ |
| x[0] |
| for x in sorted( |
| [[1, np.average(peak1)], [2, np.average(peak2)], [3, np.average(peak3)]], |
| key=lambda x: x[1], |
| ) |
| ] |
|
|
| |
| def record( |
| buzzer_sync=False, |
| order_mics=False, |
| duration=0.0, |
| delay=[0, 0, 0], |
| file_name_1="rec1.wav", |
| file_name_2="rec2.wav", |
| file_name_3="rec3.wav", |
| ): |
| |
|
|
| try: |
| with sf.SoundFile( |
| file_name_1, mode="x", samplerate=fs, channels=1 |
| ) as file1, sf.SoundFile( |
| file_name_2, mode="x", samplerate=fs, channels=1 |
| ) as file2, sf.SoundFile( |
| file_name_3, mode="x", samplerate=fs, channels=1 |
| ) as file3: |
| start_time = time.time() |
| if delay != [0, 0, 0]: |
| for sample in np.zeros(delay[0]): |
| file1.write(sample) |
| for sample in np.zeros(delay[1]): |
| file2.write(sample) |
| for sample in np.zeros(delay[2]): |
| file3.write(sample) |
| with sd.InputStream( |
| samplerate=fs, device=1, channels=1, callback=callback1 |
| ), sd.InputStream( |
| samplerate=fs, device=2, channels=1, callback=callback2 |
| ), sd.InputStream( |
| samplerate=fs, device=3, channels=1, callback=callback3 |
| ): |
| if duration != 0.0: |
| print("Recording for " + str(duration) + " seconds") |
| else: |
| print("press Ctrl+C to stop the recording") |
| while True: |
| |
| |
| dur_time = time.time() - start_time |
| file1.write(q1.get()) |
| file2.write(q2.get()) |
| file3.write(q3.get()) |
| if buzzer_sync: |
| if dur_time >= 1.2: |
| buzzer.off() |
| elif dur_time >= 1: |
| buzzer.on() |
| if duration != 0.0 and dur_time >= duration: |
| |
| raise KeyboardInterrupt |
|
|
| except KeyboardInterrupt: |
| print("\nRecording finished: ") |
| |
| if buzzer_sync: |
| return calc_delay(file_name_1, file_name_2, file_name_3) |
| |
| if order_mics: |
| print("Order: ", find_order(file_name_1, file_name_2, file_name_3)) |
|
|
| |
| def collect_data(delay_times): |
| |
| no = 0 |
| |
| |
| location = "" |
| background = "" |
| for i2 in range(4): |
| |
| if i2 == 0: |
| location = "my room" |
| if i2 == 1: |
| location = "living room" |
| if i2 == 2: |
| location = "living room coffee table" |
| if i2 == 3: |
| location = "other room" |
| for j in range(6): |
| |
| if j == 0: |
| background = "nothing" |
| if j == 1: |
| background = "one music" |
| if j == 2: |
| background = "one talking" |
| if j == 3: |
| background = "one talking and one music" |
| if j == 4: |
| background = "two talking" |
| if j == 5: |
| background = "two music" |
| if j == 6: |
| background = "random coughing or clapping" |
| for i in range(10): |
| |
| if i <= 5: |
| if os.path.exists(f"rec_{no}_a_t.wav"): |
| no += 1 |
| continue |
| else: |
| if os.path.exists(f"rec_{no}_a_f.wav"): |
| no += 1 |
| continue |
| |
| buzzer.on() |
| time.sleep(0.2) |
| buzzer.off() |
| |
| if i == 0: |
| print("#" * 80) |
| print("recording no. : " + str(no) + "/" + str(10 * 4 * 6)) |
| print("location: " + location) |
| print("background: " + background) |
| input("enter to start, hey at position 1:") |
| buzzer.on() |
| time.sleep(0.1) |
| buzzer.off() |
| print("#" * 10 + "recording started" + "#" * 10) |
| record( |
| duration=3, |
| file_name_1=f"rec_{no}_a_t.wav", |
| file_name_2=f"rec_{no}_b_t.wav", |
| file_name_3=f"rec_{no}_c_t.wav", |
| delay=delay_times, |
| ) |
| q1.queue.clear() |
| q2.queue.clear() |
| q3.queue.clear() |
| no += 1 |
| if i == 1: |
| print("#" * 80) |
| print("recording no. : " + str(no) + "/" + str(10 * 4 * 6)) |
| print("location: " + location) |
| print("background: " + background) |
| input("enter to start, hey at position 1.5:") |
| buzzer.on() |
| time.sleep(0.1) |
| buzzer.off() |
| print("#" * 10 + "recording started" + "#" * 10) |
| record( |
| duration=3, |
| file_name_1=f"rec_{no}_a_t.wav", |
| file_name_2=f"rec_{no}_b_t.wav", |
| file_name_3=f"rec_{no}_c_t.wav", |
| delay=delay_times, |
| ) |
| q1.queue.clear() |
| q2.queue.clear() |
| q3.queue.clear() |
| no += 1 |
| if i == 2: |
| print("#" * 80) |
| print("recording no. : " + str(no) + "/" + str(10 * 4 * 6)) |
| print("location: " + location) |
| print("background: " + background) |
| input("enter to start, hey at position 2:") |
| buzzer.on() |
| time.sleep(0.1) |
| buzzer.off() |
| print("#" * 10 + "recording started" + "#" * 10) |
| record( |
| duration=3, |
| file_name_1=f"rec_{no}_a_t.wav", |
| file_name_2=f"rec_{no}_b_t.wav", |
| file_name_3=f"rec_{no}_c_t.wav", |
| delay=delay_times, |
| ) |
| q1.queue.clear() |
| q2.queue.clear() |
| q3.queue.clear() |
| no += 1 |
| if i == 3: |
| print("#" * 80) |
| print("recording no. : " + str(no) + "/" + str(10 * 4 * 6)) |
| print("location: " + location) |
| print("background: " + background) |
| input("enter to start, hey at position 2.5:") |
| buzzer.on() |
| time.sleep(0.1) |
| buzzer.off() |
| print("#" * 10 + "recording started" + "#" * 10) |
| record( |
| duration=3, |
| file_name_1=f"rec_{no}_a_t.wav", |
| file_name_2=f"rec_{no}_b_t.wav", |
| file_name_3=f"rec_{no}_c_t.wav", |
| delay=delay_times, |
| ) |
| q1.queue.clear() |
| q2.queue.clear() |
| q3.queue.clear() |
| no += 1 |
| if i == 4: |
| print("#" * 80) |
| print("recording no. : " + str(no) + "/" + str(10 * 4 * 6)) |
| print("location: " + location) |
| print("background: " + background) |
| input("enter to start, hey at position 3:") |
| buzzer.on() |
| time.sleep(0.1) |
| buzzer.off() |
| print("#" * 10 + "recording started" + "#" * 10) |
| record( |
| duration=3, |
| file_name_1=f"rec_{no}_a_t.wav", |
| file_name_2=f"rec_{no}_b_t.wav", |
| file_name_3=f"rec_{no}_c_t.wav", |
| delay=delay_times, |
| ) |
| q1.queue.clear() |
| q2.queue.clear() |
| q3.queue.clear() |
| no += 1 |
| if i == 5: |
| print("#" * 80) |
| print("recording no. : " + str(no) + "/" + str(10 * 4 * 6)) |
| print("location: " + location) |
| print("background: " + background) |
| input("enter to start, hey at position 3.5 or 0.5:") |
| buzzer.on() |
| time.sleep(0.1) |
| buzzer.off() |
| print("#" * 10 + "recording started" + "#" * 10) |
| record( |
| duration=3, |
| file_name_1=f"rec_{no}_a_t.wav", |
| file_name_2=f"rec_{no}_b_t.wav", |
| file_name_3=f"rec_{no}_c_t.wav", |
| delay=delay_times, |
| ) |
| q1.queue.clear() |
| q2.queue.clear() |
| q3.queue.clear() |
| no += 1 |
| if i == 6: |
| print("#" * 80) |
| print("recording no. : " + str(no) + "/" + str(10 * 4 * 6)) |
| print("location: " + location) |
| print("background: " + background) |
| input("enter to start, nothing:") |
| buzzer.on() |
| time.sleep(0.1) |
| buzzer.off() |
| print("#" * 10 + "recording started" + "#" * 10) |
| record( |
| duration=3, |
| file_name_1=f"rec_{no}_a_f.wav", |
| file_name_2=f"rec_{no}_b_f.wav", |
| file_name_3=f"rec_{no}_c_f.wav", |
| delay=delay_times, |
| ) |
| no += 1 |
| if i == 7: |
| print("#" * 80) |
| print("recording no. : " + str(no) + "/" + str(10 * 4 * 6)) |
| print("location: " + location) |
| print("background: " + background) |
| input("enter to start, nothing:") |
| buzzer.on() |
| time.sleep(0.1) |
| buzzer.off() |
| print("#" * 10 + "recording started" + "#" * 10) |
| record( |
| duration=3, |
| file_name_1=f"rec_{no}_a_f.wav", |
| file_name_2=f"rec_{no}_b_f.wav", |
| file_name_3=f"rec_{no}_c_f.wav", |
| delay=delay_times, |
| ) |
| q1.queue.clear() |
| q2.queue.clear() |
| q3.queue.clear() |
| no += 1 |
| if i == 8: |
| print("#" * 80) |
| print("recording no. : " + str(no) + "/" + str(10 * 4 * 6)) |
| print("location: " + location) |
| print("background: " + background) |
| input("enter to start, random noise/talking at any position:") |
| buzzer.on() |
| time.sleep(0.1) |
| buzzer.off() |
| print("#" * 10 + "recording started" + "#" * 10) |
| record( |
| duration=3, |
| file_name_1=f"rec_{no}_a_f.wav", |
| file_name_2=f"rec_{no}_b_f.wav", |
| file_name_3=f"rec_{no}_c_f.wav", |
| delay=delay_times, |
| ) |
| q1.queue.clear() |
| q2.queue.clear() |
| q3.queue.clear() |
| no += 1 |
| if i == 9: |
| print("#" * 80) |
| print("recording no. : " + str(no) + "/" + str(10 * 4 * 6)) |
| print("location: " + location) |
| print("background: " + background) |
| input("enter to start, random noise/talking at any position:") |
| buzzer.on() |
| time.sleep(0.1) |
| buzzer.off() |
| print("#" * 10 + "recording started" + "#" * 10) |
| record( |
| duration=3, |
| file_name_1=f"rec_{no}_a_f.wav", |
| file_name_2=f"rec_{no}_b_f.wav", |
| file_name_3=f"rec_{no}_c_f.wav", |
| delay=delay_times, |
| ) |
| q1.queue.clear() |
| q2.queue.clear() |
| q3.queue.clear() |
| no += 1 |
|
|
|
|
| |
| print("#" * 80) |
| print("calculating delay times") |
| delay_times = record(buzzer_sync=True, duration=1.5) |
| print("Delay times: ", delay_times) |
| q1.queue.clear() |
| q2.queue.clear() |
| q3.queue.clear() |
| os.remove("rec1.wav") |
| os.remove("rec2.wav") |
| os.remove("rec3.wav") |
| print("#" * 80) |
| print("#" * 80) |
| print("calculating microphone locations") |
| print("tap the microphones in order then ^c") |
| |
| record( |
| order_mics=True, |
| delay=delay_times, |
| ) |
| q1.queue.clear() |
| q2.queue.clear() |
| q3.queue.clear() |
| os.remove("rec1.wav") |
| os.remove("rec2.wav") |
| os.remove("rec3.wav") |
| print("#" * 80) |
| collect_data(delay_times) |
|
|