|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| import os
|
| from tabulate import tabulate
|
| from tensorflow.python.keras.utils.layer_utils import count_params
|
| from typing import Dict, Optional, Tuple, List
|
| import tensorflow as tf
|
| from tensorflow.keras.models import Model
|
| import numpy as np
|
| import sklearn
|
| from pathlib import Path
|
| from onnx import ModelProto
|
| import onnxruntime
|
| import mlflow
|
| from common.utils import log_to_file
|
| import torch
|
|
|
|
|
| def ai_interp_input_quant(ai_interp, data: np.array, file_extension: str):
|
| ai_runner_input_details = ai_interp.get_inputs()[0]
|
| if ai_runner_input_details.dtype in [np.uint8, np.int8]:
|
|
|
| resc_data = (data / ai_runner_input_details.scale[0]) + ai_runner_input_details.zero_point[0]
|
|
|
| if ai_runner_input_details.dtype==np.int8:
|
| out_data = (resc_data-128).astype(np.int8)
|
| elif ai_runner_input_details.dtype==np.uint8:
|
| out_data = resc_data.astype(np.uint8)
|
| else:
|
| out_data = data.astype(np.float32)
|
|
|
|
|
|
|
|
|
|
|
| return out_data
|
|
|
| def ai_interp_outputs_dequant(ai_interp, predictions: np.array):
|
| ai_runner_output_details = ai_interp.get_outputs()
|
| out_predictions = []
|
| for i,ai_out in enumerate(ai_runner_output_details):
|
| if ai_out.scale[0]!=0 or ai_out.zero_point[0]!=0:
|
| out_predictions.append(ai_out.scale[0]*(predictions[i].astype(np.float32)-ai_out.zero_point[0]))
|
| else:
|
| out_predictions.append(predictions[i].astype(np.float32))
|
| return out_predictions
|
|
|
|
|
| def ai_runner_interp(target: str, name_model: str):
|
| """Returns an interpreter for N6 board on-target inference
|
|
|
| Args:
|
| target (str): Target to run the model on (emulated N6 or real N6 board)
|
| name_model (str): Name of the model to print error message if cant connect to the board
|
|
|
| Returns:
|
| ai_runner_interpreter : The interpreter
|
| ai_runner_input_details : Dictionnary with details about the inputs of the model
|
| ai_runner_output_details : Dictionnary with details about the outputs of the model
|
| """
|
| from common.stm_ai_runner import AiRunner
|
| if target in ['stedgeai_host', 'stedgeai_n6', 'stedgeai_h7p'] :
|
| print(f"Loading {target} for ST Edge AI inference of {name_model}")
|
| from common.stm_ai_runner import AiRunner
|
| if target == 'stedgeai_host':
|
| ai_runner_desc = 'st_ai_ws'
|
| if target in ['stedgeai_n6', 'stedgeai_h7p']:
|
| ai_runner_desc = 'serial:921600'
|
| ai_runner_interpreter = AiRunner()
|
| if not ai_runner_interpreter.connect(ai_runner_desc):
|
| raise TypeError(f"model='{name_model}' unable to load the model")
|
| ai_runner_input_details = ai_runner_interpreter.get_inputs()
|
| ai_runner_output_details = ai_runner_interpreter.get_outputs()
|
| for detail in ai_runner_input_details:
|
| print(f" I: {detail.name} {detail.shape} {detail.dtype} {detail.scale} {detail.zero_point}")
|
| for detail in ai_runner_output_details:
|
| print(f" O: {detail.name} {detail.shape} {detail.dtype} {detail.scale} {detail.zero_point}")
|
| else:
|
| ai_runner_interpreter, ai_runner_input_details, ai_runner_output_details = None,None,None
|
| return ai_runner_interpreter
|
|
|
|
|
| def get_model_name(model_type: str,
|
| input_shape: int,
|
| project_name: str) -> str:
|
| """Returns a string representation of the model name.
|
|
|
| Args:
|
| model_type (str): Type of the model.
|
| input_shape (int): Input shape of the model.
|
| project_name (str): Name of the project.
|
|
|
| Returns:
|
| str: String representation of the model name.
|
| """
|
|
|
| strings = [model_type, str(input_shape), project_name]
|
| name = '_'.join([str(i) for i in strings])
|
|
|
| return name
|
|
|
|
|
| def get_model_name_and_its_input_shape(model_path: str = None,
|
| custom_objects: Dict = None) -> Tuple:
|
| """
|
| Load a model from a given file path and return the model name and
|
| its input shape. Supported model formats are .h5, .keras, .tflite and .onnx.
|
| The basename of the model file is used as the model name. The input
|
| shape is extracted from the model.
|
|
|
| Args:
|
| model_path (str): A path to an .h5, .keras, .tflite or .onnx model file.
|
| custom_objects (Dict): a dictionnary containing custom object from the model
|
|
|
| Returns:
|
| Tuple: A tuple containing the loaded model name and its input shape.
|
| The input shape is a tuple of length 3.
|
| Raises:
|
| ValueError: If the model file extension is not '.h5' or '.tflite'.
|
| RuntimeError: If the input shape of the model cannot be found.
|
| """
|
|
|
|
|
| model_name = Path(model_path).stem
|
|
|
| file_extension = Path(model_path).suffix
|
| if file_extension in [".h5",".keras"]:
|
|
|
|
|
|
|
| model = tf.keras.models.load_model(
|
| model_path,
|
| custom_objects = custom_objects,
|
| compile=False)
|
| try :
|
| input_shape = tuple(model.input.shape[1:])
|
| except:
|
| input_shape = tuple(model.inputs[0].shape[1:])
|
|
|
| elif file_extension == ".tflite":
|
| try:
|
|
|
| interpreter = tf.lite.Interpreter(model_path=model_path)
|
| interpreter.allocate_tensors()
|
|
|
| input_details = interpreter.get_input_details()
|
| input_shape = input_details[0]['shape']
|
| input_shape = tuple(input_shape)[-3:]
|
| except RuntimeError as error:
|
| raise RuntimeError("\nUnable to extract input shape from .tflite model file\n"
|
| f"Received path {model_path}") from error
|
|
|
| elif file_extension == ".onnx":
|
| try:
|
|
|
| onx = ModelProto()
|
| with open(model_path, "rb") as f:
|
| content = f.read()
|
| onx.ParseFromString(content)
|
| sess = onnxruntime.InferenceSession(model_path)
|
|
|
| input_shape = sess.get_inputs()[0].shape
|
| input_shape = tuple(input_shape)[-3:]
|
| except RuntimeError as error:
|
| raise RuntimeError("\nUnable to extract input shape from .onnx model file\n"
|
| f"Received path {model_path}") from error
|
|
|
| else:
|
| raise RuntimeError(f"\nUnknown/unsupported model file type.\nExpected `.tflite`, `.h5`, `.keras`, or `.onnx`."
|
| f"\nReceived path {model_path.split('.')[-1]}")
|
|
|
| return model_name, input_shape
|
|
|
|
|
| def check_model_support(model_name: str, version: Optional[str] = None,
|
| supported_models: Dict = None,
|
| message: Optional[str] = None) -> None:
|
| """
|
| Check if a model name and version are supported based on a dictionary of supported models and versions.
|
|
|
| Args:
|
| model_name(str): The name of the model to check.
|
| version(str): The version of the model to check. May be set to None by the caller.
|
| supported_models(Dict[str, List[str]]): A dictionary of supported models and their versions.
|
| message(str): An error message to print.
|
|
|
| Raises:
|
| NotImplementedError: If the model name or version is not in the list of supported models or versions.
|
| ValueError: If the version attribute is missing or not applicable for the given model.
|
| """
|
| if model_name not in supported_models:
|
| x = list(supported_models.keys())
|
| raise ValueError("\nSupported model names are {}. Received {}.{}".format(x, model_name, message))
|
|
|
| model_versions = supported_models[model_name]
|
| if model_versions:
|
|
|
| if not version:
|
|
|
| raise ValueError("\nMissing `version` attribute for `{}` model.{}".format(model_name, message))
|
| if version not in model_versions:
|
|
|
| raise ValueError("\nSupported versions for `{}` model are {}. "
|
| "Received {}.{}".format(model_name, model_versions, version, message))
|
| else:
|
| if version:
|
|
|
| raise ValueError("\nThe `version` attribute is not applicable "
|
| "to '{}' model.{}".format(model_name, message))
|
|
|
|
|
| def check_attribute_value(attribute_value: str, values: List[str] = None,
|
| name: str = None, message: str = None) -> None:
|
| """
|
| Check if an attribute value is valid based on a list of supported values.
|
| Args:
|
| attribute_value(str): The value of the attribute to check.
|
| values(List[str]): A list of supported values.
|
| name(str): The name of the attribute being checked.
|
| message(str): A message to print if the attribute is not supported.
|
| Raises:
|
| ValueError: If the attribute value is not in the list of supported values.
|
| """
|
| if attribute_value not in values:
|
| raise ValueError(f"\nSupported values for `{name}` attribute are {values}. "
|
| f"Received {attribute_value}.{message}")
|
|
|
|
|
| def transfer_pretrained_weights(target_model: tf.keras.Model, source_model_path: str = None,
|
| end_layer_index: int = None, target_model_name: str = None) -> None:
|
|
|
|
|
| """
|
| Copy the weights of a source model to a target model. Only the backbone weights
|
| are copied as the two models can have different classifiers.
|
|
|
| Args:
|
| target_model (tf.keras.Model): The target model.
|
| source_model_path (str): Path to the source model file (h5 or keras file).
|
| end_layer_index (int): Index of the last backbone layer (the first layer of the model has index 0).
|
| target_model_name (str): The name of the target model.
|
|
|
| Raises:
|
| ValueError: The source model file cannot be found.
|
| ValueError: The two models are incompatible because they have different backbones.
|
| """
|
|
|
| if source_model_path:
|
| if not os.path.isfile(source_model_path):
|
| raise ValueError("Unable to find pretrained model file.\nReceived "
|
| f"model path {source_model_path}")
|
| source_model = tf.keras.models.load_model(source_model_path, compile=False)
|
|
|
| message = f"\nUnable to transfer to model `{target_model_name}`"
|
| message += f"the weights from model {source_model_path}\n"
|
| message += "Models are incompatible (backbones are different)."
|
| if len(source_model.layers) < end_layer_index + 1:
|
| raise ValueError(message)
|
| for i in range(end_layer_index + 1):
|
| weights = source_model.layers[i].get_weights()
|
| try:
|
| target_model.layers[i].set_weights(weights)
|
| except ValueError as error:
|
| raise message from error
|
|
|
|
|
| def model_summary(model):
|
| """
|
| This function displays a model summary. It is similar to a Keras
|
| model summary with the additional information:
|
| - Indices of layers
|
| - Trainable/non-trainable status of layers
|
| - Total number of layers
|
| - Number of trainable layers
|
| - Number of non-trainable layers
|
| """
|
|
|
| num_layers = len(model.layers)
|
| trainable_layers = 0
|
| table = []
|
| for i, layer in enumerate(model.layers):
|
| layer_type = layer.__class__.__name__
|
| if layer_type == "InputLayer":
|
| layer_shape = model.input.shape
|
| else:
|
| layer_shape = layer.output.shape
|
| is_trainable = True if layer.trainable else False
|
| num_params = layer.count_params()
|
| if layer.trainable:
|
| trainable_layers += 1
|
| table.append([i, is_trainable, layer.name, layer_type, num_params, layer_shape])
|
|
|
|
|
| print(108 * '=')
|
| print(" Model:", model.name)
|
| print(108 * '=')
|
| print(tabulate(table, headers=["Layer index", "Trainable", "Name", "Type", "Params#", "Output shape"]))
|
| print(108 * '-')
|
| print("Total params:", model.count_params())
|
| print("Trainable params: ", count_params(model.trainable_weights))
|
| print("Non-trainable params: ", count_params(model.non_trainable_weights))
|
| print(108 * '-')
|
| print("Total layers:", num_layers)
|
| print("Trainable layers:", trainable_layers)
|
| print("Non-trainable layers:", num_layers - trainable_layers)
|
| print(108 * '=')
|
|
|
|
|
| def count_h5_parameters(output_dir: str = None, model: tf.keras.Model = None):
|
| total_params = model.count_params()
|
| mlflow.log_metric(f"nb_params", total_params)
|
| log_to_file(output_dir, f"Nb params of float model : {total_params}")
|
| print(f"[INFO] : Nb params of float model : {total_params}")
|
|
|
|
|
| def count_tflite_parameters(output_dir: str = None,
|
| model_path: str = None,
|
| num_threads: Optional[int] = 1):
|
| interpreter = tf.lite.Interpreter(model_path=model_path, num_threads=num_threads)
|
|
|
| tensor_details = interpreter.get_tensor_details()
|
|
|
| total_params = 0
|
| for tensor in tensor_details:
|
| shape = tensor['shape']
|
| if shape is not None:
|
| num_params = 1
|
| for dim in shape:
|
| num_params *= dim
|
| total_params += num_params
|
| mlflow.log_metric(f"quantized_nb_params", total_params)
|
| log_to_file(output_dir, f"Nb params of quantized model : {total_params}")
|
| print(f"[INFO] : Nb params of quantized model : {total_params}")
|
|
|
|
|
| def tf_dataset_to_np_array(input_ds, nchw=True, labels_included=True):
|
| """
|
| Converts a TensorFlow dataset into two NumPy arrays containing the data and labels.
|
|
|
| This function iterates over the provided TensorFlow dataset, casts the image data to
|
| float32, and then converts the images and labels into NumPy arrays. The images and
|
| labels from all batches are concatenated along the first axis (batch dimension) to
|
| form two unified arrays.
|
|
|
| Parameters:
|
| - input_ds (tf.data.Dataset): A TensorFlow dataset object that yields tuples of
|
| (images, labels) when iterated over.
|
|
|
| - labels_included (bool): A boolean that represent whether or not the dataset
|
| contains the labels of the images (True) or just the images (False)
|
|
|
| Returns:
|
| - tuple: A tuple containing two NumPy arrays:
|
| - The first array contains the image data from the dataset.
|
| - The second array contains the corresponding labels.
|
|
|
| Example:
|
| ```python
|
| import tensorflow as tf
|
| import numpy as np
|
|
|
| # Assuming `dataset` is a pre-defined TensorFlow dataset with image-label pairs
|
| data, labels = tf_dataset_to_np_array(dataset)
|
|
|
| print(data.shape) # Prints the shape of the image data array
|
| print(labels.shape) # Prints the shape of the labels array
|
|
|
| # Assuming `dataset` is a pre-defined TensorFlow dataset with image only
|
| data, _ = tf_dataset_to_np_array(dataset,labels_included=False)
|
|
|
| print(data.shape) # Prints the shape of the image data array
|
| ```
|
|
|
| Note:
|
| - The input TensorFlow dataset is expected to yield batches of data.
|
| - The function assumes that the dataset yields data in the form of (images, labels),
|
| where `images` are the features and `labels` are the corresponding targets
|
| or the data is of the form (images).
|
| - The function will fail if the input dataset does not yield data in the expected format.
|
| """
|
| batch_data = []
|
| batch_labels = []
|
| if labels_included:
|
| for images, labels in input_ds:
|
| images = tf.cast(images, dtype=tf.float32).numpy()
|
| batch_data.append(images)
|
| batch_labels.append(labels)
|
| batch_labels = np.concatenate(batch_labels, axis=0)
|
| else:
|
| for images in input_ds:
|
| images = tf.cast(images, dtype=tf.float32).numpy()
|
| batch_data.append(images)
|
| batch_data = np.concatenate(batch_data, axis=0)
|
|
|
|
|
| if nchw and batch_data is not None:
|
| if batch_data.ndim == 4:
|
|
|
| axes_order = (0, 3, 1, 2)
|
| elif batch_data.ndim == 3:
|
|
|
| axes_order = (0, 2, 1)
|
| else:
|
| raise ValueError("The input array must have either 3 or 4 dimensions.")
|
| batch_data = np.transpose(batch_data, axes_order)
|
|
|
| return batch_data, batch_labels
|
|
|
|
|
| def torch_dataset_to_np_array(input_loader, nchw=True, labels_included=True, device="cpu"):
|
| """
|
| Converts a PyTorch DataLoader into two NumPy arrays containing the data and labels.
|
|
|
| Parameters:
|
| - input_loader (torch.utils.data.DataLoader): A PyTorch DataLoader yielding batches of
|
| (images, labels) or (images,) tensors.
|
| - nchw (bool): Whether to ensure the output images follow NCHW format (channels first).
|
| - labels_included (bool): Whether the dataset includes labels.
|
| - device (str): Device to move tensors to before converting (default: 'cpu').
|
|
|
| Returns:
|
| - tuple: (images_np, labels_np)
|
| - images_np: numpy.ndarray containing all image tensors concatenated along batch dim.
|
| - labels_np: numpy.ndarray containing all labels (if labels_included=True), else None.
|
|
|
| Example:
|
| ```python
|
| data, labels = torch_dataset_to_np_array(train_loader)
|
| print(data.shape, labels.shape)
|
|
|
| images_only, _ = torch_dataset_to_np_array(pred_loader, labels_included=False)
|
| print(images_only.shape)
|
| ```
|
| """
|
| all_images = []
|
| all_labels = []
|
|
|
|
|
| with torch.no_grad():
|
| for batch in input_loader:
|
| if labels_included:
|
| images, labels = batch
|
| images = images.to(device)
|
| all_labels.append(labels.cpu().numpy())
|
| else:
|
| images = batch[0] if isinstance(batch, (list, tuple)) else batch
|
| images = images.to(device)
|
|
|
| all_images.append(images.cpu().numpy())
|
|
|
|
|
| images_np = np.concatenate(all_images, axis=0)
|
| labels_np = np.concatenate(all_labels, axis=0) if labels_included else None
|
|
|
|
|
| if nchw and images_np.ndim == 4:
|
|
|
| images_np = np.transpose(images_np, (0, 2, 3, 1))
|
|
|
| return images_np, labels_np
|
|
|
| def compute_confusion_matrix(test_set: tf.data.Dataset = None, model: Model = None) -> Tuple[np.ndarray, np.float32]:
|
| """
|
| Computes the confusion matrix and logs it as an image summary.
|
|
|
| Args:
|
| test_set (tf.data.Dataset): The test dataset to evaluate the model on.
|
| model (tf.keras.models.Model): The trained model to evaluate.
|
| Returns:
|
| confusion_matrix and accuracy
|
| """
|
| test_pred = []
|
| test_labels = []
|
| for data in test_set:
|
| test_pred_score = model.predict_on_batch(data[0])
|
| if test_pred_score.shape[1] > 1:
|
|
|
| test_pred.append(np.argmax(test_pred_score, axis=1))
|
| else:
|
|
|
| test_pred_score = np.where(test_pred_score < 0.5, 0, 1)
|
| test_pred.append(np.squeeze(test_pred_score))
|
|
|
| batch_labels = np.argmax(data[1], axis=1) if len(data[1].shape)>1 else data[1]
|
| test_labels.append(batch_labels)
|
|
|
| labels = np.concatenate(test_labels, axis=0)
|
| logits = np.concatenate(test_pred, axis=0)
|
| test_accuracy = round((np.sum(labels == logits) * 100) / len(labels), 2)
|
|
|
|
|
| cm = sklearn.metrics.confusion_matrix(labels, logits)
|
| return cm, test_accuracy |