ab2207 commited on
Commit
f90165a
·
verified ·
1 Parent(s): 2a08a48
Files changed (1) hide show
  1. app.py +362 -933
app.py CHANGED
@@ -1,16 +1,10 @@
1
- """
2
- Unified Face Tool - Detection + Privacy + Smart Privacy with Recognition
3
- Combined system with 3 modes: Detection, Blur, and Smart Blur with Identification
4
- """
5
-
6
  # --- Standard Libraries ---
7
  import logging
8
  import atexit
9
  import tempfile
10
  import os
11
  import hashlib
12
- from abc import ABC, abstractmethod
13
- from dataclasses import dataclass, field
14
  from typing import Any, Dict, List, Tuple, Optional
15
  from pathlib import Path
16
 
@@ -20,1051 +14,486 @@ import numpy as np
20
  import gradio as gr
21
  from ultralytics import YOLO
22
 
23
- # --- Face Recognition Libraries (Optional) ---
24
  try:
25
  from deepface import DeepFace
26
  DEEPFACE_AVAILABLE = True
27
  except ImportError:
28
  DEEPFACE_AVAILABLE = False
29
- logging.warning("⚠️ DeepFace not installed - recognition features will fallback to 'Unknown'")
30
 
31
  try:
32
  import chromadb
33
  CHROMADB_AVAILABLE = True
34
  except ImportError:
35
  CHROMADB_AVAILABLE = False
36
- logging.warning("⚠️ ChromaDB not installed - recognition features will fallback to 'Unknown'")
37
 
38
  # --- Configure Logging ---
39
  logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
40
  logger = logging.getLogger(__name__)
41
 
42
  # ====================================================
43
- # TEMPORARY FILE CLEANUP
44
  # ====================================================
 
 
 
 
 
 
45
  TEMP_FILES = []
46
 
47
  def cleanup_temp_files():
48
- """Clean up any temporary files created during the session on exit."""
49
  for f in TEMP_FILES:
50
  try:
51
  if os.path.exists(f):
52
  os.remove(f)
53
- logger.info(f"🗑️ Cleaned up temporary file: {f}")
54
- except Exception as e:
55
- logger.warning(f"⚠️ Failed to delete temporary file {f}: {e}")
56
 
57
  atexit.register(cleanup_temp_files)
58
 
59
  def create_temp_file(suffix=".mp4") -> str:
60
- """Creates a temporary file and registers it for cleanup."""
61
  path = tempfile.mktemp(suffix=suffix)
62
  TEMP_FILES.append(path)
63
  return path
64
 
65
  # ====================================================
66
- # SENSITIVITY MAPPING
67
- # ====================================================
68
- SENSITIVITY_MAP = {
69
- "Low (Catch More)": 0.3,
70
- "Balanced (Default)": 0.5,
71
- "High (Very Strict)": 0.7
72
- }
73
-
74
- def get_confidence_from_sensitivity(sensitivity: str) -> float:
75
- """Converts user-friendly sensitivity text to numerical confidence threshold."""
76
- return SENSITIVITY_MAP.get(sensitivity, 0.5)
77
-
78
- # ====================================================
79
- # CONFIGURATION DATA CLASSES
80
- # ====================================================
81
- @dataclass
82
- class BlurConfig:
83
- """Configuration for blur effects."""
84
- type: str = "pixelate"
85
- intensity: float = 25.0
86
- pixel_size: int = 25
87
- solid_color: Tuple[int, int, int] = (0, 0, 0)
88
- adaptive_blur: bool = True
89
- min_kernel: int = 15
90
- max_kernel: int = 95
91
-
92
- @dataclass
93
- class DetectionConfig:
94
- """Configuration for the face detector."""
95
- min_confidence: float = 0.5
96
- model_path: str = "yolov8n-face.pt"
97
-
98
- @dataclass
99
- class AppConfig:
100
- """Main application configuration."""
101
- blur: BlurConfig = field(default_factory=BlurConfig)
102
- detection: DetectionConfig = field(default_factory=DetectionConfig)
103
- scaling_factor: float = 1.2
104
- forehead_margin: int = 20
105
- face_margin: int = 15
106
-
107
  # ====================================================
108
- # BLUR EFFECTS (STRATEGY PATTERN)
109
- # ====================================================
110
- class BlurEffect(ABC):
111
- """Abstract base class for blur effects."""
112
- def __init__(self, config: BlurConfig):
113
- self.config = config
114
-
115
- @abstractmethod
116
- def apply(self, image: np.ndarray, roi: Tuple[int, int, int, int]) -> np.ndarray:
117
- """Apply the blur effect to the region of interest (ROI)."""
118
- pass
119
-
120
- class GaussianBlur(BlurEffect):
121
- """Gaussian blur with adaptive kernel sizing for a natural look."""
122
- def apply(self, image: np.ndarray, roi: Tuple[int, int, int, int]) -> np.ndarray:
123
- x, y, w, h = roi
124
- face_roi = image[y:y+h, x:x+w]
125
- if face_roi.size == 0:
126
- return image
127
-
128
- if self.config.adaptive_blur:
129
- min_dim = min(w, h)
130
- kernel_val = int(min_dim * (self.config.intensity / 100.0))
131
- kernel_val = max(self.config.min_kernel, min(kernel_val, self.config.max_kernel))
132
- else:
133
- kernel_val = int(self.config.intensity)
134
 
