seifbenayed commited on
Commit
85f8ca4
·
1 Parent(s): 46b21a8
Files changed (1) hide show
  1. app.py +463 -462
app.py CHANGED
@@ -26,226 +26,270 @@ from torchvision import transforms
26
 
27
  # Add current directory to path
28
  if not os.getcwd() in sys.path:
29
- sys.path.append(os.getcwd())
30
 
31
  # Detectron2 imports - wrapped in try-except to make them optional
32
  try:
33
- from detectron2.engine import DefaultPredictor
34
- from detectron2.config import get_cfg
35
- from detectron2.utils.visualizer import Visualizer, ColorMode
36
- from detectron2 import model_zoo
37
- DETECTRON2_AVAILABLE = True
38
  except ImportError:
39
- print("Warning: Detectron2 is not installed. Damage detection will not be available.")
40
- DETECTRON2_AVAILABLE = False
41
 
42
  # Check for custom path for models
43
  try:
44
- from configs.get_config import load_config
45
- from models import *
46
- MODELS_IMPORTED = True
47
  except ImportError:
48
- print("Warning: Custom models couldn't be imported. Only damage detection will work.")
49
- MODELS_IMPORTED = False
50
 
51
  def setup_device(device_str):
52
- """Set up the computation device based on user input and availability"""
53
- if device_str == 'auto':
54
- if torch.cuda.is_available():
55
- return torch.device('cuda:0')
56
- elif hasattr(torch, 'backends') and hasattr(torch.backends, 'mps') and torch.backends.mps.is_available():
57
- return torch.device('mps')
58
- else:
59
- return torch.device('cpu')
60
- elif device_str == 'cuda' and torch.cuda.is_available():
61
  return torch.device('cuda:0')
62
- elif device_str == 'mps' and hasattr(torch, 'backends') and hasattr(torch.backends, 'mps') and torch.backends.mps.is_available():
63
  return torch.device('mps')
64
  else:
65
- print(f"Warning: Device {device_str} not available, using CPU instead.")
66
  return torch.device('cpu')
 
 
 
 
 
 
 
67
 
68
  def setup_damage_detector(model_path, threshold=0.7):
69
- """Set up the damage detection model using Detectron2"""
70
- if not DETECTRON2_AVAILABLE:
71
- print("Detectron2 is not installed. Cannot set up damage detector.")
72
- return None, None
73
-
74
- if model_path is None or not os.path.exists(model_path):
75
- print("No damage model specified or file not found. Skipping damage detection.")
76
- return None, None
77
-
78
- cfg = get_cfg()
79
- cfg.merge_from_file(model_zoo.get_config_file("COCO-InstanceSegmentation/mask_rcnn_R_50_FPN_3x.yaml"))
80
- cfg.MODEL.WEIGHTS = model_path
81
- cfg.MODEL.ROI_HEADS.NUM_CLASSES = 1 # Only one class (damage)
82
- cfg.MODEL.ROI_HEADS.SCORE_THRESH_TEST = threshold
83
 
84
- # Explicitly set to use CPU if on Mac (MPS)
85
- if torch.backends.mps.is_available():
86
- cfg.MODEL.DEVICE = "cpu"
87
- print("Mac MPS detected - forcing Detectron2 to use CPU")
88
 
89
- try:
90
- predictor = DefaultPredictor(cfg)
91
- return predictor, cfg
92
- except Exception as e:
93
- print(f"Error setting up damage detector: {e}")
94
- return None, cfg
 
 
 
 
 
 
 
 
 
 
 
95
 
96
  def load_deepfake_model(model_path, cfg_path, device):
97
- """Load the deepfake detection model"""
98
- if not MODELS_IMPORTED:
99
- print("Custom models module not imported. Cannot load deepfake model.")
100
- return None, None
101
-
102
- if model_path is None or not os.path.exists(model_path):
103
- print("No deepfake model specified or file not found. Skipping deepfake detection.")
104
- return None, None
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
105
 
106
- if cfg_path is None or not os.path.exists(cfg_path):
107
- print("No deepfake config specified or file not found. Skipping deepfake detection.")
108
- return None, None
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
109
 
110
- try:
111
- # Load config
112
- cfg = load_config(cfg_path)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
113
 
114
- # Build model
115
- model = build_model(cfg.MODEL, MODELS)
 
 
 
 
 
 
 
 
 
 
116
 
117
- # Load weights
118
- print(f"Loading deepfake model from: {model_path}")
119
- checkpoint = torch.load(model_path, map_location='cpu')
 
 
 
 
 
 
 
120
 
121
- if isinstance(checkpoint, dict) and 'state_dict' in checkpoint:
122
- model.load_state_dict(checkpoint['state_dict'])
123
- else:
124
- model.load_state_dict(checkpoint)
125
-
126
- # Move model to device and set to evaluation mode
127
- model = model.to(device)
128
- if hasattr(cfg.MODEL, 'precision') and cfg.MODEL.precision == 'fp64':
129
- model = model.to(torch.float64)
130
- model.eval()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
131
 
132
- return model, cfg
133
- except Exception as e:
134
- print(f"Error loading deepfake model: {e}")
135
- import traceback
136
- traceback.print_exc()
137
- return None, None
 
 
 
 
 
 
 
138
 
139
- def preprocess_for_deepfake(image, cfg, device):
140
- """Preprocess an image for deepfake detection"""
141
- try:
142
- # Convert to RGB if needed
143
- if len(image.shape) == 3 and image.shape[2] == 3:
144
- if image.dtype != np.uint8:
145
- image = (image * 255).astype(np.uint8)
146
- rgb_img = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
147
- else:
148
- rgb_img = image
 
 
 
 
149
 
