Spaces:
Runtime error
Runtime error
| import pynput | |
| import sys | |
| sys.path.append('../../') | |
| from src.music.config import SYNTH_RECORDED_AUDIO_PATH, RATE_AUDIO_SAVE | |
| from datetime import datetime | |
| import numpy as np | |
| import os | |
| import wave | |
| from ctypes import * | |
| from contextlib import contextmanager | |
| import pyaudio | |
| ERROR_HANDLER_FUNC = CFUNCTYPE(None, c_char_p, c_int, c_char_p, c_int, c_char_p) | |
| def py_error_handler(filename, line, function, err, fmt): | |
| pass | |
| c_error_handler = ERROR_HANDLER_FUNC(py_error_handler) | |
| def noalsaerr(): | |
| asound = cdll.LoadLibrary('libasound.so') | |
| asound.snd_lib_error_set_handler(c_error_handler) | |
| yield | |
| asound.snd_lib_error_set_handler(None) | |
| global KEY_PRESSED | |
| KEY_PRESSED = None | |
| def on_press(key): | |
| global KEY_PRESSED | |
| try: | |
| KEY_PRESSED = key.name | |
| except: | |
| pass | |
| def on_release(key): | |
| global KEY_PRESSED | |
| KEY_PRESSED = None | |
| def is_pressed(key): | |
| global KEY_PRESSED | |
| return KEY_PRESSED == key | |
| # keyboard listener | |
| listener = pynput.keyboard.Listener(on_press=on_press, on_release=on_release) | |
| listener.start() | |
| LEN_RECORDINGS = 40 | |
| class AudioRecorder: | |
| def __init__(self, chunk=2**10, rate=44100, place='', len_recording=LEN_RECORDINGS, drop_beginning=0.5): | |
| self.chunk = chunk | |
| self.rate = rate | |
| with noalsaerr(): | |
| self.audio = pyaudio.PyAudio() | |
| self.channels = 1 | |
| self.format = pyaudio.paInt16 | |
| self.stream = self.audio.open(format=self.format, | |
| channels=self.channels, | |
| rate=rate, | |
| input=True, | |
| frames_per_buffer=chunk) | |
| self.stream.stop_stream() | |
| self.drop_beginning_chunks = int(drop_beginning * self.rate / self.chunk) | |
| self.place = place | |
| self.len_recordings = len_recording | |
| def get_filename(self): | |
| now = datetime.now() | |
| return self.place + '_' + now.strftime("%b_%d_%Y_%Hh%Mm%Ss") + '.mp3' | |
| def read_last_chunk(self): | |
| return self.stream.read(self.chunk) | |
| def live_read(self): | |
| if self.stream.is_stopped(): | |
| self.stream.start_stream() | |
| i = 0 | |
| while not is_pressed('esc'): | |
| data = np.frombuffer(self.stream.read(self.chunk), dtype=np.int16) | |
| peak = np.average(np.abs(data)) * 2 | |
| bars = "#"*int(50 * peak / 2 ** 16) | |
| i += 1 | |
| print("%04d %05d %s"%(i,peak,bars)) | |
| self.stream.stop_stream() | |
| def record_next_N_seconds(self, n=None, saving_path=None): | |
| if saving_path is None: | |
| saving_path = SYNTH_RECORDED_AUDIO_PATH + self.get_filename() | |
| if n is None: | |
| n = self.len_recordings | |
| print(f'Recoding the next {n} secs.' | |
| # f'\n\tRecording starts when the first key is pressed;' | |
| f'\n\tPress Enter to end the recording;' | |
| f'\n\tPress BackSpace (<--) to cancel the recording;' | |
| f'\n\tSaving to {saving_path}') | |
| try: | |
| self.stream.start_stream() | |
| backspace_pressed = False | |
| self.recording = [] | |
| i_chunk = 0 | |
| while not is_pressed('enter') and self.chunk / self.rate * i_chunk < n: | |
| self.recording.append(self.read_last_chunk()) | |
| i_chunk += 1 | |
| if is_pressed('backspace'): | |
| backspace_pressed = True | |
| print('\n \t--> Recording cancelled! (you pressed BackSpace)') | |
| break | |
| self.stream.stop_stream() | |
| # save the file | |
| if not backspace_pressed: | |
| self.recording = self.recording[self.drop_beginning_chunks:] # drop first chunks to remove keyboard sound | |
| with wave.open(saving_path[:-4] + '.wav', 'wb') as waveFile: | |
| waveFile.setnchannels(self.channels) | |
| waveFile.setsampwidth(self.audio.get_sample_size(self.format)) | |
| waveFile.setframerate(self.rate) | |
| waveFile.writeframes(b''.join(self.recording)) | |
| os.system(f'ffmpeg -i "{saving_path[:-4] + ".wav"}" -vn -loglevel panic -y -ac 1 -ar {int(RATE_AUDIO_SAVE)} -b:a 320k "{saving_path}" ') | |
| os.remove(saving_path[:-4] + '.wav') | |
| print(f'\n--> Recording saved, duration: {self.chunk / self.rate * i_chunk:.2f} secs.') | |
| return saving_path | |
| except: | |
| print('\n --> The recording failed.') | |
| return None | |
| def record_one(self): | |
| ready_msg = False | |
| print('Starting the recording loop!\n\tPress BackSpace to cancel the current recording;\n\tPress Esc to quit the loop (only works while not recording)') | |
| while True: | |
| if not ready_msg: | |
| print('-------\nReady to record!') | |
| print('Press space to start a recording\n') | |
| ready_msg = True | |
| if is_pressed('space'): | |
| saving_path = self.record_next_N_seconds() | |
| break | |
| return saving_path | |
| def run(self): | |
| # with pynput.Listener( | |
| # on_press=self.on_press) as listener: | |
| # listener.join() | |
| ready_msg = False | |
| print('Starting the recording loop!\n\tPress BackSpace to cancel the current recording;\n\tPress Esc to quit the loop (only works while not recording)') | |
| while True: | |
| if not ready_msg: | |
| print('-------\nReady to record!') | |
| print('Press space to start a recording\n') | |
| ready_msg = True | |
| if is_pressed('space'): | |
| self.record_next_N_seconds() | |
| ready_msg = False | |
| if is_pressed('esc'): | |
| print('End of the recording session. See you soon!') | |
| self.close() | |
| break | |
| def close(self): | |
| self.stream.close() | |
| self.audio.terminate() | |
| if __name__ == '__main__': | |
| audio_recorder = AudioRecorder(place='home') | |
| audio_recorder.record_one() | |