135
- kernel_val = kernel_val | 1 # Ensure kernel size is odd
136
- blurred_roi = cv2.GaussianBlur(face_roi, (kernel_val, kernel_val), 0)
137
- image[y:y+h, x:x+w] = blurred_roi
138
- return image
139
-
140
- class PixelateBlur(BlurEffect):
141
- """Pixelation effect for a retro/digital privacy look."""
142
- def apply(self, image: np.ndarray, roi: Tuple[int, int, int, int]) -> np.ndarray:
143
- x, y, w, h = roi
144
- face_roi = image[y:y+h, x:x+w]
145
- if face_roi.size == 0:
146
- return image
147
-
148
- h_roi, w_roi = face_roi.shape[:2]
149
- pixel_size = self.config.pixel_size
150
- if pixel_size <= 0:
151
- return image
152
-
153
- small = cv2.resize(face_roi, (max(1, w_roi // pixel_size), max(1, h_roi // pixel_size)), interpolation=cv2.INTER_LINEAR)
154
- pixelated = cv2.resize(small, (w_roi, h_roi), interpolation=cv2.INTER_NEAREST)
155
- image[y:y+h, x:x+w] = pixelated
156
- return image
157
-
158
- class SolidColorBlur(BlurEffect):
159
- """Solid color rectangle overlay for complete redaction."""
160
- def apply(self, image: np.ndarray, roi: Tuple[int, int, int, int]) -> np.ndarray:
161
- x, y, w, h = roi
162
- cv2.rectangle(image, (x, y), (x+w, y+h), self.config.solid_color, -1)
163
- return image
164
-
165
- def get_blur_effect(config: BlurConfig) -> BlurEffect:
166
- """Factory function to create a blur effect instance."""
167
- blur_effects = {"gaussian": GaussianBlur, "pixelate": PixelateBlur, "solid": SolidColorBlur}
168
- blur_class = blur_effects.get(config.type)
169
- if not blur_class:
170
- raise ValueError(f"Unknown blur type: {config.type}")
171
- return blur_class(config)
172
-
173
- # ====================================================
174
- # FACE DATABASE (Simplified, no UI)
175
- # ====================================================
176
- class SimpleFaceDatabase:
177
- """Simplified face recognition system using ChromaDB - backend only."""
178
-
179
- def __init__(
180
- self,
181
- known_faces_dir: str = "known_faces",
182
- db_path: str = "./chroma_db",
183
- model_name: str = "Facenet512"
184
- ):
185
- self.known_faces_dir = Path(known_faces_dir)
186
- self.model_name = model_name
187
- self.db_path = db_path
188
- self.available = CHROMADB_AVAILABLE and DEEPFACE_AVAILABLE
189
-
190
- if not self.available:
191
- logger.warning("⚠️ Face recognition not available - will use 'Unknown' labels")
192
- self.client = None
193
- self.collection = None
194
  return
195
-
196
- # Initialize ChromaDB
197
- logger.info("🔧 Initializing face recognition database...")
198
  try:
199
  self.client = chromadb.PersistentClient(path=db_path)
200
  self.collection = self.client.get_or_create_collection(
201
  name="face_embeddings",
202
  metadata={"hnsw:space": "cosine"}
203
  )
 
204
 
205
- existing_count = self.collection.count()
206
- logger.info(f"📊 Database contains {existing_count} face embeddings")
207
-
208
- if self.known_faces_dir.exists():
209
- self._index_faces_from_folders()
210
  else:
211
- self.known_faces_dir.mkdir(parents=True, exist_ok=True)
 
212
 
213
  except Exception as e:
214
- logger.error(f"❌ Database initialization failed: {e}")
215
- self.available = False
216
- self.client = None
217
- self.collection = None
218
-
219
- def _get_image_hash(self, img_path: Path) -> str:
220
- """Generate unique hash for an image."""
221
  with open(img_path, 'rb') as f:
222
  return hashlib.md5(f.read()).hexdigest()
223
-
224
- def _index_faces_from_folders(self):
225
- """Auto-index faces from known_faces/ folder structure."""
226
- logger.info("🔄 Scanning for faces to index...")
227
-
228
- person_dirs = [d for d in self.known_faces_dir.iterdir() if d.is_dir()]
229
- indexed_count = 0
230
-
231
- for person_dir in person_dirs:
232
- folder_name = person_dir.name
233
- parts = folder_name.split('_', 1)
234
-
235
- if len(parts) != 2:
236
- logger.warning(f"⚠️ Skipping '{folder_name}' - use format: 'ID_Name'")
237
- continue
238
 
239
- person_id, person_name = parts
240
- person_name = person_name.replace('_', ' ').title()
 
 
 
 
 
 
241
 
242
- image_files = list(person_dir.glob("*.jpg")) + \
243
- list(person_dir.glob("*.png")) + \
244
- list(person_dir.glob("*.jpeg"))
245
 
246
- for img_path in image_files:
247
  try:
248
- img_hash = self._get_image_hash(img_path)
249
- existing = self.collection.get(ids=[img_hash], include=[])
250
-
251
- if existing['ids']:
252
  continue
253
-
254
- embedding_obj = DeepFace.represent(
255
- img_path=str(img_path),
256
- model_name=self.model_name,
 
257
  enforce_detection=False
258
  )
259
 
260
- if not embedding_obj:
261
- continue
262
-
263
- embedding = embedding_obj[0]["embedding"]
264
-
265
- self.collection.add(
266
- embeddings=[embedding],
267
- documents=[str(img_path)],
268
- metadatas=[{
269
- "person_id": person_id,
270
- "person_name": person_name,
271
- "image_file": img_path.name
272
- }],
273
- ids=[img_hash]
274
- )
275
-
276
- indexed_count += 1
277
- logger.info(f"✅ Indexed: {person_name} (ID: {person_id})")
278
-
279
  except Exception as e:
280
- logger.error(f" Failed to index {img_path}: {e}")
281
 
282
- if indexed_count > 0:
283
- logger.info(f"💾 Indexed {indexed_count} new face(s)")
284
-
285
- def recognize_face(self, face_image: np.ndarray, threshold: float = 0.45) -> Dict[str, Any]:
286
- """Recognize a face using ChromaDB vector search."""
287
- if not self.available or self.collection is None or self.collection.count() == 0:
288
- return {"match": False, "person_id": "unknown", "name": "Unknown"}
 
 
289
 
 
 
 
290
  try:
291
- temp_path = "temp_face.jpg"
292
- cv2.imwrite(temp_path, cv2.cvtColor(face_image, cv2.COLOR_RGB2BGR))
 
293
 
294
- embedding_obj = DeepFace.represent(
295
  img_path=temp_path,
296
- model_name=self.model_name,
297
  enforce_detection=False
298
  )
299
-
300
- if os.path.exists(temp_path):
301
- os.remove(temp_path)
302
-
303
- if not embedding_obj:
304
- return {"match": False, "person_id": "unknown", "name": "Unknown"}
305
-
306
- face_embedding = embedding_obj[0]["embedding"]
307
-
308
  results = self.collection.query(
309
- query_embeddings=[face_embedding],
310
- n_results=1,
311
- include=["metadatas", "distances"]
312
  )
313
-
314
- if not results['ids'][0]:
315
- return {"match": False, "person_id": "unknown", "name": "Unknown"}
316
-
317
  distance = results['distances'][0][0]
318
  metadata = results['metadatas'][0][0]
319
-
320
- if distance < threshold:
 
321
  return {
322
  "match": True,
323
- "person_id": metadata['person_id'],
324
- "name": metadata['person_name']
 
325
  }
326
-
327
- return {"match": False, "person_id": "unknown", "name": "Unknown"}
328
-
329
  except Exception as e:
330
- logger.error(f"Recognition error: {e}")
331
- return {"match": False, "person_id": "unknown", "name": "Unknown"}
332
 
333
- # Global database instance
334
- FACE_DB: Optional[SimpleFaceDatabase] = None
 
 
335
 
336
- def get_face_database() -> SimpleFaceDatabase:
337
- """Get or create the global face database."""
338
- global FACE_DB
339
- if FACE_DB is None:
340
- FACE_DB = SimpleFaceDatabase()
341
- return FACE_DB
342
 
343
  # ====================================================
344
- # UNIFIED YOLO DETECTOR
345
  # ====================================================
346
- class UnifiedYOLODetector:
347
- """Unified face detector using YOLOv8-Face model."""
348
- def __init__(self, config: DetectionConfig):
349
  try:
350
- logger.info(f"📦 Loading YOLOv8-Face model: {config.model_path}")
351
- self.model = YOLO(config.model_path)
352
- self.min_conf = config.min_confidence
353
- logger.info("✅ Model loaded successfully.")
354
  except Exception as e:
355
- logger.error(f"❌ Failed to load model: {e}")
356
- raise RuntimeError(f"Model loading failed. Ensure '{config.model_path}' is available.") from e
357
-
358
- def detect_faces(
359
- self,
360
- image: np.ndarray,
361
- conf_threshold: float,
362
- return_annotated: bool = False,
363
- recognize: bool = False,
364
- return_face_info: bool = True
365
- ) -> Tuple[List[Dict[str, Any]], Optional[np.ndarray]]:
366
- """Detects faces with optional annotation and recognition."""
367
- results = self.model(image, conf=conf_threshold, verbose=False)
368
  faces = []
369
- annotated_image = image.copy() if return_annotated else None
370
-
371
- # Get face database for recognition if needed
372
- face_db = get_face_database() if recognize else None
373
-
374
  for r in results:
375
- if r.boxes is None:
376
- continue
377
  for box in r.boxes:
378
  x1, y1, x2, y2 = map(int, box.xyxy[0])
379
- confidence = float(box.conf[0])
380
-
381
- face_info = {
382
- "x": x1, "y": y1,
383
- "width": x2 - x1,
384
- "height": y2 - y1,
385
- "confidence": confidence
386
- }
387
-
388
- # Add recognition if requested
389
- if recognize and face_db:
390
- face_crop = image[y1:y2, x1:x2]
391
- if face_crop.size > 0:
392
- recognition_result = face_db.recognize_face(face_crop)
393
- face_info.update(recognition_result)
394
- else:
395
- face_info.update({"match": False, "person_id": "unknown", "name": "Unknown"})
396
-
397
- faces.append(face_info)
398
-
399
- # Draw annotations if requested (Detection mode)
400
- if return_annotated and not recognize:
401
- cv2.rectangle(annotated_image, (x1, y1), (x2, y2), (0, 255, 0), 3)
402
- label = "Face"
403
- (w, h), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2)
404
- cv2.rectangle(annotated_image, (x1, y1 - h - 10), (x1 + w, y1), (0, 255, 0), -1)
405
- cv2.putText(annotated_image, label, (x1, y1 - 5),
406
- cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 0), 2)
407
-
408
- return faces, annotated_image
409
 
410
- GLOBAL_DETECTOR: Optional[UnifiedYOLODetector] = None
411
-
412
- def get_global_detector() -> UnifiedYOLODetector:
413
- """Initializes and returns the global singleton detector instance."""
414
- global GLOBAL_DETECTOR
415
- if GLOBAL_DETECTOR is None:
416
- GLOBAL_DETECTOR = UnifiedYOLODetector(DetectionConfig())
417
- return GLOBAL_DETECTOR
418
 
419
  # ====================================================
420
- # SMART PRIVACY APPLICATION
421
  # ====================================================
422
- class SmartPrivacyApp:
423
- """Application that combines blur and recognition."""
424
- def __init__(self, config: AppConfig, detector: UnifiedYOLODetector):
425
- self.config = config
426
- self.blur_effect = get_blur_effect(config.blur)
427
- self.detector = detector
428
 
429
- def _expand_bbox(self, bbox: Dict[str, Any], img_shape: Tuple[int, int]) -> Tuple[int, int, int, int]:
430
- """Expands a bounding box to include margins for better coverage."""
431
- h_img, w_img = img_shape
432
- new_w = int(bbox["width"] * self.config.scaling_factor)
433
- new_h = int(bbox["height"] * self.config.scaling_factor)
434
- x_offset = (new_w - bbox["width"]) // 2
435
- y_offset = (new_h - bbox["height"]) // 2
436
- x = max(0, bbox["x"] - x_offset - self.config.face_margin)
437
- y = max(0, bbox["y"] - y_offset - self.config.forehead_margin)
438
- w = min(w_img - x, new_w + 2 * self.config.face_margin)
439
- h = min(h_img - y, new_h + self.config.forehead_margin)
440
- return x, y, w, h
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
441
 
442
- def process_smart_privacy(self, image: np.ndarray, conf_threshold: float) -> Tuple[np.ndarray, str]:
443
- """Process image with blur and recognition - the key innovation!"""
444
- writable_image = image.copy()
445
-
446
- # Step 1: Detect and recognize faces
447
- faces, _ = self.detector.detect_faces(writable_image, conf_threshold, recognize=True)
448
-
449
- # Build status message
450
- known_count = sum(1 for f in faces if f.get("match", False))
451
- unknown_count = len(faces) - known_count
452
-
453
- status = ""
454
- if not DEEPFACE_AVAILABLE or not CHROMADB_AVAILABLE:
455
- status = "⚠️ Recognition unavailable - all faces marked as Unknown\n"
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
456
 
457
- # Step 2: Blur ALL faces
458
- for face in faces:
459
- expanded_roi = self._expand_bbox(face, writable_image.shape[:2])
460
- writable_image = self.blur_effect.apply(writable_image, expanded_roi)
461
 
462
- # Step 3: Add labels ON TOP of blurred faces
463
- for face in faces:
464
- x1, y1 = face["x"], face["y"]
465
- width, height = face["width"], face["height"]
466
-
467
- # Determine label
468
- if face.get("match", False):
469
- label = f"{face['name']} ({face['person_id']})"
470
- bg_color = (0, 200, 0) # Green background for known
471
- else:
472
- label = "Unknown"
473
- bg_color = (200, 0, 0) # Red background for unknown
474
-
475
- # Draw label with background for readability
476
- font = cv2.FONT_HERSHEY_SIMPLEX
477
- font_scale = 0.6
478
- thickness = 2
479
- (text_width, text_height), baseline = cv2.getTextSize(label, font, font_scale, thickness)
 
 
480
 
481
- # Position label at top of face bbox
482
- label_y = y1 - 5
483
- if label_y - text_height < 0: # If label goes off top, put it inside
484
- label_y = y1 + text_height + 5
485
 
486
- # Draw background rectangle
487
- cv2.rectangle(writable_image,
488
- (x1, label_y - text_height - 5),
489
- (x1 + text_width + 10, label_y + 5),
490
- bg_color, -1)
491
 
492
- # Draw text
493
- cv2.putText(writable_image, label,
494
- (x1 + 5, label_y),
495
- font, font_scale, (255, 255, 255), thickness)
496
-
497
- # Build summary
498
- if faces:
499
- status += f"✅ Processed {len(faces)} face(s): {known_count} known, {unknown_count} unknown"
500
- else:
501
- status = "❌ No faces detected"
502
-
503
- return writable_image, status
504
 
505
- def process_simple_blur(self, image: np.ndarray, conf_threshold: float) -> np.ndarray:
506
- """Simple blur without recognition (Privacy mode)."""
507
- writable_image = image.copy()
508
- faces, _ = self.detector.detect_faces(writable_image, conf_threshold, recognize=False)
509
- for face in faces:
510
- expanded_roi = self._expand_bbox(face, writable_image.shape[:2])
511
- writable_image = self.blur_effect.apply(writable_image, expanded_roi)
512
- return writable_image
513
 
514
  # ====================================================
515
- # GRADIO HANDLER FUNCTIONS
516
  # ====================================================
517
- def get_app_instance(blur_type: str, blur_amount: float, blur_size: float) -> SmartPrivacyApp:
518
- """Creates a SmartPrivacyApp instance from UI settings."""
519
- detector = get_global_detector()
520
- app_config = AppConfig(
521
- scaling_factor=blur_size,
522
- blur=BlurConfig(type=blur_type, intensity=blur_amount, pixel_size=int(blur_amount))
523
- )
524
- return SmartPrivacyApp(app_config, detector)
525
-
526
- # ---- Detection Mode Handlers ----
527
- def detect_faces_image(image, sensitivity):
528
- """Detect faces in single image."""
529
- if image is None:
530
- return None, "⚠️ No image provided."
531
- try:
532
- confidence = get_confidence_from_sensitivity(sensitivity)
533
- detector = get_global_detector()
534
- faces, annotated_image = detector.detect_faces(image, confidence, return_annotated=True, recognize=False)
535
-
536
- if faces:
537
- result = f"✅ **{len(faces)} face(s) detected!**"
538
- else:
539
- result = "❌ **No faces detected.**"
540
-
541
- return annotated_image, result
542
- except Exception as e:
543
- logger.error(f"Detection error: {e}")
544
- return image, f"❌ Error: {e}"
545
-
546
- def detect_faces_video(video_file, sensitivity, progress=gr.Progress()):
547
- """Detect faces in video."""
548
- if video_file is None:
549
- return None, "⚠️ No video provided."
550
- try:
551
- confidence = get_confidence_from_sensitivity(sensitivity)
552
- detector = get_global_detector()
553
- cap = cv2.VideoCapture(video_file.name)
554
- if not cap.isOpened():
555
- return None, "❌ Cannot open video file."
556
-
557
- out_path = create_temp_file()
558
- fourcc = cv2.VideoWriter_fourcc(*'mp4v')
559
- fps = cap.get(cv2.CAP_PROP_FPS)
560
- w, h = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)), int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
561
- total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
562
- out_vid = cv2.VideoWriter(out_path, fourcc, fps, (w, h))
563
 