150
- # Resize
151
- img_resized = cv2.resize(rgb_img, (cfg.DATASET.IMAGE_SIZE[0], cfg.DATASET.IMAGE_SIZE[1]))
152
-
153
- # Convert to PIL and apply transforms
154
- transform = transforms.Compose([
155
- transforms.ToTensor(),
156
- transforms.Normalize(
157
- mean=cfg.DATASET.TRANSFORM.normalize.mean,
158
- std=cfg.DATASET.TRANSFORM.normalize.std
159
- )
160
- ])
161
-
162
- img_tensor = transform(Image.fromarray(img_resized)).unsqueeze(0) # Add batch dimension
163
- img_tensor = img_tensor.to(device)
164
-
165
- # Convert to correct precision
166
- if hasattr(cfg.MODEL, 'precision') and cfg.MODEL.precision == 'fp64':
167
- img_tensor = img_tensor.to(torch.float64)
168
 
169
- return img_tensor
170
- except Exception as e:
171
- print(f"Error preprocessing image for deepfake detection: {e}")
172
- import traceback
173
- traceback.print_exc()
174
- return None
175
-
176
- def detect_damage(img, damage_detector):
177
- """Detect damage in an image"""
178
- try:
179
- if img is None:
180
- raise ValueError("Invalid image")
181
 
182
- # If no damage detector available, return the whole image as region
183
- if damage_detector is None:
184
- print("No damage detector available. Using whole image as region.")
185
- h, w = img.shape[:2]
186
- damage_regions = [{
187
- "box": (0, 0, w, h),
188
- "score": 1.0,
189
- "mask": None
190
- }]
191
- return img, None, damage_regions
192
 
193
- # Run inference
194
- outputs = damage_detector(img)
195
-
196
- # Get damage regions
197
- instances = outputs["instances"].to("cpu")
198
- boxes = instances.pred_boxes.tensor.numpy() if instances.has("pred_boxes") else []
199
- scores = instances.scores.numpy() if instances.has("scores") else []
200
- masks = instances.pred_masks.numpy() if instances.has("pred_masks") else []
 
201
 
202
- damage_regions = []
203
- for i in range(len(boxes)):
204
- x1, y1, x2, y2 = map(int, boxes[i])
205
- damage_regions.append({
206
- "box": (x1, y1, x2, y2),
207
- "score": float(scores[i]),
208
- "mask": masks[i] if len(masks) > i else None
209
- })
210
 
211
- if not damage_regions:
212
- print("No damage detected. Using whole image.")
213
- h, w = img.shape[:2]
214
- damage_regions = [{
215
- "box": (0, 0, w, h),
216
- "score": 1.0,
217
- "mask": None
218
- }]
219
 
220
- return img, outputs, damage_regions
221
- except Exception as e:
222
- print(f"Error detecting damage: {e}")
223
- # If error occurs, return the whole image as region
224
- if 'img' in locals() and img is not None:
225
- h, w = img.shape[:2]
226
- damage_regions = [{
227
- "box": (0, 0, w, h),
228
- "score": 1.0,
229
- "mask": None
230
- }]
231
- return img, None, damage_regions
232
- return None, None, []
233
-
234
- def check_deepfake(image, damage_regions, deepfake_model, deepfake_cfg, device, threshold=0.5):
235
- """Check if damage regions are deepfakes"""
236
- results = []
237
-
238
- if deepfake_model is None:
239
- print("No deepfake model available. Skipping deepfake detection.")
240
- return []
241
-
242
- try:
243
- # If no damage regions, check the entire image
244
- if not damage_regions:
245
- img_tensor = preprocess_for_deepfake(image, deepfake_cfg, device)
246
  if img_tensor is None:
247
- return []
248
-
249
  # Run inference
250
  with torch.no_grad():
251
  outputs = deepfake_model(img_tensor)
