surface_test / Models.py
dthh's picture
Upload Models.py with huggingface_hub
1e20859 verified
import os
# import sys
from functools import partial
from pathlib import Path
import torch
from huggingface_hub import hf_hub_download
from torch import Tensor, nn
from torchvision import models, transforms
import pandas as pd
class ModelInterface:
def __init__(self, config):
# TODO: doc string
# TODO: default values for config.get(...)
self.device = torch.device(
f"cuda:{config.get('gpu_kernel')}" if torch.cuda.is_available() else "cpu"
)
normalization = (const["NORM_MEAN"], const["NORM_SD"])
# TODO: config is changed by transform['normalize'] = normalization
transform = config.get("transform_surface")
transform["normalize"] = normalization
self.transform_surface = transform
transform = config.get("transform_road_type")
transform["normalize"] = normalization
self.transform_road_type = transform
self.model_root = Path(config.get("model_root"))
self.models = config.get("models")
self.hf_model_repo = config.get("hf_model_repo")
@staticmethod
def custom_crop(img, crop_style=None):
im_width, im_height = img.size
if crop_style == const["CROP_LOWER_MIDDLE_HALF"]:
top = im_height / 2
left = im_width / 4
height = im_height / 2
width = im_width / 2
elif crop_style == const["CROP_LOWER_HALF"]:
top = im_height / 2
left = 0
height = im_height / 2
width = im_width
else: # None, or not valid
return img
cropped_img = transforms.functional.crop(img, top, left, height, width)
return cropped_img
def transform(
self,
resize=None,
crop=None,
to_tensor=True,
normalize=None,
):
"""
Create a PyTorch image transformation function based on specified parameters.
Parameters:
- resize (tuple or None): Target size for resizing, e.g. (height, width).
- crop (string): crop style e.g. 'lower_middle_third'
- to_tensor (bool): Converts the PIL Image (H x W x C) in the range [0, 255] to a torch.FloatTensor of shape (C x H x W) in the range [0.0, 1.0]
- normalize (tuple of lists [r, g, b] or None): Mean and standard deviation for normalization.
Returns:
PyTorch image transformation function.
"""
transform_list = []
if crop is not None:
transform_list.append(
transforms.Lambda(partial(self.custom_crop, crop_style=crop))
)
if resize is not None:
if isinstance(resize, int):
resize = (resize, resize)
transform_list.append(transforms.Resize(resize))
if to_tensor:
transform_list.append(transforms.ToTensor())
if normalize is not None:
transform_list.append(transforms.Normalize(*normalize))
composed_transform = transforms.Compose(transform_list)
return composed_transform
def preprocessing(self, img_data_raw, transform):
transform = self.transform(**transform)
img_data = torch.stack([transform(img) for img in img_data_raw])
return img_data
def load_model(self, model):
model_path = self.model_root / model
# load model data from hugging face if not locally available
if not os.path.exists(model_path):
print(
f"Model file not found at {model_path}. Downloading from Hugging Face..."
)
try:
os.makedirs(self.model_root, exist_ok=True)
model_path = hf_hub_download(
repo_id=self.hf_model_repo, filename=model, local_dir=self.model_root
)
print(f"Model file downloaded to {model_path}.")
except Exception as e:
print(f"An unexpected error occurred while downloading the model: {e}")
return None, {}, False
model_state = torch.load(model_path, map_location=self.device)
model_name = model_state["model_name"]
is_regression = model_state["is_regression"]
class_to_idx = model_state["class_to_idx"]
num_classes = 1 if is_regression else len(class_to_idx.items())
model_state_dict = model_state["model_state_dict"]
model_cls = model_mapping[model_name]
model = model_cls(num_classes=num_classes)
model.load_state_dict(model_state_dict)
return model, class_to_idx, is_regression
def predict(self, model, data):
model.to(self.device)
model.eval()
image_batch = data.to(self.device)
with torch.no_grad():
batch_outputs = model(image_batch)
# batch_classes, batch_values = model.get_class_and_value(batch_outputs)
batch_values = model.get_class_probabilities(batch_outputs)
return batch_values
@staticmethod
def predict_value_to_class(batch_values, class_to_idx, ids, level=""):
columns = ["id", "level", "value", "class"]
batch_size = list(batch_values.shape)
if len(batch_size) < 2:
batch_size = [batch_size[0], 1]
df = pd.DataFrame(columns=columns, index=range(batch_size[0] * batch_size[1]))
idx_to_class = {i: cls for cls, i in class_to_idx.items()}
if batch_size[1] == 1:
batch_classes = [
idx_to_class[
min(
max(idx.item(), min(list(class_to_idx.values()))),
max(list(class_to_idx.values())),
)
]
for idx in batch_values.round().int()
]
i = 0
for id, value, cls in zip(ids, batch_values, batch_classes):
df.iloc[i] = [id, level, value.item(), cls]
i += 1
else:
batch_classes = [idx_to_class[idx.item()] for idx in torch.argmax(batch_values, dim=1)]
i = 0
for id, values in zip(ids, batch_values):
for idx, value in enumerate(values.tolist()):
df.iloc[i] = [id, level, value, idx_to_class[idx]]
i += 1
return df, batch_classes
def batch_classifications(self, img_data_raw, img_ids=None):
# default image ids
if img_ids is None:
img_ids = range(len(img_data_raw))
df = pd.DataFrame()
# road type
level = "road_type"
model_file = self.models.get(level)
if model_file is not None:
model, class_to_idx, _ = self.load_model(model=model_file)
if model is None:
print(f"Road type model '{model_file}' is not found.\n"
+ "Road type prediction is skipped.")
else:
data = self.preprocessing(img_data_raw, self.transform_road_type)
values = self.predict(model, data)
df_tmp, _ = self.predict_value_to_class(
values,
class_to_idx,
img_ids,
level,
)
df = pd.concat([df, df_tmp], ignore_index=True)
# surface type
level = "surface_type"
model_file = self.models.get(level)
if model_file is not None:
model, class_to_idx, _ = self.load_model(model=model_file)
if model is None:
print(f"Surface type model '{model_file}' is not found.\n"
+ "Surface type prediction is skipped.")
else:
data = self.preprocessing(img_data_raw, self.transform_surface)
values = self.predict(model, data)
df_tmp, classes = self.predict_value_to_class(
values,
class_to_idx,
img_ids,
level,
)
df = pd.concat([df, df_tmp], ignore_index=True)
# surface quality
level = "surface_quality"
sub_models = self.models.get(level)
if sub_models is not None:
surface_indices = {}
for i, surface_type in enumerate(classes):
if surface_type not in surface_indices:
surface_indices[surface_type] = []
surface_indices[surface_type].append(i)
for surface_type, indices in surface_indices.items():
model_file = sub_models.get(surface_type)
if model_file is not None:
model, class_to_idx, _ = self.load_model(model=model_file)
if model is None:
print(f"Quality model '{model_file}' is not found.\n"
+ f"Quality prediction is skipped for surface '{surface_type}'.")
else:
values = self.predict(model, data[indices])
df_tmp, _ = self.predict_value_to_class(
values,
class_to_idx,
[img_ids[i] for i in indices],
level,
)
df = pd.concat([df, df_tmp], ignore_index=True)
return df
class CustomEfficientNetV2SLinear(nn.Module):
def __init__(self, num_classes, avg_pool=1):
super(CustomEfficientNetV2SLinear, self).__init__()
model = models.efficientnet_v2_s(weights="IMAGENET1K_V1")
# adapt output layer
in_features = model.classifier[-1].in_features * (avg_pool * avg_pool)
fc = nn.Linear(in_features, num_classes, bias=True)
model.classifier[-1] = fc
self.features = model.features
self.avgpool = nn.AdaptiveAvgPool2d(avg_pool)
self.classifier = model.classifier
if num_classes == 1:
self.criterion = nn.MSELoss
self.is_regression = True
else:
self.criterion = nn.CrossEntropyLoss
self.is_regression = False
def get_class_probabilities(self, x):
if self.is_regression:
x = x.flatten()
else:
x = nn.functional.softmax(x, dim=1)
return x
def forward(self, x: Tensor) -> Tensor:
x = self.features(x)
x = self.avgpool(x)
x = torch.flatten(x, 1)
x = self.classifier(x)
return x
# def get_optimizer_layers(self):
# return self.classifier
# Model settings
const = {
"EFFNET_LINEAR": "efficientNetV2SLinear",
"CROP_LOWER_MIDDLE_HALF": "lower_middle_half",
"CROP_LOWER_HALF": "lower_half",
"NORM_MEAN": [0.42834484577178955, 0.4461250305175781, 0.4350937306880951],
"NORM_SD": [0.22991590201854706, 0.23555299639701843, 0.26348039507865906],
}
model_mapping = {
const["EFFNET_LINEAR"]: CustomEfficientNetV2SLinear,
}