ab2207 commited on
Commit
b6f1be7
·
verified ·
1 Parent(s): 6f206ca

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +165 -354
app.py CHANGED
@@ -37,22 +37,27 @@ logger = logging.getLogger(__name__)
37
  # 1. CONFIGURATION & UTILITIES
38
  # ====================================================
39
 
40
- # --- HYBRID PRIVACY SETTINGS ---
41
- CONFIDENCE_THRESHOLD = 0.3 # High Sensitivity (Catches mostly everything)
42
- TARGET_MOSAIC_GRID = 8 # Max Resolution: Face is divided into 12x12 grid, (Lower = Stronger Blur)
43
- MIN_PIXEL_SIZE = 15 # Min Block Size: Blocks cannot be smaller than 12px, (Higher = Stronger Blur for small faces)
44
- COVERAGE_SCALE = 1.1 # 110% Coverage (Padding around face)
 
 
 
 
 
 
 
 
45
 
46
  TEMP_FILES = []
47
 
48
  def cleanup_temp_files():
49
- """Clean up temporary video files on exit."""
50
  for f in TEMP_FILES:
51
  try:
52
- if os.path.exists(f):
53
- os.remove(f)
54
- except Exception:
55
- pass
56
 
57
  atexit.register(cleanup_temp_files)
58
 
@@ -61,484 +66,290 @@ def create_temp_file(suffix=".mp4") -> str:
61
  TEMP_FILES.append(path)
62
  return path
63
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
64
  # ====================================================
65
- # 2. THE DATABASE LAYER (Backend Only - No UI)
66
  # ====================================================
67
  class FaceDatabase:
68
- """
69
- Handles loading known faces from the 'known_faces' folder.
70
- Runs automatically on startup.
71
- """
72
  def __init__(self, db_path="./chroma_db", faces_dir="known_faces"):
73
  self.faces_dir = Path(faces_dir)
74
- self.client = None
75
  self.collection = None
76
  self.is_active = False
77
 
78
  if not DEEPFACE_AVAILABLE or not CHROMADB_AVAILABLE:
79
- logger.warning("❌ Database unavailable (Missing dependencies)")
80
  return
81
 
82
  try:
83
  self.client = chromadb.PersistentClient(path=db_path)
84
- self.collection = self.client.get_or_create_collection(
85
- name="face_embeddings",
86
- metadata={"hnsw:space": "cosine"}
87
- )
88
  self.is_active = True
89
-
90
- # Auto-index on startup
91
- if self.faces_dir.exists():
92
- self._scan_and_index()
93
- else:
94
- self.faces_dir.mkdir(parents=True, exist_ok=True)
95
- logger.info(f"📁 Created {faces_dir} folder. Add images here!")
96
-
97
  except Exception as e:
98
  logger.error(f"❌ DB Init Error: {e}")
99
- self.is_active = False
100
 
101
  def _get_hash(self, img_path: Path) -> str:
102
- with open(img_path, 'rb') as f:
103
- return hashlib.md5(f.read()).hexdigest()
104
 
105
  def _scan_and_index(self):
106
- """Scans folders and adds new images to ChromaDB."""
107
  logger.info("🔄 Scanning 'known_faces' folder...")
108
- count = 0
109
  for person_dir in self.faces_dir.iterdir():
110
  if not person_dir.is_dir(): continue
111
 
112
- # Folder format expectation: "001_John_Doe"
113
  parts = person_dir.name.split('_', 1)
114
- if len(parts) < 2:
115
- # Fallback for folders like "John"
116
- p_id = "000"
117
- p_name = person_dir.name
118
- else:
119
- p_id, p_name = parts[0], parts[1].replace('_', ' ')
120
-
121
- images = list(person_dir.glob("*.jpg")) + list(person_dir.glob("*.png")) + list(person_dir.glob("*.webp"))
122
 
 
123
  for img_path in images:
 
124
  try:
125
  img_hash = self._get_hash(img_path)
126
- # Check if already indexed
127
- if self.collection.get(ids=[img_hash])['ids']:
128
- continue
129
-
130
- # Generate Embedding
131
- embedding_objs = DeepFace.represent(
132
- img_path=str(img_path),
133
- model_name="Facenet512",
134
- enforce_detection=False
135
- )
136
 
 
137
  if embedding_objs:
138
- embedding = embedding_objs[0]["embedding"]
139
  self.collection.add(
140
  ids=[img_hash],
141
- embeddings=[embedding],
142
  metadatas=[{"id": p_id, "name": p_name, "file": img_path.name}]
143
  )
144
- count += 1
145
  logger.info(f"✅ Indexed: {p_name}")
146
  except Exception as e:
