|
|
import os |
|
|
|
|
|
from functools import partial |
|
|
from pathlib import Path |
|
|
|
|
|
import torch |
|
|
from huggingface_hub import hf_hub_download |
|
|
from torch import Tensor, nn |
|
|
from torchvision import models, transforms |
|
|
import pandas as pd |
|
|
|
|
|
|
|
|
class ModelInterface: |
|
|
def __init__(self, config): |
|
|
|
|
|
|
|
|
self.device = torch.device( |
|
|
f"cuda:{config.get('gpu_kernel')}" if torch.cuda.is_available() else "cpu" |
|
|
) |
|
|
normalization = (const["NORM_MEAN"], const["NORM_SD"]) |
|
|
|
|
|
transform = config.get("transform_surface") |
|
|
transform["normalize"] = normalization |
|
|
self.transform_surface = transform |
|
|
transform = config.get("transform_road_type") |
|
|
transform["normalize"] = normalization |
|
|
self.transform_road_type = transform |
|
|
self.model_root = Path(config.get("model_root")) |
|
|
self.models = config.get("models") |
|
|
self.hf_model_repo = config.get("hf_model_repo") |
|
|
|
|
|
@staticmethod |
|
|
def custom_crop(img, crop_style=None): |
|
|
im_width, im_height = img.size |
|
|
if crop_style == const["CROP_LOWER_MIDDLE_HALF"]: |
|
|
top = im_height / 2 |
|
|
left = im_width / 4 |
|
|
height = im_height / 2 |
|
|
width = im_width / 2 |
|
|
elif crop_style == const["CROP_LOWER_HALF"]: |
|
|
top = im_height / 2 |
|
|
left = 0 |
|
|
height = im_height / 2 |
|
|
width = im_width |
|
|
else: |
|
|
return img |
|
|
|
|
|
cropped_img = transforms.functional.crop(img, top, left, height, width) |
|
|
return cropped_img |
|
|
|
|
|
def transform( |
|
|
self, |
|
|
resize=None, |
|
|
crop=None, |
|
|
to_tensor=True, |
|
|
normalize=None, |
|
|
): |
|
|
""" |
|
|
Create a PyTorch image transformation function based on specified parameters. |
|
|
|
|
|
Parameters: |
|
|
- resize (tuple or None): Target size for resizing, e.g. (height, width). |
|
|
- crop (string): crop style e.g. 'lower_middle_third' |
|
|
- to_tensor (bool): Converts the PIL Image (H x W x C) in the range [0, 255] to a torch.FloatTensor of shape (C x H x W) in the range [0.0, 1.0] |
|
|
- normalize (tuple of lists [r, g, b] or None): Mean and standard deviation for normalization. |
|
|
|
|
|
Returns: |
|
|
PyTorch image transformation function. |
|
|
""" |
|
|
transform_list = [] |
|
|
|
|
|
if crop is not None: |
|
|
transform_list.append( |
|
|
transforms.Lambda(partial(self.custom_crop, crop_style=crop)) |
|
|
) |
|
|
|
|
|
if resize is not None: |
|
|
if isinstance(resize, int): |
|
|
resize = (resize, resize) |
|
|
transform_list.append(transforms.Resize(resize)) |
|
|
|
|
|
if to_tensor: |
|
|
transform_list.append(transforms.ToTensor()) |
|
|
|
|
|
if normalize is not None: |
|
|
transform_list.append(transforms.Normalize(*normalize)) |
|
|
|
|
|
composed_transform = transforms.Compose(transform_list) |
|
|
return composed_transform |
|
|
|
|
|
def preprocessing(self, img_data_raw, transform): |
|
|
transform = self.transform(**transform) |
|
|
img_data = torch.stack([transform(img) for img in img_data_raw]) |
|
|
return img_data |
|
|
|
|
|
def load_model(self, model): |
|
|
model_path = self.model_root / model |
|
|
|
|
|
if not os.path.exists(model_path): |
|
|
print( |
|
|
f"Model file not found at {model_path}. Downloading from Hugging Face..." |
|
|
) |
|
|
try: |
|
|
os.makedirs(self.model_root, exist_ok=True) |
|
|
model_path = hf_hub_download( |
|
|
repo_id=self.hf_model_repo, filename=model, local_dir=self.model_root |
|
|
) |
|
|
print(f"Model file downloaded to {model_path}.") |
|
|
except Exception as e: |
|
|
print(f"An unexpected error occurred while downloading the model: {e}") |
|
|
return None, {}, False |
|
|
|
|
|
model_state = torch.load(model_path, map_location=self.device) |
|
|
model_name = model_state["model_name"] |
|
|
is_regression = model_state["is_regression"] |
|
|
class_to_idx = model_state["class_to_idx"] |
|
|
num_classes = 1 if is_regression else len(class_to_idx.items()) |
|
|
model_state_dict = model_state["model_state_dict"] |
|
|
model_cls = model_mapping[model_name] |
|
|
model = model_cls(num_classes=num_classes) |
|
|
model.load_state_dict(model_state_dict) |
|
|
|
|
|
return model, class_to_idx, is_regression |
|
|
|
|
|
def predict(self, model, data): |
|
|
model.to(self.device) |
|
|
model.eval() |
|
|
|
|
|
image_batch = data.to(self.device) |
|
|
|
|
|
with torch.no_grad(): |
|
|
batch_outputs = model(image_batch) |
|
|
|
|
|
batch_values = model.get_class_probabilities(batch_outputs) |
|
|
|
|
|
return batch_values |
|
|
|
|
|
@staticmethod |
|
|
def predict_value_to_class(batch_values, class_to_idx, ids, level=""): |
|
|
columns = ["id", "level", "value", "class"] |
|
|
batch_size = list(batch_values.shape) |
|
|
if len(batch_size) < 2: |
|
|
batch_size = [batch_size[0], 1] |
|
|
df = pd.DataFrame(columns=columns, index=range(batch_size[0] * batch_size[1])) |
|
|
idx_to_class = {i: cls for cls, i in class_to_idx.items()} |
|
|
|
|
|
if batch_size[1] == 1: |
|
|
batch_classes = [ |
|
|
idx_to_class[ |
|
|
min( |
|
|
max(idx.item(), min(list(class_to_idx.values()))), |
|
|
max(list(class_to_idx.values())), |
|
|
) |
|
|
] |
|
|
for idx in batch_values.round().int() |
|
|
] |
|
|
i = 0 |
|
|
for id, value, cls in zip(ids, batch_values, batch_classes): |
|
|
df.iloc[i] = [id, level, value.item(), cls] |
|
|
i += 1 |
|
|
else: |
|
|
batch_classes = [idx_to_class[idx.item()] for idx in torch.argmax(batch_values, dim=1)] |
|
|
i = 0 |
|
|
for id, values in zip(ids, batch_values): |
|
|
for idx, value in enumerate(values.tolist()): |
|
|
df.iloc[i] = [id, level, value, idx_to_class[idx]] |
|
|
i += 1 |
|
|
|
|
|
return df, batch_classes |
|
|
|
|
|
def batch_classifications(self, img_data_raw, img_ids=None): |
|
|
|
|
|
if img_ids is None: |
|
|
img_ids = range(len(img_data_raw)) |
|
|
|
|
|
df = pd.DataFrame() |
|
|
|
|
|
|
|
|
level = "road_type" |
|
|
model_file = self.models.get(level) |
|
|
if model_file is not None: |
|
|
model, class_to_idx, _ = self.load_model(model=model_file) |
|
|
if model is None: |
|
|
print(f"Road type model '{model_file}' is not found.\n" |
|
|
+ "Road type prediction is skipped.") |
|
|
else: |
|
|
data = self.preprocessing(img_data_raw, self.transform_road_type) |
|
|
values = self.predict(model, data) |
|
|
df_tmp, _ = self.predict_value_to_class( |
|
|
values, |
|
|
class_to_idx, |
|
|
img_ids, |
|
|
level, |
|
|
) |
|
|
df = pd.concat([df, df_tmp], ignore_index=True) |
|
|
|
|
|
|
|
|
level = "surface_type" |
|
|
model_file = self.models.get(level) |
|
|
if model_file is not None: |
|
|
model, class_to_idx, _ = self.load_model(model=model_file) |
|
|
if model is None: |
|
|
print(f"Surface type model '{model_file}' is not found.\n" |
|
|
+ "Surface type prediction is skipped.") |
|
|
else: |
|
|
data = self.preprocessing(img_data_raw, self.transform_surface) |
|
|
values = self.predict(model, data) |
|
|
df_tmp, classes = self.predict_value_to_class( |
|
|
values, |
|
|
class_to_idx, |
|
|
img_ids, |
|
|
level, |
|
|
) |
|
|
df = pd.concat([df, df_tmp], ignore_index=True) |
|
|
|
|
|
|
|
|
level = "surface_quality" |
|
|
sub_models = self.models.get(level) |
|
|
if sub_models is not None: |
|
|
surface_indices = {} |
|
|
for i, surface_type in enumerate(classes): |
|
|
if surface_type not in surface_indices: |
|
|
surface_indices[surface_type] = [] |
|
|
surface_indices[surface_type].append(i) |
|
|
|
|
|
for surface_type, indices in surface_indices.items(): |
|
|
model_file = sub_models.get(surface_type) |
|
|
if model_file is not None: |
|
|
model, class_to_idx, _ = self.load_model(model=model_file) |
|
|
if model is None: |
|
|
print(f"Quality model '{model_file}' is not found.\n" |
|
|
+ f"Quality prediction is skipped for surface '{surface_type}'.") |
|
|
else: |
|
|
values = self.predict(model, data[indices]) |
|
|
df_tmp, _ = self.predict_value_to_class( |
|
|
values, |
|
|
class_to_idx, |
|
|
[img_ids[i] for i in indices], |
|
|
level, |
|
|
) |
|
|
df = pd.concat([df, df_tmp], ignore_index=True) |
|
|
|
|
|
return df |
|
|
|
|
|
|
|
|
class CustomEfficientNetV2SLinear(nn.Module): |
|
|
def __init__(self, num_classes, avg_pool=1): |
|
|
super(CustomEfficientNetV2SLinear, self).__init__() |
|
|
|
|
|
model = models.efficientnet_v2_s(weights="IMAGENET1K_V1") |
|
|
|
|
|
in_features = model.classifier[-1].in_features * (avg_pool * avg_pool) |
|
|
fc = nn.Linear(in_features, num_classes, bias=True) |
|
|
model.classifier[-1] = fc |
|
|
|
|
|
self.features = model.features |
|
|
self.avgpool = nn.AdaptiveAvgPool2d(avg_pool) |
|
|
self.classifier = model.classifier |
|
|
if num_classes == 1: |
|
|
self.criterion = nn.MSELoss |
|
|
self.is_regression = True |
|
|
else: |
|
|
self.criterion = nn.CrossEntropyLoss |
|
|
self.is_regression = False |
|
|
|
|
|
def get_class_probabilities(self, x): |
|
|
if self.is_regression: |
|
|
x = x.flatten() |
|
|
else: |
|
|
x = nn.functional.softmax(x, dim=1) |
|
|
return x |
|
|
|
|
|
def forward(self, x: Tensor) -> Tensor: |
|
|
x = self.features(x) |
|
|
|
|
|
x = self.avgpool(x) |
|
|
x = torch.flatten(x, 1) |
|
|
|
|
|
x = self.classifier(x) |
|
|
|
|
|
return x |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
const = { |
|
|
"EFFNET_LINEAR": "efficientNetV2SLinear", |
|
|
"CROP_LOWER_MIDDLE_HALF": "lower_middle_half", |
|
|
"CROP_LOWER_HALF": "lower_half", |
|
|
"NORM_MEAN": [0.42834484577178955, 0.4461250305175781, 0.4350937306880951], |
|
|
"NORM_SD": [0.22991590201854706, 0.23555299639701843, 0.26348039507865906], |
|
|
} |
|
|
|
|
|
model_mapping = { |
|
|
const["EFFNET_LINEAR"]: CustomEfficientNetV2SLinear, |
|
|
} |
|
|
|