Spaces:
Sleeping
Sleeping
| """Module containing common function. | |
| """ | |
| import os | |
| import traceback | |
| import numpy as np | |
| from pathlib import Path | |
| import config as cfg | |
| def collect_audio_files(path: str): | |
| """Collects all audio files in the given directory. | |
| Args: | |
| path: The directory to be searched. | |
| Returns: | |
| A sorted list of all audio files in the directory. | |
| """ | |
| # Get all files in directory with os.walk | |
| files = [] | |
| for root, _, flist in os.walk(path): | |
| for f in flist: | |
| if not f.startswith(".") and f.rsplit(".", 1)[-1].lower() in cfg.ALLOWED_FILETYPES: | |
| files.append(os.path.join(root, f)) | |
| return sorted(files) | |
| def readLines(path: str): | |
| """Reads the lines into a list. | |
| Opens the file and reads its contents into a list. | |
| It is expected to have one line for each species or label. | |
| Args: | |
| path: Absolute path to the species file. | |
| Returns: | |
| A list of all species inside the file. | |
| """ | |
| return Path(path).read_text(encoding="utf-8").splitlines() if path else [] | |
| def list_subdirectories(path: str): | |
| """Lists all directories inside a path. | |
| Retrieves all the subdirectories in a given path without recursion. | |
| Args: | |
| path: Directory to be searched. | |
| Returns: | |
| A filter sequence containing the absolute paths to all directories. | |
| """ | |
| return filter(lambda el: os.path.isdir(os.path.join(path, el)), os.listdir(path)) | |
| def random_split(x, y, val_ratio=0.2): | |
| """Splits the data into training and validation data. | |
| Makes sure that each class is represented in both sets. | |
| Args: | |
| x: Samples. | |
| y: One-hot labels. | |
| val_ratio: The ratio of validation data. | |
| Returns: | |
| A tuple of (x_train, y_train, x_val, y_val). | |
| """ | |
| # Set numpy random seed | |
| np.random.seed(cfg.RANDOM_SEED) | |
| # Get number of classes | |
| num_classes = y.shape[1] | |
| # Initialize training and validation data | |
| x_train, y_train, x_val, y_val = [], [], [], [] | |
| # Split data | |
| for i in range(num_classes): | |
| # Get indices of current class | |
| indices = np.where(y[:, i] == 1)[0] | |
| # Get number of samples for each set | |
| num_samples = len(indices) | |
| num_samples_train = max(1, int(num_samples * (1 - val_ratio))) | |
| num_samples_val = max(0, num_samples - num_samples_train) | |
| # Randomly choose samples for training and validation | |
| np.random.shuffle(indices) | |
| train_indices = indices[:num_samples_train] | |
| val_indices = indices[num_samples_train:num_samples_train + num_samples_val] | |
| # Append samples to training and validation data | |
| x_train.append(x[train_indices]) | |
| y_train.append(y[train_indices]) | |
| x_val.append(x[val_indices]) | |
| y_val.append(y[val_indices]) | |
| # Concatenate data | |
| x_train = np.concatenate(x_train) | |
| y_train = np.concatenate(y_train) | |
| x_val = np.concatenate(x_val) | |
| y_val = np.concatenate(y_val) | |
| # Shuffle data | |
| indices = np.arange(len(x_train)) | |
| np.random.shuffle(indices) | |
| x_train = x_train[indices] | |
| y_train = y_train[indices] | |
| indices = np.arange(len(x_val)) | |
| np.random.shuffle(indices) | |
| x_val = x_val[indices] | |
| y_val = y_val[indices] | |
| return x_train, y_train, x_val, y_val | |
| def mixup(x, y, augmentation_ratio=0.25, alpha=0.2): | |
| """Apply mixup to the given data. | |
| Mixup is a data augmentation technique that generates new samples by | |
| mixing two samples and their labels. | |
| Args: | |
| x: Samples. | |
| y: One-hot labels. | |
| augmentation_ratio: The ratio of augmented samples. | |
| alpha: The beta distribution parameter. | |
| Returns: | |
| Augmented data. | |
| """ | |
| # Calculate the number of samples to augment based on the ratio | |
| num_samples_to_augment = int(len(x) * augmentation_ratio) | |
| for _ in range(num_samples_to_augment): | |
| # Randomly choose one instance from the dataset | |
| index = np.random.choice(len(x)) | |
| x1, y1 = x[index], y[index] | |
| # Randomly choose a different instance from the dataset | |
| second_index = np.random.choice(len(x)) | |
| while second_index == index: | |
| second_index = np.random.choice(len(x)) | |
| x2, y2 = x[second_index], y[second_index] | |
| # Generate a random mixing coefficient (lambda) | |
| lambda_ = np.random.beta(alpha, alpha) | |
| # Mix the embeddings and labels | |
| mixed_x = lambda_ * x1 + (1 - lambda_) * x2 | |
| mixed_y = lambda_ * y1 + (1 - lambda_) * y2 | |
| # Replace one of the original samples and labels with the augmented sample and labels | |
| x[index] = mixed_x | |
| y[index] = mixed_y | |
| return x, y | |
| def label_smoothing(y, alpha=0.1): | |
| # Subtract alpha from correct label when it is >0 | |
| y[y > 0] -= alpha | |
| # Assigned alpha to all other labels | |
| y[y == 0] = alpha / y.shape[0] | |
| return y | |
| def upsampling(x, y, ratio=0.5, mode="repeat"): | |
| """Balance data through upsampling. | |
| We upsample minority classes to have at least 10% (ratio=0.1) of the samples of the majority class. | |
| Args: | |
| x: Samples. | |
| y: One-hot labels. | |
| ratio: The minimum ratio of minority to majority samples. | |
| mode: The upsampling mode. Either 'repeat', 'mean' or 'smote'. | |
| Returns: | |
| Upsampled data. | |
| """ | |
| # Set numpy random seed | |
| np.random.seed(cfg.RANDOM_SEED) | |
| # Determin min number of samples | |
| min_samples = int(np.max(y.sum(axis=0)) * ratio) | |
| x_temp = [] | |
| y_temp = [] | |
| if mode == 'repeat': | |
| # For each class with less than min_samples ranomdly repeat samples | |
| for i in range(y.shape[1]): | |
| while y[:, i].sum() + len(y_temp) < min_samples: | |
| # Randomly choose a sample from the minority class | |
| random_index = np.random.choice(np.where(y[:, i] == 1)[0]) | |
| # Append the sample and label to a temp list | |
| x_temp.append(x[random_index]) | |
| y_temp.append(y[random_index]) | |
| elif mode == 'mean': | |
| # For each class with less than min_samples | |
| # select two random samples and calculate the mean | |
| for i in range(y.shape[1]): | |
| x_temp = [] | |
| y_temp = [] | |
| while y[:, i].sum() + len(y_temp) < min_samples: | |
| # Randomly choose two samples from the minority class | |
| random_indices = np.random.choice(np.where(y[:, i] == 1)[0], 2) | |
| # Calculate the mean of the two samples | |
| mean = np.mean(x[random_indices], axis=0) | |
| # Append the mean and label to a temp list | |
| x_temp.append(mean) | |
| y_temp.append(y[random_indices[0]]) | |
| elif mode == 'smote': | |
| # For each class with less than min_samples apply SMOTE | |
| for i in range(y.shape[1]): | |
| x_temp = [] | |
| y_temp = [] | |
| while y[:, i].sum() + len(y_temp) < min_samples: | |
| # Randomly choose a sample from the minority class | |
| random_index = np.random.choice(np.where(y[:, i] == 1)[0]) | |
| # Get the k nearest neighbors | |
| k = 5 | |
| distances = np.sqrt(np.sum((x - x[random_index])**2, axis=1)) | |
| indices = np.argsort(distances)[1:k+1] | |
| # Randomly choose one of the neighbors | |
| random_neighbor = np.random.choice(indices) | |
| # Calculate the difference vector | |
| diff = x[random_neighbor] - x[random_index] | |
| # Randomly choose a weight between 0 and 1 | |
| weight = np.random.uniform(0, 1) | |
| # Calculate the new sample | |
| new_sample = x[random_index] + weight * diff | |
| # Append the new sample and label to a temp list | |
| x_temp.append(new_sample) | |
| y_temp.append(y[random_index]) | |
| # Append the temp list to the original data | |
| if len(x_temp) > 0: | |
| x = np.vstack((x, np.array(x_temp))) | |
| y = np.vstack((y, np.array(y_temp))) | |
| # Shuffle data | |
| indices = np.arange(len(x)) | |
| np.random.shuffle(indices) | |
| x = x[indices] | |
| y = y[indices] | |
| return x, y | |
| def saveToCache(cache_file: str, x_train: np.ndarray, y_train: np.ndarray, labels: list[str]): | |
| """Saves the training data to a cache file. | |
| Args: | |
| cache_file: The path to the cache file. | |
| x_train: The training samples. | |
| y_train: The training labels. | |
| labels: The list of labels. | |
| """ | |
| # Create cache directory | |
| os.makedirs(os.path.dirname(cache_file), exist_ok=True) | |
| # Save to cache | |
| np.savez_compressed(cache_file, x_train=x_train, y_train=y_train, labels=labels) | |
| def loadFromCache(cache_file: str): | |
| """Loads the training data from a cache file. | |
| Args: | |
| cache_file: The path to the cache file. | |
| Returns: | |
| A tuple of (x_train, y_train, labels). | |
| """ | |
| # Load from cache | |
| cache = np.load(cache_file, allow_pickle=True) | |
| # Get data | |
| x_train = cache["x_train"] | |
| y_train = cache["y_train"] | |
| labels = cache["labels"] | |
| return x_train, y_train, labels | |
| def clearErrorLog(): | |
| """Clears the error log file. | |
| For debugging purposes. | |
| """ | |
| if os.path.isfile(cfg.ERROR_LOG_FILE): | |
| os.remove(cfg.ERROR_LOG_FILE) | |
| def writeErrorLog(ex: Exception): | |
| """Writes an exception to the error log. | |
| Formats the stacktrace and writes it in the error log file configured in the config. | |
| Args: | |
| ex: An exception that occurred. | |
| """ | |
| with open(cfg.ERROR_LOG_FILE, "a") as elog: | |
| elog.write("".join(traceback.TracebackException.from_exception(ex).format()) + "\n") | |