import os # import sys from functools import partial from pathlib import Path import torch from huggingface_hub import hf_hub_download from torch import Tensor, nn from torchvision import models, transforms import pandas as pd class ModelInterface: def __init__(self, config): # TODO: doc string # TODO: default values for config.get(...) self.device = torch.device( f"cuda:{config.get('gpu_kernel')}" if torch.cuda.is_available() else "cpu" ) normalization = (const["NORM_MEAN"], const["NORM_SD"]) # TODO: config is changed by transform['normalize'] = normalization transform = config.get("transform_surface") transform["normalize"] = normalization self.transform_surface = transform transform = config.get("transform_road_type") transform["normalize"] = normalization self.transform_road_type = transform self.model_root = Path(config.get("model_root")) self.models = config.get("models") self.hf_model_repo = config.get("hf_model_repo") @staticmethod def custom_crop(img, crop_style=None): im_width, im_height = img.size if crop_style == const["CROP_LOWER_MIDDLE_HALF"]: top = im_height / 2 left = im_width / 4 height = im_height / 2 width = im_width / 2 elif crop_style == const["CROP_LOWER_HALF"]: top = im_height / 2 left = 0 height = im_height / 2 width = im_width else: # None, or not valid return img cropped_img = transforms.functional.crop(img, top, left, height, width) return cropped_img def transform( self, resize=None, crop=None, to_tensor=True, normalize=None, ): """ Create a PyTorch image transformation function based on specified parameters. Parameters: - resize (tuple or None): Target size for resizing, e.g. (height, width). - crop (string): crop style e.g. 'lower_middle_third' - to_tensor (bool): Converts the PIL Image (H x W x C) in the range [0, 255] to a torch.FloatTensor of shape (C x H x W) in the range [0.0, 1.0] - normalize (tuple of lists [r, g, b] or None): Mean and standard deviation for normalization. Returns: PyTorch image transformation function. """ transform_list = [] if crop is not None: transform_list.append( transforms.Lambda(partial(self.custom_crop, crop_style=crop)) ) if resize is not None: if isinstance(resize, int): resize = (resize, resize) transform_list.append(transforms.Resize(resize)) if to_tensor: transform_list.append(transforms.ToTensor()) if normalize is not None: transform_list.append(transforms.Normalize(*normalize)) composed_transform = transforms.Compose(transform_list) return composed_transform def preprocessing(self, img_data_raw, transform): transform = self.transform(**transform) img_data = torch.stack([transform(img) for img in img_data_raw]) return img_data def load_model(self, model): model_path = self.model_root / model # load model data from hugging face if not locally available if not os.path.exists(model_path): print( f"Model file not found at {model_path}. Downloading from Hugging Face..." ) try: os.makedirs(self.model_root, exist_ok=True) model_path = hf_hub_download( repo_id=self.hf_model_repo, filename=model, local_dir=self.model_root ) print(f"Model file downloaded to {model_path}.") except Exception as e: print(f"An unexpected error occurred while downloading the model: {e}") return None, {}, False model_state = torch.load(model_path, map_location=self.device) model_name = model_state["model_name"] is_regression = model_state["is_regression"] class_to_idx = model_state["class_to_idx"] num_classes = 1 if is_regression else len(class_to_idx.items()) model_state_dict = model_state["model_state_dict"] model_cls = model_mapping[model_name] model = model_cls(num_classes=num_classes) model.load_state_dict(model_state_dict) return model, class_to_idx, is_regression def predict(self, model, data): model.to(self.device) model.eval() image_batch = data.to(self.device) with torch.no_grad(): batch_outputs = model(image_batch) # batch_classes, batch_values = model.get_class_and_value(batch_outputs) batch_values = model.get_class_probabilities(batch_outputs) return batch_values @staticmethod def predict_value_to_class(batch_values, class_to_idx, ids, level=""): columns = ["id", "level", "value", "class"] batch_size = list(batch_values.shape) if len(batch_size) < 2: batch_size = [batch_size[0], 1] df = pd.DataFrame(columns=columns, index=range(batch_size[0] * batch_size[1])) idx_to_class = {i: cls for cls, i in class_to_idx.items()} if batch_size[1] == 1: batch_classes = [ idx_to_class[ min( max(idx.item(), min(list(class_to_idx.values()))), max(list(class_to_idx.values())), ) ] for idx in batch_values.round().int() ] i = 0 for id, value, cls in zip(ids, batch_values, batch_classes): df.iloc[i] = [id, level, value.item(), cls] i += 1 else: batch_classes = [idx_to_class[idx.item()] for idx in torch.argmax(batch_values, dim=1)] i = 0 for id, values in zip(ids, batch_values): for idx, value in enumerate(values.tolist()): df.iloc[i] = [id, level, value, idx_to_class[idx]] i += 1 return df, batch_classes def batch_classifications(self, img_data_raw, img_ids=None): # default image ids if img_ids is None: img_ids = range(len(img_data_raw)) df = pd.DataFrame() # road type level = "road_type" model_file = self.models.get(level) if model_file is not None: model, class_to_idx, _ = self.load_model(model=model_file) if model is None: print(f"Road type model '{model_file}' is not found.\n" + "Road type prediction is skipped.") else: data = self.preprocessing(img_data_raw, self.transform_road_type) values = self.predict(model, data) df_tmp, _ = self.predict_value_to_class( values, class_to_idx, img_ids, level, ) df = pd.concat([df, df_tmp], ignore_index=True) # surface type level = "surface_type" model_file = self.models.get(level) if model_file is not None: model, class_to_idx, _ = self.load_model(model=model_file) if model is None: print(f"Surface type model '{model_file}' is not found.\n" + "Surface type prediction is skipped.") else: data = self.preprocessing(img_data_raw, self.transform_surface) values = self.predict(model, data) df_tmp, classes = self.predict_value_to_class( values, class_to_idx, img_ids, level, ) df = pd.concat([df, df_tmp], ignore_index=True) # surface quality level = "surface_quality" sub_models = self.models.get(level) if sub_models is not None: surface_indices = {} for i, surface_type in enumerate(classes): if surface_type not in surface_indices: surface_indices[surface_type] = [] surface_indices[surface_type].append(i) for surface_type, indices in surface_indices.items(): model_file = sub_models.get(surface_type) if model_file is not None: model, class_to_idx, _ = self.load_model(model=model_file) if model is None: print(f"Quality model '{model_file}' is not found.\n" + f"Quality prediction is skipped for surface '{surface_type}'.") else: values = self.predict(model, data[indices]) df_tmp, _ = self.predict_value_to_class( values, class_to_idx, [img_ids[i] for i in indices], level, ) df = pd.concat([df, df_tmp], ignore_index=True) return df class CustomEfficientNetV2SLinear(nn.Module): def __init__(self, num_classes, avg_pool=1): super(CustomEfficientNetV2SLinear, self).__init__() model = models.efficientnet_v2_s(weights="IMAGENET1K_V1") # adapt output layer in_features = model.classifier[-1].in_features * (avg_pool * avg_pool) fc = nn.Linear(in_features, num_classes, bias=True) model.classifier[-1] = fc self.features = model.features self.avgpool = nn.AdaptiveAvgPool2d(avg_pool) self.classifier = model.classifier if num_classes == 1: self.criterion = nn.MSELoss self.is_regression = True else: self.criterion = nn.CrossEntropyLoss self.is_regression = False def get_class_probabilities(self, x): if self.is_regression: x = x.flatten() else: x = nn.functional.softmax(x, dim=1) return x def forward(self, x: Tensor) -> Tensor: x = self.features(x) x = self.avgpool(x) x = torch.flatten(x, 1) x = self.classifier(x) return x # def get_optimizer_layers(self): # return self.classifier # Model settings const = { "EFFNET_LINEAR": "efficientNetV2SLinear", "CROP_LOWER_MIDDLE_HALF": "lower_middle_half", "CROP_LOWER_HALF": "lower_half", "NORM_MEAN": [0.42834484577178955, 0.4461250305175781, 0.4350937306880951], "NORM_SD": [0.22991590201854706, 0.23555299639701843, 0.26348039507865906], } model_mapping = { const["EFFNET_LINEAR"]: CustomEfficientNetV2SLinear, }