models-moved / Models.py
dthh's picture
Upload Models.py with huggingface_hub
917fb3c verified
import os
# import sys
from functools import partial
from pathlib import Path
import logging
import torch
from huggingface_hub import hf_hub_download
from torch import Tensor, nn
from torchvision import models, transforms
import pandas as pd
from collections import defaultdict
class ModelInterface:
"""
Interface for managing image classification and regression tasks.
"""
def __init__(self, config):
"""
Initialize the ModelInterface.
Parameters:
config (dict): Configuration dictionary containing the following keys:
- gpu_kernel (int): GPU index to use for computations. Defaults to the first available GPU if available, otherwise CPU.
- transform_surface (dict): Parameters for surface type and quality image transformations, including resize, crop, and normalization settings.
- transform_road_type (dict): Parameters for road type image transformations, similar to surface transformations.
- model_root (str): Directory path where model files are stored locally. Defaults to folder name 'models'.
- models (dict): Dictionary mapping prediction levels (e.g., 'road_type', 'surface_type') to model file names.
- hf_model_repo (str): Hugging Face repository ID for downloading models if not found locally.
"""
self.device = self._validate_device(config.get('gpu_kernel', ''))
self.model_root = Path(config.get("model_root", "models"))
self.models = config.get("models")
self.hf_model_repo = config.get("hf_model_repo", "")
self._validate_models()
self._default_normalization = (NORM_MEAN, NORM_SD)
self.transform_surface = self._validate_transform(config.get("transform_surface", None), "surface_type")
self.transform_road_type = self._validate_transform(config.get("transform_road_type", None), "road_type")
def _validate_device(self, gpu_kernel):
try:
cuda = "cuda" if gpu_kernel == '' else f"cuda:{gpu_kernel}"
return torch.device(
cuda if torch.cuda.is_available() else "cpu"
)
except Exception as e:
logging.warning(f"An unexpected error occurred while selecting GPU: {e}\n"
+ "Falling back to CPU.")
return torch.device("cpu")
def _validate_models(self):
"""
Check if model files exist and download from hugging face if not.
"""
if not self.models:
raise TypeError("No models are defined.")
log_model_not_defined = "No model for '{level_string}' is defined. Prediction is skipped."
# check surface type model
level = "surface_type"
model_file = self.models.get(level)
if model_file is None:
logging.info(log_model_not_defined.format(level_string=model_to_info_string[level]))
else:
self.download_model(model_file)
_, surface_class_to_idx, _ = self.load_model(model=model_file)
# check quality models
level = "surface_quality"
sub_models = self.models.get(level)
if model_file is None:
logging.info(log_model_not_defined.format(level_string=model_to_info_string[level]))
else:
for surface_type in surface_class_to_idx:
model_file = sub_models.get(surface_type)
if model_file is None:
logging.info(log_model_not_defined.format(level_string=surface_type))
else:
self.download_model(model_file)
self.load_model(model=model_file)
# check road type model
level = "road_type"
model_file = self.models.get(level)
if model_file is None:
logging.info(log_model_not_defined.format(level_string=model_to_info_string[level]))
else:
self.download_model(model_file)
self.load_model(model=model_file)
def _validate_transform(self, transform, level):
"""
Validate the transformation for a given model type if the model exists.
Parameters:
- transform (dict): transformation.
- level (str): model level.
Returns:
dict: transformation.
"""
if level in self.models:
if transform is None:
logging.warning(f"No transformation for {model_to_info_string[level]} prediction defined.")
transform = {}
if "normalize" not in transform:
logging.info(f"No normalization parameters for {model_to_info_string[level]} prediction provided. Using default values.")
transform["normalize"] = self._default_normalization
return transform
def download_model(self, model):
"""
Download a model from Hugging Face repository.
Parameters:
- model (str): Model file name.
Returns:
None
"""
model_path = self.model_root / model
# load model data from hugging face if not locally available
if not os.path.exists(model_path):
logging.info(
f"Model file not found at {model_path}. Downloading from Hugging Face..."
)
try:
os.makedirs(self.model_root, exist_ok=True)
model_path = hf_hub_download(
repo_id=self.hf_model_repo, filename=model, local_dir=self.model_root
)
logging.info(f"Model file downloaded successfully to {model_path}.")
except Exception as e:
logging.error(f"An unexpected error occurred while downloading the model {model}: {e}")
raise e
@staticmethod
def custom_crop(img, crop_style=None):
"""
Crop an image according to the specified style.
Parameters:
- img (PIL.Image): Input image to be cropped.
- crop_style (str, optional): Style of cropping (e.g., 'lower_middle_half').
Returns:
PIL.Image: Cropped image.
"""
im_width, im_height = img.size
if crop_style == CROP_LOWER_MIDDLE_HALF:
top = im_height / 2
left = im_width / 4
height = im_height / 2
width = im_width / 2
elif crop_style == CROP_LOWER_HALF:
top = im_height / 2
left = 0
height = im_height / 2
width = im_width
else: # None, or not valid
logging.warning(f"Cropping method {crop_style} is not defined. Image is not cropped.")
return img
cropped_img = transforms.functional.crop(img, top, left, height, width)
return cropped_img
def transform(
self,
resize=None,
crop=None,
to_tensor=True,
normalize=None,
):
"""
Create a PyTorch image transformation function based on specified parameters.
Parameters:
- resize ((int, int) or int, optional): Target size for resizing, e.g. (height, width). If int, then used for both height and width.
- crop (str, optional): crop style e.g. 'lower_middle_third'
- to_tensor (bool, optional): Converts the PIL Image (H x W x C) in the range [0, 255] to a torch.FloatTensor of shape (C x H x W) in the range [0.0, 1.0]
- normalize (tuple of lists [r, g, b], optional): Mean and standard deviation for normalization.
Returns:
PyTorch image transformation function.
"""
transform_list = []
if crop is not None:
transform_list.append(
transforms.Lambda(partial(self.custom_crop, crop_style=crop))
)
if resize is not None:
if isinstance(resize, int):
resize = (resize, resize)
transform_list.append(transforms.Resize(resize))
if to_tensor:
transform_list.append(transforms.ToTensor())
if normalize is not None:
transform_list.append(transforms.Normalize(*normalize))
composed_transform = transforms.Compose(transform_list)
return composed_transform
def preprocessing(self, img_data_raw, transform):
"""
Preprocess raw image data using a specified transformation.
Parameters:
- img_data_raw (list): List of raw images to preprocess.
- transform (dict): Dictionary of transformation parameters.
Returns:
torch.Tensor: Preprocessed image tensor.
"""
if not img_data_raw:
raise ValueError("Image data is empty.")
transform = self.transform(**transform)
img_data = torch.stack([transform(img) for img in img_data_raw])
return img_data
def load_model(self, model):
"""
Load a model from local storage.
Parameters:
- model (str): Model file name.
Returns:
nn.Module: Loaded model.
dict: Mapping of classes to indices.
bool: Whether the model is for regression.
"""
model_path = self.model_root / model
try:
model_state = torch.load(model_path, map_location=self.device)
model_name = model_state["model_name"]
is_regression = model_state["is_regression"]
class_to_idx = model_state["class_to_idx"]
num_classes = 1 if is_regression else len(class_to_idx.items())
model_state_dict = model_state["model_state_dict"]
model_cls = model_mapping[model_name]
model = model_cls(num_classes=num_classes)
model.load_state_dict(model_state_dict)
except Exception as e:
logging.error(f"An unexpected error occurred while loading the model {model_path}: {e}")
raise e
return model, class_to_idx, is_regression
def predict(self, model, data):
"""
Perform predictions using the specified model and input data.
Parameters:
- model (nn.Module): The model to use for predictions.
- data (torch.Tensor): Batch of input data.
Returns:
torch.Tensor: Predicted values or class probabilities.
"""
model.to(self.device)
model.eval()
image_batch = data.to(self.device)
with torch.no_grad():
batch_outputs = model(image_batch)
# batch_classes, batch_values = model.get_class_and_value(batch_outputs)
batch_values = model.get_class_probabilities(batch_outputs)
return batch_values
@staticmethod
def predict_to_classes(batch_values, class_to_idx):
"""
Map predicted values to classes.
Parameters:
- batch_values (torch.Tensor): Batch of prediction values.
- class_to_idx (dict): Mapping from class names to indices.
Returns:
list: List of predicted values.
list: List of predicted classes.
"""
idx_to_class = {i: cls for cls, i in class_to_idx.items()}
if len(list(batch_values.shape)) < 2:
classes = [
idx_to_class[
min(
max(idx.item(), min(list(class_to_idx.values()))),
max(list(class_to_idx.values())),
)
]
for idx in batch_values.round().int()
]
values = batch_values.tolist()
else:
classes = [idx_to_class[idx.item()] for idx in torch.argmax(batch_values, dim=1)]
values = batch_values.tolist()
return values, classes
def batch_classifications(self, img_data_raw, img_ids=None):
"""
Perform batch classification for multiple prediction levels (road type, surface type, surface quality).
Parameters:
- img_data_raw (list): List of raw images to classify.
- img_ids (list, optional): List of IDs corresponding to the images. Defaults to indices.
Returns:
list: Combined list of image ids and predictions across levels.
"""
if not img_data_raw:
logging.info("Input data is empty. No predictions performed.")
return []
# default image ids
if img_ids is None:
img_ids = range(len(img_data_raw))
# default values
road_classes = [None] * len(img_data_raw)
road_values = [None] * len(img_data_raw)
surface_classes = [None] * len(img_data_raw)
surface_values = [None] * len(img_data_raw)
quality_classes = [None] * len(img_data_raw)
quality_values = [None] * len(img_data_raw)
# road type
level = "road_type"
model_file = self.models.get(level)
if model_file is not None:
model, class_to_idx, _ = self.load_model(model=model_file)
data = self.preprocessing(img_data_raw, self.transform_road_type)
values = self.predict(model, data)
road_values, road_classes = self.predict_to_classes(values, class_to_idx)
# surface type
level = "surface_type"
model_file = self.models.get(level)
if model_file is not None:
model, class_to_idx, _ = self.load_model(model=model_file)
data = self.preprocessing(img_data_raw, self.transform_surface)
values = self.predict(model, data)
surface_values, surface_classes = self.predict_to_classes(values, class_to_idx)
# surface quality
level = "surface_quality"
sub_models = self.models.get(level)
if sub_models is not None:
surface_indices = defaultdict(list)
for i, surface_type in enumerate(surface_classes):
surface_indices[surface_type].append(i)
quality_values = [None] * len(img_data_raw)
quality_classes = [None] * len(img_data_raw)
for surface_type, indices in surface_indices.items():
model_file = sub_models.get(surface_type)
if model_file is not None:
model, class_to_idx, _ = self.load_model(model=model_file)
values = self.predict(model, data[indices])
values, classes = self.predict_to_classes(values, class_to_idx)
for idx, vl, cls in zip(indices, values, classes):
quality_values[idx] = vl
quality_classes[idx] = cls
# final results combination
final_results = [
[
img_ids[i],
road_classes[i],
road_values[i],
surface_classes[i],
surface_values[i],
quality_classes[i],
quality_values[i],
]
for i in range(len(img_data_raw))
]
return final_results
class CustomEfficientNetV2SLinear(nn.Module):
"""
Custom implementation of EfficientNetV2-S with a linear classifier for classification or regression tasks.
Attributes:
features (nn.Sequential): Feature extractor from EfficientNetV2-S.
avgpool (nn.AdaptiveAvgPool2d): Adaptive average pooling layer.
classifier (nn.Sequential): Fully connected layers for classification.
is_regression (bool): Whether the model is configured for regression tasks.
criterion (callable): Loss function used for training the model.
"""
def __init__(self, num_classes, avg_pool=1):
super(CustomEfficientNetV2SLinear, self).__init__()
model = models.efficientnet_v2_s(weights="IMAGENET1K_V1")
# adapt output layer
in_features = model.classifier[-1].in_features * (avg_pool * avg_pool)
fc = nn.Linear(in_features, num_classes, bias=True)
model.classifier[-1] = fc
self.features = model.features
self.avgpool = nn.AdaptiveAvgPool2d(avg_pool)
self.classifier = model.classifier
if num_classes == 1:
self.criterion = nn.MSELoss
self.is_regression = True
else:
self.criterion = nn.CrossEntropyLoss
self.is_regression = False
def get_class_probabilities(self, x):
if self.is_regression:
x = x.flatten()
else:
x = nn.functional.softmax(x, dim=1)
return x
def forward(self, x: Tensor) -> Tensor:
x = self.features(x)
x = self.avgpool(x)
x = torch.flatten(x, 1)
x = self.classifier(x)
return x
# def get_optimizer_layers(self):
# return self.classifier
# Constants
EFFNET_LINEAR = "efficientNetV2SLinear"
CROP_LOWER_MIDDLE_HALF = "lower_middle_half"
CROP_LOWER_HALF = "lower_half"
NORM_MEAN = [0.42834484577178955, 0.4461250305175781, 0.4350937306880951]
NORM_SD = [0.22991590201854706, 0.23555299639701843, 0.26348039507865906]
model_mapping = {
EFFNET_LINEAR: CustomEfficientNetV2SLinear,
}
model_to_info_string = {
"surface_type": "surface type",
"road_type": "road type",
"surface_quality": "quality",
}