ab2207 commited on
Commit
4215391
·
verified ·
1 Parent(s): a0ce7d9

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +417 -334
app.py CHANGED
@@ -1,14 +1,10 @@
1
- # [FINAL] Intelligent Face Privacy Tool
2
- # Merges YOLOv8 detection, DeepFace recognition, and blurring into a single, robust workflow.
3
-
4
  # --- Standard Libraries ---
5
  import logging
6
  import atexit
7
  import tempfile
8
  import os
9
  import hashlib
10
- from abc import ABC, abstractmethod
11
- from dataclasses import dataclass, field
12
  from typing import Any, Dict, List, Tuple, Optional
13
  from pathlib import Path
14
 
@@ -18,37 +14,46 @@ import numpy as np
18
  import gradio as gr
19
  from ultralytics import YOLO
20
 
21
- # --- Optional Face Recognition & Database Libraries ---
22
  try:
23
  from deepface import DeepFace
24
  DEEPFACE_AVAILABLE = True
25
  except ImportError:
26
  DEEPFACE_AVAILABLE = False
27
- logging.warning("⚠️ DeepFace not installed. Recognition features will be disabled. Install with: pip install deepface")
28
 
29
  try:
30
  import chromadb
31
  CHROMADB_AVAILABLE = True
32
  except ImportError:
33
  CHROMADB_AVAILABLE = False
34
- logging.warning("⚠️ ChromaDB not installed. Recognition features will be disabled. Install with: pip install chromadb")
35
 
36
  # --- Configure Logging ---
37
  logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
38
  logger = logging.getLogger(__name__)
39
 
40
  # ====================================================
41
- # SHARED UTILITIES
42
  # ====================================================
43
 
44
- # --- Temporary File Cleanup ---
 
 
 
 
 
45
  TEMP_FILES = []
 
46
  def cleanup_temp_files():
 
47
  for f in TEMP_FILES:
48
  try:
49
- if os.path.exists(f): os.remove(f); logger.info(f"🗑️ Cleaned up temp file: {f}")
50
- except Exception as e:
51
- logger.warning(f"⚠️ Failed to delete temp file {f}: {e}")
 
 
52
  atexit.register(cleanup_temp_files)
53
 
54
  def create_temp_file(suffix=".mp4") -> str:
@@ -56,370 +61,448 @@ def create_temp_file(suffix=".mp4") -> str:
56
  TEMP_FILES.append(path)
57
  return path
58
 
59
- # --- Sensitivity Mapping ---
60
- SENSITIVITY_MAP = {"Low (Catch More)": 0.3, "Balanced (Default)": 0.5, "High (Very Strict)": 0.7}
61
- def get_confidence_from_sensitivity(sensitivity: str) -> float:
62
- return SENSITIVITY_MAP.get(sensitivity, 0.5)
63
-
64
  # ====================================================
65
- # CONFIGURATION DATA CLASSES
66
  # ====================================================