564
- frame_num, frames_with_faces = 0, 0
565
- while cap.isOpened():
566
- ret, frame = cap.read()
567
- if not ret:
568
- break
569
- frame_num += 1
570
- progress(frame_num / max(total_frames, 1), desc=f"Frame {frame_num}/{total_frames}")
571
- faces, annotated_frame = detector.detect_faces(frame, confidence, return_annotated=True, recognize=False)
572
- if faces:
573
- frames_with_faces += 1
574
- out_vid.write(annotated_frame)
575
-
576
- cap.release()
577
- out_vid.release()
 
578
 
579
- if frames_with_faces > 0:
580
- result = f"✅ **Faces detected in {frames_with_faces}/{frame_num} frames!**"
581
- else:
582
- result = f"❌ **No faces detected in {frame_num} frames.**"
583
 
584
- return out_path, result
585
- except Exception as e:
586
- logger.error(f"Video detection error: {e}")
587
- return None, f" Error: {e}"
588
-
589
- def detect_faces_webcam(image, sensitivity):
590
- """Detect faces in webcam stream."""
591
- if image is None:
592
- return None
593
- try:
594
- confidence = get_confidence_from_sensitivity(sensitivity)
595
- detector = get_global_detector()
596
- _, annotated_image = detector.detect_faces(image, confidence, return_annotated=True, recognize=False)
597
- return annotated_image
598
- except Exception as e:
599
- logger.error(f"Webcam detection error: {e}")
600
- return image
601
-
602
- # ---- Privacy Mode Handlers (Blur only) ----
603
- def process_privacy_image(image, blur_type, blur_amount, blur_size, sensitivity):
604
- """Process single image with blur effect only."""
605
- if image is None:
606
- return None
607
- try:
608
- confidence = get_confidence_from_sensitivity(sensitivity)
609
- app = get_app_instance(blur_type, blur_amount, blur_size)
610
- return app.process_simple_blur(image, confidence)
611
- except Exception as e:
612
- logger.error(f"Privacy processing error: {e}")
613
- return image
614
-
615
- def process_privacy_video(video_file, blur_type, blur_amount, blur_size, sensitivity, progress=gr.Progress()):
616
- """Process video with blur effect only."""
617
- if video_file is None:
618
- return None, "⚠️ No video provided."
619
- try:
620
- confidence = get_confidence_from_sensitivity(sensitivity)
621
- app = get_app_instance(blur_type, blur_amount, blur_size)
622
- cap = cv2.VideoCapture(video_file.name)
623
- if not cap.isOpened():
624
- return None, "❌ Cannot open video file."
625
-
626
- out_path = create_temp_file()
627
- fourcc = cv2.VideoWriter_fourcc(*'mp4v')
628
- fps = cap.get(cv2.CAP_PROP_FPS)
629
- w, h = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)), int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
630
- total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
631
- out_vid = cv2.VideoWriter(out_path, fourcc, fps, (w, h))
632
-
633
- frame_num = 0
634
- while cap.isOpened():
635
- ret, frame = cap.read()
636
- if not ret:
637
- break
638
- frame_num += 1
639
- progress(frame_num / max(total_frames, 1), desc=f"Processing frame {frame_num}/{total_frames}")
640
- processed_frame = app.process_simple_blur(frame, confidence)
641
- out_vid.write(processed_frame)
642
 