147
- logger.error(f"⚠️ Failed to index {img_path.name}: {e}")
148
-
149
- if count > 0:
150
- logger.info(f"📥 Added {count} new faces to database.")
151
- else:
152
- logger.info("ℹ️ Database is up to date.")
153
 
154
  def recognize(self, face_img: np.ndarray) -> Dict[str, Any]:
155
- """Returns {'match': bool, 'name': str, 'id': str, 'color': tuple}"""
156
- # Default response (Unknown / Red)
157
- default = {"match": False, "name": "Unknown", "id": "Unknown", "color": (255, 0, 0)}
158
-
159
- if not self.is_active or self.collection is None or self.collection.count() == 0:
160
- return default
161
 
162
  try:
163
- # Create temp file for DeepFace (it prefers paths)
 
164
  temp_path = "temp_query.jpg"
165
  cv2.imwrite(temp_path, cv2.cvtColor(face_img, cv2.COLOR_RGB2BGR))
166
 
167
- embedding_objs = DeepFace.represent(
168
- img_path=temp_path,
169
- model_name="Facenet512",
170
- enforce_detection=False
171
- )
172
  if os.path.exists(temp_path): os.remove(temp_path)
173
 
174
  if not embedding_objs: return default
175
 
176
- query_embed = embedding_objs[0]["embedding"]
177
- results = self.collection.query(
178
- query_embeddings=[query_embed],
179
- n_results=1
180
- )
181
-
182
  if not results['ids'][0]: return default
183
 
184
  distance = results['distances'][0][0]
185
  metadata = results['metadatas'][0][0]
186
 
187
- # Threshold: Lower is stricter. 0.45 is a good balance for Facenet512
188
- if distance < 0.45:
189
  return {
190
  "match": True,
191
  "name": metadata['name'],
192
  "id": metadata['id'],
193
- "color": (0, 255, 0) # Green for match
194
  }
195
  return default
196
 
197
  except Exception as e:
198
- logger.error(f"Recognition Error: {e}")
199
  return default
200
 
201
  def get_stats(self):
202
- if self.is_active and self.collection:
203
- return f"✅ Active | {self.collection.count()} Faces Indexed"
204
- return "❌ Offline (Check dependencies or 'known_faces' folder)"
205
 
206
- # Singleton DB
207
  FACE_DB = FaceDatabase()
208
 
209
  # ====================================================
210
- # 3. THE UNIFIED DETECTOR (YOLO)
211
  # ====================================================
212
  class Detector:
213
  def __init__(self):
214
- logger.info("📦 Loading YOLOv8-Face...")
215
- try:
216
- self.model = YOLO("yolov8n-face.pt")
217
- logger.info("✅ Model Loaded.")
218
- except Exception as e:
219
- logger.error(f"❌ Model Load Failed: {e}")
220
- raise e
221
 
222
  def detect(self, image: np.ndarray):
223
- # Uses Hardcoded High Sensitivity
224
- results = self.model(image, conf=CONFIDENCE_THRESHOLD, verbose=False)
225
  faces = []
226
  for r in results:
227
  if r.boxes is None: continue
228
  for box in r.boxes:
229
  x1, y1, x2, y2 = map(int, box.xyxy[0])
230
- faces.append({
231
- "box": (x1, y1, x2-x1, y2-y1),
232
- "conf": float(box.conf[0])
233
- })
234
  return faces
235
 
236
  GLOBAL_DETECTOR = Detector()
237
 
238
- # ====================================================
239
- # 4. CORE LOGIC
240
- # ====================================================
241
-
242
  def apply_blur(image, x, y, w, h):
243
- """
244
- HYBRID ADAPTIVE BLUR:
245
- - Uses Adaptive Grid (12 blocks) for Big Faces.
246
- - Uses Min Pixel Size (12px) for Small Faces to force lower resolution.
247
- """
248
- h_img, w_img = image.shape[:2]
249
-
250
- # --- COVERAGE AREA SCALE ---
251
- pad_w = int(w * (COVERAGE_SCALE - 1.0) / 2)
252
- pad_h = int(h * (COVERAGE_SCALE - 1.0) / 2)
253
-
254
- x = max(0, x - pad_w)
255
- y = max(0, y - pad_h)
256
- w = min(w_img - x, w + (2 * pad_w))
257
- h = min(h_img - y, h + (2 * pad_h))
258
-
259
  roi = image[y:y+h, x:x+w]
260
  if roi.size == 0: return image
261
 