@@ -266,301 +310,258 @@ def check_deepfake(image, damage_regions, deepfake_model, deepfake_cfg, device,
266
  confidence = cls_prob[0][0] if cls_prob.ndim > 1 else cls_prob[0]
267
 
268
  results.append({
269
- "region": "full_image",
 
270
  "deepfake_prob": float(confidence),
271
  "is_fake": bool(is_fake)
272
  })
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
273
 
274
- return results
275
-
276
- # Process each damage region
277
- for i, region in enumerate(damage_regions):
278
- x1, y1, x2, y2 = region["box"]
279
- # Ensure coordinates are within image bounds
280
- x1, y1 = max(0, x1), max(0, y1)
281
- x2, y2 = min(image.shape[1], x2), min(image.shape[0], y2)
282
-
283
- # Extract region and check if it's a deepfake
284
- if x2 > x1 and y2 > y1:
285
- # Get ROI
286
- roi = image[y1:y2, x1:x2]
 
 
 
287
 
288
- # Preprocess
289
- img_tensor = preprocess_for_deepfake(roi, deepfake_cfg, device)
290
- if img_tensor is None:
291
- continue
292
 
293
- # Run inference
294
- with torch.no_grad():
295
- outputs = deepfake_model(img_tensor)
296
-
297
- # Extract outputs
298
- if isinstance(outputs, list):
299
- outputs = outputs[0]
300
-
301
- if isinstance(outputs, dict) and 'cls' in outputs:
302
- cls_outputs = outputs['cls']
303
- cls_prob = cls_outputs.sigmoid().cpu().numpy()
304
- else:
305
- # Assuming the output is directly the classification probability
306
- cls_prob = outputs.sigmoid().cpu().numpy() if hasattr(outputs, 'sigmoid') else outputs.cpu().numpy()
307
-
308
- if cls_prob.size > 0:
309
- is_fake = cls_prob[0][0] > threshold if cls_prob.ndim > 1 else cls_prob[0] > threshold
310
- confidence = cls_prob[0][0] if cls_prob.ndim > 1 else cls_prob[0]
311
-
312
- results.append({
313
- "region_id": i,
314
- "box": (x1, y1, x2, y2),
315
- "deepfake_prob": float(confidence),
316
- "is_fake": bool(is_fake)
317
- })
318
-
319
- return results
320
- except Exception as e:
321
- print(f"Error in deepfake detection: {e}")
322
- import traceback
323
- traceback.print_exc()
324
- return []
325
-
326
- def visualize_results(image, damage_outputs, deepfake_results, damage_threshold):
327
- """Create visualization of damage detection and deepfake verification"""
328
- try:
329
- # Create a copy for visualization
330
- img_copy = image.copy()
331
-
332
- # Draw damage detection results
333
- if damage_outputs is not None and DETECTRON2_AVAILABLE:
334
- try:
335
- v = Visualizer(img_copy[:, :, ::-1], scale=1.0, instance_mode=ColorMode.IMAGE_BW)
336
- v = v.draw_instance_predictions(damage_outputs["instances"].to("cpu"))
337
- result_img = v.get_image()[:, :, ::-1]
338
 
339
- # Convert to a standard numpy array to ensure compatibility with OpenCV
340
- result_img = np.array(result_img, dtype=np.uint8)
341
- except Exception as e:
342
- print(f"Error visualizing damage detection: {e}")
343
- result_img = img_copy
344
- else:
345
- result_img = img_copy
346
-
347
- # Add deepfake detection results
348
- for result in deepfake_results:
349
- try:
350
- if "box" in result:
351
- x1, y1, x2, y2 = result["box"]
352
- fake_prob = result["deepfake_prob"]
353
- is_fake = result["is_fake"]
354
- region_id = result.get("region_id", 0)
355
-
356
- # Text for the region
357
- text = f"R{region_id}: {'FAKE' if is_fake else 'REAL'} ({fake_prob*100:.1f}%)"
358
-
359
- # Different colors for fake/real
360
- color = (0, 0, 255) if is_fake else (0, 255, 0) # Red for fake, green for real
361
-
362
- # Ensure we have a standard numpy array
363
- if not isinstance(result_img, np.ndarray):
364
- result_img = np.array(result_img, dtype=np.uint8)
365
-
366
- # Draw rectangle and text
367
- cv2.rectangle(result_img, (x1, y1), (x2, y2), color, 2)
368
- cv2.putText(result_img, text, (x1, y1-10), cv2.FONT_HERSHEY_SIMPLEX, 0.8, color, 2)
369
- elif "region" in result and result["region"] == "full_image":
370
- fake_prob = result["deepfake_prob"]
371
- is_fake = result["is_fake"]
372
-
373
- # Text for the whole image
374
- text = f"Image: {'FAKE' if is_fake else 'REAL'} ({fake_prob*100:.1f}%)"
375
-
376
- # Different colors for fake/real
377
- color = (0, 0, 255) if is_fake else (0, 255, 0) # Red for fake, green for real
378
-
379
- # Ensure we have a standard numpy array
380
- if not isinstance(result_img, np.ndarray):
381
- result_img = np.array(result_img, dtype=np.uint8)
382
-
383
- # Draw text
384
- cv2.putText(result_img, text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, color, 2)
385
- except Exception as e:
386
- print(f"Error drawing result {result}: {e}")
387
-
388
- return result_img
389
- except Exception as e:
390
- print(f"Error visualizing results: {e}")
391
- import traceback
392
- traceback.print_exc()
393
- return np.array(image, dtype=np.uint8) # Return the original image as a numpy array
394
 
395
  def process_image(input_image, damage_model_path, deepfake_model_path, deepfake_cfg_path,
396
- damage_threshold, deepfake_threshold, skip_damage, device_str):
397
- """Process an image through the car damage and deepfake detection pipeline"""
398
- progress_info = []
399
-
400
- # Convert Gradio image to numpy array
401
- if isinstance(input_image, dict) and "path" in input_image:
402
- img = cv2.imread(input_image["path"])
403
- elif isinstance(input_image, str):
404
- img = cv2.imread(input_image)
405
- elif isinstance(input_image, np.ndarray):
406
- # Make a copy to avoid modifying the original
407
- img = input_image.copy()
408
- # Convert from RGB to BGR (OpenCV format)
409
- if len(img.shape) == 3 and img.shape[2] == 3:
410
- img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
411
  else:
412
- return None, "Error: Unsupported image format"
413
-
414
- if img is None:
415
- return None, "Error: Could not read the image"
416
-
417
- # Progress update
418
- progress_info.append("Image loaded successfully")
419
-
420
- # Setup device
421
- device = setup_device(device_str)
422
- progress_info.append(f"Using device: {device}")
423
-
424
- # Initialize models
425
- damage_detector = None
426
- deepfake_model = None
427
- deepfake_cfg = None
428
-
429
- # Setup damage detector if not skipped
430
- if not skip_damage and damage_model_path:
431
- progress_info.append("Setting up damage detector...")
432
- damage_detector, detector_cfg = setup_damage_detector(damage_model_path, float(damage_threshold))
433
- if damage_detector is None and DETECTRON2_AVAILABLE:
434
- progress_info.append("Failed to initialize damage detector")
435
- else:
436
- progress_info.append("Damage detector initialized successfully")
437
-
438
- # Setup deepfake detector
439
- if deepfake_model_path and deepfake_cfg_path:
440
- progress_info.append("Setting up deepfake detector...")
441
- deepfake_model, deepfake_cfg = load_deepfake_model(deepfake_model_path, deepfake_cfg_path, device)
442
- if deepfake_model is None:
443
- progress_info.append("Failed to initialize deepfake detector")
444
- else:
445
- progress_info.append("Deepfake detector initialized successfully")
446
-
447
- # Ensure at least one detector is working
448
- if damage_detector is None and deepfake_model is None:
449
- return None, "Error: Neither damage nor deepfake detector is available"
450
-
451
- # Step 1: Detect damage or use whole image
452
- progress_info.append("Detecting damage regions...")
453
  start_time = time.time()
454
- img, damage_outputs, damage_regions = detect_damage(img, damage_detector)
455
- damage_time = time.time() - start_time
456
-
457
- if img is None:
458
- return None, "Error: Failed to process image"
459
 
460
- # Print damage detection results
461
- if damage_detector is not None and damage_regions:
462
- progress_info.append(f"Detected {len(damage_regions)} damage regions in {damage_time:.3f} seconds")
 
 
 
 
 
 
 
 
 
 
 
463
  else:
464
- progress_info.append("Using the whole image for analysis")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
465
 
466
- # Step 2: Check if damage is deepfake
467
- deepfake_results = []
468
- if deepfake_model is not None:
469
- progress_info.append("Performing deepfake detection...")
470
- start_time = time.time()
471
- deepfake_results = check_deepfake(
472
- img, damage_regions, deepfake_model, deepfake_cfg, device, float(deepfake_threshold)
473
- )
474
- deepfake_time = time.time() - start_time
 
 
 
 
 
 
 
 
 
 
475
 
476
- if deepfake_results:
477
- progress_info.append(f"Deepfake detection completed in {deepfake_time:.3f} seconds")
478
-
479
- # Generate report
480
- for result in deepfake_results:
481
- if "region_id" in result:
482
- region_id = result["region_id"]
483
- fake_prob = result["deepfake_prob"]
484
- is_fake = result["is_fake"]
485
- progress_info.append(f"Region {region_id}: {'FAKE' if is_fake else 'REAL'} (Probability: {fake_prob*100:.2f}%)")
486
- elif "region" in result and result["region"] == "full_image":
487
- fake_prob = result["deepfake_prob"]
488
- is_fake = result["is_fake"]
489
- progress_info.append(f"Whole image: {'FAKE' if is_fake else 'REAL'} (Probability: {fake_prob*100:.2f}%)")
490
- else:
491
- progress_info.append("No deepfake detection results")
492
 
493
- # Step 3: Visualize final results
494
- progress_info.append("Generating visualization...")
495
- result_img = visualize_results(img, damage_outputs, deepfake_results, float(damage_threshold))
 
 
 
 
 
 
 
 
 
 
 
 
496
 
497
- # Convert back to RGB for Gradio
498
- if len(result_img.shape) == 3 and result_img.shape[2] == 3:
499
- result_img = cv2.cvtColor(result_img, cv2.COLOR_BGR2RGB)
500
-
501
- progress_info.append("Processing complete!")
502
-
503
- return result_img, "\n".join(progress_info)
504
-
505
- def create_gradio_interface():
506
- with gr.Blocks(title="Car Damage & Deepfake Detection") as app:
507
- gr.Markdown("# Car Damage Detection & Deepfake Verification")
508
- gr.Markdown("Upload an image to detect car damage and check if it's a deepfake")
509
 
510
- with gr.Tab("Basic Interface"):
511
- with gr.Row():
512
- with gr.Column(scale=1):
513
- input_image = gr.Image(type="numpy", label="Input Image")
514
-
515
- # Simple controls
516
- skip_damage = gr.Checkbox(label="Skip Damage Detection", value=False)
517
- damage_threshold = gr.Slider(minimum=0.1, maximum=1.0, value=0.7, step=0.05,
518
- label="Damage Detection Threshold")
519
- deepfake_threshold = gr.Slider(minimum=0.1, maximum=1.0, value=0.5, step=0.05,
520
- label="Deepfake Detection Threshold")
521
- device = gr.Dropdown(choices=["auto", "cuda", "cpu", "mps"], value="auto",
522
- label="Computation Device")
523
-
524
- process_btn = gr.Button("Process Image", variant="primary")
525
-
526
- with gr.Column(scale=1):
527
- output_image = gr.Image(type="numpy", label="Result")
528
- output_text = gr.Textbox(label="Detection Results", lines=10)
529
-
530
- with gr.Tab("Advanced Settings"):
531
- with gr.Row():
532
- with gr.Column():
533
- damage_model_path = gr.Textbox(label="Damage Model Path",
534
- placeholder="Path to damage detection model (.pth)")
535
- deepfake_model_path = gr.Textbox(label="Deepfake Model Path",
536
- placeholder="Path to deepfake detection model (.pth)")
537
- deepfake_cfg_path = gr.Textbox(label="Deepfake Config Path",
538
- placeholder="Path to deepfake model config (.yaml)")
539
-
540
- # Connect the process function
541
- process_btn.click(
542
- fn=process_image,
543
- inputs=[
544
- input_image,
545
- damage_model_path,
546
- deepfake_model_path,
547
- deepfake_cfg_path,
548
- damage_threshold,
549
- deepfake_threshold,
550
- skip_damage,
551
- device
552
- ],
553
- outputs=[output_image, output_text]
554
- )
555
-
556
- # Examples
557
- gr.Markdown("## Examples")
558
- gr.Markdown("Note: Examples will only work if you have the appropriate models installed.")
559
-
560
- return app
561
- app = create_gradio_interface()
562
 
 
 
563
 
 
564
  if __name__ == "__main__":
565
- # Create and launch the Gradio interface
566
- app.launch(share=True) # Set share=False in production
 
26
 
27
  # Add current directory to path
28
  if not os.getcwd() in sys.path:
29
+ sys.path.append(os.getcwd())
30
 
31
  # Detectron2 imports - wrapped in try-except to make them optional
32
  try:
33
+ from detectron2.engine import DefaultPredictor
34
+ from detectron2.config import get_cfg
35
+ from detectron2.utils.visualizer import Visualizer, ColorMode
36
+ from detectron2 import model_zoo
37
+ DETECTRON2_AVAILABLE = True
38
  except ImportError:
39
+ print("Warning: Detectron2 is not installed. Damage detection will not be available.")
40
+ DETECTRON2_AVAILABLE = False
41
 
42
  # Check for custom path for models
43
  try:
44
+ from configs.get_config import load_config
45
+ from models import *
46
+ MODELS_IMPORTED = True
47
  except ImportError:
48
+ print("Warning: Custom models couldn't be imported. Only damage detection will work.")
49
+ MODELS_IMPORTED = False
50
 
51
  def setup_device(device_str):
52
+ """Set up the computation device based on user input and availability"""
53
+ if device_str == 'auto':
54
+ if torch.cuda.is_available():
 
 
 
 
 
 
55
  return torch.device('cuda:0')
56
+ elif hasattr(torch, 'backends') and hasattr(torch.backends, 'mps') and torch.backends.mps.is_available():
57
  return torch.device('mps')
58
  else:
 
59
  return torch.device('cpu')
60
+ elif device_str == 'cuda' and torch.cuda.is_available():
61
+ return torch.device('cuda:0')
62
+ elif device_str == 'mps' and hasattr(torch, 'backends') and hasattr(torch.backends, 'mps') and torch.backends.mps.is_available():
63
+ return torch.device('mps')
64
+ else:
65
+ print(f"Warning: Device {device_str} not available, using CPU instead.")
66
+ return torch.device('cpu')
67
 
68
  def setup_damage_detector(model_path, threshold=0.7):
69
+ """Set up the damage detection model using Detectron2"""
70
+ if not DETECTRON2_AVAILABLE:
71
+ print("Detectron2 is not installed. Cannot set up damage detector.")
72
+ return None, None
 
 
 
 
 
 
 
 
 
 
73
 
74
+ if model_path is None or not os.path.exists(model_path):
75
+ print("No damage model specified or file not found. Skipping damage detection.")
76
+ return None, None
 
77
 
78
+ cfg = get_cfg()
79
+ cfg.merge_from_file(model_zoo.get_config_file("COCO-InstanceSegmentation/mask_rcnn_R_50_FPN_3x.yaml"))
80
+ cfg.MODEL.WEIGHTS = model_path
81
+ cfg.MODEL.ROI_HEADS.NUM_CLASSES = 1 # Only one class (damage)
82
+ cfg.MODEL.ROI_HEADS.SCORE_THRESH_TEST = threshold
83
+
84
+ # Explicitly set to use CPU if on Mac (MPS)
85
+ if torch.backends.mps.is_available():
86
+ cfg.MODEL.DEVICE = "cpu"
87
+ print("Mac MPS detected - forcing Detectron2 to use CPU")
88
+
89
+ try:
90
+ predictor = DefaultPredictor(cfg)
91
+ return predictor, cfg
92
+ except Exception as e:
93
+ print(f"Error setting up damage detector: {e}")
94
+ return None, cfg
95
 
96
  def load_deepfake_model(model_path, cfg_path, device):
97
+ """Load the deepfake detection model"""
98
+ if not MODELS_IMPORTED:
99
+ print("Custom models module not imported. Cannot load deepfake model.")
100
+ return None, None
101
+
102
+ if model_path is None or not os.path.exists(model_path):
103
+ print("No deepfake model specified or file not found. Skipping deepfake detection.")
104
+ return None, None
105
+
106
+ if cfg_path is None or not os.path.exists(cfg_path):
107
+ print("No deepfake config specified or file not found. Skipping deepfake detection.")
108
+ return None, None
109
+
110
+ try:
111
+ # Load config
112
+ cfg = load_config(cfg_path)
113
+
114
+ # Build model
115
+ model = build_model(cfg.MODEL, MODELS)
116
+
117
+ # Load weights
118
+ print(f"Loading deepfake model from: {model_path}")
119
+ checkpoint = torch.load(model_path, map_location='cpu')
120
+
121
+ if isinstance(checkpoint, dict) and 'state_dict' in checkpoint:
122
+ model.load_state_dict(checkpoint['state_dict'])
123
+ else:
124
+ model.load_state_dict(checkpoint)
125
 
126
+ # Move model to device and set to evaluation mode
127
+ model = model.to(device)
128
+ if hasattr(cfg.MODEL, 'precision') and cfg.MODEL.precision == 'fp64':
129
+ model = model.to(torch.float64)
130
+ model.eval()
131
+
132
+ return model, cfg
133
+ except Exception as e:
134
+ print(f"Error loading deepfake model: {e}")
135
+ import traceback
136
+ traceback.print_exc()
137
+ return None, None
138
+
139
+ def preprocess_for_deepfake(image, cfg, device):
140
+ """Preprocess an image for deepfake detection"""
141
+ try:
142
+ # Convert to RGB if needed
143
+ if len(image.shape) == 3 and image.shape[2] == 3:
144
+ if image.dtype != np.uint8:
145
+ image = (image * 255).astype(np.uint8)
146
+ rgb_img = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
147
+ else:
148
+ rgb_img = image
149
 
150
+ # Resize
151
+ img_resized = cv2.resize(rgb_img, (cfg.DATASET.IMAGE_SIZE[0], cfg.DATASET.IMAGE_SIZE[1]))
152
+
153
+ # Convert to PIL and apply transforms
154
+ transform = transforms.Compose([
155
+ transforms.ToTensor(),
156
+ transforms.Normalize(
157
+ mean=cfg.DATASET.TRANSFORM.normalize.mean,
158
+ std=cfg.DATASET.TRANSFORM.normalize.std
159
+ )
160
+ ])
161
+
162
+ img_tensor = transform(Image.fromarray(img_resized)).unsqueeze(0) # Add batch dimension
163
+ img_tensor = img_tensor.to(device)
164
+
165
+ # Convert to correct precision
166
+ if hasattr(cfg.MODEL, 'precision') and cfg.MODEL.precision == 'fp64':
167
+ img_tensor = img_tensor.to(torch.float64)
168
 
169
+ return img_tensor
170
+ except Exception as e:
171
+ print(f"Error preprocessing image for deepfake detection: {e}")
172
+ import traceback
173
+ traceback.print_exc()
174
+ return None
175
+
176
+ def detect_damage(img, damage_detector):
177
+ """Detect damage in an image"""
178
+ try:
179
+ if img is None:
180
+ raise ValueError("Invalid image")
181
 
182
+ # If no damage detector available, return the whole image as region
183
+ if damage_detector is None:
184
+ print("No damage detector available. Using whole image as region.")
185
+ h, w = img.shape[:2]
186
+ damage_regions = [{
187
+ "box": (0, 0, w, h),
188
+ "score": 1.0,
189
+ "mask": None
190
+ }]
191
+ return img, None, damage_regions
192
 
193
+ # Run inference
194
+ outputs = damage_detector(img)
195
+
196
+ # Get damage regions
197
+ instances = outputs["instances"].to("cpu")
198
+ boxes = instances.pred_boxes.tensor.numpy() if instances.has("pred_boxes") else []
199
+ scores = instances.scores.numpy() if instances.has("scores") else []
200
+ masks = instances.pred_masks.numpy() if instances.has("pred_masks") else []
201
+
202
+ damage_regions = []
203
+ for i in range(len(boxes)):
204
+ x1, y1, x2, y2 = map(int, boxes[i])
205
+ damage_regions.append({
206
+ "box": (x1, y1, x2, y2),
207
+ "score": float(scores[i]),
208
+ "mask": masks[i] if len(masks) > i else None
209
+ })
210
+
211
+ if not damage_regions:
212
+ print("No damage detected. Using whole image.")
213
+ h, w = img.shape[:2]
214
+ damage_regions = [{
215
+ "box": (0, 0, w, h),
216
+ "score": 1.0,
217
+ "mask": None
218
+ }]
219
 
220
+ return img, outputs, damage_regions
221
+ except Exception as e:
222
+ print(f"Error detecting damage: {e}")
223
+ # If error occurs, return the whole image as region
224
+ if 'img' in locals() and img is not None:
225
+ h, w = img.shape[:2]
226
+ damage_regions = [{
227
+ "box": (0, 0, w, h),
228
+ "score": 1.0,
229
+ "mask": None
230
+ }]
231
+ return img, None, damage_regions
232
+ return None, None, []
233
 
234
+ def check_deepfake(image, damage_regions, deepfake_model, deepfake_cfg, device, threshold=0.5):
235
+ """Check if damage regions are deepfakes"""
236
+ results = []
237
+
238
+ if deepfake_model is None:
239
+ print("No deepfake model available. Skipping deepfake detection.")
240
+ return []
241
+
242
+ try:
243
+ # If no damage regions, check the entire image
244
+ if not damage_regions:
245
+ img_tensor = preprocess_for_deepfake(image, deepfake_cfg, device)
246
+ if img_tensor is None:
247
+ return []
248
 
249
+ # Run inference
250
+ with torch.no_grad():
251
+ outputs = deepfake_model(img_tensor)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
252
 
253
+ # Extract outputs
254
+ if isinstance(outputs, list):
255
+ outputs = outputs[0]
 
 
 
 
 
 
 
 
 
256
 
257
+ if isinstance(outputs, dict) and 'cls' in outputs:
258
+ cls_outputs = outputs['cls']
259
+ cls_prob = cls_outputs.sigmoid().cpu().numpy()
260
+ else:
261
+ # Assuming the output is directly the classification probability
262
+ cls_prob = outputs.sigmoid().cpu().numpy() if hasattr(outputs, 'sigmoid') else outputs.cpu().numpy()
 
 
 
 
263
 
264
+ if cls_prob.size > 0:
265
+ is_fake = cls_prob[0][0] > threshold if cls_prob.ndim > 1 else cls_prob[0] > threshold
266
+ confidence = cls_prob[0][0] if cls_prob.ndim > 1 else cls_prob[0]
267
+
268
+ results.append({
269
+ "region": "full_image",
270
+ "deepfake_prob": float(confidence),
271
+ "is_fake": bool(is_fake)
272
+ })
273
 
274
+ return results
275
+
276
+ # Process each damage region
277
+ for i, region in enumerate(damage_regions):
278
+ x1, y1, x2, y2 = region["box"]
279
+ # Ensure coordinates are within image bounds
280
+ x1, y1 = max(0, x1), max(0, y1)
281
+ x2, y2 = min(image.shape[1], x2), min(image.shape[0], y2)
282
 
283
+ # Extract region and check if it's a deepfake
284
+ if x2 > x1 and y2 > y1:
285
+ # Get ROI
286
+ roi = image[y1:y2, x1:x2]
 
 
 
 
287
 
288
+ # Preprocess
289
+ img_tensor = preprocess_for_deepfake(roi, deepfake_cfg, device)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
290
  if img_tensor is None:
291
+ continue
292
+
293
  # Run inference
294
  with torch.no_grad():
295
  outputs = deepfake_model(img_tensor)
 
310
  confidence = cls_prob[0][0] if cls_prob.ndim > 1 else cls_prob[0]
311
 
312
  results.append({
313
+ "region_id": i,
314
+ "box": (x1, y1, x2, y2),
315
  "deepfake_prob": float(confidence),
316
  "is_fake": bool(is_fake)
317
  })
318
+
319
+ return results
320
+ except Exception as e:
321
+ print(f"Error in deepfake detection: {e}")
322
+ import traceback
323
+ traceback.print_exc()
324
+ return []
325
+
326
+ def visualize_results(image, damage_outputs, deepfake_results, damage_threshold):
327
+ """Create visualization of damage detection and deepfake verification"""
328
+ try:
329
+ # Create a copy for visualization
330
+ img_copy = image.copy()
331
+
332
+ # Draw damage detection results
333
+ if damage_outputs is not None and DETECTRON2_AVAILABLE:
334
+ try:
335
+ v = Visualizer(img_copy[:, :, ::-1], scale=1.0, instance_mode=ColorMode.IMAGE_BW)
336
+ v = v.draw_instance_predictions(damage_outputs["instances"].to("cpu"))
337
+ result_img = v.get_image()[:, :, ::-1]
338
 
339
+ # Convert to a standard numpy array to ensure compatibility with OpenCV
340
+ result_img = np.array(result_img, dtype=np.uint8)
341
+ except Exception as e:
342
+ print(f"Error visualizing damage detection: {e}")
343
+ result_img = img_copy
344
+ else:
345
+ result_img = img_copy
346
+
347
+ # Add deepfake detection results
348
+ for result in deepfake_results:
349
+ try:
350
+ if "box" in result:
351
+ x1, y1, x2, y2 = result["box"]
352
+ fake_prob = result["deepfake_prob"]
353
+ is_fake = result["is_fake"]
354
+ region_id = result.get("region_id", 0)
355
 
356
+ # Text for the region
357
+ text = f"R{region_id}: {'FAKE' if is_fake else 'REAL'} ({fake_prob*100:.1f}%)"
 
 
358
 
359
+ # Different colors for fake/real
360
+ color = (0, 0, 255) if is_fake else (0, 255, 0) # Red for fake, green for real
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
361
 
362
+ # Ensure we have a standard numpy array
363
+ if not isinstance(result_img, np.ndarray):
364
+ result_img = np.array(result_img, dtype=np.uint8)
365
+
366
+ # Draw rectangle and text
367
+ cv2.rectangle(result_img, (x1, y1), (x2, y2), color, 2)
368
+ cv2.putText(result_img, text, (x1, y1-10), cv2.FONT_HERSHEY_SIMPLEX, 0.8, color, 2)
369
+ elif "region" in result and result["region"] == "full_image":
370
+ fake_prob = result["deepfake_prob"]
371
+ is_fake = result["is_fake"]
372
+
373
+ # Text for the whole image
374
+ text = f"Image: {'FAKE' if is_fake else 'REAL'} ({fake_prob*100:.1f}%)"
375
+
376
+ # Different colors for fake/real
377
+ color = (0, 0, 255) if is_fake else (0, 255, 0) # Red for fake, green for real
378
+
379
+ # Ensure we have a standard numpy array
380
+ if not isinstance(result_img, np.ndarray):
381
+ result_img = np.array(result_img, dtype=np.uint8)
382
+
383
+ # Draw text
384
+ cv2.putText(result_img, text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, color, 2)
385
+ except Exception as e:
386
+ print(f"Error drawing result {result}: {e}")
387
+
388
+ return result_img
389
+ except Exception as e:
390
+ print(f"Error visualizing results: {e}")
391
+ import traceback
392
+ traceback.print_exc()
393
+ return np.array(image, dtype=np.uint8) # Return the original image as a numpy array
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
394
 
395
  def process_image(input_image, damage_model_path, deepfake_model_path, deepfake_cfg_path,
396
+ damage_threshold, deepfake_threshold, skip_damage, device_str):
397
+ """Process an image through the car damage and deepfake detection pipeline"""
398
+ progress_info = []
399
+
400
+ # Convert Gradio image to numpy array
401
+ if isinstance(input_image, dict) and "path" in input_image:
402
+ img = cv2.imread(input_image["path"])
403
+ elif isinstance(input_image, str):
404
+ img = cv2.imread(input_image)
405
+ elif isinstance(input_image, np.ndarray):
406
+ # Make a copy to avoid modifying the original
407
+ img = input_image.copy()
408
+ # Convert from RGB to BGR (OpenCV format)
409
+ if len(img.shape) == 3 and img.shape[2] == 3:
410
+ img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR)
411
+ else:
412
+ return None, "Error: Unsupported image format"
413
+
414
+ if img is None:
415
+ return None, "Error: Could not read the image"
416
+
417
+ # Progress update
418
+ progress_info.append("Image loaded successfully")
419
+
420
+ # Setup device
421
+ device = setup_device(device_str)
422
+ progress_info.append(f"Using device: {device}")
423
+
424
+ # Initialize models
425
+ damage_detector = None
426
+ deepfake_model = None
427
+ deepfake_cfg = None
428
+
429
+ # Setup damage detector if not skipped
430
+ if not skip_damage and damage_model_path:
431
+ progress_info.append("Setting up damage detector...")
432
+ damage_detector, detector_cfg = setup_damage_detector(damage_model_path, float(damage_threshold))
433
+ if damage_detector is None and DETECTRON2_AVAILABLE:
434
+ progress_info.append("Failed to initialize damage detector")
435
  else:
436
+ progress_info.append("Damage detector initialized successfully")
437
+
438
+ # Setup deepfake detector
439
+ if deepfake_model_path and deepfake_cfg_path:
440
+ progress_info.append("Setting up deepfake detector...")
441
+ deepfake_model, deepfake_cfg = load_deepfake_model(deepfake_model_path, deepfake_cfg_path, device)
442
+ if deepfake_model is None:
443
+ progress_info.append("Failed to initialize deepfake detector")
444
+ else:
445
+ progress_info.append("Deepfake detector initialized successfully")
446
+
447
+ # Ensure at least one detector is working
448
+ if damage_detector is None and deepfake_model is None:
449
+ return None, "Error: Neither damage nor deepfake detector is available"
450
+
451
+ # Step 1: Detect damage or use whole image
452
+ progress_info.append("Detecting damage regions...")
453
+ start_time = time.time()
454
+ img, damage_outputs, damage_regions = detect_damage(img, damage_detector)
455
+ damage_time = time.time() - start_time
456
+
457
+ if img is None:
458
+ return None, "Error: Failed to process image"
459
+
460
+ # Print damage detection results
461
+ if damage_detector is not None and damage_regions:
462
+ progress_info.append(f"Detected {len(damage_regions)} damage regions in {damage_time:.3f} seconds")
463
+ else:
464
+ progress_info.append("Using the whole image for analysis")
465
+
466
+ # Step 2: Check if damage is deepfake
467
+ deepfake_results = []
468
+ if deepfake_model is not None:
469
+ progress_info.append("Performing deepfake detection...")
 
 
 
 
 
 
 