643
- cap.release()
644
- out_vid.release()
645
- return out_path, f"✅ Processed {frame_num} frames."
646
- except Exception as e:
647
- logger.error(f"Video processing error: {e}")
648
- return None, f"❌ Error: {e}"
649
 
650
- def process_privacy_webcam(image, blur_type, blur_amount, blur_size, sensitivity):
651
- """Process webcam stream with blur."""
652
- if image is None:
653
- return None
654
- try:
655
- confidence = get_confidence_from_sensitivity(sensitivity)
656
- app = get_app_instance(blur_type, blur_amount, blur_size)
657
- return app.process_simple_blur(image, confidence)
658
- except Exception as e:
659
- logger.error(f"Webcam processing error: {e}")
660
- return image
661
-
662
- # ---- Smart Privacy Mode Handlers (Blur + Identify) ----
663
- def process_smart_image(image, blur_type, blur_amount, blur_size, sensitivity):
664
- """Process image with blur and identification."""
665
- if image is None:
666
- return None, "⚠️ No image provided."
667
- try:
668
- confidence = get_confidence_from_sensitivity(sensitivity)
669
- app = get_app_instance(blur_type, blur_amount, blur_size)
670
- processed_image, status = app.process_smart_privacy(image, confidence)
671
- return processed_image, status
672
- except Exception as e:
673
- logger.error(f"Smart processing error: {e}")
674
- return image, f"❌ Error: {e}"
675
-
676
- def process_smart_video(video_file, blur_type, blur_amount, blur_size, sensitivity, progress=gr.Progress()):
677
- """Process video with blur and identification."""
678
- if video_file is None:
679
- return None, "⚠️ No video provided."
680
- try:
681
- confidence = get_confidence_from_sensitivity(sensitivity)
682
- app = get_app_instance(blur_type, blur_amount, blur_size)
683
- cap = cv2.VideoCapture(video_file.name)
684
- if not cap.isOpened():
685
- return None, "❌ Cannot open video file."
686
 
687
- out_path = create_temp_file()
688
- fourcc = cv2.VideoWriter_fourcc(*'mp4v')
689
- fps = cap.get(cv2.CAP_PROP_FPS)
690
- w, h = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)), int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
691
- total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
692
- out_vid = cv2.VideoWriter(out_path, fourcc, fps, (w, h))
 
 
 
693
 
694
- frame_num = 0
695
- total_known = 0
696
- total_unknown = 0
697
-
698
- # Process frames (with temporal optimization for performance)
699
- PROCESS_EVERY_N = 10 # Full recognition every 10 frames
700
- cached_faces = []
701
 
702
- while cap.isOpened():
703
- ret, frame = cap.read()
704
- if not ret:
705
- break
706
- frame_num += 1
707
- progress(frame_num / max(total_frames, 1), desc=f"Processing frame {frame_num}/{total_frames}")
708
 
709
- # Process frame
710
- if frame_num % PROCESS_EVERY_N == 1 or frame_num == 1:
711
- # Full processing with recognition
712
- processed_frame, _ = app.process_smart_privacy(frame, confidence)
713
- detector = get_global_detector()
714
- cached_faces, _ = detector.detect_faces(frame, confidence, recognize=True)
715
-
716
- # Count for statistics
717
- for face in cached_faces:
718
- if face.get("match", False):
719
- total_known += 1
720
- else:
721
- total_unknown += 1
722
- else:
723
- # Use cached recognition results for performance
724
- processed_frame = app.process_simple_blur(frame, confidence)
725
- # Apply cached labels
726
- for face in cached_faces:
727
- x1, y1 = face["x"], face["y"]
728
-
729
- if face.get("match", False):
730
- label = f"{face['name']} ({face['person_id']})"
731
- bg_color = (0, 200, 0)
732
- else:
733
- label = "Unknown"
734
- bg_color = (200, 0, 0)
735
 
736
- font = cv2.FONT_HERSHEY_SIMPLEX
737
- (text_width, text_height), _ = cv2.getTextSize(label, font, 0.6, 2)
738
- label_y = y1 - 5
739
- if label_y - text_height < 0:
740
- label_y = y1 + text_height + 5
741
 
742
- cv2.rectangle(processed_frame,
743
- (x1, label_y - text_height - 5),
744
- (x1 + text_width + 10, label_y + 5),
745
- bg_color, -1)
746
- cv2.putText(processed_frame, label,
747
- (x1 + 5, label_y),
748
- font, 0.6, (255, 255, 255), 2)
 
749
 
750
- out_vid.write(processed_frame)
751
-
752
- cap.release()
753
- out_vid.release()
754
-
755
- status = f"✅ Processed {frame_num} frames.\n"
756
- if not DEEPFACE_AVAILABLE or not CHROMADB_AVAILABLE:
757
- status += "⚠️ Recognition unavailable - all faces marked as Unknown"
758
- else:
759
- status += f"Known faces: {total_known}, Unknown: {total_unknown}"
760
-
761
- return out_path, status
762
- except Exception as e:
763
- logger.error(f"Smart video processing error: {e}")
764
- return None, f"❌ Error: {e}"
765
-
766
- def process_smart_webcam(image, blur_type, blur_amount, blur_size, sensitivity):
767
- """Process webcam with blur and identification."""
768
- if image is None:
769
- return None, None
770
- try:
771
- confidence = get_confidence_from_sensitivity(sensitivity)
772
- app = get_app_instance(blur_type, blur_amount, blur_size)
773
- processed_image, _ = app.process_smart_privacy(image, confidence)
774
- return processed_image, None
775
- except Exception as e:
776
- logger.error(f"Smart webcam error: {e}")
777
- return image, None
778
-
779
- # ====================================================
780
- # GRADIO UI
781
- # ====================================================
782
- with gr.Blocks(theme=gr.themes.Soft(primary_hue="blue"), title="Unified Face Tool") as demo:
783
- gr.Markdown("# 🎯 Unified Face Tool")
784
- gr.Markdown("**Detection • Privacy • Smart Privacy** - AI-powered face processing using YOLOv8")
785
-
786
- # Recognition availability warning
787
- if not DEEPFACE_AVAILABLE or not CHROMADB_AVAILABLE:
788
- with gr.Row():
789
- gr.Markdown("""
790
- ⚠️ **Recognition features limited:** Install optional dependencies for full functionality:
791
- ```bash
792
- pip install deepface tf-keras chromadb
793
- ```
794
- *Smart Privacy will work but all faces will be labeled as 'Unknown'*
795
- """)
796
-
797
- with gr.Row():
798
- # ========== SETTINGS SIDEBAR ==========
799
- with gr.Column(scale=1, variant="panel"):
800
- gr.Markdown("### ⚙️ Settings")
801
-
802
- with gr.Accordion("Detection", open=True):
803
- detection_sensitivity = gr.Radio(
804
- choices=list(SENSITIVITY_MAP.keys()),
805
- value="Balanced (Default)",
806
- label="Sensitivity",
807
- info="How strict face detection should be"
808
- )
809
-
810
- with gr.Accordion("Privacy", open=True):
811
- blur_type = gr.Radio(
812
- ["gaussian", "pixelate", "solid"],
813
- value="pixelate",
814
- label="Blur Type"
815
- )
816
- blur_amount = gr.Slider(
817
- 1, 100,
818
- step=1,
819
- value=15,
820
- label="Blur Intensity"
821
- )
822
- blur_size = gr.Slider(
823
- 1.0, 2.0,
824
- step=0.05,
825
- value=1.1,
826
- label="Coverage Area"
827
- )
828
-
829
- # ========== MAIN CONTENT ==========
830
- with gr.Column(scale=3):
831
  with gr.Tabs():
