Spaces:
Running
Running
| """ | |
| Title: Speaker Recognition | |
| Author: [Fadi Badine](https://twitter.com/fadibadine) | |
| Date created: 14/06/2020 | |
| Last modified: 19/07/2023 | |
| Description: Classify speakers using Fast Fourier Transform (FFT) and a 1D Convnet. | |
| Accelerator: GPU | |
| Converted to Keras 3 by: [Fadi Badine](https://twitter.com/fadibadine) | |
| """ | |
| """ | |
| ## Introduction | |
| This example demonstrates how to create a model to classify speakers from the | |
| frequency domain representation of speech recordings, obtained via Fast Fourier | |
| Transform (FFT). | |
| It shows the following: | |
| - How to use `tf.data` to load, preprocess and feed audio streams into a model | |
| - How to create a 1D convolutional network with residual | |
| connections for audio classification. | |
| Our process: | |
| - We prepare a dataset of speech samples from different speakers, with the speaker as label. | |
| - We add background noise to these samples to augment our data. | |
| - We take the FFT of these samples. | |
| - We train a 1D convnet to predict the correct speaker given a noisy FFT speech sample. | |
| Note: | |
| - This example should be run with TensorFlow 2.3 or higher, or `tf-nightly`. | |
| - The noise samples in the dataset need to be resampled to a sampling rate of 16000 Hz | |
| before using the code in this example. In order to do this, you will need to have | |
| installed `ffmpg`. | |
| """ | |
| """ | |
| ## Setup | |
| """ | |
| import os | |
| os.environ["KERAS_BACKEND"] = "tensorflow" | |
| import shutil | |
| import numpy as np | |
| import tensorflow as tf | |
| import keras | |
| from pathlib import Path | |
| from IPython.display import display, Audio | |
| # Get the data from https://www.kaggle.com/kongaevans/speaker-recognition-dataset/ | |
| # and save it to ./speaker-recognition-dataset.zip | |
| # then unzip it to ./16000_pcm_speeches | |
| """shell | |
| kaggle datasets download -d kongaevans/speaker-recognition-dataset | |
| unzip -qq speaker-recognition-dataset.zip | |
| """ | |
| DATASET_ROOT = "16000_pcm_speeches" | |
| # The folders in which we will put the audio samples and the noise samples | |
| AUDIO_SUBFOLDER = "audio" | |
| NOISE_SUBFOLDER = "noise" | |
| DATASET_AUDIO_PATH = os.path.join(DATASET_ROOT, AUDIO_SUBFOLDER) | |
| DATASET_NOISE_PATH = os.path.join(DATASET_ROOT, NOISE_SUBFOLDER) | |
| # Percentage of samples to use for validation | |
| VALID_SPLIT = 0.1 | |
| # Seed to use when shuffling the dataset and the noise | |
| SHUFFLE_SEED = 43 | |
| # The sampling rate to use. | |
| # This is the one used in all the audio samples. | |
| # We will resample all the noise to this sampling rate. | |
| # This will also be the output size of the audio wave samples | |
| # (since all samples are of 1 second long) | |
| SAMPLING_RATE = 16000 | |
| # The factor to multiply the noise with according to: | |
| # noisy_sample = sample + noise * prop * scale | |
| # where prop = sample_amplitude / noise_amplitude | |
| SCALE = 0.5 | |
| BATCH_SIZE = 128 | |
| EPOCHS = 1 # For a real training run, use EPOCHS = 100 | |
| """ | |
| ## Data preparation | |
| The dataset is composed of 7 folders, divided into 2 groups: | |
| - Speech samples, with 5 folders for 5 different speakers. Each folder contains | |
| 1500 audio files, each 1 second long and sampled at 16000 Hz. | |
| - Background noise samples, with 2 folders and a total of 6 files. These files | |
| are longer than 1 second (and originally not sampled at 16000 Hz, but we will resample them to 16000 Hz). | |
| We will use those 6 files to create 354 1-second-long noise samples to be used for training. | |
| Let's sort these 2 categories into 2 folders: | |
| - An `audio` folder which will contain all the per-speaker speech sample folders | |
| - A `noise` folder which will contain all the noise samples | |
| """ | |
| """ | |
| Before sorting the audio and noise categories into 2 folders, | |
| we have the following directory structure: | |
| ``` | |
| main_directory/ | |
| ...speaker_a/ | |
| ...speaker_b/ | |
| ...speaker_c/ | |
| ...speaker_d/ | |
| ...speaker_e/ | |
| ...other/ | |
| ..._background_noise_/ | |
| ``` | |
| After sorting, we end up with the following structure: | |
| ``` | |
| main_directory/ | |
| ...audio/ | |
| ......speaker_a/ | |
| ......speaker_b/ | |
| ......speaker_c/ | |
| ......speaker_d/ | |
| ......speaker_e/ | |
| ...noise/ | |
| ......other/ | |
| ......_background_noise_/ | |
| ``` | |
| """ | |
| for folder in os.listdir(DATASET_ROOT): | |
| if os.path.isdir(os.path.join(DATASET_ROOT, folder)): | |
| if folder in [AUDIO_SUBFOLDER, NOISE_SUBFOLDER]: | |
| # If folder is `audio` or `noise`, do nothing | |
| continue | |
| elif folder in ["other", "_background_noise_"]: | |
| # If folder is one of the folders that contains noise samples, | |
| # move it to the `noise` folder | |
| shutil.move( | |
| os.path.join(DATASET_ROOT, folder), | |
| os.path.join(DATASET_NOISE_PATH, folder), | |
| ) | |
| else: | |
| # Otherwise, it should be a speaker folder, then move it to | |
| # `audio` folder | |
| shutil.move( | |
| os.path.join(DATASET_ROOT, folder), | |
| os.path.join(DATASET_AUDIO_PATH, folder), | |
| ) | |
| """ | |
| ## Noise preparation | |
| In this section: | |
| - We load all noise samples (which should have been resampled to 16000) | |
| - We split those noise samples to chunks of 16000 samples which | |
| correspond to 1 second duration each | |
| """ | |
| # Get the list of all noise files | |
| noise_paths = [] | |
| for subdir in os.listdir(DATASET_NOISE_PATH): | |
| subdir_path = Path(DATASET_NOISE_PATH) / subdir | |
| if os.path.isdir(subdir_path): | |
| noise_paths += [ | |
| os.path.join(subdir_path, filepath) | |
| for filepath in os.listdir(subdir_path) | |
| if filepath.endswith(".wav") | |
| ] | |
| if not noise_paths: | |
| raise RuntimeError(f"Could not find any files at {DATASET_NOISE_PATH}") | |
| print( | |
| "Found {} files belonging to {} directories".format( | |
| len(noise_paths), len(os.listdir(DATASET_NOISE_PATH)) | |
| ) | |
| ) | |
| """ | |
| Resample all noise samples to 16000 Hz | |
| """ | |
| command = ( | |
| "for dir in `ls -1 " + DATASET_NOISE_PATH + "`; do " | |
| "for file in `ls -1 " + DATASET_NOISE_PATH + "/$dir/*.wav`; do " | |
| "sample_rate=`ffprobe -hide_banner -loglevel panic -show_streams " | |
| "$file | grep sample_rate | cut -f2 -d=`; " | |
| "if [ $sample_rate -ne 16000 ]; then " | |
| "ffmpeg -hide_banner -loglevel panic -y " | |
| "-i $file -ar 16000 temp.wav; " | |
| "mv temp.wav $file; " | |
| "fi; done; done" | |
| ) | |
| os.system(command) | |
| # Split noise into chunks of 16,000 steps each | |
| def load_noise_sample(path): | |
| sample, sampling_rate = tf.audio.decode_wav( | |
| tf.io.read_file(path), desired_channels=1 | |
| ) | |
| if sampling_rate == SAMPLING_RATE: | |
| # Number of slices of 16000 each that can be generated from the noise sample | |
| slices = int(sample.shape[0] / SAMPLING_RATE) | |
| sample = tf.split(sample[: slices * SAMPLING_RATE], slices) | |
| return sample | |
| else: | |
| print("Sampling rate for {} is incorrect. Ignoring it".format(path)) | |
| return None | |
| noises = [] | |
| for path in noise_paths: | |
| sample = load_noise_sample(path) | |
| if sample: | |
| noises.extend(sample) | |
| noises = tf.stack(noises) | |
| print( | |
| "{} noise files were split into {} noise samples where each is {} sec. long".format( | |
| len(noise_paths), noises.shape[0], noises.shape[1] // SAMPLING_RATE | |
| ) | |
| ) | |
| """ | |
| ## Dataset generation | |
| """ | |
| def paths_and_labels_to_dataset(audio_paths, labels): | |
| """Constructs a dataset of audios and labels.""" | |
| path_ds = tf.data.Dataset.from_tensor_slices(audio_paths) | |
| audio_ds = path_ds.map( | |
| lambda x: path_to_audio(x), num_parallel_calls=tf.data.AUTOTUNE | |
| ) | |
| label_ds = tf.data.Dataset.from_tensor_slices(labels) | |
| return tf.data.Dataset.zip((audio_ds, label_ds)) | |
| def path_to_audio(path): | |
| """Reads and decodes an audio file.""" | |
| audio = tf.io.read_file(path) | |
| audio, _ = tf.audio.decode_wav(audio, 1, SAMPLING_RATE) | |
| return audio | |
| def add_noise(audio, noises=None, scale=0.5): | |
| if noises is not None: | |
| # Create a random tensor of the same size as audio ranging from | |
| # 0 to the number of noise stream samples that we have. | |
| tf_rnd = tf.random.uniform( | |
| (tf.shape(audio)[0],), 0, noises.shape[0], dtype=tf.int32 | |
| ) | |
| noise = tf.gather(noises, tf_rnd, axis=0) | |
| # Get the amplitude proportion between the audio and the noise | |
| prop = tf.math.reduce_max(audio, axis=1) / tf.math.reduce_max(noise, axis=1) | |
| prop = tf.repeat(tf.expand_dims(prop, axis=1), tf.shape(audio)[1], axis=1) | |
| # Adding the rescaled noise to audio | |
| audio = audio + noise * prop * scale | |
| return audio | |
| def audio_to_fft(audio): | |
| # Since tf.signal.fft applies FFT on the innermost dimension, | |
| # we need to squeeze the dimensions and then expand them again | |
| # after FFT | |
| audio = tf.squeeze(audio, axis=-1) | |
| fft = tf.signal.fft( | |
| tf.cast(tf.complex(real=audio, imag=tf.zeros_like(audio)), tf.complex64) | |
| ) | |
| fft = tf.expand_dims(fft, axis=-1) | |
| # Return the absolute value of the first half of the FFT | |
| # which represents the positive frequencies | |
| return tf.math.abs(fft[:, : (audio.shape[1] // 2), :]) | |
| # Get the list of audio file paths along with their corresponding labels | |
| class_names = os.listdir(DATASET_AUDIO_PATH) | |
| print( | |
| "Our class names: {}".format( | |
| class_names, | |
| ) | |
| ) | |
| audio_paths = [] | |
| labels = [] | |
| for label, name in enumerate(class_names): | |
| print( | |
| "Processing speaker {}".format( | |
| name, | |
| ) | |
| ) | |
| dir_path = Path(DATASET_AUDIO_PATH) / name | |
| speaker_sample_paths = [ | |
| os.path.join(dir_path, filepath) | |
| for filepath in os.listdir(dir_path) | |
| if filepath.endswith(".wav") | |
| ] | |
| audio_paths += speaker_sample_paths | |
| labels += [label] * len(speaker_sample_paths) | |
| print( | |
| "Found {} files belonging to {} classes.".format(len(audio_paths), len(class_names)) | |
| ) | |
| # Shuffle | |
| rng = np.random.RandomState(SHUFFLE_SEED) | |
| rng.shuffle(audio_paths) | |
| rng = np.random.RandomState(SHUFFLE_SEED) | |
| rng.shuffle(labels) | |
| # Split into training and validation | |
| num_val_samples = int(VALID_SPLIT * len(audio_paths)) | |
| print("Using {} files for training.".format(len(audio_paths) - num_val_samples)) | |
| train_audio_paths = audio_paths[:-num_val_samples] | |
| train_labels = labels[:-num_val_samples] | |
| print("Using {} files for validation.".format(num_val_samples)) | |
| valid_audio_paths = audio_paths[-num_val_samples:] | |
| valid_labels = labels[-num_val_samples:] | |
| # Create 2 datasets, one for training and the other for validation | |
| train_ds = paths_and_labels_to_dataset(train_audio_paths, train_labels) | |
| train_ds = train_ds.shuffle(buffer_size=BATCH_SIZE * 8, seed=SHUFFLE_SEED).batch( | |
| BATCH_SIZE | |
| ) | |
| valid_ds = paths_and_labels_to_dataset(valid_audio_paths, valid_labels) | |
| valid_ds = valid_ds.shuffle(buffer_size=32 * 8, seed=SHUFFLE_SEED).batch(32) | |
| # Add noise to the training set | |
| train_ds = train_ds.map( | |
| lambda x, y: (add_noise(x, noises, scale=SCALE), y), | |
| num_parallel_calls=tf.data.AUTOTUNE, | |
| ) | |
| # Transform audio wave to the frequency domain using `audio_to_fft` | |
| train_ds = train_ds.map( | |
| lambda x, y: (audio_to_fft(x), y), num_parallel_calls=tf.data.AUTOTUNE | |
| ) | |
| train_ds = train_ds.prefetch(tf.data.AUTOTUNE) | |
| valid_ds = valid_ds.map( | |
| lambda x, y: (audio_to_fft(x), y), num_parallel_calls=tf.data.AUTOTUNE | |
| ) | |
| valid_ds = valid_ds.prefetch(tf.data.AUTOTUNE) | |
| """ | |
| ## Model Definition | |
| """ | |
| def residual_block(x, filters, conv_num=3, activation="relu"): | |
| # Shortcut | |
| s = keras.layers.Conv1D(filters, 1, padding="same")(x) | |
| for i in range(conv_num - 1): | |
| x = keras.layers.Conv1D(filters, 3, padding="same")(x) | |
| x = keras.layers.Activation(activation)(x) | |
| x = keras.layers.Conv1D(filters, 3, padding="same")(x) | |
| x = keras.layers.Add()([x, s]) | |
| x = keras.layers.Activation(activation)(x) | |
| return keras.layers.MaxPool1D(pool_size=2, strides=2)(x) | |
| def build_model(input_shape, num_classes): | |
| inputs = keras.layers.Input(shape=input_shape, name="input") | |
| x = residual_block(inputs, 16, 2) | |
| x = residual_block(x, 32, 2) | |
| x = residual_block(x, 64, 3) | |
| x = residual_block(x, 128, 3) | |
| x = residual_block(x, 128, 3) | |
| x = keras.layers.AveragePooling1D(pool_size=3, strides=3)(x) | |
| x = keras.layers.Flatten()(x) | |
| x = keras.layers.Dense(256, activation="relu")(x) | |
| x = keras.layers.Dense(128, activation="relu")(x) | |
| outputs = keras.layers.Dense(num_classes, activation="softmax", name="output")(x) | |
| return keras.models.Model(inputs=inputs, outputs=outputs) | |
| model = build_model((SAMPLING_RATE // 2, 1), len(class_names)) | |
| model.summary() | |
| # Compile the model using Adam's default learning rate | |
| model.compile( | |
| optimizer="Adam", | |
| loss="sparse_categorical_crossentropy", | |
| metrics=["accuracy"], | |
| ) | |
| # Add callbacks: | |
| # 'EarlyStopping' to stop training when the model is not enhancing anymore | |
| # 'ModelCheckPoint' to always keep the model that has the best val_accuracy | |
| model_save_filename = "model.keras" | |
| earlystopping_cb = keras.callbacks.EarlyStopping(patience=10, restore_best_weights=True) | |
| mdlcheckpoint_cb = keras.callbacks.ModelCheckpoint( | |
| model_save_filename, monitor="val_accuracy", save_best_only=True | |
| ) | |
| """ | |
| ## Training | |
| """ | |
| history = model.fit( | |
| train_ds, | |
| epochs=EPOCHS, | |
| validation_data=valid_ds, | |
| callbacks=[earlystopping_cb, mdlcheckpoint_cb], | |
| ) | |
| """ | |
| ## Evaluation | |
| """ | |
| print(model.evaluate(valid_ds)) | |
| """ | |
| We get ~ 98% validation accuracy. | |
| """ | |
| """ | |
| ## Demonstration | |
| Let's take some samples and: | |
| - Predict the speaker | |
| - Compare the prediction with the real speaker | |
| - Listen to the audio to see that despite the samples being noisy, | |
| the model is still pretty accurate | |
| """ | |
| SAMPLES_TO_DISPLAY = 10 | |
| test_ds = paths_and_labels_to_dataset(valid_audio_paths, valid_labels) | |
| test_ds = test_ds.shuffle(buffer_size=BATCH_SIZE * 8, seed=SHUFFLE_SEED).batch( | |
| BATCH_SIZE | |
| ) | |
| test_ds = test_ds.map( | |
| lambda x, y: (add_noise(x, noises, scale=SCALE), y), | |
| num_parallel_calls=tf.data.AUTOTUNE, | |
| ) | |
| for audios, labels in test_ds.take(1): | |
| # Get the signal FFT | |
| ffts = audio_to_fft(audios) | |
| # Predict | |
| y_pred = model.predict(ffts) | |
| # Take random samples | |
| rnd = np.random.randint(0, BATCH_SIZE, SAMPLES_TO_DISPLAY) | |
| audios = audios.numpy()[rnd, :, :] | |
| labels = labels.numpy()[rnd] | |
| y_pred = np.argmax(y_pred, axis=-1)[rnd] | |
| for index in range(SAMPLES_TO_DISPLAY): | |
| # For every sample, print the true and predicted label | |
| # as well as run the voice with the noise | |
| print( | |
| "Speaker:\33{} {}\33[0m\tPredicted:\33{} {}\33[0m".format( | |
| "[92m" if labels[index] == y_pred[index] else "[91m", | |
| class_names[labels[index]], | |
| "[92m" if labels[index] == y_pred[index] else "[91m", | |
| class_names[y_pred[index]], | |
| ) | |
| ) | |
| display(Audio(audios[index, :, :].squeeze(), rate=SAMPLING_RATE)) | |