470
  start_time = time.time()
471
+ deepfake_results = check_deepfake(
472
+ img, damage_regions, deepfake_model, deepfake_cfg, device, float(deepfake_threshold)
473
+ )
474
+ deepfake_time = time.time() - start_time
 
475
 
476
+ if deepfake_results:
477
+ progress_info.append(f"Deepfake detection completed in {deepfake_time:.3f} seconds")
478
+
479
+ # Generate report
480
+ for result in deepfake_results:
481
+ if "region_id" in result:
482
+ region_id = result["region_id"]
483
+ fake_prob = result["deepfake_prob"]
484
+ is_fake = result["is_fake"]
485
+ progress_info.append(f"Region {region_id}: {'FAKE' if is_fake else 'REAL'} (Probability: {fake_prob*100:.2f}%)")
486
+ elif "region" in result and result["region"] == "full_image":
487
+ fake_prob = result["deepfake_prob"]
488
+ is_fake = result["is_fake"]
489
+ progress_info.append(f"Whole image: {'FAKE' if is_fake else 'REAL'} (Probability: {fake_prob*100:.2f}%)")
490
  else:
491
+ progress_info.append("No deepfake detection results")
492
+
493
+ # Step 3: Visualize final results
494
+ progress_info.append("Generating visualization...")
495
+ result_img = visualize_results(img, damage_outputs, deepfake_results, float(damage_threshold))
496
+
497
+ # Convert back to RGB for Gradio
498
+ if len(result_img.shape) == 3 and result_img.shape[2] == 3:
499
+ result_img = cv2.cvtColor(result_img, cv2.COLOR_BGR2RGB)
500
+
501
+ progress_info.append("Processing complete!")
502
+
503
+ return result_img, "\n".join(progress_info)
504
+
505
+ def create_gradio_interface():
506
+ with gr.Blocks(title="Car Damage & Deepfake Detection") as app:
507
+ gr.Markdown("# Car Damage Detection & Deepfake Verification")
508
+ gr.Markdown("Upload an image to detect car damage and check if it's a deepfake")
509
 