832
- # ========== DETECTION MODE ==========
833
- with gr.TabItem("🔍 Detection Mode"):
834
- gr.Markdown("### Just detect faces - no modifications")
 
 
 
 
835
 
836
- with gr.Tabs():
837
- with gr.TabItem("📷 Image"):
838
- with gr.Row():
839
- det_img_in = gr.Image(
840
- sources=["upload", "clipboard"],
841
- type="numpy",
842
- label="Input",
843
- height=400
844
- )
845
- with gr.Column():
846
- det_img_out = gr.Image(
847
- type="numpy",
848
- label="Detection Result",
849
- height=350
850
- )
851
- det_img_status = gr.Markdown("_Upload an image_")
852
-
853
- with gr.Row():
854
- det_img_btn = gr.Button("Detect Faces", variant="primary", scale=3)
855
- gr.ClearButton([det_img_in, det_img_out, det_img_status], scale=1)
856
-
857
- with gr.TabItem("🎥 Video"):
858
- with gr.Row():
859
- det_vid_in = gr.File(
860
- file_types=[".mp4", ".mov", ".avi"],
861
- label="Input Video"
862
- )
863
- with gr.Column():
864
- det_vid_out = gr.Video(
865
- label="Annotated Video",
866
- height=400
867
- )
868
- det_vid_status = gr.Markdown("_Upload a video_")
869
- with gr.Row():
870
- det_vid_btn = gr.Button("Analyze Video", variant="primary", scale=3)
871
- gr.ClearButton([det_vid_in, det_vid_out, det_vid_status], scale=1)
872
-
873
- with gr.TabItem("📹 Webcam"):
874
- with gr.Row():
875
- det_web_in = gr.Image(
876
- sources=["webcam"],
877
- type="numpy",
878
- streaming=True,
879
- label="Live Feed",
880
- height=400
881
- )
882
- det_web_out = gr.Image(
883
- type="numpy",
884
- label="Detection",
885
- height=400
886
- )
887
-
888
- # ========== PRIVACY MODE ==========
889
- with gr.TabItem("🔒 Privacy Mode"):
890
- gr.Markdown("### Blur all faces for complete privacy")
891
 
892
- with gr.Tabs():
893
- with gr.TabItem("📷 Image"):
894
- with gr.Row():
895
- priv_img_in = gr.Image(
896
- sources=["upload", "clipboard"],
897
- type="numpy",
898
- label="Input",
899
- height=400
900
- )
901
- priv_img_out = gr.Image(
902
- type="numpy",
903
- label="Protected Image",
904
- height=400
905
- )
906
- with gr.Row():
907
- priv_img_btn = gr.Button("Apply Privacy Blur", variant="primary", scale=3)
908
- gr.ClearButton([priv_img_in, priv_img_out], scale=1)
909
-
910
- with gr.TabItem("🎥 Video"):
911
- with gr.Row():
912
- priv_vid_in = gr.File(
913
- file_types=[".mp4", ".mov", ".avi"],
914
- label="Input Video"
915
- )
916
- with gr.Column():
917
- priv_vid_out = gr.Video(
918
- label="Protected Video",
919
- height=400
920
- )
921
- priv_vid_status = gr.Markdown("")
922
- with gr.Row():
923
- priv_vid_btn = gr.Button("Process Video", variant="primary", scale=3)
924
- gr.ClearButton([priv_vid_in, priv_vid_out, priv_vid_status], scale=1)
925
-
926
- with gr.TabItem("📹 Webcam"):
927
- with gr.Row():
928
- priv_web_in = gr.Image(
929
- sources=["webcam"],
930
- type="numpy",
931
- streaming=True,
932
- label="Live Feed",
933
- height=400
934
- )
935
- priv_web_out = gr.Image(
936
- type="numpy",
937
- label="Protected Feed",
938
- height=400
939
- )
940
-
941
- # ========== SMART PRIVACY MODE ==========
942
- with gr.TabItem("🎯 Smart Privacy Mode"):
943
- gr.Markdown("### **The Innovation:** Blur faces while preserving identity information")
944
- gr.Markdown("*All faces are blurred but labeled with their identity*")
945
 
946
- with gr.Tabs():
947
- with gr.TabItem("📷 Image"):
948
- with gr.Row():
949
- smart_img_in = gr.Image(
950
- sources=["upload", "clipboard"],
951
- type="numpy",
952
- label="Input",
953
- height=400
954
- )
955
- with gr.Column():
956
- smart_img_out = gr.Image(
957
- type="numpy",
958
- label="Smart Privacy Result",
959
- height=350
960
- )
961
- smart_img_status = gr.Markdown("_Upload an image_")
962
-
963
- with gr.Row():
964
- smart_img_btn = gr.Button("🎯 Apply Smart Privacy", variant="primary", scale=3)
965
- gr.ClearButton([smart_img_in, smart_img_out, smart_img_status], scale=1)
966
-
967
- with gr.TabItem("🎥 Video"):
968
- with gr.Row():
969
- smart_vid_in = gr.File(
970
- file_types=[".mp4", ".mov", ".avi"],
971
- label="Input Video"
972
- )
973
- with gr.Column():
974
- smart_vid_out = gr.Video(
975
- label="Smart Privacy Video",
976
- height=400
977
- )
978
- smart_vid_status = gr.Markdown("")
979
- with gr.Row():
980
- smart_vid_btn = gr.Button("🎯 Process with Smart Privacy", variant="primary", scale=3)
981
- gr.ClearButton([smart_vid_in, smart_vid_out, smart_vid_status], scale=1)
982
-
983
- with gr.TabItem("📹 Webcam"):
984
- with gr.Row():
985
- smart_web_in = gr.Image(
986
- sources=["webcam"],
987
- type="numpy",
988
- streaming=True,
989
- label="Live Feed",
990
- height=400
991
- )
992
- with gr.Column():
993
- smart_web_out = gr.Image(
994
- type="numpy",
995
- label="Smart Privacy Feed",
996
- height=400
997
- )
998
- smart_web_status = gr.Markdown("")
999
 
1000
- # ========== EVENT HANDLERS ==========
1001
-
1002
- # Detection Mode
1003
- det_img_btn.click(
1004
- detect_faces_image,
1005
- inputs=[det_img_in, detection_sensitivity],
1006
- outputs=[det_img_out, det_img_status]
1007
- )
1008
- det_vid_btn.click(
1009
- detect_faces_video,
1010
- inputs=[det_vid_in, detection_sensitivity],
1011
- outputs=[det_vid_out, det_vid_status]
1012
- )
1013
- det_web_in.stream(
1014
- detect_faces_webcam,
1015
- inputs=[det_web_in, detection_sensitivity],
1016
- outputs=det_web_out
1017
- )
1018
 
