dthh commited on
Commit
1e20859
·
verified ·
1 Parent(s): f6e85c1

Upload Models.py with huggingface_hub

Browse files
Files changed (1) hide show
  1. Models.py +298 -0
Models.py ADDED
@@ -0,0 +1,298 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ # import sys
3
+ from functools import partial
4
+ from pathlib import Path
5
+
6
+ import torch
7
+ from huggingface_hub import hf_hub_download
8
+ from torch import Tensor, nn
9
+ from torchvision import models, transforms
10
+ import pandas as pd
11
+
12
+
13
+ class ModelInterface:
14
+ def __init__(self, config):
15
+ # TODO: doc string
16
+ # TODO: default values for config.get(...)
17
+ self.device = torch.device(
18
+ f"cuda:{config.get('gpu_kernel')}" if torch.cuda.is_available() else "cpu"
19
+ )
20
+ normalization = (const["NORM_MEAN"], const["NORM_SD"])
21
+ # TODO: config is changed by transform['normalize'] = normalization
22
+ transform = config.get("transform_surface")
23
+ transform["normalize"] = normalization
24
+ self.transform_surface = transform
25
+ transform = config.get("transform_road_type")
26
+ transform["normalize"] = normalization
27
+ self.transform_road_type = transform
28
+ self.model_root = Path(config.get("model_root"))
29
+ self.models = config.get("models")
30
+ self.hf_model_repo = config.get("hf_model_repo")
31
+
32
+ @staticmethod
33
+ def custom_crop(img, crop_style=None):
34
+ im_width, im_height = img.size
35
+ if crop_style == const["CROP_LOWER_MIDDLE_HALF"]:
36
+ top = im_height / 2
37
+ left = im_width / 4
38
+ height = im_height / 2
39
+ width = im_width / 2
40
+ elif crop_style == const["CROP_LOWER_HALF"]:
41
+ top = im_height / 2
42
+ left = 0
43
+ height = im_height / 2
44
+ width = im_width
45
+ else: # None, or not valid
46
+ return img
47
+
48
+ cropped_img = transforms.functional.crop(img, top, left, height, width)
49
+ return cropped_img
50
+
51
+ def transform(
52
+ self,
53
+ resize=None,
54
+ crop=None,
55
+ to_tensor=True,
56
+ normalize=None,
57
+ ):
58
+ """
59
+ Create a PyTorch image transformation function based on specified parameters.
60
+
61
+ Parameters:
62
+ - resize (tuple or None): Target size for resizing, e.g. (height, width).
63
+ - crop (string): crop style e.g. 'lower_middle_third'
64
+ - to_tensor (bool): Converts the PIL Image (H x W x C) in the range [0, 255] to a torch.FloatTensor of shape (C x H x W) in the range [0.0, 1.0]
65
+ - normalize (tuple of lists [r, g, b] or None): Mean and standard deviation for normalization.
66
+
67
+ Returns:
68
+ PyTorch image transformation function.
69
+ """
70
+ transform_list = []
71
+
72
+ if crop is not None:
73
+ transform_list.append(
74
+ transforms.Lambda(partial(self.custom_crop, crop_style=crop))
75
+ )
76
+
77
+ if resize is not None:
78
+ if isinstance(resize, int):
79
+ resize = (resize, resize)
80
+ transform_list.append(transforms.Resize(resize))
81
+
82
+ if to_tensor:
83
+ transform_list.append(transforms.ToTensor())
84
+
85
+ if normalize is not None:
86
+ transform_list.append(transforms.Normalize(*normalize))
87
+
88
+ composed_transform = transforms.Compose(transform_list)
89
+ return composed_transform
90
+
91
+ def preprocessing(self, img_data_raw, transform):
92
+ transform = self.transform(**transform)
93
+ img_data = torch.stack([transform(img) for img in img_data_raw])
94
+ return img_data
95
+
96
+ def load_model(self, model):
97
+ model_path = self.model_root / model
98
+ # load model data from hugging face if not locally available
99
+ if not os.path.exists(model_path):
100
+ print(
101
+ f"Model file not found at {model_path}. Downloading from Hugging Face..."
102
+ )
103
+ try:
104
+ os.makedirs(self.model_root, exist_ok=True)
105
+ model_path = hf_hub_download(
106
+ repo_id=self.hf_model_repo, filename=model, local_dir=self.model_root
107
+ )
108
+ print(f"Model file downloaded to {model_path}.")
109
+ except Exception as e:
110
+ print(f"An unexpected error occurred while downloading the model: {e}")
111
+ return None, {}, False
112
+
113
+ model_state = torch.load(model_path, map_location=self.device)
114
+ model_name = model_state["model_name"]
115
+ is_regression = model_state["is_regression"]
116
+ class_to_idx = model_state["class_to_idx"]
117
+ num_classes = 1 if is_regression else len(class_to_idx.items())
118
+ model_state_dict = model_state["model_state_dict"]
119
+ model_cls = model_mapping[model_name]
120
+ model = model_cls(num_classes=num_classes)
121
+ model.load_state_dict(model_state_dict)
122
+
123
+ return model, class_to_idx, is_regression
124
+
125
+ def predict(self, model, data):
126
+ model.to(self.device)
127
+ model.eval()
128
+
129
+ image_batch = data.to(self.device)
130
+
131
+ with torch.no_grad():
132
+ batch_outputs = model(image_batch)
133
+ # batch_classes, batch_values = model.get_class_and_value(batch_outputs)
134
+ batch_values = model.get_class_probabilities(batch_outputs)
135
+
136
+ return batch_values
137
+
138
+ @staticmethod
139
+ def predict_value_to_class(batch_values, class_to_idx, ids, level=""):
140
+ columns = ["id", "level", "value", "class"]
141
+ batch_size = list(batch_values.shape)
142
+ if len(batch_size) < 2:
143
+ batch_size = [batch_size[0], 1]
144
+ df = pd.DataFrame(columns=columns, index=range(batch_size[0] * batch_size[1]))
145
+ idx_to_class = {i: cls for cls, i in class_to_idx.items()}
146
+
147
+ if batch_size[1] == 1:
148
+ batch_classes = [
149
+ idx_to_class[
150
+ min(
151
+ max(idx.item(), min(list(class_to_idx.values()))),
152
+ max(list(class_to_idx.values())),
153
+ )
154
+ ]
155
+ for idx in batch_values.round().int()
156
+ ]
157
+ i = 0
158
+ for id, value, cls in zip(ids, batch_values, batch_classes):
159
+ df.iloc[i] = [id, level, value.item(), cls]
160
+ i += 1
161
+ else:
162
+ batch_classes = [idx_to_class[idx.item()] for idx in torch.argmax(batch_values, dim=1)]
163
+ i = 0
164
+ for id, values in zip(ids, batch_values):
165
+ for idx, value in enumerate(values.tolist()):
166
+ df.iloc[i] = [id, level, value, idx_to_class[idx]]
167
+ i += 1
168
+
169
+ return df, batch_classes
170
+
171
+ def batch_classifications(self, img_data_raw, img_ids=None):
172
+ # default image ids
173
+ if img_ids is None:
174
+ img_ids = range(len(img_data_raw))
175
+
176
+ df = pd.DataFrame()
177
+
178
+ # road type
179
+ level = "road_type"
180
+ model_file = self.models.get(level)
181
+ if model_file is not None:
182
+ model, class_to_idx, _ = self.load_model(model=model_file)
183
+ if model is None:
184
+ print(f"Road type model '{model_file}' is not found.\n"
185
+ + "Road type prediction is skipped.")
186
+ else:
187
+ data = self.preprocessing(img_data_raw, self.transform_road_type)
188
+ values = self.predict(model, data)
189
+ df_tmp, _ = self.predict_value_to_class(
190
+ values,
191
+ class_to_idx,
192
+ img_ids,
193
+ level,
194
+ )
195
+ df = pd.concat([df, df_tmp], ignore_index=True)
196
+
197
+ # surface type
198
+ level = "surface_type"
199
+ model_file = self.models.get(level)
200
+ if model_file is not None:
201
+ model, class_to_idx, _ = self.load_model(model=model_file)
202
+ if model is None:
203
+ print(f"Surface type model '{model_file}' is not found.\n"
204
+ + "Surface type prediction is skipped.")
205
+ else:
206
+ data = self.preprocessing(img_data_raw, self.transform_surface)
207
+ values = self.predict(model, data)
208
+ df_tmp, classes = self.predict_value_to_class(
209
+ values,
210
+ class_to_idx,
211
+ img_ids,
212
+ level,
213
+ )
214
+ df = pd.concat([df, df_tmp], ignore_index=True)
215
+
216
+ # surface quality
217
+ level = "surface_quality"
218
+ sub_models = self.models.get(level)
219
+ if sub_models is not None:
220
+ surface_indices = {}
221
+ for i, surface_type in enumerate(classes):
222
+ if surface_type not in surface_indices:
223
+ surface_indices[surface_type] = []
224
+ surface_indices[surface_type].append(i)
225
+
226
+ for surface_type, indices in surface_indices.items():
227
+ model_file = sub_models.get(surface_type)
228
+ if model_file is not None:
229
+ model, class_to_idx, _ = self.load_model(model=model_file)
230
+ if model is None:
231
+ print(f"Quality model '{model_file}' is not found.\n"
232
+ + f"Quality prediction is skipped for surface '{surface_type}'.")
233
+ else:
234
+ values = self.predict(model, data[indices])
235
+ df_tmp, _ = self.predict_value_to_class(
236
+ values,
237
+ class_to_idx,
238
+ [img_ids[i] for i in indices],
239
+ level,
240
+ )
241
+ df = pd.concat([df, df_tmp], ignore_index=True)
242
+
243
+ return df
244
+
245
+
246
+ class CustomEfficientNetV2SLinear(nn.Module):
247
+ def __init__(self, num_classes, avg_pool=1):
248
+ super(CustomEfficientNetV2SLinear, self).__init__()
249
+
250
+ model = models.efficientnet_v2_s(weights="IMAGENET1K_V1")
251
+ # adapt output layer
252
+ in_features = model.classifier[-1].in_features * (avg_pool * avg_pool)
253
+ fc = nn.Linear(in_features, num_classes, bias=True)
254
+ model.classifier[-1] = fc
255
+
256
+ self.features = model.features
257
+ self.avgpool = nn.AdaptiveAvgPool2d(avg_pool)
258
+ self.classifier = model.classifier
259
+ if num_classes == 1:
260
+ self.criterion = nn.MSELoss
261
+ self.is_regression = True
262
+ else:
263
+ self.criterion = nn.CrossEntropyLoss
264
+ self.is_regression = False
265
+
266
+ def get_class_probabilities(self, x):
267
+ if self.is_regression:
268
+ x = x.flatten()
269
+ else:
270
+ x = nn.functional.softmax(x, dim=1)
271
+ return x
272
+
273
+ def forward(self, x: Tensor) -> Tensor:
274
+ x = self.features(x)
275
+
276
+ x = self.avgpool(x)
277
+ x = torch.flatten(x, 1)
278
+
279
+ x = self.classifier(x)
280
+
281
+ return x
282
+
283
+ # def get_optimizer_layers(self):
284
+ # return self.classifier
285
+
286
+
287
+ # Model settings
288
+ const = {
289
+ "EFFNET_LINEAR": "efficientNetV2SLinear",
290
+ "CROP_LOWER_MIDDLE_HALF": "lower_middle_half",
291
+ "CROP_LOWER_HALF": "lower_half",
292
+ "NORM_MEAN": [0.42834484577178955, 0.4461250305175781, 0.4350937306880951],
293
+ "NORM_SD": [0.22991590201854706, 0.23555299639701843, 0.26348039507865906],
294
+ }
295
+
296
+ model_mapping = {
297
+ const["EFFNET_LINEAR"]: CustomEfficientNetV2SLinear,
298
+ }