Spaces:
Sleeping
Sleeping
| """Contains functions to use the BirdNET models. | |
| """ | |
| import os | |
| import warnings | |
| import numpy as np | |
| import config as cfg | |
| import utils | |
| os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3" | |
| os.environ["CUDA_VISIBLE_DEVICES"] = "" | |
| warnings.filterwarnings("ignore") | |
| # Import TFLite from runtime or Tensorflow; | |
| # import Keras if protobuf model; | |
| # NOTE: we have to use TFLite if we want to use | |
| # the metadata model or want to extract embeddings | |
| try: | |
| import tflite_runtime.interpreter as tflite | |
| except ModuleNotFoundError: | |
| from tensorflow import lite as tflite | |
| if not cfg.MODEL_PATH.endswith(".tflite"): | |
| from tensorflow import keras | |
| INTERPRETER: tflite.Interpreter = None | |
| C_INTERPRETER: tflite.Interpreter = None | |
| M_INTERPRETER: tflite.Interpreter = None | |
| PBMODEL = None | |
| def loadModel(class_output=True): | |
| """Initializes the BirdNET Model. | |
| Args: | |
| class_output: Omits the last layer when False. | |
| """ | |
| global PBMODEL | |
| global INTERPRETER | |
| global INPUT_LAYER_INDEX | |
| global OUTPUT_LAYER_INDEX | |
| # Do we have to load the tflite or protobuf model? | |
| if cfg.MODEL_PATH.endswith(".tflite"): | |
| # Load TFLite model and allocate tensors. | |
| INTERPRETER = tflite.Interpreter(model_path=cfg.MODEL_PATH, num_threads=cfg.TFLITE_THREADS) | |
| INTERPRETER.allocate_tensors() | |
| # Get input and output tensors. | |
| input_details = INTERPRETER.get_input_details() | |
| output_details = INTERPRETER.get_output_details() | |
| # Get input tensor index | |
| INPUT_LAYER_INDEX = input_details[0]["index"] | |
| # Get classification output or feature embeddings | |
| if class_output: | |
| OUTPUT_LAYER_INDEX = output_details[0]["index"] | |
| else: | |
| OUTPUT_LAYER_INDEX = output_details[0]["index"] - 1 | |
| else: | |
| # Load protobuf model | |
| # Note: This will throw a bunch of warnings about custom gradients | |
| # which we will ignore until TF lets us block them | |
| PBMODEL = keras.models.load_model(cfg.MODEL_PATH, compile=False) | |
| def loadCustomClassifier(): | |
| """Loads the custom classifier.""" | |
| global C_INTERPRETER | |
| global C_INPUT_LAYER_INDEX | |
| global C_OUTPUT_LAYER_INDEX | |
| global C_INPUT_SIZE | |
| # Load TFLite model and allocate tensors. | |
| C_INTERPRETER = tflite.Interpreter(model_path=cfg.CUSTOM_CLASSIFIER, num_threads=cfg.TFLITE_THREADS) | |
| C_INTERPRETER.allocate_tensors() | |
| # Get input and output tensors. | |
| input_details = C_INTERPRETER.get_input_details() | |
| output_details = C_INTERPRETER.get_output_details() | |
| # Get input tensor index | |
| C_INPUT_LAYER_INDEX = input_details[0]["index"] | |
| C_INPUT_SIZE = input_details[0]["shape"][-1] | |
| # Get classification output | |
| C_OUTPUT_LAYER_INDEX = output_details[0]["index"] | |
| def loadMetaModel(): | |
| """Loads the model for species prediction. | |
| Initializes the model used to predict species list, based on coordinates and week of year. | |
| """ | |
| global M_INTERPRETER | |
| global M_INPUT_LAYER_INDEX | |
| global M_OUTPUT_LAYER_INDEX | |
| # Load TFLite model and allocate tensors. | |
| M_INTERPRETER = tflite.Interpreter(model_path=cfg.MDATA_MODEL_PATH, num_threads=cfg.TFLITE_THREADS) | |
| M_INTERPRETER.allocate_tensors() | |
| # Get input and output tensors. | |
| input_details = M_INTERPRETER.get_input_details() | |
| output_details = M_INTERPRETER.get_output_details() | |
| # Get input tensor index | |
| M_INPUT_LAYER_INDEX = input_details[0]["index"] | |
| M_OUTPUT_LAYER_INDEX = output_details[0]["index"] | |
| def buildLinearClassifier(num_labels, input_size, hidden_units=0, dropout=0.0): | |
| """Builds a classifier. | |
| Args: | |
| num_labels: Output size. | |
| input_size: Size of the input. | |
| hidden_units: If > 0, creates another hidden layer with the given number of units. | |
| Returns: | |
| A new classifier. | |
| """ | |
| # import keras | |
| from tensorflow import keras | |
| # Build a simple one- or two-layer linear classifier | |
| model = keras.Sequential() | |
| # Input layer | |
| model.add(keras.layers.InputLayer(input_shape=(input_size,))) | |
| # Hidden layer | |
| if hidden_units > 0: | |
| # Dropout layer? | |
| if dropout > 0: | |
| model.add(keras.layers.Dropout(dropout)) | |
| model.add(keras.layers.Dense(hidden_units, activation="relu")) | |
| # Dropout layer? | |
| if dropout > 0: | |
| model.add(keras.layers.Dropout(dropout)) | |
| # Classification layer | |
| model.add(keras.layers.Dense(num_labels)) | |
| # Activation layer | |
| model.add(keras.layers.Activation("sigmoid")) | |
| return model | |
| def trainLinearClassifier(classifier, | |
| x_train, | |
| y_train, | |
| epochs, | |
| batch_size, | |
| learning_rate, | |
| val_split, | |
| upsampling_ratio, | |
| upsampling_mode, | |
| train_with_mixup, | |
| train_with_label_smoothing, | |
| on_epoch_end=None): | |
| """Trains a custom classifier. | |
| Trains a new classifier for BirdNET based on the given data. | |
| Args: | |
| classifier: The classifier to be trained. | |
| x_train: Samples. | |
| y_train: Labels. | |
| epochs: Number of epochs to train. | |
| batch_size: Batch size. | |
| learning_rate: The learning rate during training. | |
| on_epoch_end: Optional callback `function(epoch, logs)`. | |
| Returns: | |
| (classifier, history) | |
| """ | |
| # import keras | |
| from tensorflow import keras | |
| class FunctionCallback(keras.callbacks.Callback): | |
| def __init__(self, on_epoch_end=None) -> None: | |
| super().__init__() | |
| self.on_epoch_end_fn = on_epoch_end | |
| def on_epoch_end(self, epoch, logs=None): | |
| if self.on_epoch_end_fn: | |
| self.on_epoch_end_fn(epoch, logs) | |
| # Set random seed | |
| np.random.seed(cfg.RANDOM_SEED) | |
| # Shuffle data | |
| idx = np.arange(x_train.shape[0]) | |
| np.random.shuffle(idx) | |
| x_train = x_train[idx] | |
| y_train = y_train[idx] | |
| # Random val split | |
| x_train, y_train, x_val, y_val = utils.random_split(x_train, y_train, val_split) | |
| print(f"Training on {x_train.shape[0]} samples, validating on {x_val.shape[0]} samples.", flush=True) | |
| # Upsample training data | |
| if upsampling_ratio > 0: | |
| x_train, y_train = utils.upsampling(x_train, y_train, upsampling_ratio, upsampling_mode) | |
| print(f"Upsampled training data to {x_train.shape[0]} samples.", flush=True) | |
| # Apply mixup to training data | |
| if train_with_mixup: | |
| x_train, y_train = utils.mixup(x_train, y_train) | |
| # Apply label smoothing | |
| if train_with_label_smoothing: | |
| y_train = utils.label_smoothing(y_train) | |
| # Early stopping | |
| callbacks = [ | |
| keras.callbacks.EarlyStopping( | |
| monitor="val_loss", patience=5, verbose=1, start_from_epoch=epochs // 4, restore_best_weights=True | |
| ), | |
| FunctionCallback(on_epoch_end=on_epoch_end), | |
| ] | |
| # Cosine annealing lr schedule | |
| lr_schedule = keras.experimental.CosineDecay(learning_rate, epochs * x_train.shape[0] / batch_size) | |
| # Compile model | |
| classifier.compile( | |
| optimizer=keras.optimizers.Adam(learning_rate=lr_schedule), | |
| loss="binary_crossentropy", | |
| metrics=[keras.metrics.AUC(curve="PR", multi_label=False, name="AUPRC")], | |
| ) | |
| # Train model | |
| history = classifier.fit( | |
| x_train, y_train, epochs=epochs, batch_size=batch_size, validation_data=(x_val, y_val), callbacks=callbacks | |
| ) | |
| return classifier, history | |
| def saveLinearClassifier(classifier, model_path, labels): | |
| """Saves a custom classifier on the hard drive. | |
| Saves the classifier as a tflite model, as well as the used labels in a .txt. | |
| Args: | |
| classifier: The custom classifier. | |
| model_path: Path the model will be saved at. | |
| labels: List of labels used for the classifier. | |
| """ | |
| import tensorflow as tf | |
| saved_model = PBMODEL if PBMODEL else tf.keras.models.load_model(cfg.PB_MODEL, compile=False) | |
| # Remove activation layer | |
| classifier.pop() | |
| combined_model = tf.keras.Sequential([saved_model.embeddings_model, classifier], "basic") | |
| # Append .tflite if necessary | |
| if not model_path.endswith(".tflite"): | |
| model_path += ".tflite" | |
| # Make folders | |
| os.makedirs(os.path.dirname(model_path), exist_ok=True) | |
| # Save model as tflite | |
| converter = tflite.TFLiteConverter.from_keras_model(combined_model) | |
| tflite_model = converter.convert() | |
| open(model_path, "wb").write(tflite_model) | |
| # Save labels | |
| with open(model_path.replace(".tflite", "_Labels.txt"), "w") as f: | |
| for label in labels: | |
| f.write(label + "\n") | |
| def save_raven_model(classifier, model_path, labels): | |
| import tensorflow as tf | |
| import csv | |
| import json | |
| saved_model = PBMODEL if PBMODEL else tf.keras.models.load_model(cfg.PB_MODEL, compile=False) | |
| combined_model = tf.keras.Sequential([saved_model.embeddings_model, classifier], "basic") | |
| # Make signatures | |
| class SignatureModule(tf.Module): | |
| def __init__(self, keras_model): | |
| super().__init__() | |
| self.model = keras_model | |
| def basic(self, inputs): | |
| return {"scores": self.model(inputs)} | |
| smodel = SignatureModule(combined_model) | |
| signatures = { | |
| "basic": smodel.basic, | |
| } | |
| # Save signature model | |
| os.makedirs(os.path.dirname(model_path), exist_ok=True) | |
| model_path = model_path[:-7] if model_path.endswith(".tflite") else model_path | |
| tf.saved_model.save(smodel, model_path, signatures=signatures) | |
| # Save label file | |
| labelIds = [label[:4].replace(" ", "") + str(i) for i, label in enumerate(labels, 1)] | |
| labels_dir = os.path.join(model_path, "labels") | |
| os.makedirs(labels_dir, exist_ok=True) | |
| with open(os.path.join(labels_dir, "label_names.csv"), "w", newline="") as labelsfile: | |
| labelwriter = csv.writer(labelsfile) | |
| labelwriter.writerows(zip(labelIds, labels)) | |
| # Save class names file | |
| classes_dir = os.path.join(model_path, "classes") | |
| os.makedirs(classes_dir, exist_ok=True) | |
| with open(os.path.join(classes_dir, "classes.csv"), "w", newline="") as classesfile: | |
| classeswriter = csv.writer(classesfile) | |
| for labelId in labelIds: | |
| classeswriter.writerow((labelId, 0.25, cfg.SIG_FMIN, cfg.SIG_FMAX, False)) | |
| # Save model config | |
| model_config = os.path.join(model_path, "model_config.json") | |
| with open(model_config, "w") as modelconfigfile: | |
| modelconfig = { | |
| "specVersion": 1, | |
| "modelDescription": "Custom classifier trained with BirdNET " | |
| + cfg.MODEL_VESION | |
| + " embeddings.\nBirdNET was developed by the K. Lisa Yang Center for Conservation Bioacoustics at the Cornell Lab of Ornithology in collaboration with Chemnitz University of Technology.\n\nhttps://birdnet.cornell.edu", | |
| "modelTypeConfig": {"modelType": "RECOGNITION"}, | |
| "signatures": [ | |
| { | |
| "signatureName": "basic", | |
| "modelInputs": [{"inputName": "inputs", "sampleRate": 48000.0, "inputConfig": ["batch", "samples"]}], | |
| "modelOutputs": [{"outputName": "scores", "outputType": "SCORES"}], | |
| } | |
| ], | |
| "globalSemanticKeys": labelIds, | |
| } | |
| json.dump(modelconfig, modelconfigfile, indent=2) | |
| def predictFilter(lat, lon, week): | |
| """Predicts the probability for each species. | |
| Args: | |
| lat: The latitude. | |
| lon: The longitude. | |
| week: The week of the year [1-48]. Use -1 for yearlong. | |
| Returns: | |
| A list of probabilities for all species. | |
| """ | |
| global M_INTERPRETER | |
| # Does interpreter exist? | |
| if M_INTERPRETER == None: | |
| loadMetaModel() | |
| # Prepare mdata as sample | |
| sample = np.expand_dims(np.array([lat, lon, week], dtype="float32"), 0) | |
| # Run inference | |
| M_INTERPRETER.set_tensor(M_INPUT_LAYER_INDEX, sample) | |
| M_INTERPRETER.invoke() | |
| return M_INTERPRETER.get_tensor(M_OUTPUT_LAYER_INDEX)[0] | |
| def explore(lat: float, lon: float, week: int): | |
| """Predicts the species list. | |
| Predicts the species list based on the coordinates and week of year. | |
| Args: | |
| lat: The latitude. | |
| lon: The longitude. | |
| week: The week of the year [1-48]. Use -1 for yearlong. | |
| Returns: | |
| A sorted list of tuples with the score and the species. | |
| """ | |
| # Make filter prediction | |
| l_filter = predictFilter(lat, lon, week) | |
| # Apply threshold | |
| l_filter = np.where(l_filter >= cfg.LOCATION_FILTER_THRESHOLD, l_filter, 0) | |
| # Zip with labels | |
| l_filter = list(zip(l_filter, cfg.LABELS)) | |
| # Sort by filter value | |
| l_filter = sorted(l_filter, key=lambda x: x[0], reverse=True) | |
| return l_filter | |
| def flat_sigmoid(x, sensitivity=-1): | |
| return 1 / (1.0 + np.exp(sensitivity * np.clip(x, -15, 15))) | |
| def predict(sample): | |
| """Uses the main net to predict a sample. | |
| Args: | |
| sample: Audio sample. | |
| Returns: | |
| The prediction scores for the sample. | |
| """ | |
| # Has custom classifier? | |
| if cfg.CUSTOM_CLASSIFIER != None: | |
| return predictWithCustomClassifier(sample) | |
| global INTERPRETER | |
| # Does interpreter or keras model exist? | |
| if INTERPRETER == None and PBMODEL == None: | |
| loadModel() | |
| if PBMODEL == None: | |
| # Reshape input tensor | |
| INTERPRETER.resize_tensor_input(INPUT_LAYER_INDEX, [len(sample), *sample[0].shape]) | |
| INTERPRETER.allocate_tensors() | |
| # Make a prediction (Audio only for now) | |
| INTERPRETER.set_tensor(INPUT_LAYER_INDEX, np.array(sample, dtype="float32")) | |
| INTERPRETER.invoke() | |
| prediction = INTERPRETER.get_tensor(OUTPUT_LAYER_INDEX) | |
| return prediction | |
| else: | |
| # Make a prediction (Audio only for now) | |
| prediction = PBMODEL.embeddings_model.predict(sample) | |
| return prediction | |
| def predictWithCustomClassifier(sample): | |
| """Uses the custom classifier to make a prediction. | |
| Args: | |
| sample: Audio sample. | |
| Returns: | |
| The prediction scores for the sample. | |
| """ | |
| global C_INTERPRETER | |
| global C_INPUT_SIZE | |
| # Does interpreter exist? | |
| if C_INTERPRETER == None: | |
| loadCustomClassifier() | |
| vector = embeddings(sample) if C_INPUT_SIZE != 144000 else sample | |
| # Reshape input tensor | |
| C_INTERPRETER.resize_tensor_input(C_INPUT_LAYER_INDEX, [len(vector), *vector[0].shape]) | |
| C_INTERPRETER.allocate_tensors() | |
| # Make a prediction | |
| C_INTERPRETER.set_tensor(C_INPUT_LAYER_INDEX, np.array(vector, dtype="float32")) | |
| C_INTERPRETER.invoke() | |
| prediction = C_INTERPRETER.get_tensor(C_OUTPUT_LAYER_INDEX) | |
| return prediction | |
| def embeddings(sample): | |
| """Extracts the embeddings for a sample. | |
| Args: | |
| sample: Audio samples. | |
| Returns: | |
| The embeddings. | |
| """ | |
| global INTERPRETER | |
| # Does interpreter exist? | |
| if INTERPRETER == None: | |
| loadModel(False) | |
| # Reshape input tensor | |
| INTERPRETER.resize_tensor_input(INPUT_LAYER_INDEX, [len(sample), *sample[0].shape]) | |
| INTERPRETER.allocate_tensors() | |
| # Extract feature embeddings | |
| INTERPRETER.set_tensor(INPUT_LAYER_INDEX, np.array(sample, dtype="float32")) | |
| INTERPRETER.invoke() | |
| features = INTERPRETER.get_tensor(OUTPUT_LAYER_INDEX) | |
| return features | |