1019
- # Privacy Mode
1020
- priv_img_btn.click(
1021
- process_privacy_image,
1022
- inputs=[priv_img_in, blur_type, blur_amount, blur_size, detection_sensitivity],
1023
- outputs=priv_img_out
1024
- )
1025
- priv_vid_btn.click(
1026
- process_privacy_video,
1027
- inputs=[priv_vid_in, blur_type, blur_amount, blur_size, detection_sensitivity],
1028
- outputs=[priv_vid_out, priv_vid_status]
1029
- )
1030
- priv_web_in.stream(
1031
- process_privacy_webcam,
1032
- inputs=[priv_web_in, blur_type, blur_amount, blur_size, detection_sensitivity],
1033
- outputs=priv_web_out
1034
- )
1035
-
1036
- # Smart Privacy Mode
1037
- smart_img_btn.click(
1038
- process_smart_image,
1039
- inputs=[smart_img_in, blur_type, blur_amount, blur_size, detection_sensitivity],
1040
- outputs=[smart_img_out, smart_img_status]
1041
- )
1042
- smart_vid_btn.click(
1043
- process_smart_video,
1044
- inputs=[smart_vid_in, blur_type, blur_amount, blur_size, detection_sensitivity],
1045
- outputs=[smart_vid_out, smart_vid_status]
1046
- )
1047
- smart_web_in.stream(
1048
- process_smart_webcam,
1049
- inputs=[smart_web_in, blur_type, blur_amount, blur_size, detection_sensitivity],
1050
- outputs=[smart_web_out, smart_web_status]
1051
- )
1052
 
1053
- # ====================================================
1054
- # MAIN ENTRY POINT
1055
- # ====================================================
1056
  if __name__ == "__main__":
1057
- logger.info("🚀 Initializing Unified Face Tool...")
1058
  try:
1059
- # Initialize detector
1060
- get_global_detector()
1061
-
1062
- # Initialize face database (if available)
1063
- if DEEPFACE_AVAILABLE and CHROMADB_AVAILABLE:
1064
- get_face_database()
1065
-
1066
- logger.info("✅ Systems ready. Launching Gradio interface...")
1067
  demo.launch()
1068
  except Exception as e:
1069
- logger.error(f"Startup failed: {e}")
1070
- logger.info("💡 Make sure 'yolov8n-face.pt' is available")
 
 
 
 
 
 
1
  # --- Standard Libraries ---
2
  import logging
3
  import atexit
4
  import tempfile
5
  import os
6
  import hashlib
7
+ from dataclasses import dataclass
 
8
  from typing import Any, Dict, List, Tuple, Optional
9
  from pathlib import Path
10
 
 
14
  import gradio as gr
15
  from ultralytics import YOLO
16
 
17
+ # --- Face Recognition Libraries (Optional / Fallback) ---
18
  try:
19
  from deepface import DeepFace
20
  DEEPFACE_AVAILABLE = True
21
  except ImportError:
22
  DEEPFACE_AVAILABLE = False
23
+ logging.warning("⚠️ DeepFace not installed - Recognition will be disabled (Fallback Mode).")
24
 
25
  try:
26
  import chromadb
27
  CHROMADB_AVAILABLE = True
28
  except ImportError:
29
  CHROMADB_AVAILABLE = False
30
+ logging.warning("⚠️ ChromaDB not installed - Database features disabled.")
31
 
32
  # --- Configure Logging ---
33
  logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
34
  logger = logging.getLogger(__name__)
35
 
36
  # ====================================================
37
+ # 1. CONFIGURATION & UTILITIES
38
  # ====================================================
39
+
40
+ # --- HARDCODED DEFAULTS (EXPERT MODE) ---
41
+ CONFIDENCE_THRESHOLD = 0.3 # High Sensitivity
42
+ TARGET_MOSAIC_GRID = 12 # Adaptive Grid (12 blocks across face)
43
+ COVERAGE_SCALE = 1.1 # 110% Coverage (Margin)
44
+
45
  TEMP_FILES = []
46
 
47
  def cleanup_temp_files():
48
+ """Clean up temporary video files on exit."""
49
  for f in TEMP_FILES:
50
  try:
51
  if os.path.exists(f):
52
  os.remove(f)
53
+ except Exception:
54
+ pass
 
55
 
56
  atexit.register(cleanup_temp_files)
57
 
58
  def create_temp_file(suffix=".mp4") -> str:
 
59
  path = tempfile.mktemp(suffix=suffix)
60
  TEMP_FILES.append(path)
61
  return path
62
 
63
  # ====================================================
64
+ # 2. THE DATABASE LAYER (Backend Only - No UI)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
65
  # ====================================================
66
+ class FaceDatabase:
67
+ """
68
+ Handles loading known faces from the 'known_faces' folder.
69
+ Runs automatically on startup.
70
+ """
71
+ def __init__(self, db_path="./chroma_db", faces_dir="known_faces"):
72
+ self.faces_dir = Path(faces_dir)
73
+ self.client = None
74
+ self.collection = None
75
+ self.is_active = False
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
76
 
77
+ if not DEEPFACE_AVAILABLE or not CHROMADB_AVAILABLE:
78
+ logger.warning("❌ Database unavailable (Missing dependencies)")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
79
  return
80
+
 
 
81
  try:
82
  self.client = chromadb.PersistentClient(path=db_path)
83
  self.collection = self.client.get_or_create_collection(
84
  name="face_embeddings",
85
  metadata={"hnsw:space": "cosine"}
86
  )
87
+ self.is_active = True
88
 
89
+ # Auto-index on startup
90
+ if self.faces_dir.exists():
91
+ self._scan_and_index()
 
 
92
  else:
93
+ self.faces_dir.mkdir(parents=True, exist_ok=True)
94
+ logger.info(f"📁 Created {faces_dir} folder. Add images here!")
95
 
96
  except Exception as e:
97
+ logger.error(f"❌ DB Init Error: {e}")
98
+ self.is_active = False
99
+
100
+ def _get_hash(self, img_path: Path) -> str:
 
 
 
101
  with open(img_path, 'rb') as f:
102
  return hashlib.md5(f.read()).hexdigest()
103
+
104
+ def _scan_and_index(self):
105
+ """Scans folders and adds new images to ChromaDB."""
106
+ logger.info("🔄 Scanning 'known_faces' folder...")
107
+ count = 0
108
+ for person_dir in self.faces_dir.iterdir():
109
+ if not person_dir.is_dir(): continue
 
 
 
 
 
 
 
 
110
 
111
+ # Folder format expectation: "001_John_Doe"
112
+ parts = person_dir.name.split('_', 1)
113
+ if len(parts) < 2:
114
+ # Fallback for folders like "John"
115
+ p_id = "000"
116
+ p_name = person_dir.name
117
+ else:
118
+ p_id, p_name = parts[0], parts[1].replace('_', ' ')
119
 
120
+ images = list(person_dir.glob("*.jpg")) + list(person_dir.glob("*.png")) + list(person_dir.glob("*.webp"))
 
 
121
 
122
+ for img_path in images:
123
  try:
124
+ img_hash = self._get_hash(img_path)
125
+ # Check if already indexed
126
+ if self.collection.get(ids=[img_hash])['ids']:
 
127
  continue
128
+
129
+ # Generate Embedding
130
+ embedding_objs = DeepFace.represent(
131
+ img_path=str(img_path),
132
+ model_name="Facenet512",
133
  enforce_detection=False
134
  )
135
 
136
+ if embedding_objs:
137
+ embedding = embedding_objs[0]["embedding"]
138
+ self.collection.add(
139
+ ids=[img_hash],
140
+ embeddings=[embedding],
141
+ metadatas=[{"id": p_id, "name": p_name, "file": img_path.name}]
142
+ )
143
+ count += 1
144
+ logger.info(f"✅ Indexed: {p_name}")
 
 
 
 
 
 
 
 
 
 
145
  except Exception as e:
146
+ logger.error(f"⚠️ Failed to index {img_path.name}: {e}")
147
 
148
+ if count > 0:
149
+ logger.info(f"📥 Added {count} new faces to database.")
150
+ else:
151
+ logger.info("ℹ️ Database is up to date.")
152
+
153
+ def recognize(self, face_img: np.ndarray) -> Dict[str, Any]:
154
+ """Returns {'match': bool, 'name': str, 'id': str, 'color': tuple}"""
155
+ # Default response (Unknown / Red)
156
+ default = {"match": False, "name": "Unknown", "id": "Unknown", "color": (255, 0, 0)}
157
 
