nayan90k commited on
Commit
0fba8bd
·
verified ·
1 Parent(s): 159f25c

Upload 5 files

Browse files
Files changed (5) hide show
  1. app.py +121 -0
  2. best.pt +3 -0
  3. inference_core.py +363 -0
  4. labels.json +7 -0
  5. requirements.txt +6 -0
app.py ADDED
@@ -0,0 +1,121 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ #!/usr/bin/env python3
2
+ from __future__ import annotations
3
+
4
+ import os
5
+ from pathlib import Path
6
+
7
+ import cv2
8
+ import gradio as gr
9
+
10
+ from inference_core import MaskClassifierPyTorch, parse_class_thresholds
11
+
12
+
13
+ MODEL_PATH = Path("best.pt")
14
+
15
+ def _env_float(name: str, default: float) -> float:
16
+ value = os.getenv(name)
17
+ if value is None or value.strip() == "":
18
+ return default
19
+ return float(value)
20
+
21
+
22
+ classifier = MaskClassifierPyTorch(
23
+ model_path=MODEL_PATH,
24
+ use_mediapipe=True,
25
+ min_top_confidence=_env_float("MASK_MIN_TOP_CONFIDENCE", 0.0),
26
+ min_margin=_env_float("MASK_MIN_MARGIN", 0.0),
27
+ class_thresholds=parse_class_thresholds(os.getenv("MASK_CLASS_THRESHOLDS")),
28
+ reject_label="uncertain",
29
+ )
30
+
31
+
32
+ def predict_image(input_image):
33
+ if input_image is None:
34
+ return "No image provided", None
35
+
36
+ image_bgr = cv2.cvtColor(input_image, cv2.COLOR_RGB2BGR)
37
+ result = classifier.predict_from_bgr(image_bgr)
38
+ if not result["ok"]:
39
+ return result["error"], None
40
+
41
+ scores = result["scores"]
42
+ text = f"Label: {result['label']}\nConfidence: {result['confidence']:.4f}"
43
+ return text, scores
44
+
45
+
46
+ def predict_video(video_path: str, sample_every_n_frames: int):
47
+ if not video_path:
48
+ return "No video provided"
49
+
50
+ cap = cv2.VideoCapture(video_path)
51
+ if not cap.isOpened():
52
+ return "Could not open video"
53
+
54
+ frame_idx = 0
55
+ preds = []
56
+ while True:
57
+ ok, frame = cap.read()
58
+ if not ok:
59
+ break
60
+ if frame_idx % max(1, int(sample_every_n_frames)) == 0:
61
+ result = classifier.predict_from_bgr(frame)
62
+ if result["ok"]:
63
+ preds.append(result)
64
+ frame_idx += 1
65
+ cap.release()
66
+
67
+ if not preds:
68
+ return "No detectable face found in sampled frames"
69
+
70
+ counts = {}
71
+ conf_sum = {}
72
+ for p in preds:
73
+ label = p["label"]
74
+ counts[label] = counts.get(label, 0) + 1
75
+ conf_sum[label] = conf_sum.get(label, 0.0) + p["confidence"]
76
+
77
+ non_uncertain_counts = {k: v for k, v in counts.items() if k != "uncertain"}
78
+ if non_uncertain_counts:
79
+ top_label = max(non_uncertain_counts, key=non_uncertain_counts.get)
80
+ avg_conf = conf_sum[top_label] / counts[top_label]
81
+ else:
82
+ top_label = "uncertain"
83
+ avg_conf = conf_sum[top_label] / counts[top_label]
84
+
85
+ lines = [
86
+ f"Frames scanned: {frame_idx}",
87
+ f"Frames predicted: {len(preds)}",
88
+ f"Final label: {top_label}",
89
+ f"Avg confidence: {avg_conf:.4f}",
90
+ f"Label counts: {counts}",
91
+ ]
92
+ return "\n".join(lines)
93
+
94
+
95
+ with gr.Blocks(title="Face Mask Detection") as demo:
96
+ gr.Markdown("# Face Mask Detection (MobileNetV2 + ONNX INT8)")
97
+ gr.Markdown("Upload an image or video to run mask classification.")
98
+
99
+ with gr.Tab("Image"):
100
+ image_input = gr.Image(type="numpy", label="Input Image")
101
+ image_btn = gr.Button("Predict")
102
+ image_text = gr.Textbox(label="Result")
103
+ image_scores = gr.Label(label="Class Probabilities")
104
+ image_btn.click(
105
+ fn=predict_image, inputs=[image_input], outputs=[image_text, image_scores]
106
+ )
107
+
108
+ with gr.Tab("Video"):
109
+ video_input = gr.Video(label="Input Video")
110
+ frame_stride = gr.Slider(
111
+ minimum=1, maximum=60, value=15, step=1, label="Sample every N frames"
112
+ )
113
+ video_btn = gr.Button("Predict")
114
+ video_text = gr.Textbox(label="Result", lines=8)
115
+ video_btn.click(
116
+ fn=predict_video, inputs=[video_input, frame_stride], outputs=[video_text]
117
+ )
118
+
119
+
120
+ if __name__ == "__main__":
121
+ demo.launch()
best.pt ADDED
@@ -0,0 +1,3 @@
 
 
 
 
1
+ version https://git-lfs.github.com/spec/v1
2
+ oid sha256:f58b6f7a07858598c15df1ed4595df96ad0936e224edb01b730c09fe90e58641
3
+ size 27063625
inference_core.py ADDED
@@ -0,0 +1,363 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ #!/usr/bin/env python3
2
+ from __future__ import annotations
3
+
4
+ import json
5
+ from pathlib import Path
6
+
7
+ import cv2
8
+ import numpy as np
9
+ import torch
10
+ import torch.nn as nn
11
+ from torchvision import models
12
+
13
+
14
+ def parse_class_thresholds(spec: str | None) -> dict[str, float]:
15
+ if not spec:
16
+ return {}
17
+
18
+ thresholds: dict[str, float] = {}
19
+ items = [item.strip() for item in spec.split(",") if item.strip()]
20
+ for item in items:
21
+ if "=" not in item:
22
+ raise ValueError(
23
+ f"Invalid threshold item '{item}'. Expected format: class=value"
24
+ )
25
+ label, raw_value = item.split("=", 1)
26
+ label = label.strip()
27
+ if not label:
28
+ raise ValueError("Class label cannot be empty in class threshold spec")
29
+ value = float(raw_value.strip())
30
+ if value < 0.0 or value > 1.0:
31
+ raise ValueError(f"Threshold for '{label}' must be in [0, 1], got {value}")
32
+ thresholds[label] = value
33
+ return thresholds
34
+
35
+
36
+ class FaceDetector:
37
+ def __init__(self, use_mediapipe: bool = True, min_confidence: float = 0.5):
38
+ self._backend = "none"
39
+ self._detector = None
40
+ self._min_confidence = min_confidence
41
+
42
+ if use_mediapipe:
43
+ try:
44
+ import mediapipe as mp
45
+
46
+ self._detector = mp.solutions.face_detection.FaceDetection(
47
+ model_selection=1,
48
+ min_detection_confidence=min_confidence,
49
+ )
50
+ self._backend = "mediapipe"
51
+ except Exception:
52
+ self._detector = None
53
+
54
+ def close(self) -> None:
55
+ if self._backend == "mediapipe" and self._detector is not None:
56
+ self._detector.close()
57
+
58
+ def _largest_bbox(self, detections, width: int, height: int):
59
+ largest = None
60
+ largest_det = None
61
+ area_max = -1.0
62
+ for d in detections:
63
+ bbox = d.location_data.relative_bounding_box
64
+ w = max(0.0, bbox.width) * width
65
+ h = max(0.0, bbox.height) * height
66
+ area = w * h
67
+ if area > area_max:
68
+ area_max = area
69
+ largest = bbox
70
+ largest_det = d
71
+ return largest, largest_det
72
+
73
+ def detect_largest_face_with_meta(self, image_bgr: np.ndarray, margin: float = 0.2):
74
+ h, w = image_bgr.shape[:2]
75
+
76
+ if self._backend != "mediapipe" or self._detector is None:
77
+ meta = {"bbox": [0, 0, w, h], "keypoints": []}
78
+ return image_bgr, meta
79
+
80
+ rgb = cv2.cvtColor(image_bgr, cv2.COLOR_BGR2RGB)
81
+ result = self._detector.process(rgb)
82
+ if not result.detections:
83
+ return None, None
84
+
85
+ bbox, detection = self._largest_bbox(result.detections, w, h)
86
+ if bbox is None:
87
+ return None, None
88
+
89
+ x = bbox.xmin * w
90
+ y = bbox.ymin * h
91
+ bw = bbox.width * w
92
+ bh = bbox.height * h
93
+ cx = x + bw / 2.0
94
+ cy = y + bh / 2.0
95
+ side = max(bw, bh) * (1.0 + margin)
96
+
97
+ x1 = int(max(0, cx - side / 2.0))
98
+ y1 = int(max(0, cy - side / 2.0))
99
+ x2 = int(min(w, cx + side / 2.0))
100
+ y2 = int(min(h, cy + side / 2.0))
101
+ if x2 <= x1 or y2 <= y1:
102
+ return None, None
103
+
104
+ keypoints = []
105
+ if detection is not None:
106
+ for kp in detection.location_data.relative_keypoints:
107
+ keypoints.append([int(kp.x * w), int(kp.y * h)])
108
+
109
+ meta = {"bbox": [x1, y1, x2, y2], "keypoints": keypoints}
110
+ return image_bgr[y1:y2, x1:x2], meta
111
+
112
+ def detect_largest_face(
113
+ self, image_bgr: np.ndarray, margin: float = 0.2
114
+ ) -> np.ndarray | None:
115
+ crop, _ = self.detect_largest_face_with_meta(image_bgr=image_bgr, margin=margin)
116
+ return crop
117
+
118
+
119
+ class MaskClassifierONNX:
120
+ def __init__(
121
+ self,
122
+ model_path: Path,
123
+ labels_path: Path | None = None,
124
+ use_mediapipe: bool = True,
125
+ min_top_confidence: float = 0.0,
126
+ min_margin: float = 0.0,
127
+ class_thresholds: dict[str, float] | None = None,
128
+ reject_label: str = "uncertain",
129
+ ):
130
+ self.model_path = Path(model_path)
131
+ providers = ["CPUExecutionProvider"]
132
+ self.session = ort.InferenceSession(str(self.model_path), providers=providers)
133
+ self.input_name = self.session.get_inputs()[0].name
134
+ self.output_name = self.session.get_outputs()[0].name
135
+ self.class_names = self._load_class_names(labels_path)
136
+ self.detector = FaceDetector(use_mediapipe=use_mediapipe)
137
+ self.min_top_confidence = float(min_top_confidence)
138
+ self.min_margin = float(min_margin)
139
+ self.class_thresholds = dict(class_thresholds or {})
140
+ self.reject_label = reject_label
141
+
142
+ if self.min_top_confidence < 0.0 or self.min_top_confidence > 1.0:
143
+ raise ValueError("min_top_confidence must be in [0, 1]")
144
+ if self.min_margin < 0.0 or self.min_margin > 1.0:
145
+ raise ValueError("min_margin must be in [0, 1]")
146
+ for label, value in self.class_thresholds.items():
147
+ if value < 0.0 or value > 1.0:
148
+ raise ValueError(
149
+ f"class threshold for '{label}' must be in [0, 1], got {value}"
150
+ )
151
+
152
+ def _load_class_names(self, labels_path: Path | None) -> list[str]:
153
+ candidate = labels_path
154
+ if candidate is None:
155
+ candidate = self.model_path.with_suffix(".labels.json")
156
+ if candidate.exists():
157
+ payload = json.loads(candidate.read_text(encoding="utf-8"))
158
+ if isinstance(payload, list):
159
+ return payload
160
+ if isinstance(payload, dict) and "class_names" in payload:
161
+ return list(payload["class_names"])
162
+ return ["with_mask", "incorrect_mask", "without_mask"]
163
+
164
+ @staticmethod
165
+ def preprocess(image_bgr: np.ndarray, image_size: int = 224) -> np.ndarray:
166
+ img = cv2.cvtColor(image_bgr, cv2.COLOR_BGR2RGB)
167
+ img = cv2.resize(img, (image_size, image_size), interpolation=cv2.INTER_AREA)
168
+ arr = img.astype(np.float32) / 255.0
169
+ mean = np.array([0.485, 0.456, 0.406], dtype=np.float32)
170
+ std = np.array([0.229, 0.224, 0.225], dtype=np.float32)
171
+ arr = (arr - mean) / std
172
+ arr = np.transpose(arr, (2, 0, 1))
173
+ return np.expand_dims(arr, axis=0)
174
+
175
+ @staticmethod
176
+ def softmax(logits: np.ndarray) -> np.ndarray:
177
+ z = logits - np.max(logits, axis=1, keepdims=True)
178
+ exp = np.exp(z)
179
+ return exp / np.sum(exp, axis=1, keepdims=True)
180
+
181
+ def _apply_decision_policy(self, probs: np.ndarray) -> dict:
182
+ top_idx = int(np.argmax(probs))
183
+ top_label = self.class_names[top_idx]
184
+ top_conf = float(probs[top_idx])
185
+
186
+ if len(probs) > 1:
187
+ sorted_idx = np.argsort(probs)[::-1]
188
+ second_conf = float(probs[int(sorted_idx[1])])
189
+ margin = top_conf - second_conf
190
+ else:
191
+ margin = 1.0
192
+
193
+ if top_conf < self.min_top_confidence:
194
+ return {
195
+ "label": self.reject_label,
196
+ "decision_reason": "top_confidence_below_min",
197
+ "raw_label": top_label,
198
+ "raw_confidence": top_conf,
199
+ "margin": float(margin),
200
+ }
201
+
202
+ class_threshold = self.class_thresholds.get(top_label)
203
+ if class_threshold is not None and top_conf < class_threshold:
204
+ return {
205
+ "label": self.reject_label,
206
+ "decision_reason": "class_threshold_not_met",
207
+ "raw_label": top_label,
208
+ "raw_confidence": top_conf,
209
+ "margin": float(margin),
210
+ }
211
+
212
+ if margin < self.min_margin:
213
+ return {
214
+ "label": self.reject_label,
215
+ "decision_reason": "margin_below_min",
216
+ "raw_label": top_label,
217
+ "raw_confidence": top_conf,
218
+ "margin": float(margin),
219
+ }
220
+
221
+ return {
222
+ "label": top_label,
223
+ "decision_reason": "accepted",
224
+ "raw_label": top_label,
225
+ "raw_confidence": top_conf,
226
+ "margin": float(margin),
227
+ }
228
+
229
+ def predict_from_bgr(self, image_bgr: np.ndarray) -> dict:
230
+ face, meta = self.detector.detect_largest_face_with_meta(image_bgr, margin=0.2)
231
+ if face is None:
232
+ return {
233
+ "ok": False,
234
+ "error": "No face detected",
235
+ "label": None,
236
+ "confidence": None,
237
+ "scores": None,
238
+ "face_bbox": None,
239
+ "face_keypoints": None,
240
+ }
241
+
242
+ inp = self.preprocess(face)
243
+ logits = self.session.run([self.output_name], {self.input_name: inp})[0]
244
+ probs = self.softmax(logits)[0]
245
+ policy = self._apply_decision_policy(probs)
246
+
247
+ return {
248
+ "ok": True,
249
+ "label": policy["label"],
250
+ "confidence": policy["raw_confidence"],
251
+ "raw_label": policy["raw_label"],
252
+ "raw_confidence": policy["raw_confidence"],
253
+ "margin": policy["margin"],
254
+ "decision_reason": policy["decision_reason"],
255
+ "scores": {
256
+ name: float(probs[i]) for i, name in enumerate(self.class_names)
257
+ },
258
+ "face_bbox": meta.get("bbox") if meta else None,
259
+ "face_keypoints": meta.get("keypoints") if meta else None,
260
+ }
261
+
262
+
263
+ class MaskClassifierPyTorch:
264
+ def __init__(
265
+ self,
266
+ model_path,
267
+ labels_path = None,
268
+ use_mediapipe: bool = True,
269
+ min_top_confidence: float = 0.0,
270
+ min_margin: float = 0.0,
271
+ class_thresholds: dict = None,
272
+ reject_label: str = "uncertain",
273
+ ):
274
+ self.model_path = Path(model_path)
275
+ self.class_names = self._load_class_names(labels_path)
276
+ self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
277
+
278
+ self.model = models.mobilenet_v2(weights=None)
279
+ self.model.classifier[1] = nn.Linear(self.model.classifier[1].in_features, len(self.class_names))
280
+ state_dict = torch.load(self.model_path, map_location=self.device)
281
+ if 'model_state_dict' in state_dict:
282
+ state_dict = state_dict['model_state_dict']
283
+ self.model.load_state_dict(state_dict)
284
+ self.model.to(self.device)
285
+ self.model.eval()
286
+
287
+ self.detector = FaceDetector(use_mediapipe=use_mediapipe)
288
+ self.min_top_confidence = float(min_top_confidence)
289
+ self.min_margin = float(min_margin)
290
+ self.class_thresholds = dict(class_thresholds or {})
291
+ self.reject_label = reject_label
292
+
293
+ def _load_class_names(self, labels_path) -> list[str]:
294
+ candidate = labels_path
295
+ if candidate is None:
296
+ candidate = Path("labels.json")
297
+ if candidate.exists():
298
+ payload = json.loads(candidate.read_text(encoding="utf-8"))
299
+ if isinstance(payload, list):
300
+ return payload
301
+ if isinstance(payload, dict) and "class_names" in payload:
302
+ return list(payload["class_names"])
303
+ return ["with_mask", "incorrect_mask", "without_mask"]
304
+
305
+ @staticmethod
306
+ def preprocess(image_bgr: np.ndarray, image_size: int = 224) -> np.ndarray:
307
+ img = cv2.cvtColor(image_bgr, cv2.COLOR_BGR2RGB)
308
+ img = cv2.resize(img, (image_size, image_size), interpolation=cv2.INTER_AREA)
309
+ arr = img.astype(np.float32) / 255.0
310
+ mean = np.array([0.485, 0.456, 0.406], dtype=np.float32)
311
+ std = np.array([0.229, 0.224, 0.225], dtype=np.float32)
312
+ arr = (arr - mean) / std
313
+ arr = np.transpose(arr, (2, 0, 1))
314
+ return np.expand_dims(arr, axis=0)
315
+
316
+ def _apply_decision_policy(self, probs: np.ndarray) -> dict:
317
+ top_idx = int(np.argmax(probs))
318
+ top_label = self.class_names[top_idx]
319
+ top_conf = float(probs[top_idx])
320
+
321
+ if len(probs) > 1:
322
+ sorted_idx = np.argsort(probs)[::-1]
323
+ second_conf = float(probs[int(sorted_idx[1])])
324
+ margin = top_conf - second_conf
325
+ else:
326
+ margin = 1.0
327
+
328
+ if top_conf < self.min_top_confidence:
329
+ return {"label": self.reject_label, "raw_label": top_label, "raw_confidence": top_conf}
330
+
331
+ class_threshold = self.class_thresholds.get(top_label)
332
+ if class_threshold is not None and top_conf < class_threshold:
333
+ return {"label": self.reject_label, "raw_label": top_label, "raw_confidence": top_conf}
334
+
335
+ if margin < self.min_margin:
336
+ return {"label": self.reject_label, "raw_label": top_label, "raw_confidence": top_conf}
337
+
338
+ return {"label": top_label, "raw_label": top_label, "raw_confidence": top_conf}
339
+
340
+ def predict_from_bgr(self, image_bgr: np.ndarray) -> dict:
341
+ face, meta = self.detector.detect_largest_face_with_meta(image_bgr, margin=0.2)
342
+ if face is None:
343
+ return {"ok": False, "error": "No face detected"}
344
+
345
+ inp = self.preprocess(face)
346
+ tensor_inp = torch.from_numpy(inp).to(self.device).float()
347
+
348
+ with torch.no_grad():
349
+ outputs = self.model(tensor_inp)
350
+ probs = torch.nn.functional.softmax(outputs[0], dim=0).cpu().numpy()
351
+
352
+ policy = self._apply_decision_policy(probs)
353
+
354
+ scores = {}
355
+ for i, class_name in enumerate(self.class_names):
356
+ scores[class_name] = float(probs[i])
357
+
358
+ return {
359
+ "ok": True,
360
+ "label": policy["label"],
361
+ "confidence": policy["raw_confidence"],
362
+ "scores": scores
363
+ }
labels.json ADDED
@@ -0,0 +1,7 @@
 
 
 
 
 
 
 
 
1
+ {
2
+ "class_names": [
3
+ "incorrect_mask",
4
+ "with_mask",
5
+ "without_mask"
6
+ ]
7
+ }
requirements.txt ADDED
@@ -0,0 +1,6 @@
 
 
 
 
 
 
 
1
+ gradio
2
+ numpy
3
+ opencv-python-headless
4
+ mediapipe
5
+ torch
6
+ torchvision