|
|
import os |
|
|
|
|
|
from functools import partial |
|
|
from pathlib import Path |
|
|
import logging |
|
|
|
|
|
import torch |
|
|
from huggingface_hub import hf_hub_download |
|
|
from torch import Tensor, nn |
|
|
from torchvision import models, transforms |
|
|
import pandas as pd |
|
|
from collections import defaultdict |
|
|
|
|
|
|
|
|
class ModelInterface: |
|
|
""" |
|
|
Interface for managing image classification and regression tasks. |
|
|
|
|
|
""" |
|
|
def __init__(self, config): |
|
|
""" |
|
|
Initialize the ModelInterface. |
|
|
|
|
|
Parameters: |
|
|
config (dict): Configuration dictionary containing the following keys: |
|
|
- gpu_kernel (int): GPU index to use for computations. Defaults to the first available GPU if available, otherwise CPU. |
|
|
- transform_surface (dict): Parameters for surface type and quality image transformations, including resize, crop, and normalization settings. |
|
|
- transform_road_type (dict): Parameters for road type image transformations, similar to surface transformations. |
|
|
- model_root (str): Directory path where model files are stored locally. Defaults to folder name 'models'. |
|
|
- models (dict): Dictionary mapping prediction levels (e.g., 'road_type', 'surface_type') to model file names. |
|
|
- hf_model_repo (str): Hugging Face repository ID for downloading models if not found locally. |
|
|
""" |
|
|
self.device = self._validate_device(config.get('gpu_kernel', '')) |
|
|
self.model_root = Path(config.get("model_root", "models")) |
|
|
self.models = config.get("models") |
|
|
self.hf_model_repo = config.get("hf_model_repo", "") |
|
|
self._validate_models() |
|
|
self._default_normalization = (NORM_MEAN, NORM_SD) |
|
|
self.transform_surface = self._validate_transform(config.get("transform_surface", None), "surface_type") |
|
|
self.transform_road_type = self._validate_transform(config.get("transform_road_type", None), "road_type") |
|
|
|
|
|
def _validate_device(self, gpu_kernel): |
|
|
try: |
|
|
cuda = "cuda" if gpu_kernel == '' else f"cuda:{gpu_kernel}" |
|
|
return torch.device( |
|
|
cuda if torch.cuda.is_available() else "cpu" |
|
|
) |
|
|
except Exception as e: |
|
|
logging.warning(f"An unexpected error occurred while selecting GPU: {e}\n" |
|
|
+ "Falling back to CPU.") |
|
|
return torch.device("cpu") |
|
|
|
|
|
def _validate_models(self): |
|
|
""" |
|
|
Check if model files exist and download from hugging face if not. |
|
|
""" |
|
|
if not self.models: |
|
|
raise TypeError("No models are defined.") |
|
|
|
|
|
log_model_not_defined = "No model for '{level_string}' is defined. Prediction is skipped." |
|
|
|
|
|
|
|
|
level = "surface_type" |
|
|
model_file = self.models.get(level) |
|
|
if model_file is None: |
|
|
logging.info(log_model_not_defined.format(level_string=model_to_info_string[level])) |
|
|
else: |
|
|
self.download_model(model_file) |
|
|
_, surface_class_to_idx, _ = self.load_model(model=model_file) |
|
|
|
|
|
|
|
|
level = "surface_quality" |
|
|
sub_models = self.models.get(level) |
|
|
if model_file is None: |
|
|
logging.info(log_model_not_defined.format(level_string=model_to_info_string[level])) |
|
|
else: |
|
|
for surface_type in surface_class_to_idx: |
|
|
model_file = sub_models.get(surface_type) |
|
|
if model_file is None: |
|
|
logging.info(log_model_not_defined.format(level_string=surface_type)) |
|
|
else: |
|
|
self.download_model(model_file) |
|
|
self.load_model(model=model_file) |
|
|
|
|
|
|
|
|
level = "road_type" |
|
|
model_file = self.models.get(level) |
|
|
if model_file is None: |
|
|
logging.info(log_model_not_defined.format(level_string=model_to_info_string[level])) |
|
|
else: |
|
|
self.download_model(model_file) |
|
|
self.load_model(model=model_file) |
|
|
|
|
|
def _validate_transform(self, transform, level): |
|
|
""" |
|
|
Validate the transformation for a given model type if the model exists. |
|
|
|
|
|
Parameters: |
|
|
- transform (dict): transformation. |
|
|
- level (str): model level. |
|
|
|
|
|
Returns: |
|
|
dict: transformation. |
|
|
""" |
|
|
if level in self.models: |
|
|
if transform is None: |
|
|
logging.warning(f"No transformation for {model_to_info_string[level]} prediction defined.") |
|
|
transform = {} |
|
|
|
|
|
if "normalize" not in transform: |
|
|
logging.info(f"No normalization parameters for {model_to_info_string[level]} prediction provided. Using default values.") |
|
|
transform["normalize"] = self._default_normalization |
|
|
|
|
|
return transform |
|
|
|
|
|
|
|
|
def download_model(self, model): |
|
|
""" |
|
|
Download a model from Hugging Face repository. |
|
|
|
|
|
Parameters: |
|
|
- model (str): Model file name. |
|
|
|
|
|
Returns: |
|
|
None |
|
|
""" |
|
|
model_path = self.model_root / model |
|
|
|
|
|
if not os.path.exists(model_path): |
|
|
logging.info( |
|
|
f"Model file not found at {model_path}. Downloading from Hugging Face..." |
|
|
) |
|
|
try: |
|
|
os.makedirs(self.model_root, exist_ok=True) |
|
|
model_path = hf_hub_download( |
|
|
repo_id=self.hf_model_repo, filename=model, local_dir=self.model_root |
|
|
) |
|
|
logging.info(f"Model file downloaded successfully to {model_path}.") |
|
|
except Exception as e: |
|
|
logging.error(f"An unexpected error occurred while downloading the model {model}: {e}") |
|
|
raise e |
|
|
|
|
|
|
|
|
@staticmethod |
|
|
def custom_crop(img, crop_style=None): |
|
|
""" |
|
|
Crop an image according to the specified style. |
|
|
|
|
|
Parameters: |
|
|
- img (PIL.Image): Input image to be cropped. |
|
|
- crop_style (str, optional): Style of cropping (e.g., 'lower_middle_half'). |
|
|
|
|
|
Returns: |
|
|
PIL.Image: Cropped image. |
|
|
""" |
|
|
im_width, im_height = img.size |
|
|
if crop_style == CROP_LOWER_MIDDLE_HALF: |
|
|
top = im_height / 2 |
|
|
left = im_width / 4 |
|
|
height = im_height / 2 |
|
|
width = im_width / 2 |
|
|
elif crop_style == CROP_LOWER_HALF: |
|
|
top = im_height / 2 |
|
|
left = 0 |
|
|
height = im_height / 2 |
|
|
width = im_width |
|
|
else: |
|
|
logging.warning(f"Cropping method {crop_style} is not defined. Image is not cropped.") |
|
|
return img |
|
|
|
|
|
cropped_img = transforms.functional.crop(img, top, left, height, width) |
|
|
return cropped_img |
|
|
|
|
|
def transform( |
|
|
self, |
|
|
resize=None, |
|
|
crop=None, |
|
|
to_tensor=True, |
|
|
normalize=None, |
|
|
): |
|
|
""" |
|
|
Create a PyTorch image transformation function based on specified parameters. |
|
|
|
|
|
Parameters: |
|
|
- resize ((int, int) or int, optional): Target size for resizing, e.g. (height, width). If int, then used for both height and width. |
|
|
- crop (str, optional): crop style e.g. 'lower_middle_third' |
|
|
- to_tensor (bool, optional): Converts the PIL Image (H x W x C) in the range [0, 255] to a torch.FloatTensor of shape (C x H x W) in the range [0.0, 1.0] |
|
|
- normalize (tuple of lists [r, g, b], optional): Mean and standard deviation for normalization. |
|
|
|
|
|
Returns: |
|
|
PyTorch image transformation function. |
|
|
""" |
|
|
transform_list = [] |
|
|
|
|
|
if crop is not None: |
|
|
transform_list.append( |
|
|
transforms.Lambda(partial(self.custom_crop, crop_style=crop)) |
|
|
) |
|
|
|
|
|
if resize is not None: |
|
|
if isinstance(resize, int): |
|
|
resize = (resize, resize) |
|
|
transform_list.append(transforms.Resize(resize)) |
|
|
|
|
|
if to_tensor: |
|
|
transform_list.append(transforms.ToTensor()) |
|
|
|
|
|
if normalize is not None: |
|
|
transform_list.append(transforms.Normalize(*normalize)) |
|
|
|
|
|
composed_transform = transforms.Compose(transform_list) |
|
|
return composed_transform |
|
|
|
|
|
def preprocessing(self, img_data_raw, transform): |
|
|
""" |
|
|
Preprocess raw image data using a specified transformation. |
|
|
|
|
|
Parameters: |
|
|
- img_data_raw (list): List of raw images to preprocess. |
|
|
- transform (dict): Dictionary of transformation parameters. |
|
|
|
|
|
Returns: |
|
|
torch.Tensor: Preprocessed image tensor. |
|
|
""" |
|
|
if not img_data_raw: |
|
|
raise ValueError("Image data is empty.") |
|
|
|
|
|
transform = self.transform(**transform) |
|
|
img_data = torch.stack([transform(img) for img in img_data_raw]) |
|
|
return img_data |
|
|
|
|
|
def load_model(self, model): |
|
|
""" |
|
|
Load a model from local storage. |
|
|
|
|
|
Parameters: |
|
|
- model (str): Model file name. |
|
|
|
|
|
Returns: |
|
|
nn.Module: Loaded model. |
|
|
dict: Mapping of classes to indices. |
|
|
bool: Whether the model is for regression. |
|
|
""" |
|
|
model_path = self.model_root / model |
|
|
try: |
|
|
model_state = torch.load(model_path, map_location=self.device) |
|
|
model_name = model_state["model_name"] |
|
|
is_regression = model_state["is_regression"] |
|
|
class_to_idx = model_state["class_to_idx"] |
|
|
num_classes = 1 if is_regression else len(class_to_idx.items()) |
|
|
model_state_dict = model_state["model_state_dict"] |
|
|
model_cls = model_mapping[model_name] |
|
|
model = model_cls(num_classes=num_classes) |
|
|
model.load_state_dict(model_state_dict) |
|
|
except Exception as e: |
|
|
logging.error(f"An unexpected error occurred while loading the model {model_path}: {e}") |
|
|
raise e |
|
|
|
|
|
return model, class_to_idx, is_regression |
|
|
|
|
|
def predict(self, model, data): |
|
|
""" |
|
|
Perform predictions using the specified model and input data. |
|
|
|
|
|
Parameters: |
|
|
- model (nn.Module): The model to use for predictions. |
|
|
- data (torch.Tensor): Batch of input data. |
|
|
|
|
|
Returns: |
|
|
torch.Tensor: Predicted values or class probabilities. |
|
|
""" |
|
|
model.to(self.device) |
|
|
model.eval() |
|
|
|
|
|
image_batch = data.to(self.device) |
|
|
|
|
|
with torch.no_grad(): |
|
|
batch_outputs = model(image_batch) |
|
|
|
|
|
batch_values = model.get_class_probabilities(batch_outputs) |
|
|
|
|
|
return batch_values |
|
|
|
|
|
@staticmethod |
|
|
def predict_to_classes(batch_values, class_to_idx): |
|
|
""" |
|
|
Map predicted values to classes. |
|
|
|
|
|
Parameters: |
|
|
- batch_values (torch.Tensor): Batch of prediction values. |
|
|
- class_to_idx (dict): Mapping from class names to indices. |
|
|
|
|
|
Returns: |
|
|
list: List of predicted values. |
|
|
list: List of predicted classes. |
|
|
""" |
|
|
idx_to_class = {i: cls for cls, i in class_to_idx.items()} |
|
|
|
|
|
if len(list(batch_values.shape)) < 2: |
|
|
classes = [ |
|
|
idx_to_class[ |
|
|
min( |
|
|
max(idx.item(), min(list(class_to_idx.values()))), |
|
|
max(list(class_to_idx.values())), |
|
|
) |
|
|
] |
|
|
for idx in batch_values.round().int() |
|
|
] |
|
|
values = batch_values.tolist() |
|
|
else: |
|
|
classes = [idx_to_class[idx.item()] for idx in torch.argmax(batch_values, dim=1)] |
|
|
values = batch_values.tolist() |
|
|
|
|
|
return values, classes |
|
|
|
|
|
def batch_classifications(self, img_data_raw, img_ids=None): |
|
|
""" |
|
|
Perform batch classification for multiple prediction levels (road type, surface type, surface quality). |
|
|
|
|
|
Parameters: |
|
|
- img_data_raw (list): List of raw images to classify. |
|
|
- img_ids (list, optional): List of IDs corresponding to the images. Defaults to indices. |
|
|
|
|
|
Returns: |
|
|
list: Combined list of image ids and predictions across levels. |
|
|
""" |
|
|
if not img_data_raw: |
|
|
logging.info("Input data is empty. No predictions performed.") |
|
|
return [] |
|
|
|
|
|
|
|
|
if img_ids is None: |
|
|
img_ids = range(len(img_data_raw)) |
|
|
|
|
|
|
|
|
road_classes = [None] * len(img_data_raw) |
|
|
road_values = [None] * len(img_data_raw) |
|
|
surface_classes = [None] * len(img_data_raw) |
|
|
surface_values = [None] * len(img_data_raw) |
|
|
quality_classes = [None] * len(img_data_raw) |
|
|
quality_values = [None] * len(img_data_raw) |
|
|
|
|
|
|
|
|
level = "road_type" |
|
|
model_file = self.models.get(level) |
|
|
if model_file is not None: |
|
|
model, class_to_idx, _ = self.load_model(model=model_file) |
|
|
data = self.preprocessing(img_data_raw, self.transform_road_type) |
|
|
values = self.predict(model, data) |
|
|
road_values, road_classes = self.predict_to_classes(values, class_to_idx) |
|
|
|
|
|
|
|
|
level = "surface_type" |
|
|
model_file = self.models.get(level) |
|
|
if model_file is not None: |
|
|
model, class_to_idx, _ = self.load_model(model=model_file) |
|
|
data = self.preprocessing(img_data_raw, self.transform_surface) |
|
|
values = self.predict(model, data) |
|
|
surface_values, surface_classes = self.predict_to_classes(values, class_to_idx) |
|
|
|
|
|
|
|
|
level = "surface_quality" |
|
|
sub_models = self.models.get(level) |
|
|
if sub_models is not None: |
|
|
surface_indices = defaultdict(list) |
|
|
for i, surface_type in enumerate(surface_classes): |
|
|
surface_indices[surface_type].append(i) |
|
|
|
|
|
quality_values = [None] * len(img_data_raw) |
|
|
quality_classes = [None] * len(img_data_raw) |
|
|
for surface_type, indices in surface_indices.items(): |
|
|
model_file = sub_models.get(surface_type) |
|
|
if model_file is not None: |
|
|
model, class_to_idx, _ = self.load_model(model=model_file) |
|
|
values = self.predict(model, data[indices]) |
|
|
values, classes = self.predict_to_classes(values, class_to_idx) |
|
|
for idx, vl, cls in zip(indices, values, classes): |
|
|
quality_values[idx] = vl |
|
|
quality_classes[idx] = cls |
|
|
|
|
|
|
|
|
final_results = [ |
|
|
[ |
|
|
img_ids[i], |
|
|
road_classes[i], |
|
|
road_values[i], |
|
|
surface_classes[i], |
|
|
surface_values[i], |
|
|
quality_classes[i], |
|
|
quality_values[i], |
|
|
] |
|
|
for i in range(len(img_data_raw)) |
|
|
] |
|
|
|
|
|
return final_results |
|
|
|
|
|
|
|
|
class CustomEfficientNetV2SLinear(nn.Module): |
|
|
""" |
|
|
Custom implementation of EfficientNetV2-S with a linear classifier for classification or regression tasks. |
|
|
|
|
|
Attributes: |
|
|
features (nn.Sequential): Feature extractor from EfficientNetV2-S. |
|
|
avgpool (nn.AdaptiveAvgPool2d): Adaptive average pooling layer. |
|
|
classifier (nn.Sequential): Fully connected layers for classification. |
|
|
is_regression (bool): Whether the model is configured for regression tasks. |
|
|
criterion (callable): Loss function used for training the model. |
|
|
""" |
|
|
|
|
|
def __init__(self, num_classes, avg_pool=1): |
|
|
super(CustomEfficientNetV2SLinear, self).__init__() |
|
|
|
|
|
model = models.efficientnet_v2_s(weights="IMAGENET1K_V1") |
|
|
|
|
|
in_features = model.classifier[-1].in_features * (avg_pool * avg_pool) |
|
|
fc = nn.Linear(in_features, num_classes, bias=True) |
|
|
model.classifier[-1] = fc |
|
|
|
|
|
self.features = model.features |
|
|
self.avgpool = nn.AdaptiveAvgPool2d(avg_pool) |
|
|
self.classifier = model.classifier |
|
|
if num_classes == 1: |
|
|
self.criterion = nn.MSELoss |
|
|
self.is_regression = True |
|
|
else: |
|
|
self.criterion = nn.CrossEntropyLoss |
|
|
self.is_regression = False |
|
|
|
|
|
def get_class_probabilities(self, x): |
|
|
if self.is_regression: |
|
|
x = x.flatten() |
|
|
else: |
|
|
x = nn.functional.softmax(x, dim=1) |
|
|
return x |
|
|
|
|
|
def forward(self, x: Tensor) -> Tensor: |
|
|
x = self.features(x) |
|
|
|
|
|
x = self.avgpool(x) |
|
|
x = torch.flatten(x, 1) |
|
|
|
|
|
x = self.classifier(x) |
|
|
|
|
|
return x |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
EFFNET_LINEAR = "efficientNetV2SLinear" |
|
|
CROP_LOWER_MIDDLE_HALF = "lower_middle_half" |
|
|
CROP_LOWER_HALF = "lower_half" |
|
|
NORM_MEAN = [0.42834484577178955, 0.4461250305175781, 0.4350937306880951] |
|
|
NORM_SD = [0.22991590201854706, 0.23555299639701843, 0.26348039507865906] |
|
|
|
|
|
|
|
|
model_mapping = { |
|
|
EFFNET_LINEAR: CustomEfficientNetV2SLinear, |
|
|
} |
|
|
|
|
|
model_to_info_string = { |
|
|
"surface_type": "surface type", |
|
|
"road_type": "road type", |
|
|
"surface_quality": "quality", |
|
|
} |