158
+ if not self.is_active or self.collection is None or self.collection.count() == 0:
159
+ return default
160
+
161
  try:
162
+ # Create temp file for DeepFace (it prefers paths)
163
+ temp_path = "temp_query.jpg"
164
+ cv2.imwrite(temp_path, cv2.cvtColor(face_img, cv2.COLOR_RGB2BGR))
165
 
166
+ embedding_objs = DeepFace.represent(
167
  img_path=temp_path,
168
+ model_name="Facenet512",
169
  enforce_detection=False
170
  )
171
+ if os.path.exists(temp_path): os.remove(temp_path)
172
+
173
+ if not embedding_objs: return default
174
+
175
+ query_embed = embedding_objs[0]["embedding"]
 
 
 
 
176
  results = self.collection.query(
177
+ query_embeddings=[query_embed],
178
+ n_results=1
 
179
  )
180
+
181
+ if not results['ids'][0]: return default
182
+
 
183
  distance = results['distances'][0][0]
184
  metadata = results['metadatas'][0][0]
185
+
186
+ # Threshold: Lower is stricter. 0.45 is a good balance for Facenet512
187
+ if distance < 0.45:
188
  return {
189
  "match": True,
190
+ "name": metadata['name'],
191
+ "id": metadata['id'],
192
+ "color": (0, 255, 0) # Green for match
193
  }
194
+ return default
195
+
 
196
  except Exception as e:
197
+ logger.error(f"Recognition Error: {e}")
198
+ return default
199
 
200
+ def get_stats(self):
201
+ if self.is_active and self.collection:
202
+ return f"✅ Active | {self.collection.count()} Faces Indexed"
203
+ return "❌ Offline (Check dependencies or 'known_faces' folder)"
204
 
205
+ # Singleton DB
206
+ FACE_DB = FaceDatabase()
 
 
 
 
207
 
208
  # ====================================================
209
+ # 3. THE UNIFIED DETECTOR (YOLO)
210
  # ====================================================
211
+ class Detector:
212
+ def __init__(self):
213
+ logger.info("📦 Loading YOLOv8-Face...")
214
  try:
215
+ self.model = YOLO("yolov8n-face.pt")
216
+ logger.info("✅ Model Loaded.")
 
 
217
  except Exception as e:
218
+ logger.error(f"❌ Model Load Failed: {e}")
219
+ raise e
220
+
221
+ def detect(self, image: np.ndarray):
222
+ # Uses Hardcoded High Sensitivity
223
+ results = self.model(image, conf=CONFIDENCE_THRESHOLD, verbose=False)
 
 
 
 
 
 
 
224
  faces = []
 
 
 
 
 
225
  for r in results:
226
+ if r.boxes is None: continue
 
227
  for box in r.boxes:
228
  x1, y1, x2, y2 = map(int, box.xyxy[0])
229
+ faces.append({
230
+ "box": (x1, y1, x2-x1, y2-y1),
231
+ "conf": float(box.conf[0])
232
+ })
233
+ return faces
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
234
 
235
+ GLOBAL_DETECTOR = Detector()
 
 
 
 
 
 
 
236
 
237
  # ====================================================
238
+ # 4. CORE LOGIC
239
  # ====================================================
 
 
 
 
 
 
240
 
241
+ def apply_blur(image, x, y, w, h):
242
+ """
243
+ ADAPTIVE BLUR: Solves the 'Big Image' problem.
244
+ Ensures face is always reduced to TARGET_MOSAIC_GRID blocks.
245
+ """
246
+ h_img, w_img = image.shape[:2]
247
+
248
+ # --- COVERAGE AREA SCALE ---
249
+ # We add padding based on COVERAGE_SCALE
250
+ pad_w = int(w * (COVERAGE_SCALE - 1.0) / 2)
251
+ pad_h = int(h * (COVERAGE_SCALE - 1.0) / 2)
252
+
253
+ x = max(0, x - pad_w)
254
+ y = max(0, y - pad_h)
255
+ w = min(w_img - x, w + (2 * pad_w))
256
+ h = min(h_img - y, h + (2 * pad_h))
257
+
258
+ roi = image[y:y+h, x:x+w]
259
+ if roi.size == 0: return image
260
+
261
+ h_roi, w_roi = roi.shape[:2]
262
+
263
+ # --- ADAPTIVE GRID LOGIC ---
264
+ # We force the face down to a tiny grid (TARGET_MOSAIC_GRID)
265
+ aspect = w_roi / h_roi
266
+ target_w = TARGET_MOSAIC_GRID
267
+ target_h = int(TARGET_MOSAIC_GRID / aspect)
268
+
269
+ target_w = max(2, target_w)
270
+ target_h = max(2, target_h)
271
 