262
- h_roi, w_roi = roi.shape[:2]
263
-
264
- # --- HYBRID LOGIC ---
265
- # 1. Max blocks allowed by Adaptive Rule
266
- grid_adaptive = TARGET_MOSAIC_GRID
267
-
268
- # 2. Max blocks allowed by Min Pixel Rule
269
- # If face is 100px wide, and min pixel is 12px, we allow max 8 blocks.
270
- grid_pixel_limit = max(1, w_roi // MIN_PIXEL_SIZE)
271
-
272
- # 3. Take the stricter (lower) grid count
273
- final_grid_size = min(grid_adaptive, grid_pixel_limit)
274
- final_grid_size = max(2, final_grid_size) # Ensure at least 2x2
275
-
276
- # Calculate Block Dimensions
277
- aspect = w_roi / h_roi
278
- target_w = final_grid_size
279
- target_h = int(final_grid_size / aspect)
280
-
281
- target_w = max(2, target_w)
282
- target_h = max(2, target_h)
283
-
284
- # Downscale (Destroy Detail)
285
- small = cv2.resize(roi, (target_w, target_h), interpolation=cv2.INTER_LINEAR)
286
- # Upscale (Pixelate)
287
- pixelated = cv2.resize(small, (w_roi, h_roi), interpolation=cv2.INTER_NEAREST)
288
 
 
 
 
289
  image[y:y+h, x:x+w] = pixelated
290
  return image
291
 
292
- def draw_label(image, x, y, w, h, text, color):
293
  """
294
- 1. Always draw the border (The primary signal).
295
- 2. Attempt to draw text inside.
296
- 3. If text doesn't fit geometrically, hide it automatically.
297
  """
298
- # 1. Dynamic Border Thickness
299
- # Big face = 2px border, Tiny face = 1px border (so it doesn't look like a blob)
300
- thickness = 2 if w > 40 else 1
301
  cv2.rectangle(image, (x, y), (x+w, y+h), color, thickness)
302
 
303
- # 2. Geometry Check: Can we fit text?
304
- # We define a 'Minimum Readable Font' (Scale 0.4 is the smallest legible size)
305
- min_font_scale = 0.4
306
  font = cv2.FONT_HERSHEY_SIMPLEX
307
- font_thickness = 1
 
 
 
 
 
 
 
 
 
 
 
 
 
308
 
309
- (tw, th), _ = cv2.getTextSize(text, font, min_font_scale, font_thickness)
310
-
311
- # PADDING: We need at least 2 pixels on sides
312
- required_width = tw + 4
313
-
314
- # THE RULE: If the text is wider than the face, DO NOT DRAW IT.
315
- if required_width > w:
316
- return
317
-
318
- # 3. If we passed the check, draw it centered
319
- center_x = x + (w // 2) - (tw // 2)
320
- center_y = y + (h // 2) + (th // 2)
321
-
322
- # Background Strip (for readability against blur)
323
- bg_tl = (center_x - 2, center_y - th - 2)
324
- bg_br = (center_x + tw + 2, center_y + 2)
325
- cv2.rectangle(image, bg_tl, bg_br, color, -1)
326
 
327
- # Text
328
- cv2.putText(image, text, (center_x, center_y), font, min_font_scale, (255, 255, 255), font_thickness, cv2.LINE_AA)
329
-
330
 
331
  def process_frame(image, mode):
332
- """
333
- MASTER FUNCTION (Standardized).
334
- """
335
  if image is None: return None, "No Image"
336
 
337
- # 1. Detection
338
  faces = GLOBAL_DETECTOR.detect(image)
339
  processed_img = image.copy()
340
  log_entries = []
341
- labels_to_draw = []
342
 
343
- # =================================================
344
- # PASS 1: BLUR & ANALYZE
345
- # =================================================
346
- for i, face in enumerate(faces):
347
- x, y, w, h = face['box']
348
-
349
- # Default: Unknown/Red
350
- full_log_text = "Unknown"
351
- img_label_text = "Unknown"
352
- color = (255, 0, 0)
353
 
 
 
 
 
 
 
 
 
 
 
 
354
  if mode in ["data", "smart"]:
355
- face_crop = image[y:y+h, x:x+w]
 
 
356
  if face_crop.size > 0:
357
  res = FACE_DB.recognize(face_crop)
358
-
359
  if res['match']:
360
- # Match: Green
361
- full_log_text = f"MATCH - {res['name']} (ID: {res['id']})"
362
- log_entries.append(f"✅ Face #{i+1}: {full_log_text}")
363
- img_label_text = f"ID: {res['id']}"
364
- color = (0, 255, 0)
365
  else:
366
- # No Match: Red
367
- full_log_text = "UNKNOWN"
368
- log_entries.append(f"⚠️ Face #{i+1}: {full_log_text}")
369
- img_label_text = "Unknown"
370
- color = (255, 0, 0)
371
- else:
372
- log_entries.append(f"🔒 Face #{i+1}: Anonymized")
373
-
374
- # Action: Blur
375
- if mode == "privacy" or mode == "smart":
376
- processed_img = apply_blur(processed_img, x, y, w, h)
377
-
378
- # Action: Queue Badge
379
- if mode == "data" or mode == "smart":
380
- labels_to_draw.append({
381
- "x": x, "y": y, "w": w, "h": h,
382
- "text": img_label_text, "color": color
383
- })
384
-
385
- # =================================================
386
- # PASS 2: DRAW BADGES (Top Layer)
387
- # =================================================
388
- for item in labels_to_draw:
389
- draw_label(processed_img, item['x'], item['y'], item['w'], item['h'], item['text'], item['color'])
390
-
391
- # Create Log
392
- if not log_entries:
393
- final_log = "No faces detected."
394
- else:
395
- final_log = "--- Detection Report ---\n" + "\n".join(log_entries)
396
 
 
 
 
 
 
 
 
 
 
 
 
397
  return processed_img, final_log
398
-
399
 
400
  # ====================================================
401
- # 5. VIDEO PROCESSING HELPERS
402
  # ====================================================
403
- def process_video_general(video_path, mode, progress=gr.Progress()):
404
- """Generic video processor."""
405
  if not video_path: return None
406
-
407
  cap = cv2.VideoCapture(video_path)
408
- if not cap.isOpened(): return None
409
-
410
- fps = int(cap.get(cv2.CAP_PROP_FPS))
411
- width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
412
- height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
413
- total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
414
 
415
  out_path = create_temp_file()
416
- fourcc = cv2.VideoWriter_fourcc(*'avc1')
417
- out = cv2.VideoWriter(out_path, fourcc, fps, (width, height))
418
- if not out.isOpened():
419
- out = cv2.VideoWriter(out_path, cv2.VideoWriter_fourcc(*'mp4v'), fps, (width, height))
420
 
 
421
  cnt = 0
422
  while cap.isOpened():
423
  ret, frame = cap.read()
424
  if not ret: break
425
-
426
- # Process using the Master Function (Ignore log for video)
427
  res_frame, _ = process_frame(frame, mode)
428
-
429
  out.write(res_frame)
430
  cnt += 1
431
- if total > 0 and cnt % 10 == 0:
432
- progress(cnt/total, desc=f"Processing Frame {cnt}/{total}")
433
 
434
  cap.release()
435
  out.release()
436
  return out_path
437
 
438
- # ====================================================
439
- # 6. GRADIO INTERFACE
440
- # ====================================================
441
-
442
- with gr.Blocks(theme=gr.themes.Soft(primary_hue="blue"), title="Smart Redaction Demo") as demo:
443
 
444
- gr.Markdown("# 🛡️ Smart Redaction System")
445
- gr.Markdown("### From Raw Privacy to Intelligent Security")
446
-
447
- # Unified Config Info Row
448
- with gr.Row():
449
- gr.Markdown(f"**System Status:** {FACE_DB.get_stats()}")
450
-
451
  with gr.Tabs():
452
-
453
- # --- TAB 1: RAW PRIVACY ---
454
- with gr.TabItem("1️⃣ Raw Privacy (Baseline)"):
455
- gr.Markdown("### 🔒 Total Anonymization")
456
- gr.Markdown("*GDPR Compliance: Everyone is hidden. No data is extracted.*")
457
-
458
- with gr.Tabs():
459
- with gr.TabItem("Image"):
460
- p_img_in = gr.Image(label="Input", type="numpy", height=400)
461
- p_img_out = gr.Image(label="Anonymized Output", height=400)
462
- p_btn = gr.Button("Apply Privacy", variant="primary")
463
-
464
- with gr.TabItem("Video"):
465
- p_vid_in = gr.Video(label="Input Video")
466
- p_vid_out = gr.Video(label="Anonymized Output")
467
- p_vid_btn = gr.Button("Process Video", variant="primary")
468
-
469
- with gr.TabItem("Webcam"):
470
- p_web_in = gr.Image(sources=["webcam"], streaming=True, type="numpy")
471
- p_web_out = gr.Image(label="Live Privacy Feed")
472
-
473
- # --- TAB 2: THE DATA LAYER ---
474
- with gr.TabItem("2️⃣ The Data Layer (Security)"):
475
- gr.Markdown("### 🔍 Recognition & Intelligence")
476
- gr.Markdown("*Security Control Room. We identify Known vs Unknown. No Privacy.*")
477
-
478
- with gr.Tabs():
479
- with gr.TabItem("Image"):
480
- with gr.Row():
481
- d_img_in = gr.Image(label="Input", type="numpy", height=400)
482
- with gr.Column():
483
- d_img_out = gr.Image(label="Data Output", height=400)
484
- d_log_out = gr.Textbox(label="Detection Log", lines=4)
485
- d_btn = gr.Button("Analyze Data", variant="primary")
486
-
487
- with gr.TabItem("Video"):
488
- d_vid_in = gr.Video(label="Input Video")
489
- d_vid_out = gr.Video(label="Data Output")
490
- d_vid_btn = gr.Button("Analyze Video", variant="primary")
491
-
492
- with gr.TabItem("Webcam"):
493
- d_web_in = gr.Image(sources=["webcam"], streaming=True, type="numpy")
494
- d_web_out = gr.Image(label="Live Data Feed")
495
-
496
- # --- TAB 3: SMART REDACTION ---
497
- with gr.TabItem("3️⃣ Smart Redaction (Combined)"):
498
- gr.Markdown("### 🛡️ Intelligent Privacy")
499
- gr.Markdown("*The Solution: Faces are blurred for privacy, but Identities are overlaid for security.*")
500
-
501
- with gr.Tabs():
502
- with gr.TabItem("Image"):
503
- with gr.Row():
504
- s_img_in = gr.Image(label="Input", type="numpy", height=400)
505
- with gr.Column():
506
- s_img_out = gr.Image(label="Smart Redaction Output", height=400)
507
- s_log_out = gr.Textbox(label="Detection Log", lines=4)
508
- s_btn = gr.Button("Apply Smart Redaction", variant="primary")
509
-
510
- with gr.TabItem("Video"):
511
- s_vid_in = gr.Video(label="Input Video")
512
- s_vid_out = gr.Video(label="Smart Redaction Output")
513
- s_vid_btn = gr.Button("Process Smart Video", variant="primary")
514
-
515
- with gr.TabItem("Webcam"):
516
- s_web_in = gr.Image(sources=["webcam"], streaming=True, type="numpy")
517
- s_web_out = gr.Image(label="Live Smart Feed")
518
-
519
- # =========================================
520
- # EVENT HANDLERS
521
- # =========================================
522
-
523
- # Tab 1: Privacy (Mode="privacy")
524
- p_btn.click(lambda img: process_frame(img, "privacy")[0], inputs=[p_img_in], outputs=p_img_out)
525
- p_vid_btn.click(lambda vid: process_video_general(vid, "privacy"), inputs=[p_vid_in], outputs=p_vid_out)
526
- p_web_in.stream(lambda img: process_frame(img, "privacy")[0], inputs=[p_web_in], outputs=p_web_out)
527
-
528
- # Tab 2: Data (Mode="data")
529
- d_btn.click(lambda img: process_frame(img, "data"), inputs=[d_img_in], outputs=[d_img_out, d_log_out])
530
- d_vid_btn.click(lambda vid: process_video_general(vid, "data"), inputs=[d_vid_in], outputs=d_vid_out)
531
- d_web_in.stream(lambda img: process_frame(img, "data")[0], inputs=[d_web_in], outputs=d_web_out)
532
-
533
- # Tab 3: Smart (Mode="smart")
534
- s_btn.click(lambda img: process_frame(img, "smart"), inputs=[s_img_in], outputs=[s_img_out, s_log_out])
535
- s_vid_btn.click(lambda vid: process_video_general(vid, "smart"), inputs=[s_vid_in], outputs=s_vid_out)
536
- s_web_in.stream(lambda img: process_frame(img, "smart")[0], inputs=[s_web_in], outputs=s_web_out)
537
 
538
  if __name__ == "__main__":
539
- try:
540
- if GLOBAL_DETECTOR:
541
- logger.info("✅ System Ready. Launching...")
542
- demo.launch()
543
- except Exception as e:
544
- logger.error(f"Startup Failed: {e}")
 
37
  # 1. CONFIGURATION & UTILITIES
38
  # ====================================================
39
 
40
+ # --- TUNED PARAMETERS ---
41
+ # 1. Detection Sensitivity
42
+ DETECTION_CONFIDENCE = 0.4
43
+
44
+ # 2. Recognition Strictness (LOWER = STRICTER)
45
+ # 0.30 is the sweet spot for Facenet512.
46
+ # Anything above 0.40 causes the "Lupita/Spader" identity confusion.
47
+ RECOGNITION_THRESHOLD = 0.30
48
+
49
+ # 3. Visual Settings
50
+ TARGET_MOSAIC_GRID = 10 # Resolution of the blur
51
+ MIN_PIXEL_SIZE = 12 # Minimum pixel block size
52
+ COVERAGE_SCALE = 1.2 # 120% Coverage (Padding around face to catch hair/ears)
53
 
54
  TEMP_FILES = []
55
 
56
  def cleanup_temp_files():
 
57
  for f in TEMP_FILES:
58
  try:
59
+ if os.path.exists(f): os.remove(f)
60
+ except Exception: pass
 
 
61
 
62
  atexit.register(cleanup_temp_files)
63
 
 
66
  TEMP_FILES.append(path)
67
  return path
68
 
69
+ def get_padded_coords(image, x, y, w, h, scale=COVERAGE_SCALE):
70
+ """
71
+ UNIFIED COORDINATE SYSTEM:
72
+ Calculates the padded coordinates once so Blur and Box match perfectly.
73
+ """
74
+ h_img, w_img = image.shape[:2]
75
+
76
+ # Calculate padding amount
77
+ pad_w = int(w * (scale - 1.0) / 2)
78
+ pad_h = int(h * (scale - 1.0) / 2)
79
+
80
+ # Apply padding with boundary checks
81
+ new_x = max(0, x - pad_w)
82
+ new_y = max(0, y - pad_h)
83
+ new_w = min(w_img - new_x, w + (2 * pad_w))
84
+ new_h = min(h_img - new_y, h + (2 * pad_h))
85
+
86
+ return new_x, new_y, new_w, new_h
87
+
88
  # ====================================================
89
+ # 2. THE DATABASE LAYER
90
  # ====================================================
91
  class FaceDatabase:
 
 
 
 
92
  def __init__(self, db_path="./chroma_db", faces_dir="known_faces"):
93
  self.faces_dir = Path(faces_dir)
 
94
  self.collection = None
95
  self.is_active = False
96
 
97
  if not DEEPFACE_AVAILABLE or not CHROMADB_AVAILABLE:
 
98
  return
99
 
100
  try:
101
  self.client = chromadb.PersistentClient(path=db_path)
102
+ self.collection = self.client.get_or_create_collection(name="face_embeddings", metadata={"hnsw:space": "cosine"})
 
 
 
103
  self.is_active = True
104
+ if self.faces_dir.exists(): self._scan_and_index()
105
+ else: self.faces_dir.mkdir(parents=True, exist_ok=True)
 
 
 
 
 
 
106
  except Exception as e:
107
  logger.error(f"❌ DB Init Error: {e}")
 
108
 
109
  def _get_hash(self, img_path: Path) -> str:
110
+ with open(img_path, 'rb') as f: return hashlib.md5(f.read()).hexdigest()
 
111
 
112
  def _scan_and_index(self):
 
113
  logger.info("🔄 Scanning 'known_faces' folder...")
 
114
  for person_dir in self.faces_dir.iterdir():
115
  if not person_dir.is_dir(): continue
116
 
 
117
  parts = person_dir.name.split('_', 1)
118
+ p_id = parts[0] if len(parts) > 1 else "000"
119
+ p_name = parts[1].replace('_', ' ') if len(parts) > 1 else person_dir.name
 
 
 
 
 
 
120
 
121
+ images = list(person_dir.glob("*.*"))
122
  for img_path in images:
123
+ if img_path.suffix.lower() not in ['.jpg', '.png', '.webp', '.jpeg']: continue
124
  try:
125
  img_hash = self._get_hash(img_path)
126
+ if self.collection.get(ids=[img_hash])['ids']: continue
 
 
 
 
 
 
 
 
 
127
 
128
+ embedding_objs = DeepFace.represent(img_path=str(img_path), model_name="Facenet512", enforce_detection=False)
129
  if embedding_objs:
 
130
  self.collection.add(
131
  ids=[img_hash],
132
+ embeddings=[embedding_objs[0]["embedding"]],
133
  metadatas=[{"id": p_id, "name": p_name, "file": img_path.name}]
134
  )
 
135
  logger.info(f"✅ Indexed: {p_name}")
136
  except Exception as e:
137
+ logger.error(f"⚠️ Skip {img_path.name}: {e}")
 
 
 
 
 
138
 
139
  def recognize(self, face_img: np.ndarray) -> Dict[str, Any]:
140
+ default = {"match": False, "name": "Unknown", "id": "Unknown", "color": (255, 0, 0)} # Red
141
+ if not self.is_active or self.collection.count() == 0: return default
 
 
 
 
142
 
143
  try:
144
+ # DeepFace expects BGR or Path. Convert RGB->BGR just in case.
145
+ # Using a temp file ensures DeepFace preprocessing runs consistently.
146
  temp_path = "temp_query.jpg"
147
  cv2.imwrite(temp_path, cv2.cvtColor(face_img, cv2.COLOR_RGB2BGR))
148
 
149
+ embedding_objs = DeepFace.represent(img_path=temp_path, model_name="Facenet512", enforce_detection=False)
 
 
 
 
150
  if os.path.exists(temp_path): os.remove(temp_path)
151
 
152
  if not embedding_objs: return default
153
 
154
+ results = self.collection.query(query_embeddings=[embedding_objs[0]["embedding"]], n_results=1)
 
 
 
 
 
155
  if not results['ids'][0]: return default
156
 
157
  distance = results['distances'][0][0]
158
  metadata = results['metadatas'][0][0]
159
 
160
+ # --- SECURITY FIX: STRICT THRESHOLD ---
161
+ if distance < RECOGNITION_THRESHOLD:
162
  return {
163
  "match": True,
164
  "name": metadata['name'],
165
  "id": metadata['id'],
166
+ "color": (0, 255, 0) # Green
167
  }
168
  return default
169
 
170
  except Exception as e:
 
171
  return default
172
 
173
  def get_stats(self):
174
+ return f"✅ Active | {self.collection.count()} Faces" if (self.is_active and self.collection) else "❌ Offline"
 
 
175
 
 
176
  FACE_DB = FaceDatabase()
177
 
178
  # ====================================================
179
+ # 3. DETECTOR & DRAWING LOGIC
180
  # ====================================================
181
  class Detector:
182
  def __init__(self):
183
+ self.model = YOLO("yolov8n-face.pt")
 
 
 
 
 
 
184
 
185
  def detect(self, image: np.ndarray):
186
+ results = self.model(image, conf=DETECTION_CONFIDENCE, verbose=False)
 
187
  faces = []
188
  for r in results:
189
  if r.boxes is None: continue
190
  for box in r.boxes:
191
  x1, y1, x2, y2 = map(int, box.xyxy[0])
192
+ faces.append((x1, y1, x2-x1, y2-y1)) # Return as X, Y, W, H
 
 
 
193
  return faces
194
 
195
  GLOBAL_DETECTOR = Detector()
196
 
 
 
 
 
197
  def apply_blur(image, x, y, w, h):
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
198
  roi = image[y:y+h, x:x+w]
199
  if roi.size == 0: return image
200
 
201
+ # Adaptive Grid Logic
202
+ grid_pixel_limit = max(1, w // MIN_PIXEL_SIZE)
203
+ final_grid_size = max(2, min(TARGET_MOSAIC_GRID, grid_pixel_limit))
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
204
 
205
+ # Blur
206
+ small = cv2.resize(roi, (final_grid_size, final_grid_size), interpolation=cv2.INTER_LINEAR)
207
+ pixelated = cv2.resize(small, (w, h), interpolation=cv2.INTER_NEAREST)
208
  image[y:y+h, x:x+w] = pixelated
209
  return image
210
 
211
+ def draw_smart_label(image, x, y, w, h, text, color):
212
  """
213
+ UX FIX: Draws the label OUTSIDE the face box.
 
 
214
  """
215
+ # 1. Draw the Bounding Box
216
+ thickness = 2
 
217
  cv2.rectangle(image, (x, y), (x+w, y+h), color, thickness)
218
 
219
+ # 2. Prepare Text
 
 
220
  font = cv2.FONT_HERSHEY_SIMPLEX
221
+ font_scale = 0.6 # Slightly larger for readability
222
+ font_thick = 2
223
+ (tw, th), baseline = cv2.getTextSize(text, font, font_scale, font_thick)
224
+
225
+ # 3. Smart Positioning (Top vs Bottom)
226
+ # Default to TOP. If face is at y=0, flip to BOTTOM.
227
+ text_y = y - 10
228
+ if y - th - 15 < 0:
229
+ text_y = y + h + th + 10
230
+
231
+ # 4. Draw Background Box (Header Style)
232
+ # Center the text horizontally relative to the face box
233
+ center_x = x + (w // 2)
234
+ text_x = center_x - (tw // 2)
235
 
236
+ # Background rectangle for text
237
+ pad = 5
238
+ cv2.rectangle(image,
239
+ (text_x - pad, text_y - th - pad),
240
+ (text_x + tw + pad, text_y + pad),
241
+ color, -1) # Filled
 
 
 
 
 
 
 
 
 
 
 
242
 
243
+ # 5. Draw Text
244
+ cv2.putText(image, text, (text_x, text_y), font, font_scale, (255, 255, 255), font_thick, cv2.LINE_AA)
 
245
 
246
  def process_frame(image, mode):
 
 
 
247
  if image is None: return None, "No Image"
248
 
 
249
  faces = GLOBAL_DETECTOR.detect(image)
250
  processed_img = image.copy()
251
  log_entries = []
 
252
 
253
+ # Queue for drawing so labels appear ON TOP of blur
254
+ draw_queue = []
255
+
256
+ for i, (raw_x, raw_y, raw_w, raw_h) in enumerate(faces):
 
 
 
 
 
 
257
 
258
+ # --- STEP 1: CALCULATE UNIFIED COORDINATES ---
259
+ # We use these padded coordinates for EVERYTHING (Crop, Blur, Box)
260
+ # This prevents the "Bleeding" visual glitch.
261
+ px, py, pw, ph = get_padded_coords(processed_img, raw_x, raw_y, raw_w, raw_h)
262
+
263
+ # Defaults
264
+ label_text = "Unknown"
265
+ box_color = (200, 0, 0) # Dark Red default
266
+ log_text = "Unknown"
267
+
268
+ # --- STEP 2: RECOGNITION (Data/Smart Mode) ---
269
  if mode in ["data", "smart"]:
270
+ # Crop using the PADDED area to give the model more context (hair, chin)
271
+ face_crop = processed_img[py:py+ph, px:px+pw]
272
+
273
  if face_crop.size > 0:
274
  res = FACE_DB.recognize(face_crop)
 
275
  if res['match']:
276
+ label_text = f"{res['name']} ({res['id']})"
277
+ box_color = (0, 200, 0) # Green
278
+ log_text = f"MATCH: {res['name']}"
 
 
279
  else:
280
+ log_text = "Unknown Person"
281
+
282
+ draw_queue.append((px, py, pw, ph, label_text, box_color))
283
+ log_entries.append(f"Face #{i+1}: {log_text}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
284
 
285
+ # --- STEP 3: PRIVACY (Privacy/Smart Mode) ---
286
+ if mode in ["privacy", "smart"]:
287
+ processed_img = apply_blur(processed_img, px, py, pw, ph)
288
+ if mode == "privacy":
289
+ log_entries.append(f"Face #{i+1}: Redacted")
290
+
291
+ # --- STEP 4: DRAW UI (Last Layer) ---
292
+ for (dx, dy, dw, dh, txt, col) in draw_queue:
293
+ draw_smart_label(processed_img, dx, dy, dw, dh, txt, col)
294
+
295
+ final_log = "--- Report ---\n" + "\n".join(log_entries) if log_entries else "No faces."
296
  return processed_img, final_log
 
297
 
298
  # ====================================================
299
+ # 4. VIDEO & GRADIO SETUP
300
  # ====================================================
301
+ def process_video(video_path, mode, progress=gr.Progress()):
 
302
  if not video_path: return None
 
303
  cap = cv2.VideoCapture(video_path)
304
+ fps = cap.get(cv2.CAP_PROP_FPS)
305
+ width, height = int(cap.get(3)), int(cap.get(4))
 
 
 
 
306
 
307
  out_path = create_temp_file()
308
+ out = cv2.VideoWriter(out_path, cv2.VideoWriter_fourcc(*'mp4v'), fps, (width, height))
 
 
 
309
 
310
+ total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
311
  cnt = 0
312
  while cap.isOpened():
313
  ret, frame = cap.read()
314
  if not ret: break
 
 
315
  res_frame, _ = process_frame(frame, mode)
 
316
  out.write(res_frame)
317
  cnt += 1
318
+ if cnt % 10 == 0: progress(cnt/total)
 
319
 
320
  cap.release()
321
  out.release()
322
  return out_path
323
 
324
+ with gr.Blocks(theme=gr.themes.Soft(), title="Secure Redaction V2") as demo:
325
+ gr.Markdown("# 🛡️ Smart Redaction System (Patched V2)")
326
+ gr.Markdown(f"**Engine Status:** {FACE_DB.get_stats()} | **Security Threshold:** {RECOGNITION_THRESHOLD}")
 
 
327
 
 
 
 
 
 
 
 
328
  with gr.Tabs():
329
+ with gr.Tab("1️⃣ Raw Privacy"):
330
+ with gr.Row():
331
+ p_in = gr.Image(label="Input", type="numpy")
332
+ p_out = gr.Image(label="Redacted Output")
333
+ p_btn = gr.Button("Anonymize", variant="primary")
334
+ p_btn.click(lambda x: process_frame(x, "privacy")[0], p_in, p_out)
335
+
336
+ with gr.Tab("2️⃣ Security Data"):
337
+ with gr.Row():
338
+ d_in = gr.Image(label="Input", type="numpy")
339
+ with gr.Column():
340
+ d_out = gr.Image(label="Analyst View (Clear Face)")
341
+ d_log = gr.Textbox(label="Logs")
342
+ d_btn = gr.Button("Analyze", variant="primary")
343
+ d_btn.click(lambda x: process_frame(x, "data"), d_in, [d_out, d_log])
344
+
345
+ with gr.Tab("3️⃣ Smart Mode"):
346
+ with gr.Row():
347
+ s_in = gr.Image(label="Input", type="numpy")
348
+ with gr.Column():
349
+ s_out = gr.Image(label="Smart Output (Blurred + ID)")
350
+ s_log = gr.Textbox(label="Logs")
351
+ s_btn = gr.Button("Execute Smart Redaction", variant="primary")
352
+ s_btn.click(lambda x: process_frame(x, "smart"), s_in, [s_out, s_log])
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
353
 
354
  if __name__ == "__main__":
355
+ demo.launch()