67
-
68
- @dataclass
69
- class BlurConfig:
70
- type: str = "pixelate"
71
- intensity: float = 25.0
72
- pixel_size: int = 25
73
- scaling_factor: float = 1.1
74
-
75
- @dataclass
76
- class DetectionConfig:
77
- model_path: str = "yolov8n-face.pt"
78
-
79
- # ====================================================
80
- # BLUR EFFECTS (Strategy Pattern)
81
- # ====================================================
82
-
83
- class BlurEffect(ABC):
84
- def __init__(self, config: BlurConfig): self.config = config
85
- @abstractmethod
86
- def apply(self, image: np.ndarray, roi: Tuple[int, int, int, int]) -> np.ndarray: pass
87
-
88
- class PixelateBlur(BlurEffect):
89
- def apply(self, image: np.ndarray, roi: Tuple[int, int, int, int]) -> np.ndarray:
90
- x, y, w, h = roi
91
- face_roi = image[y:y+h, x:x+w]
92
- if face_roi.size == 0: return image
93
- h_roi, w_roi = face_roi.shape[:2]
94
- pixel_size = max(1, self.config.pixel_size)
95
- small = cv2.resize(face_roi, (max(1, w_roi // pixel_size), max(1, h_roi // pixel_size)), interpolation=cv2.INTER_LINEAR)
96
- pixelated = cv2.resize(small, (w_roi, h_roi), interpolation=cv2.INTER_NEAREST)
97
- image[y:y+h, x:x+w] = pixelated
98
- return image
99
-
100
- class GaussianBlur(BlurEffect):
101
- def apply(self, image: np.ndarray, roi: Tuple[int, int, int, int]) -> np.ndarray:
102
- x, y, w, h = roi
103
- face_roi = image[y:y+h, x:x+w]
104
- if face_roi.size == 0: return image
105
- kernel_val = int(self.config.intensity) | 1
106
- blurred_roi = cv2.GaussianBlur(face_roi, (kernel_val, kernel_val), 0)
107
- image[y:y+h, x:x+w] = blurred_roi
108
- return image
109
-
110
- def get_blur_effect(config: BlurConfig) -> BlurEffect:
111
- effects = {"pixelate": PixelateBlur, "gaussian": GaussianBlur}
112
- cls = effects.get(config.type)
113
- if not cls: raise ValueError(f"Unknown blur type: {config.type}")
114
- return cls(config)
115
-
116
- # ====================================================
117
- # FACE DATABASE (ChromaDB + DeepFace)
118
- # ====================================================
119
-
120
  class FaceDatabase:
121
- def __init__(self, db_path: str = "./chroma_db", known_faces_dir: str = "known_faces", model_name: str = "Facenet512"):
122
- self.model_name = model_name
123
- self.known_faces_dir = Path(known_faces_dir)
124
- self.is_ready = False
 
 
 
 
 
 
125
  if not DEEPFACE_AVAILABLE or not CHROMADB_AVAILABLE:
126
- logger.error("❌ DeepFace or ChromaDB not available. Face recognition is disabled.")
127
  return
128
 
129
- logger.info("🔧 Initializing Face Database...")
130
  try:
131
- client = chromadb.PersistentClient(path=db_path)
132
- self.collection = client.get_or_create_collection(name="face_embeddings", metadata={"hnsw:space": "cosine"})
133
- self._index_known_faces()
134
- self.is_ready = self.collection.count() > 0
135
- if not self.is_ready:
136
- logger.warning("⚠️ Face database is empty. All faces will be marked as 'Unknown'.")
 
 
 
 
137
  else:
138
- logger.info(f"✅ Face database ready with {self.collection.count()} known faces.")
 
 
139
  except Exception as e:
140
- logger.error(f"❌ ChromaDB initialization failed: {e}")
141
-
142
- def _get_image_hash(self, img_path: Path) -> str:
143
- with open(img_path, 'rb') as f: return hashlib.md5(f.read()).hexdigest()
144
-
145
- def _index_known_faces(self):
146
- if not self.known_faces_dir.exists():
147
- logger.info(f"📁 Creating '{self.known_faces_dir}' directory for known faces.")
148
- self.known_faces_dir.mkdir(parents=True, exist_ok=True)
149
- return
150
-
151
- logger.info(f"🔄 Scanning '{self.known_faces_dir}' for faces to index...")
152
- indexed_count = 0
153
- for person_dir in [d for d in self.known_faces_dir.iterdir() if d.is_dir()]:
154
- folder_name = person_dir.name
155
- parts = folder_name.split('_', 1)
156
- if len(parts) != 2:
157
- logger.warning(f"⚠️ Skipping folder '{folder_name}'. Use format 'ID_Name' (e.g., '001_John_Doe').")
158
- continue
159
- person_id, person_name = parts[0], parts[1].replace('_', ' ').title()
160
- for img_path in list(person_dir.glob("*.[jp][pn]g")) + list(person_dir.glob("*.webp")):
 
 
 
 
 
161
  try:
162
- img_hash = self._get_image_hash(img_path)
163
- if self.collection.get(ids=[img_hash])['ids']: continue
164
- embedding = DeepFace.represent(str(img_path), model_name=self.model_name, enforce_detection=False)[0]["embedding"]
165
- self.collection.add(embeddings=[embedding], metadatas=[{"id": person_id, "name": person_name}], ids=[img_hash])
166
- indexed_count += 1
167
- logger.info(f"✅ Indexed: {person_name} (ID: {person_id}) from {img_path.name}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
168
  except Exception as e:
169
- logger.error(f" Failed to index {img_path}: {e}")
170
- if indexed_count > 0:
171
- logger.info(f"💾 Indexed {indexed_count} new faces.")
 
 
 
 
 
 
 
 
 
 
 
172
 
173
- def recognize_face(self, face_image: np.ndarray, threshold: float = 0.4) -> Dict[str, Any]:
174
- if not self.is_ready:
175
- return {"match": False, "name": "Unknown"}
176
  try:
177
- embedding = DeepFace.represent(face_image, model_name=self.model_name, enforce_detection=False)[0]["embedding"]
178
- results = self.collection.query(query_embeddings=[embedding], n_results=1, include=["metadatas", "distances"])
179
- if not results['ids'][0]:
180
- return {"match": False, "name": "Unknown"}
181
- distance, metadata = results['distances'][0][0], results['metadatas'][0][0]
182
- if distance < threshold:
183
- return {"match": True, "name": metadata.get('name', 'Known'), "id": metadata.get('id', 'N/A')}
184
- return {"match": False, "name": "Unknown"}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
185
  except Exception as e:
186
- logger.warning(f"⚠️ Recognition failed for a face: {e}")
187
- return {"match": False, "name": "Unknown"}
 
 
 
 
 
 
 
 
188
 
189
  # ====================================================
190
- # UNIFIED FACE DETECTOR
191
  # ====================================================
192
-
193
- class YOLOv8FaceDetector:
194
- def __init__(self, config: DetectionConfig):
195
  try:
196
- self.model = YOLO(config.model_path)
197
- logger.info("✅ YOLOv8 face model loaded successfully.")
198
  except Exception as e:
199
- raise RuntimeError(f"❌ Failed to load YOLOv8 model: {e}")
200
-
201
- def detect_faces(self, image: np.ndarray, conf_threshold: float, face_db: Optional[FaceDatabase]) -> List[Dict[str, Any]]:
202
- results = self.model(image, conf=conf_threshold, verbose=False)
203
- detected_faces = []
 
 
204
  for r in results:
205
  if r.boxes is None: continue
206
  for box in r.boxes:
207
  x1, y1, x2, y2 = map(int, box.xyxy[0])
208
- face_info = {"box": (x1, y1, x2 - x1, y2 - y1)}
209
-
210
- # Perform recognition if a database is provided
211
- if face_db and face_db.is_ready:
212
- face_crop = image[y1:y2, x1:x2]
213
- if face_crop.size > 0:
214
- rec_result = face_db.recognize_face(face_crop)
215
- face_info.update(rec_result)
216
- else:
217
- face_info.update({"match": False, "name": "Unknown"})
218
- else:
219
- face_info.update({"match": False, "name": "Unknown"})
220
-
221
- detected_faces.append(face_info)
222
- return detected_faces
223
 
224
  # ====================================================
225
- # CORE APPLICATION LOGIC
226
  # ====================================================
227
 
228
- class IntelligentPrivacyApp:
229
- def __init__(self, blur_config: BlurConfig, detector: YOLOv8FaceDetector, face_db: Optional[FaceDatabase]):
230
- self.blur_effect = get_blur_effect(blur_config)
231
- self.detector = detector
232
- self.face_db = face_db
233
- self.blur_config = blur_config
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
234
 
235
- def _expand_bbox(self, box: Tuple[int, int, int, int]) -> Tuple[int, int, int, int]:
236
- x, y, w, h = box
237
- new_w = int(w * self.blur_config.scaling_factor)
238
- new_h = int(h * self.blur_config.scaling_factor)
239
- x_offset, y_offset = (new_w - w) // 2, (new_h - h) // 2
240
- return max(0, x - x_offset), max(0, y - y_offset), new_w, new_h
241
-
242
- def process_frame(self, frame: np.ndarray, conf_threshold: float, mode: str) -> np.ndarray:
243
- processed_frame = frame.copy()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
244
 
245
- try:
246
- faces = self.detector.detect_faces(processed_frame, conf_threshold, self.face_db if mode == "intelligent" else None)
247
- except Exception as e:
248
- logger.error(f"❌ Face detection failed: {e}")
249
- gr.Warning("Face detection process failed. Cannot process frame.")
250
- return frame # Return original frame on critical error
251
-
252
- for face in faces:
253
- # Always apply blur
254
- expanded_box = self._expand_bbox(face["box"])
255
- processed_frame = self.blur_effect.apply(processed_frame, expanded_box)
256
-
257
- # Apply label on top of blur if in intelligent mode
258
- if mode == "intelligent":
259
- is_known = face.get("match", False)
260
- label = face.get("name", "Unknown") if is_known else "Unknown"
261
- color = (0, 255, 0) if is_known else (255, 0, 0)
262
-
263
- x, y, _, _ = face["box"]
264
- (w, h), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2)
265
-
266
- # Draw label background
267
- label_y_pos = y - 10
268
- bg_y1 = label_y_pos - h - 5
269
- bg_y2 = label_y_pos + 5
270
- cv2.rectangle(processed_frame, (x, bg_y1), (x + w, bg_y2), color, -1)
271
 
272
- # Draw label text
273
- cv2.putText(processed_frame, label, (x, label_y_pos), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
274
 
275
- return processed_frame
276
 
277
  # ====================================================
278
- # GRADIO HANDLER FUNCTIONS
279
  # ====================================================
280
-
281
- # --- Global Instances ---
282
- DETECTOR = YOLOv8FaceDetector(DetectionConfig())
283
- FACE_DB = FaceDatabase()
284
-
285
- def get_app_instance(blur_type: str, blur_amount: float, blur_size: float) -> IntelligentPrivacyApp:
286
- blur_config = BlurConfig(
287
- type=blur_type,
288
- intensity=blur_amount,
289
- pixel_size=int(blur_amount),
290
- scaling_factor=blur_size
291
- )
292
- return IntelligentPrivacyApp(blur_config, DETECTOR, FACE_DB)
293
-
294
- def process_media(media, blur_type, blur_amount, blur_size, sensitivity, mode, progress=gr.Progress()):
295
- if media is None: return None, "No media provided."
296
 
297
- app = get_app_instance(blur_type, blur_amount, blur_size)
298
- confidence = get_confidence_from_sensitivity(sensitivity)
299
-
300
- # --- Image Processing ---
301
- if isinstance(media, np.ndarray):
302
- result = app.process_frame(media, confidence, mode)
303
- return result, "Image processing complete."
304
 
305
- # --- Video Processing ---
306
- if hasattr(media, 'name'):
307
- try:
308
- cap = cv2.VideoCapture(media.name)
309
- if not cap.isOpened(): return None, "❌ Cannot open video file."
310
-
311
- out_path = create_temp_file()
312
- fourcc = cv2.VideoWriter_fourcc(*'mp4v')
313
- fps, w, h = cap.get(cv2.CAP_PROP_FPS), int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)), int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
314
- total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
315
- out_vid = cv2.VideoWriter(out_path, fourcc, fps, (w, h))
316
-
317
- for frame_num in progress.tqdm(range(total_frames), desc="Processing Video"):
318
- ret, frame = cap.read()
319
- if not ret: break
320
- processed_frame = app.process_frame(frame, confidence, mode)
321
- out_vid.write(processed_frame)
322
-
323
- cap.release(); out_vid.release()
324
- return out_path, f"✅ Video processing complete ({total_frames} frames)."
325
- except Exception as e:
326
- logger.error(f"❌ Video processing error: {e}")
327
- gr.Error(f"Video processing failed: {e}")
328
- return None, f"❌ Error: {e}"
329
 
330
- return None, "Unsupported media type."
331
-
332
- def process_stream(frame, blur_type, blur_amount, blur_size, sensitivity, mode):
333
- if frame is None: return None
334
- app = get_app_instance(blur_type, blur_amount, blur_size)
335
- confidence = get_confidence_from_sensitivity(sensitivity)
336
- return app.process_frame(frame, confidence, mode)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
337
 
338
  # ====================================================
339
- # GRADIO UI
340
  # ====================================================
341
- with gr.Blocks(theme=gr.themes.Soft(primary_hue="blue"), title="Intelligent Face Privacy Tool") as demo:
342
- gr.Markdown("# 🎯 Intelligent Face Privacy Tool")
343
- gr.Markdown("An advanced tool to apply context-aware privacy to faces in media. Faces are blurred and labeled as 'Known' or 'Unknown'.")
344
-
345
- if not DEEPFACE_AVAILABLE or not CHROMADB_AVAILABLE:
346
- gr.Markdown("### ⚠️ Recognition Features Disabled\n`deepface` or `chromadb` not found. 'Intelligent Privacy' mode will label all faces as 'Unknown'.\nInstall with: `pip install deepface chromadb`")
347
 
 
 
 
 
 
 
348
  with gr.Row():
349
- # ========== SETTINGS SIDEBAR ==========
350
- with gr.Column(scale=1):
351
- with gr.Column(variant="panel"):
352
- gr.Markdown("### ⚙️ Processing Settings")
353
- detection_sensitivity = gr.Radio(
354
- choices=list(SENSITIVITY_MAP.keys()), value="Balanced (Default)",
355
- label="Detection Sensitivity", info="How strict the face detection should be."
356
- )
357
- with gr.Accordion("Blur Customization", open=True):
358
- blur_type = gr.Radio(["pixelate", "gaussian"], value="pixelate", label="Blur Type")
359
- blur_amount = gr.Slider(5, 100, step=1, value=25, label="Blur Intensity / Pixel Size")
360
- blur_size = gr.Slider(1.0, 1.5, step=0.05, value=1.1, label="Coverage Area (Scale)")
361
-
362
- # ========== MAIN CONTENT AREA ==========
363
- with gr.Column(scale=3):
364
  with gr.Tabs():
365
- # ========== INTELLIGENT PRIVACY TAB ==========
366
- with gr.TabItem("🎯 Intelligent Privacy"):
367
- gr.Markdown("### Blur all faces and label them as 'Known' or 'Unknown'.")
368
- with gr.Tabs() as intelligent_tabs:
369
- with gr.TabItem("📷 Image"):
370
- with gr.Row():
371
- img_in_intel = gr.Image(sources=["upload", "clipboard"], type="numpy", label="Input Image", height=400)
372
- img_out_intel = gr.Image(type="numpy", label="Processed Image", height=400)
373
- img_status_intel = gr.Markdown()
374
- intel_img_btn = gr.Button("Apply Intelligent Privacy", variant="primary")
375
- with gr.TabItem("🎥 Video"):
376
- with gr.Row():
377
- vid_in_intel = gr.File(file_types=[".mp4", ".mov"], label="Input Video")
378
- with gr.Column():
379
- vid_out_intel = gr.Video(label="Processed Video", height=400)
380
- vid_status_intel = gr.Markdown()
381
- intel_vid_btn = gr.Button("Process Video", variant="primary")
382
- with gr.TabItem("📹 Webcam"):
383
- with gr.Row():
384
- web_in_intel = gr.Image(sources=["webcam"], streaming=True, type="numpy", label="Live Webcam", height=400)
385
- web_out_intel = gr.Image(type="numpy", label="Processed Feed", height=400)
386
-
387
- # ========== SIMPLE PRIVACY TAB ==========
388
- with gr.TabItem("🔒 Simple Privacy"):
389
- gr.Markdown("### Blur all faces without any labels. Useful for total anonymization.")
390
- with gr.Tabs():
391
- with gr.TabItem("📷 Image"):
392
- with gr.Row():
393
- img_in_simple = gr.Image(sources=["upload", "clipboard"], type="numpy", label="Input Image", height=400)
394
- img_out_simple = gr.Image(type="numpy", label="Protected Image", height=400)
395
- img_status_simple = gr.Markdown()
396
- simple_img_btn = gr.Button("Apply Simple Blur", variant="secondary")
397
- with gr.TabItem("🎥 Video"):
398
- with gr.Row():
399
- vid_in_simple = gr.File(file_types=[".mp4", ".mov"], label="Input Video")
400
- with gr.Column():
401
- vid_out_simple = gr.Video(label="Protected Video", height=400)
402
- vid_status_simple = gr.Markdown()
403
- simple_vid_btn = gr.Button("Process Video", variant="secondary")
404
- with gr.TabItem("📹 Webcam"):
405
- with gr.Row():
406
- web_in_simple = gr.Image(sources=["webcam"], streaming=True, type="numpy", label="Live Webcam", height=400)
407
- web_out_simple = gr.Image(type="numpy", label="Protected Feed", height=400)
408
-
409
- # ========== EVENT HANDLERS ==========
410
- # --- Intelligent Privacy Mode ---
411
- intel_img_btn.click(lambda *args: process_media(*args, mode="intelligent"), [img_in_intel, blur_type, blur_amount, blur_size, detection_sensitivity], [img_out_intel, img_status_intel])
412
- intel_vid_btn.click(lambda *args: process_media(*args, mode="intelligent"), [vid_in_intel, blur_type, blur_amount, blur_size, detection_sensitivity], [vid_out_intel, vid_status_intel])
413
- web_in_intel.stream(lambda *args: process_stream(*args, mode="intelligent"), [web_in_intel, blur_type, blur_amount, blur_size, detection_sensitivity], web_out_intel)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
414
 
415
- # --- Simple Privacy Mode ---
416
- simple_img_btn.click(lambda *args: process_media(*args, mode="simple"), [img_in_simple, blur_type, blur_amount, blur_size, detection_sensitivity], [img_out_simple, img_status_simple])
417
- simple_vid_btn.click(lambda *args: process_media(*args, mode="simple"), [vid_in_simple, blur_type, blur_amount, blur_size, detection_sensitivity], [vid_out_simple, vid_status_simple])
418
- web_in_simple.stream(lambda *args: process_stream(*args, mode="simple"), [web_in_simple, blur_type, blur_amount, blur_size, detection_sensitivity], web_out_simple)
 
 
 
 
 
 
 
 
 
 
419
 
420
- # ====================================================
421
- # MAIN ENTRY POINT
422
- # ====================================================
423
  if __name__ == "__main__":
424
- logger.info("🚀 Initializing Intelligent Face Privacy Tool...")
425
- demo.launch()
 
 
 
 
 
 
 
 
1
  # --- Standard Libraries ---
2
  import logging
3
  import atexit
4
  import tempfile
5
  import os
6
  import hashlib
7
+ from dataclasses import dataclass
 
8
  from typing import Any, Dict, List, Tuple, Optional
9
  from pathlib import Path
10
 
 
14
  import gradio as gr
15
  from ultralytics import YOLO
16
 
17
+ # --- Face Recognition Libraries (Optional / Fallback) ---
18
  try:
19
  from deepface import DeepFace
20
  DEEPFACE_AVAILABLE = True
21
  except ImportError:
22
  DEEPFACE_AVAILABLE = False
23
+ logging.warning("⚠️ DeepFace not installed - Recognition will be disabled (Fallback Mode).")
24
 
25
  try:
26
  import chromadb
27
  CHROMADB_AVAILABLE = True
28
  except ImportError:
29
  CHROMADB_AVAILABLE = False
30
+ logging.warning("⚠️ ChromaDB not installed - Database features disabled.")
31
 
32
  # --- Configure Logging ---
33
  logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
34
  logger = logging.getLogger(__name__)
35
 
36
  # ====================================================
37
+ # 1. CONFIGURATION & UTILITIES
38
  # ====================================================
39
 
40
+ # --- HYBRID PRIVACY SETTINGS ---
41
+ CONFIDENCE_THRESHOLD = 0.3 # High Sensitivity (Catches mostly everything)
42
+ TARGET_MOSAIC_GRID = 12 # Max Resolution: Face is divided into 12x12 grid
43
+ MIN_PIXEL_SIZE = 12 # Min Block Size: Blocks cannot be smaller than 12px
44
+ COVERAGE_SCALE = 1.1 # 110% Coverage (Padding around face)
45
+
46
  TEMP_FILES = []
47
+
48
  def cleanup_temp_files():
49
+ """Clean up temporary video files on exit."""
50
  for f in TEMP_FILES:
51
  try:
52
+ if os.path.exists(f):
53
+ os.remove(f)
54
+ except Exception:
55
+ pass
56
+
57
  atexit.register(cleanup_temp_files)
58
 
59
  def create_temp_file(suffix=".mp4") -> str:
 
61
  TEMP_FILES.append(path)
62
  return path
63
 
 
 
 
 
 
64
  # ====================================================
65
+ # 2. THE DATABASE LAYER (Backend Only - No UI)
66
  # ====================================================
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
67
  class FaceDatabase:
68
+ """
69
+ Handles loading known faces from the 'known_faces' folder.
70
+ Runs automatically on startup.
71
+ """
72
+ def __init__(self, db_path="./chroma_db", faces_dir="known_faces"):
73
+ self.faces_dir = Path(faces_dir)
74
+ self.client = None
75
+ self.collection = None
76
+ self.is_active = False
77
+
78
  if not DEEPFACE_AVAILABLE or not CHROMADB_AVAILABLE:
79
+ logger.warning("❌ Database unavailable (Missing dependencies)")
80
  return
81
 
 
82
  try:
83
+ self.client = chromadb.PersistentClient(path=db_path)
84
+ self.collection = self.client.get_or_create_collection(
85
+ name="face_embeddings",
86
+ metadata={"hnsw:space": "cosine"}
87
+ )
88
+ self.is_active = True
89
+
90
+ # Auto-index on startup
91
+ if self.faces_dir.exists():
92
+ self._scan_and_index()
93
  else:
94
+ self.faces_dir.mkdir(parents=True, exist_ok=True)
95
+ logger.info(f"📁 Created {faces_dir} folder. Add images here!")
96
+
97
  except Exception as e:
98
+ logger.error(f"❌ DB Init Error: {e}")
99
+ self.is_active = False
100
+
101
+ def _get_hash(self, img_path: Path) -> str:
102
+ with open(img_path, 'rb') as f:
103
+ return hashlib.md5(f.read()).hexdigest()
104
+
105
+ def _scan_and_index(self):
106
+ """Scans folders and adds new images to ChromaDB."""
107
+ logger.info("🔄 Scanning 'known_faces' folder...")
108
+ count = 0
109
+ for person_dir in self.faces_dir.iterdir():
110
+ if not person_dir.is_dir(): continue
111
+
112
+ # Folder format expectation: "001_John_Doe"
113
+ parts = person_dir.name.split('_', 1)
114
+ if len(parts) < 2:
115
+ # Fallback for folders like "John"
116
+ p_id = "000"
117
+ p_name = person_dir.name
118
+ else:
119
+ p_id, p_name = parts[0], parts[1].replace('_', ' ')
120
+
121
+ images = list(person_dir.glob("*.jpg")) + list(person_dir.glob("*.png")) + list(person_dir.glob("*.webp"))
122
+
123
+ for img_path in images:
124
  try:
125
+ img_hash = self._get_hash(img_path)
126
+ # Check if already indexed
127
+ if self.collection.get(ids=[img_hash])['ids']:
128
+ continue
129
+
130
+ # Generate Embedding
131
+ embedding_objs = DeepFace.represent(
132
+ img_path=str(img_path),
133
+ model_name="Facenet512",
134
+ enforce_detection=False
135
+ )
136
+
137
+ if embedding_objs:
138
+ embedding = embedding_objs[0]["embedding"]
139
+ self.collection.add(
140
+ ids=[img_hash],
141
+ embeddings=[embedding],
142
+ metadatas=[{"id": p_id, "name": p_name, "file": img_path.name}]
143
+ )
144
+ count += 1
145
+ logger.info(f"✅ Indexed: {p_name}")
146
  except Exception as e:
147
+ logger.error(f"⚠️ Failed to index {img_path.name}: {e}")
148
+
149
+ if count > 0:
150
+ logger.info(f"📥 Added {count} new faces to database.")
151
+ else:
152
+ logger.info("ℹ️ Database is up to date.")
153
+
154
+ def recognize(self, face_img: np.ndarray) -> Dict[str, Any]:
155
+ """Returns {'match': bool, 'name': str, 'id': str, 'color': tuple}"""
156
+ # Default response (Unknown / Red)
157
+ default = {"match": False, "name": "Unknown", "id": "Unknown", "color": (255, 0, 0)}
158
+
159
+ if not self.is_active or self.collection is None or self.collection.count() == 0:
160
+ return default
161
 
 
 
 
162
  try:
163
+ # Create temp file for DeepFace (it prefers paths)
164
+ temp_path = "temp_query.jpg"
165
+ cv2.imwrite(temp_path, cv2.cvtColor(face_img, cv2.COLOR_RGB2BGR))
166
+
167
+ embedding_objs = DeepFace.represent(
168
+ img_path=temp_path,
169
+ model_name="Facenet512",
170
+ enforce_detection=False
171
+ )
172
+ if os.path.exists(temp_path): os.remove(temp_path)
173
+
174
+ if not embedding_objs: return default
175
+
176
+ query_embed = embedding_objs[0]["embedding"]
177
+ results = self.collection.query(
178
+ query_embeddings=[query_embed],
179
+ n_results=1
180
+ )
181
+
182
+ if not results['ids'][0]: return default
183
+
184
+ distance = results['distances'][0][0]
185
+ metadata = results['metadatas'][0][0]
186
+
187
+ # Threshold: Lower is stricter. 0.45 is a good balance for Facenet512
188
+ if distance < 0.45:
189
+ return {
190
+ "match": True,
191
+ "name": metadata['name'],
192
+ "id": metadata['id'],
193
+ "color": (0, 255, 0) # Green for match
194
+ }
195
+ return default
196
+
197
  except Exception as e:
198
+ logger.error(f"Recognition Error: {e}")
199
+ return default
200
+
201
+ def get_stats(self):
202
+ if self.is_active and self.collection:
203
+ return f"✅ Active | {self.collection.count()} Faces Indexed"
204
+ return "❌ Offline (Check dependencies or 'known_faces' folder)"
205
+
206
+ # Singleton DB
207
+ FACE_DB = FaceDatabase()
208
 
209
  # ====================================================
210
+ # 3. THE UNIFIED DETECTOR (YOLO)
211
  # ====================================================
212
+ class Detector:
213
+ def __init__(self):
214
+ logger.info("📦 Loading YOLOv8-Face...")
215
  try:
216
+ self.model = YOLO("yolov8n-face.pt")
217
+ logger.info("✅ Model Loaded.")
218
  except Exception as e:
219
+ logger.error(f"❌ Model Load Failed: {e}")
220
+ raise e
221
+
222
+ def detect(self, image: np.ndarray):
223
+ # Uses Hardcoded High Sensitivity
224
+ results = self.model(image, conf=CONFIDENCE_THRESHOLD, verbose=False)
225
+ faces = []
226
  for r in results:
227
  if r.boxes is None: continue
228
  for box in r.boxes:
229
  x1, y1, x2, y2 = map(int, box.xyxy[0])
230
+ faces.append({
231
+ "box": (x1, y1, x2-x1, y2-y1),
232
+ "conf": float(box.conf[0])
233
+ })
234
+ return faces
235
+
236
+ GLOBAL_DETECTOR = Detector()
 
 
 
 
 
 
 
 
237
 
238
  # ====================================================
239
+ # 4. CORE LOGIC
240
  # ====================================================
241
 
242
+ def apply_blur(image, x, y, w, h):
243
+ """
244
+ HYBRID ADAPTIVE BLUR:
245
+ - Uses Adaptive Grid (12 blocks) for Big Faces.
246
+ - Uses Min Pixel Size (12px) for Small Faces to force lower resolution.
247
+ """
248
+ h_img, w_img = image.shape[:2]
249
+
250
+ # --- COVERAGE AREA SCALE ---
251
+ pad_w = int(w * (COVERAGE_SCALE - 1.0) / 2)
252
+ pad_h = int(h * (COVERAGE_SCALE - 1.0) / 2)
253
+
254
+ x = max(0, x - pad_w)
255
+ y = max(0, y - pad_h)
256
+ w = min(w_img - x, w + (2 * pad_w))
257
+ h = min(h_img - y, h + (2 * pad_h))
258
+
259
+ roi = image[y:y+h, x:x+w]
260
+ if roi.size == 0: return image
261
+
262
+ h_roi, w_roi = roi.shape[:2]
263
+
264
+ # --- HYBRID LOGIC ---
265
+ # 1. Max blocks allowed by Adaptive Rule
266
+ grid_adaptive = TARGET_MOSAIC_GRID
267
+
268
+ # 2. Max blocks allowed by Min Pixel Rule
269
+ # If face is 100px wide, and min pixel is 12px, we allow max 8 blocks.
270
+ grid_pixel_limit = max(1, w_roi // MIN_PIXEL_SIZE)
271
+
272
+ # 3. Take the stricter (lower) grid count
273
+ final_grid_size = min(grid_adaptive, grid_pixel_limit)
274
+ final_grid_size = max(2, final_grid_size) # Ensure at least 2x2
275
+
276
+ # Calculate Block Dimensions
277
+ aspect = w_roi / h_roi
278
+ target_w = final_grid_size
279
+ target_h = int(final_grid_size / aspect)
280
+
281
+ target_w = max(2, target_w)
282
+ target_h = max(2, target_h)
283
+
284
+ # Downscale (Destroy Detail)
285
+ small = cv2.resize(roi, (target_w, target_h), interpolation=cv2.INTER_LINEAR)
286
+ # Upscale (Pixelate)
287
+ pixelated = cv2.resize(small, (w_roi, h_roi), interpolation=cv2.INTER_NEAREST)
288
 
289
+ image[y:y+h, x:x+w] = pixelated
290
+ return image
291
+
292
+ def draw_label(image, x, y, w, text, color, on_blur=False):
293
+ """Annotation step: Draws text."""
294
+ font = cv2.FONT_HERSHEY_SIMPLEX
295
+ scale = 0.6
296
+ thickness = 2
297
+ (tw, th), _ = cv2.getTextSize(text, font, scale, thickness)
298
+
299
+ if on_blur:
300
+ center_x = x + (w // 2) - (tw // 2)
301
+ center_y = y + (w // 2)
302
+ cv2.rectangle(image, (center_x - 5, center_y - th - 5), (center_x + tw + 5, center_y + 5), color, -1)
303
+ cv2.putText(image, text, (center_x, center_y), font, scale, (255, 255, 255), thickness)
304
+ else:
305
+ cv2.rectangle(image, (x, y - th - 10), (x + tw + 10, y), color, -1)
306
+ cv2.putText(image, text, (x + 5, y - 5), font, scale, (255, 255, 255), thickness)
307
+
308
+ def process_frame(image, mode):
309
+ """
310
+ THE MASTER FUNCTION.
311
+ Returns: (processed_image, log_string)
312
+ """
313
+ if image is None: return None, "No Image"
314
+
315
+ # 1. Detection
316
+ faces = GLOBAL_DETECTOR.detect(image)
317
+ processed_img = image.copy()
318
+ log_entries = []
319
+
320
+ for i, face in enumerate(faces):
321
+ x, y, w, h = face['box']
322
 
323
+ # 2. Analysis
324
+ identity = {"name": "", "color": (0, 255, 0)}
325
+
326
+ if mode in ["data", "smart"]:
327
+ # Crop and Check DB
328
+ face_crop = image[y:y+h, x:x+w]
329
+ if face_crop.size > 0:
330
+ res = FACE_DB.recognize(face_crop)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
331
 
332
+ if res['match']:
333
+ label_text = f"ID: {res['id']} ({res['name']})"
334
+ log_entries.append(f"✅ Face #{i+1}: MATCH - {res['name']} (ID: {res['id']})")
335
+ else:
336
+ label_text = "Unknown"
337
+ log_entries.append(f"⚠️ Face #{i+1}: UNKNOWN")
338
+
339
+ identity = {"name": label_text, "color": res['color']}
340
+ else:
341
+ log_entries.append(f"🔒 Face #{i+1}: Anonymized")
342
+
343
+ # 3. Modification
344
+ if mode == "privacy":
345
+ processed_img = apply_blur(processed_img, x, y, w, h)
346
+
347
+ elif mode == "data":
348
+ cv2.rectangle(processed_img, (x, y), (x+w, y+h), identity['color'], 2)
349
+ draw_label(processed_img, x, y, w, identity['name'], identity['color'], on_blur=False)
350
+
351
+ elif mode == "smart":
352
+ processed_img = apply_blur(processed_img, x, y, w, h)
353
+ draw_label(processed_img, x, y, w, identity['name'], identity['color'], on_blur=True)
354
+
355
+ # Create Log String
356
+ if not log_entries:
357
+ final_log = "No faces detected."
358
+ else:
359
+ final_log = "--- Detection Report ---\n" + "\n".join(log_entries)
360
 
361
+ return processed_img, final_log
362
 
363
  # ====================================================
364
+ # 5. VIDEO PROCESSING HELPERS
365
  # ====================================================
366
+ def process_video_general(video_path, mode, progress=gr.Progress()):
367
+ """Generic video processor."""
368
+ if not video_path: return None
 
 
 
 
 
 
 
 
 
 
 
 
 
369
 
370
+ cap = cv2.VideoCapture(video_path)
371
+ if not cap.isOpened(): return None
 
 
 
 
 
372
 
373
+ fps = int(cap.get(cv2.CAP_PROP_FPS))
374
+ width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
375
+ height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
376
+ total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
377
 
378
+ out_path = create_temp_file()
379
+ fourcc = cv2.VideoWriter_fourcc(*'avc1')
380
+ out = cv2.VideoWriter(out_path, fourcc, fps, (width, height))
381
+ if not out.isOpened():
382
+ out = cv2.VideoWriter(out_path, cv2.VideoWriter_fourcc(*'mp4v'), fps, (width, height))
383
+
384
+ cnt = 0
385
+ while cap.isOpened():
386
+ ret, frame = cap.read()
387
+ if not ret: break
388
+
389
+ # Process using the Master Function (Ignore log for video)
390
+ res_frame, _ = process_frame(frame, mode)
391
+
392
+ out.write(res_frame)
393
+ cnt += 1
394
+ if total > 0 and cnt % 10 == 0:
395
+ progress(cnt/total, desc=f"Processing Frame {cnt}/{total}")
396
+
397
+ cap.release()
398
+ out.release()
399
+ return out_path
400
 
401
  # ====================================================
402
+ # 6. GRADIO INTERFACE
403
  # ====================================================
 
 
 
 
 
 
404
 
405
+ with gr.Blocks(theme=gr.themes.Soft(primary_hue="blue"), title="Smart Redaction Demo") as demo:
406
+
407
+ gr.Markdown("# 🛡️ Smart Redaction System")
408
+ gr.Markdown("### From Raw Privacy to Intelligent Security")
409
+
410
+ # Unified Config Info Row
411
  with gr.Row():
412
+ gr.Markdown(f"**System Status:** {FACE_DB.get_stats()}")
413
+ gr.Markdown(f"**Config:** Hybrid Pixelate | High Sensitivity | 110% Coverage")
414
+
415
+ with gr.Tabs():
416
+
417
+ # --- TAB 1: RAW PRIVACY ---
418
+ with gr.TabItem("1️⃣ Raw Privacy (Baseline)"):
419
+ gr.Markdown("### 🔒 Total Anonymization")
420
+ gr.Markdown("*Scenario: GDPR Compliance. Everyone is hidden. No data is extracted.*")
421
+
 
 
 
 
 
422
  with gr.Tabs():
423
+ with gr.TabItem("Image"):
424
+ p_img_in = gr.Image(label="Input", type="numpy", height=400)
425
+ p_img_out = gr.Image(label="Anonymized Output", height=400)
426
+ p_btn = gr.Button("Apply Privacy", variant="primary")
427
+
428
+ with gr.TabItem("Video"):
429
+ p_vid_in = gr.Video(label="Input Video")
430
+ p_vid_out = gr.Video(label="Anonymized Output")
431
+ p_vid_btn = gr.Button("Process Video", variant="primary")
432
+
433
+ with gr.TabItem("Webcam"):
434
+ p_web_in = gr.Image(sources=["webcam"], streaming=True, type="numpy")
435
+ p_web_out = gr.Image(label="Live Privacy Feed")
436
+
437
+ # --- TAB 2: THE DATA LAYER ---
438
+ with gr.TabItem("2️⃣ The Data Layer (Security)"):
439
+ gr.Markdown("### 🔍 Recognition & Intelligence")
440
+ gr.Markdown("*Scenario: Security Control Room. We identify Known vs Unknown. No Privacy.*")
441
+
442
+ with gr.Tabs():
443
+ with gr.TabItem("Image"):
444
+ with gr.Row():
445
+ d_img_in = gr.Image(label="Input", type="numpy", height=400)
446
+ with gr.Column():
447
+ d_img_out = gr.Image(label="Data Output", height=400)
448
+ d_log_out = gr.Textbox(label="Detection Log", lines=4)
449
+ d_btn = gr.Button("Analyze Data", variant="primary")
450
+
451
+ with gr.TabItem("Video"):
452
+ d_vid_in = gr.Video(label="Input Video")
453
+ d_vid_out = gr.Video(label="Data Output")
454
+ d_vid_btn = gr.Button("Analyze Video", variant="primary")
455
+
456
+ with gr.TabItem("Webcam"):
457
+ d_web_in = gr.Image(sources=["webcam"], streaming=True, type="numpy")
458
+ d_web_out = gr.Image(label="Live Data Feed")
459
+
460
+ # --- TAB 3: SMART REDACTION ---
461
+ with gr.TabItem("3️⃣ Smart Redaction (Combined)"):
462
+ gr.Markdown("### 🛡️ Intelligent Privacy")
463
+ gr.Markdown("*Scenario: The Solution. Faces are blurred for privacy, but Identities are overlaid for security.*")
464
+
465
+ with gr.Tabs():
466
+ with gr.TabItem("Image"):
467
+ with gr.Row():
468
+ s_img_in = gr.Image(label="Input", type="numpy", height=400)
469
+ with gr.Column():
470
+ s_img_out = gr.Image(label="Smart Redaction Output", height=400)
471
+ s_log_out = gr.Textbox(label="Detection Log", lines=4)
472
+ s_btn = gr.Button("Apply Smart Redaction", variant="primary")
473
+
474
+ with gr.TabItem("Video"):
475
+ s_vid_in = gr.Video(label="Input Video")
476
+ s_vid_out = gr.Video(label="Smart Redaction Output")
477
+ s_vid_btn = gr.Button("Process Smart Video", variant="primary")
478
+
479
+ with gr.TabItem("Webcam"):
480
+ s_web_in = gr.Image(sources=["webcam"], streaming=True, type="numpy")
481
+ s_web_out = gr.Image(label="Live Smart Feed")
482
+
483
+ # =========================================
484
+ # EVENT HANDLERS
485
+ # =========================================
486
 
487
+ # Tab 1: Privacy (Mode="privacy")
488
+ p_btn.click(lambda img: process_frame(img, "privacy")[0], inputs=[p_img_in], outputs=p_img_out)
489
+ p_vid_btn.click(lambda vid: process_video_general(vid, "privacy"), inputs=[p_vid_in], outputs=p_vid_out)
490
+ p_web_in.stream(lambda img: process_frame(img, "privacy")[0], inputs=[p_web_in], outputs=p_web_out)
491
+
492
+ # Tab 2: Data (Mode="data")
493
+ d_btn.click(lambda img: process_frame(img, "data"), inputs=[d_img_in], outputs=[d_img_out, d_log_out])
494
+ d_vid_btn.click(lambda vid: process_video_general(vid, "data"), inputs=[d_vid_in], outputs=d_vid_out)
495
+ d_web_in.stream(lambda img: process_frame(img, "data")[0], inputs=[d_web_in], outputs=d_web_out)
496
+
497
+ # Tab 3: Smart (Mode="smart")
498
+ s_btn.click(lambda img: process_frame(img, "smart"), inputs=[s_img_in], outputs=[s_img_out, s_log_out])
499
+ s_vid_btn.click(lambda vid: process_video_general(vid, "smart"), inputs=[s_vid_in], outputs=s_vid_out)
500
+ s_web_in.stream(lambda img: process_frame(img, "smart")[0], inputs=[s_web_in], outputs=s_web_out)
501
 
 
 
 
502
  if __name__ == "__main__":
503
+ try:
504
+ if GLOBAL_DETECTOR:
505
+ logger.info("✅ System Ready. Launching...")
506
+ demo.launch()
507
+ except Exception as e:
508
+ logger.error(f"Startup Failed: {e}")