272
+ # Downscale (Destroy Detail)
273
+ small = cv2.resize(roi, (target_w, target_h), interpolation=cv2.INTER_LINEAR)
274
+ # Upscale (Pixelate)
275
+ pixelated = cv2.resize(small, (w_roi, h_roi), interpolation=cv2.INTER_NEAREST)
276
+
277
+ image[y:y+h, x:x+w] = pixelated
278
+ return image
279
+
280
+ def draw_label(image, x, y, w, text, color, on_blur=False):
281
+ """Annotation step: Draws text."""
282
+ font = cv2.FONT_HERSHEY_SIMPLEX
283
+ scale = 0.6
284
+ thickness = 2
285
+ (tw, th), _ = cv2.getTextSize(text, font, scale, thickness)
286
+
287
+ if on_blur:
288
+ center_x = x + (w // 2) - (tw // 2)
289
+ center_y = y + (w // 2)
290
+ cv2.rectangle(image, (center_x - 5, center_y - th - 5), (center_x + tw + 5, center_y + 5), color, -1)
291
+ cv2.putText(image, text, (center_x, center_y), font, scale, (255, 255, 255), thickness)
292
+ else:
293
+ cv2.rectangle(image, (x, y - th - 10), (x + tw + 10, y), color, -1)
294
+ cv2.putText(image, text, (x + 5, y - 5), font, scale, (255, 255, 255), thickness)
295
+
296
+ def process_frame(image, mode):
297
+ """
298
+ THE MASTER FUNCTION.
299
+ Returns: (processed_image, log_string)
300
+ """
301
+ if image is None: return None, "No Image"
302
+
303
+ # 1. Detection
304
+ faces = GLOBAL_DETECTOR.detect(image)
305
+ processed_img = image.copy()
306
+ log_entries = []
307
+
308
+ for i, face in enumerate(faces):
309
+ x, y, w, h = face['box']
310
 
311
+ # 2. Analysis
312
+ identity = {"name": "", "color": (0, 255, 0)}
 
 
313
 
314
+ if mode in ["data", "smart"]:
315
+ # Crop and Check DB
316
+ face_crop = image[y:y+h, x:x+w]
317
+ if face_crop.size > 0:
318
+ res = FACE_DB.recognize(face_crop)
319
+
320
+ if res['match']:
321
+ label_text = f"ID: {res['id']} ({res['name']})"
322
+ log_entries.append(f"✅ Face #{i+1}: MATCH - {res['name']} (ID: {res['id']})")
323
+ else:
324
+ label_text = "Unknown"
325
+ log_entries.append(f"⚠️ Face #{i+1}: UNKNOWN")
326
+
327
+ identity = {"name": label_text, "color": res['color']}
328
+ else:
329
+ log_entries.append(f"🔒 Face #{i+1}: Anonymized")
330
+
331
+ # 3. Modification
332
+ if mode == "privacy":
333
+ processed_img = apply_blur(processed_img, x, y, w, h)
334
 
335
+ elif mode == "data":
336
+ cv2.rectangle(processed_img, (x, y), (x+w, y+h), identity['color'], 2)
337
+ draw_label(processed_img, x, y, w, identity['name'], identity['color'], on_blur=False)
 
338
 
339
+ elif mode == "smart":
340
+ processed_img = apply_blur(processed_img, x, y, w, h)
341
+ draw_label(processed_img, x, y, w, identity['name'], identity['color'], on_blur=True)
 
 
342
 
343
+ # Create Log String
344
+ if not log_entries:
345
+ final_log = "No faces detected."
346
+ else:
347
+ final_log = "--- Detection Report ---\n" + "\n".join(log_entries)
 
 
 
 
 
 
 
348
 
349
+ return processed_img, final_log
 
 
 
 
 
 
 
350
 
351
  # ====================================================
352
+ # 5. VIDEO PROCESSING HELPERS
353
  # ====================================================
354
+ def process_video_general(video_path, mode, progress=gr.Progress()):
355
+ """Generic video processor."""
356
+ if not video_path: return None
357
+
358
+ cap = cv2.VideoCapture(video_path)
359
+ if not cap.isOpened(): return None
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
360
 
361
+ fps = int(cap.get(cv2.CAP_PROP_FPS))
362
+ width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
363
+ height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
364
+ total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
365
+
366
+ out_path = create_temp_file()
367
+ fourcc = cv2.VideoWriter_fourcc(*'avc1')
368
+ out = cv2.VideoWriter(out_path, fourcc, fps, (width, height))
369
+ if not out.isOpened():
370
+ out = cv2.VideoWriter(out_path, cv2.VideoWriter_fourcc(*'mp4v'), fps, (width, height))
371
+
372
+ cnt = 0
373
+ while cap.isOpened():
374
+ ret, frame = cap.read()
375
+ if not ret: break
376
 
377
+ # Process using the Master Function (Ignore log for video)
378
+ res_frame, _ = process_frame(frame, mode)
 
 
379
 
380
+ out.write(res_frame)
381
+ cnt += 1
382
+ if total > 0 and cnt % 10 == 0:
383
+ progress(cnt/total, desc=f"Processing Frame {cnt}/{total}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
384
 
385
+ cap.release()
386
+ out.release()
387
+ return out_path
 
 
 
388
 
389
+ # ====================================================
390
+ # 6. GRADIO INTERFACE
391
+ # ====================================================
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
392
 
393
+ with gr.Blocks(theme=gr.themes.Soft(primary_hue="blue"), title="Smart Redaction Demo") as demo:
394
+
395
+ gr.Markdown("# 🛡️ Smart Redaction System")
396
+ gr.Markdown("### From Raw Privacy to Intelligent Security")
397
+
398
+ # Unified Config Info Row
399
+ with gr.Row():
400
+ gr.Markdown(f"**System Status:** {FACE_DB.get_stats()}")
401
+ gr.Markdown(f"**Config:** Adaptive Pixelate | High Sensitivity | 110% Coverage")
402
 
403
+ with gr.Tabs():
 
 
 
 
 
 
404
 
405
+ # --- TAB 1: RAW PRIVACY ---
406
+ with gr.TabItem("1️⃣ Raw Privacy (Baseline)"):
407
+ gr.Markdown("### 🔒 Total Anonymization")
408
+ gr.Markdown("*Scenario: GDPR Compliance. Everyone is hidden. No data is extracted.*")
 
 
409
 
410
+ with gr.Tabs():
411
+ with gr.TabItem("Image"):
412
+ p_img_in = gr.Image(label="Input", type="numpy", height=400)
413
+ p_img_out = gr.Image(label="Anonymized Output", height=400)
414
+ p_btn = gr.Button("Apply Privacy", variant="primary")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
415
 
416
+ with gr.TabItem("Video"):
417
+ p_vid_in = gr.Video(label="Input Video")
418
+ p_vid_out = gr.Video(label="Anonymized Output")
419
+ p_vid_btn = gr.Button("Process Video", variant="primary")
 
420
 
421
+ with gr.TabItem("Webcam"):
422
+ p_web_in = gr.Image(sources=["webcam"], streaming=True, type="numpy")
423
+ p_web_out = gr.Image(label="Live Privacy Feed")
424
+
425
+ # --- TAB 2: THE DATA LAYER ---
426
+ with gr.TabItem("2️⃣ The Data Layer (Security)"):
427
+ gr.Markdown("### 🔍 Recognition & Intelligence")
428
+ gr.Markdown("*Scenario: Security Control Room. We identify Known vs Unknown. No Privacy.*")
429
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
430
  with gr.Tabs():
431
+ with gr.TabItem("Image"):
432
+ with gr.Row():
433
+ d_img_in = gr.Image(label="Input", type="numpy", height=400)
434
+ with gr.Column():
435
+ d_img_out = gr.Image(label="Data Output", height=400)
436
+ d_log_out = gr.Textbox(label="Detection Log", lines=4)
437
+ d_btn = gr.Button("Analyze Data", variant="primary")
438
 
439
+ with gr.TabItem("Video"):
440
+ d_vid_in = gr.Video(label="Input Video")
441
+ d_vid_out = gr.Video(label="Data Output")
442
+ d_vid_btn = gr.Button("Analyze Video", variant="primary")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
443
 
444
+ with gr.TabItem("Webcam"):
445
+ d_web_in = gr.Image(sources=["webcam"], streaming=True, type="numpy")
446
+ d_web_out = gr.Image(label="Live Data Feed")
447
+
448
+ # --- TAB 3: SMART REDACTION ---
449
+ with gr.TabItem("3️⃣ Smart Redaction (Combined)"):
450
+ gr.Markdown("### 🛡️ Intelligent Privacy")
451
+ gr.Markdown("*Scenario: The Solution. Faces are blurred for privacy, but Identities are overlaid for security.*")
452
+
453
+ with gr.Tabs():
454
+ with gr.TabItem("Image"):
455
+ with gr.Row():
456
+ s_img_in = gr.Image(label="Input", type="numpy", height=400)
457
+ with gr.Column():
458
+ s_img_out = gr.Image(label="Smart Redaction Output", height=400)
459
+ s_log_out = gr.Textbox(label="Detection Log", lines=4)
460
+ s_btn = gr.Button("Apply Smart Redaction", variant="primary")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
461
 
462
+ with gr.TabItem("Video"):
463
+ s_vid_in = gr.Video(label="Input Video")
464
+ s_vid_out = gr.Video(label="Smart Redaction Output")
465
+ s_vid_btn = gr.Button("Process Smart Video", variant="primary")
466
+
467
+ with gr.TabItem("Webcam"):
468
+ s_web_in = gr.Image(sources=["webcam"], streaming=True, type="numpy")
469
+ s_web_out = gr.Image(label="Live Smart Feed")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
470
 
471
+ # =========================================
472
+ # EVENT HANDLERS
473
+ # =========================================
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
474
 
475
+ # Tab 1: Privacy (Mode="privacy")
476
+ # Note: We use [0] index for streaming/image to just get the image, ignoring log
477
+ p_btn.click(lambda img: process_frame(img, "privacy")[0], inputs=[p_img_in], outputs=p_img_out)
478
+ p_vid_btn.click(lambda vid: process_video_general(vid, "privacy"), inputs=[p_vid_in], outputs=p_vid_out)
479
+ p_web_in.stream(lambda img: process_frame(img, "privacy")[0], inputs=[p_web_in], outputs=p_web_out)
480
+
481
+ # Tab 2: Data (Mode="data")
482
+ # Image returns [image, log]
483
+ d_btn.click(lambda img: process_frame(img, "data"), inputs=[d_img_in], outputs=[d_img_out, d_log_out])
484
+ d_vid_btn.click(lambda vid: process_video_general(vid, "data"), inputs=[d_vid_in], outputs=d_vid_out)
485
+ d_web_in.stream(lambda img: process_frame(img, "data")[0], inputs=[d_web_in], outputs=d_web_out)
486
+
487
+ # Tab 3: Smart (Mode="smart")
488
+ # Image returns [image, log]
489
+ s_btn.click(lambda img: process_frame(img, "smart"), inputs=[s_img_in], outputs=[s_img_out, s_log_out])
490
+ s_vid_btn.click(lambda vid: process_video_general(vid, "smart"), inputs=[s_vid_in], outputs=s_vid_out)
491
+ s_web_in.stream(lambda img: process_frame(img, "smart")[0], inputs=[s_web_in], outputs=s_web_out)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
492
 
 
 
 
493
  if __name__ == "__main__":
 
494
  try:
495
+ if GLOBAL_DETECTOR:
496
+ logger.info("✅ System Ready. Launching...")
 
 
 
 
 
 
497
  demo.launch()
498
  except Exception as e:
499
+ logger.error(f"Startup Failed: {e}")