510
+ with gr.Tab("Basic Interface"):
511
+ with gr.Row():
512
+ with gr.Column(scale=1):
513
+ input_image = gr.Image(type="numpy", label="Input Image")
514
+
515
+ # Simple controls
516
+ skip_damage = gr.Checkbox(label="Skip Damage Detection", value=False)
517
+ damage_threshold = gr.Slider(minimum=0.1, maximum=1.0, value=0.7, step=0.05,
518
+ label="Damage Detection Threshold")
519
+ deepfake_threshold = gr.Slider(minimum=0.1, maximum=1.0, value=0.5, step=0.05,
520
+ label="Deepfake Detection Threshold")
521
+ device = gr.Dropdown(choices=["auto", "cuda", "cpu", "mps"], value="auto",
522
+ label="Computation Device")
523
+
524
+ process_btn = gr.Button("Process Image", variant="primary")
525
+
526
+ with gr.Column(scale=1):
527
+ output_image = gr.Image(type="numpy", label="Result")
528
+ output_text = gr.Textbox(label="Detection Results", lines=10)
529
 
530
+ with gr.Tab("Advanced Settings"):
531
+ with gr.Row():
532
+ with gr.Column():
533
+ damage_model_path = gr.Textbox(label="Damage Model Path",
534
+ placeholder="Path to damage detection model (.pth)")
535
+ deepfake_model_path = gr.Textbox(label="Deepfake Model Path",
536
+ placeholder="Path to deepfake detection model (.pth)")
537
+ deepfake_cfg_path = gr.Textbox(label="Deepfake Config Path",
538
+ placeholder="Path to deepfake model config (.yaml)")
 
 
 
 
 
 
 
539
 
540
+ # Connect the process function
541
+ process_btn.click(
542
+ fn=process_image,
543
+ inputs=[
544
+ input_image,
545
+ damage_model_path,
546
+ deepfake_model_path,
547
+ deepfake_cfg_path,
548
+ damage_threshold,
549
+ deepfake_threshold,
550
+ skip_damage,
551
+ device
552
+ ],
553
+ outputs=[output_image, output_text]
554
+ )
555
 
556
+ # Examples
557
+ gr.Markdown("## Examples")
558
+ gr.Markdown("Note: Examples will only work if you have the appropriate models installed.")
 
 
 
 
 
 
 
 
 
559
 
560
+ return app
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
561
 
562
+ # Create and launch the app
563
+ app = create_gradio_interface()
564
 
565
+ # For local testing and Hugging Face Spaces, with debugging enabled
566
  if __name__ == "__main__":
567
+ app.launch(debug=True) # Enable debug mode to see detailed error messages