ab2207 commited on
Commit
a5bc248
·
verified ·
1 Parent(s): 513ef60
Files changed (1) hide show
  1. app.py +381 -762
app.py CHANGED
@@ -1,16 +1,12 @@
1
- """
2
- Face Recognition Tool (YOLO + DeepFace + ChromaDB)
3
- """
4
-
5
  # --- Standard Libraries ---
6
  import logging
7
  import atexit
8
  import tempfile
9
  import os
 
10
  from dataclasses import dataclass
11
  from typing import Any, Dict, List, Tuple, Optional
12
  from pathlib import Path
13
- import hashlib
14
 
15
  # --- Computer Vision & UI Libraries ---
16
  import cv2
@@ -18,884 +14,507 @@ import numpy as np
18
  import gradio as gr
19
  from ultralytics import YOLO
20
 
21
- # --- HuggingFace Hub API ---
22
- try:
23
- from huggingface_hub import HfApi
24
- HF_HUB_AVAILABLE = True
25
- except ImportError:
26
- HF_HUB_AVAILABLE = False
27
- logging.warning("⚠️ huggingface_hub not installed - auto-commit disabled")
28
-
29
- # --- Face Recognition Libraries ---
30
  try:
31
  from deepface import DeepFace
32
  DEEPFACE_AVAILABLE = True
33
  except ImportError:
34
  DEEPFACE_AVAILABLE = False
35
- logging.warning("⚠️ DeepFace not installed")
36
 
37
  try:
38
  import chromadb
39
- from chromadb.config import Settings
40
  CHROMADB_AVAILABLE = True
41
  except ImportError:
42
  CHROMADB_AVAILABLE = False
43
- logging.warning("⚠️ ChromaDB not installed")
44
 
45
  # --- Configure Logging ---
46
  logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
47
  logger = logging.getLogger(__name__)
48
 
49
  # ====================================================
50
- # TEMPORARY FILE CLEANUP
51
  # ====================================================
 
52
  TEMP_FILES = []
53
 
54
  def cleanup_temp_files():
55
- """Clean up temporary files on exit."""
56
  for f in TEMP_FILES:
57
  try:
58
  if os.path.exists(f):
59
  os.remove(f)
60
- logger.info(f"🗑️ Cleaned up: {f}")
61
- except Exception as e:
62
- logger.warning(f"⚠️ Failed to delete {f}: {e}")
63
 
64
  atexit.register(cleanup_temp_files)
65
 
66
  def create_temp_file(suffix=".mp4") -> str:
67
- """Creates a temporary file and registers it for cleanup."""
68
  path = tempfile.mktemp(suffix=suffix)
69
  TEMP_FILES.append(path)
70
  return path
71
 
72
  # ====================================================
73
- # SCALABLE FACE DATABASE (ChromaDB)
74
  # ====================================================
75
- class ScalableFaceDatabase:
76
- """Production-ready face recognition system using ChromaDB."""
77
-
78
- def __init__(
79
- self,
80
- known_faces_dir: str = "known_faces",
81
- db_path: str = "./chroma_db",
82
- model_name: str = "Facenet512"
83
- ):
84
- self.known_faces_dir = Path(known_faces_dir)
85
- self.model_name = model_name
86
- self.db_path = db_path
87
-
88
- # HuggingFace Hub API for auto-commit
89
- self.hf_token = os.getenv("HF_TOKEN")
90
- self.space_id = os.getenv("SPACE_ID")
91
-
92
- if HF_HUB_AVAILABLE and self.hf_token and self.space_id:
93
- self.hf_api = HfApi()
94
- logger.info(f"✅ Auto-commit enabled for Space: {self.space_id}")
95
- else:
96
- self.hf_api = None
97
- if not self.hf_token:
98
- logger.warning("⚠️ HF_TOKEN not set - auto-commit disabled")
99
- if not self.space_id:
100
- logger.warning("⚠️ SPACE_ID not set - auto-commit disabled")
101
-
102
- if not CHROMADB_AVAILABLE:
103
- logger.error("❌ ChromaDB not available")
104
- self.client = None
105
- self.collection = None
106
  return
107
-
108
- # Initialize ChromaDB
109
- logger.info("🔧 Initializing ChromaDB...")
110
  try:
111
  self.client = chromadb.PersistentClient(path=db_path)
112
-
113
  self.collection = self.client.get_or_create_collection(
114
  name="face_embeddings",
115
  metadata={"hnsw:space": "cosine"}
116
  )
 
117
 
118
- existing_count = self.collection.count()
119
- logger.info(f"📊 Database contains {existing_count} face embeddings")
120
-
121
- if self.known_faces_dir.exists():
122
- self._index_faces_from_folders()
123
  else:
124
- logger.info(f"📁 Creating {self.known_faces_dir} directory...")
125
- self.known_faces_dir.mkdir(parents=True, exist_ok=True)
126
 
127
  except Exception as e:
128
- logger.error(f"❌ ChromaDB initialization failed: {e}")
129
- self.client = None
130
- self.collection = None
131
-
132
- def _get_image_hash(self, img_path: Path) -> str:
133
- """Generate unique hash for an image."""
134
  with open(img_path, 'rb') as f:
135
  return hashlib.md5(f.read()).hexdigest()
136
-
137
- def _index_faces_from_folders(self):
138
- """Auto-index faces from known_faces/ folder structure."""
139
- logger.info("🔄 Scanning for faces to index...")
140
-
141
- person_dirs = [d for d in self.known_faces_dir.iterdir() if d.is_dir()]
142
- indexed_count = 0
143
-
144
- for person_dir in person_dirs:
145
- folder_name = person_dir.name
146
- parts = folder_name.split('_', 1)
147
-
148
- if len(parts) != 2:
149
- logger.warning(f"⚠️ Skipping '{folder_name}' - use format: 'ID_Name'")
150
- continue
151
-
152
- person_id, person_name = parts
153
- person_name = person_name.replace('_', ' ').title()
154
 
155
- image_files = list(person_dir.glob("*.jpg")) + \
156
- list(person_dir.glob("*.png")) + \
157
- list(person_dir.glob("*.jpeg")) + \
158
- list(person_dir.glob("*.webp")) + \
159
- list(person_dir.glob("*.avif")) + \
160
- list(person_dir.glob("*.bmp")) + \
161
- list(person_dir.glob("*.tiff"))
162
 
163
- for img_path in image_files:
164
  try:
165
- img_hash = self._get_image_hash(img_path)
166
- existing = self.collection.get(ids=[img_hash], include=[])
167
-
168
- if existing['ids']:
169
  continue
170
-
171
- if not DEEPFACE_AVAILABLE:
172
- logger.warning("⚠️ DeepFace not available, skipping indexing")
173
- return
174
-
175
- embedding_obj = DeepFace.represent(
176
- img_path=str(img_path),
177
- model_name=self.model_name,
178
  enforce_detection=False
179
  )
180
 
181
- if not embedding_obj:
182
- continue
183
-
184
- embedding = embedding_obj[0]["embedding"]
185
-
186
- self.collection.add(
187
- embeddings=[embedding],
188
- documents=[str(img_path)],
189
- metadatas=[{
190
- "person_id": person_id,
191
- "person_name": person_name,
192
- "image_file": img_path.name
193
- }],
194
- ids=[img_hash]
195
- )
196
-
197
- indexed_count += 1
198
- logger.info(f"✅ Indexed: {person_name} (ID: {person_id}) - {img_path.name}")
199
-
200
  except Exception as e:
201
- logger.error(f" Failed to index {img_path}: {e}")
202
 
203
- if indexed_count > 0:
204
- logger.info(f"💾 Indexed {indexed_count} new face(s)")
205
- logger.info(f"📊 Total faces in database: {self.collection.count()}")
206
  else:
207
- logger.info("ℹ️ No new faces to index")
208
-
209
- def add_person(self, person_id: str, person_name: str, face_image: np.ndarray) -> Tuple[bool, str]:
210
- """Add a new person to the database and commit to git."""
211
- if not CHROMADB_AVAILABLE or self.collection is None:
212
- return False, " ChromaDB not available"
213
-
214
- if not DEEPFACE_AVAILABLE:
215
- return False, "❌ DeepFace not available"
216
-
217
- try:
218
- folder_name = f"{person_id}_{person_name.lower().replace(' ', '_')}"
219
- person_dir = self.known_faces_dir / folder_name
220
- person_dir.mkdir(parents=True, exist_ok=True)
221
-
222
- # ✅ NEW: Auto-increment filename for multiple images
223
- existing_images = list(person_dir.glob(f"{person_id}_*.jpg"))
224
- next_num = len(existing_images) + 1
225
- img_filename = f"{person_id}_{next_num}.jpg"
226
- img_path = person_dir / img_filename
227
-
228
- cv2.imwrite(str(img_path), cv2.cvtColor(face_image, cv2.COLOR_RGB2BGR))
229
- logger.info(f"💾 Saved image: {img_path}")
230
-
231
- commit_success = False
232
- if self.hf_api and self.hf_token and self.space_id:
233
- try:
234
- self.hf_api.upload_file(
235
- path_or_fileobj=str(img_path),
236
- path_in_repo=f"known_faces/{folder_name}/{img_filename}",
237
- repo_id=self.space_id,
238
- repo_type="space",
239
- token=self.hf_token,
240
- commit_message=f"Add image #{next_num} for {person_name} (ID: {person_id})"
241
- )
242
- logger.info(f"✅ Committed to git!")
243
- commit_success = True
244
- except Exception as e:
245
- logger.warning(f"⚠️ Git commit failed: {e}")
246
-
247
- embedding_obj = DeepFace.represent(
248
- img_path=str(img_path),
249
- model_name=self.model_name,
250
- enforce_detection=True
251
- )
252
-
253
- if not embedding_obj:
254
- return False, "❌ No face detected in image"
255
-
256
- embedding = embedding_obj[0]["embedding"]
257
- img_hash = self._get_image_hash(img_path)
258
-
259
- self.collection.add(
260
- embeddings=[embedding],
261
- documents=[str(img_path)],
262
- metadatas=[{
263
- "person_id": person_id,
264
- "person_name": person_name,
265
- "image_file": img_filename
266
- }],
267
- ids=[img_hash]
268
- )
269
-
270
- success_msg = f"✅ Added image #{next_num} for {person_name} (ID: {person_id})\n"
271
- if commit_success:
272
- success_msg += "🔒 Committed to git - PERMANENT!"
273
- else:
274
- success_msg += "⚠️ Saved locally only"
275
-
276
- return True, success_msg
277
-
278
- except Exception as e:
279
- logger.error(f"❌ Error: {e}")
280
- return False, f"❌ Error: {str(e)}"
281
-
282
- def recognize_face(
283
- self,
284
- face_image: np.ndarray,
285
- threshold: float = 0.45
286
- ) -> Dict[str, Any]:
287
- """Recognize a face using ChromaDB vector search."""
288
- if not CHROMADB_AVAILABLE or self.collection is None:
289
- return {"match": False, "person_id": "unknown", "name": "Unknown", "distance": 999}
290
-
291
- if not DEEPFACE_AVAILABLE:
292
- return {"match": False, "person_id": "unknown", "name": "Unknown", "distance": 999}
293
-
294
- if self.collection.count() == 0:
295
- return {"match": False, "person_id": "unknown", "name": "Unknown", "distance": 999}
296
 
 
 
 
297
  try:
298
- temp_path = "temp_face.jpg"
299
- cv2.imwrite(temp_path, cv2.cvtColor(face_image, cv2.COLOR_RGB2BGR))
 
300
 
301
- embedding_obj = DeepFace.represent(
302
  img_path=temp_path,
303
- model_name=self.model_name,
304
  enforce_detection=False
305
  )
306
-
307
- if os.path.exists(temp_path):
308
- os.remove(temp_path)
309
-
310
- if not embedding_obj:
311
- return {"match": False, "person_id": "unknown", "name": "Unknown", "distance": 999}
312
-
313
- face_embedding = embedding_obj[0]["embedding"]
314
-
315
  results = self.collection.query(
316
- query_embeddings=[face_embedding],
317
- n_results=1,
318
- include=["metadatas", "distances"]
319
  )
320
-
321
- if not results['ids'][0]:
322
- return {"match": False, "person_id": "unknown", "name": "Unknown", "distance": 999}
323
-
324
  distance = results['distances'][0][0]
325
  metadata = results['metadatas'][0][0]
326
-
327
- if distance < threshold:
 
328
  return {
329
  "match": True,
330
- "person_id": metadata['person_id'],
331
- "name": metadata['person_name'],
332
- "distance": distance
333
  }
334
- else:
335
- return {"match": False, "person_id": "unknown", "name": "Unknown", "distance": distance}
336
-
337
- except Exception as e:
338
- logger.error(f"❌ Recognition error: {e}")
339
- return {"match": False, "person_id": "unknown", "name": "Unknown", "distance": 999}
340
-
341
- def get_all_persons(self) -> List[Dict[str, str]]:
342
- """Get list of all registered persons."""
343
- if not CHROMADB_AVAILABLE or self.collection is None:
344
- return []
345
-
346
- try:
347
- all_data = self.collection.get(include=["metadatas"])
348
-
349
- persons = {}
350
- for metadata in all_data['metadatas']:
351
- person_id = metadata['person_id']
352
- if person_id not in persons:
353
- persons[person_id] = {
354
- "id": person_id,
355
- "name": metadata['person_name']
356
- }
357
-
358
- return list(persons.values())
359
- except Exception as e:
360
- logger.error(f"Error getting persons: {e}")
361
- return []
362
-
363
- def delete_person(self, person_id: str) -> Tuple[bool, str]:
364
- """Remove a person from the database."""
365
- if not CHROMADB_AVAILABLE or self.collection is None:
366
- return False, "❌ ChromaDB not available"
367
-
368
- try:
369
- all_data = self.collection.get(include=["metadatas"])
370
- ids_to_delete = [
371
- all_data['ids'][i]
372
- for i, meta in enumerate(all_data['metadatas'])
373
- if meta['person_id'] == person_id
374
- ]
375
-
376
- if ids_to_delete:
377
- self.collection.delete(ids=ids_to_delete)
378
- logger.info(f"🗑️ Deleted {len(ids_to_delete)} embeddings for ID: {person_id}")
379
- return True, f"✅ Deleted person ID: {person_id}"
380
- else:
381
- return False, f"❌ Person ID not found: {person_id}"
382
-
383
- except Exception as e:
384
- logger.error(f"Error deleting: {e}")
385
- return False, f"❌ Error: {str(e)}"
386
 
387
- # Global database instance
388
- FACE_DB: Optional[ScalableFaceDatabase] = None
 
389
 
390
- def get_face_database() -> ScalableFaceDatabase:
391
- """Get or create the global face database."""
392
- global FACE_DB
393
- if FACE_DB is None:
394
- FACE_DB = ScalableFaceDatabase()
395
- return FACE_DB
396
 
397
- FIXED_CONFIDENCE = 0.55
 
398
 
399
  # ====================================================
400
- # YOLO DETECTOR
401
  # ====================================================
402
- @dataclass
403
- class DetectionConfig:
404
- min_confidence: float = 0.6
405
- model_path: str = "yolov8n-face.pt"
406
-
407
- class YOLOv8FaceDetector:
408
- def __init__(self, config: DetectionConfig):
409
  try:
410
- logger.info(f"📦 Loading YOLO model: {config.model_path}")
411
- self.model = YOLO(config.model_path)
412
- logger.info("✅ YOLO model loaded")
413
  except Exception as e:
414
- logger.error(f"❌ YOLO loading failed: {e}")
415
- raise RuntimeError(f"Cannot load YOLO model") from e
416
-
417
- def detect_faces(
418
- self,
419
- image: np.ndarray,
420
- conf_threshold: float,
421
- recognize: bool = False
422
- ) -> Tuple[List[Dict[str, Any]], np.ndarray]:
423
- """Detect and optionally recognize faces."""
424
- results = self.model(image, conf=conf_threshold, verbose=False)
425
  faces = []
426
- annotated_image = image.copy()
427
-
428
- face_db = get_face_database() if recognize else None
429
-
430
  for r in results:
431
- if r.boxes is None:
432
- continue
433
  for box in r.boxes:
434
  x1, y1, x2, y2 = map(int, box.xyxy[0])
435
- confidence = float(box.conf[0])
436
-
437
- face_info = {
438
- "x": x1, "y": y1,
439
- "width": x2 - x1,
440
- "height": y2 - y1,
441
- "confidence": confidence
442
- }
443
-
444
- if recognize and face_db:
445
- face_crop = image[y1:y2, x1:x2]
446
- if face_crop.size > 0:
447
- recognition_result = face_db.recognize_face(face_crop)
448
- face_info.update(recognition_result)
449
-
450
- color = (0, 255, 0) if recognition_result["match"] else (255, 0, 0)
451
- cv2.rectangle(annotated_image, (x1, y1), (x2, y2), color, 3)
452
-
453
- if recognition_result["match"]:
454
- label = f"{recognition_result['name']} ({recognition_result['person_id']})"
455
- else:
456
- label = "Unknown"
457
-
458
- (w, h), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2)
459
- cv2.rectangle(annotated_image, (x1, y1 - h - 10), (x1 + w, y1), color, -1)
460
- cv2.putText(annotated_image, label, (x1, y1 - 5),
461
- cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)
462
- else:
463
- cv2.rectangle(annotated_image, (x1, y1), (x2, y2), (0, 255, 0), 3)
464
- label = "Face"
465
- (w, h), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2)
466
- cv2.rectangle(annotated_image, (x1, y1 - h - 10), (x1 + w, y1), (0, 255, 0), -1)
467
- cv2.putText(annotated_image, label, (x1, y1 - 5),
468
- cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 0), 2)
469
-
470
- faces.append(face_info)
471
-
472
- return faces, annotated_image
473
 
474
- GLOBAL_DETECTOR: Optional[YOLOv8FaceDetector] = None
475
-
476
- def get_global_detector() -> YOLOv8FaceDetector:
477
- global GLOBAL_DETECTOR
478
- if GLOBAL_DETECTOR is None:
479
- GLOBAL_DETECTOR = YOLOv8FaceDetector(DetectionConfig())
480
- return GLOBAL_DETECTOR
481
 
482
  # ====================================================
483
- # GRADIO HANDLERS
484
  # ====================================================
485
- def recognize_image(image):
486
- """Recognize faces in image."""
487
- if image is None:
488
- return None, "⚠️ No image provided"
 
 
 
 
 
 
489
 
490
- try:
491
- detector = get_global_detector()
492
- faces, annotated = detector.detect_faces(image, FIXED_CONFIDENCE, recognize=True)
493
-
494
- if faces:
495
- known = sum(1 for f in faces if f.get("match", False))
496
- unknown = len(faces) - known
497
-
498
- result = f"### 📊 Detection Results\n\n"
499
- result += f"**Total Faces:** {len(faces)}\n\n"
500
- result += f"- ✅ **Known:** {known}\n"
501
- result += f"- **Unknown:** {unknown}\n\n"
502
-
503
- if known > 0:
504
- result += "**Identified People:**\n"
505
- for i, f in enumerate(faces, 1):
506
- if f.get("match"):
507
- result += f"{i}. {f['name']} (ID: {f['person_id']})\n"
508
- else:
509
- result = "❌ **No faces detected**"
510
-
511
- return annotated, result
512
- except Exception as e:
513
- logger.error(f"Error: {e}")
514
- return image, f"❌ Error: {str(e)}"
515
 
516
- def recognize_video(video_file, progress=gr.Progress()):
517
- """Process video and recognize faces across frames with temporal smoothing."""
518
- if video_file is None:
519
- return None, "⚠️ No video provided"
 
 
520
 
521
- try:
522
- detector = get_global_detector()
523
- db = get_face_database()
524
-
525
- cap = cv2.VideoCapture(video_file.name)
526
- if not cap.isOpened():
527
- return None, "❌ Cannot open video file"
528
-
529
- fps = int(cap.get(cv2.CAP_PROP_FPS))
530
- width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
531
- height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
532
- total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
533
-
534
- output_path = create_temp_file(suffix=".mp4")
535
- fourcc = cv2.VideoWriter_fourcc(*'mp4v')
536
- out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
537
-
538
- # ✅ NEW: Frame processing optimization
539
- PROCESS_EVERY_N_FRAMES = 15 # Only run DeepFace every 15 frames
540
-
541
- frame_count = 0
542
- total_detections = 0
543
- known_detections = {}
544
-
545
- # NEW: Temporal smoothing - cache recognition results
546
- last_recognition_results = {} # {face_position_key: recognition_data}
547
-
548
- logger.info(f"📹 Processing video: {total_frames} frames at {fps} FPS")
549
- logger.info(f"⚡ Performance mode: Processing every {PROCESS_EVERY_N_FRAMES}th frame")
550
-
551
- while cap.isOpened():
552
- ret, frame = cap.read()
553
- if not ret:
554
- break
555
-
556
- frame_count += 1
557
- progress(frame_count / total_frames, desc=f"Frame {frame_count}/{total_frames}")
558
-
559
- frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
560
-
561
- # NEW: Smart frame processing
562
- if frame_count % PROCESS_EVERY_N_FRAMES == 0:
563
- # Run FULL recognition (expensive - includes DeepFace)
564
- faces, _ = detector.detect_faces(frame_rgb, FIXED_CONFIDENCE, recognize=True)
565
-
566
- # Update recognition cache
567
- last_recognition_results = {}
568
- for face in faces:
569
- # Create position-based key for tracking
570
- face_key = (face['x'] // 50, face['y'] // 50)
571
- last_recognition_results[face_key] = {
572
- 'match': face.get('match', False),
573
- 'person_id': face.get('person_id', 'unknown'),
574
- 'name': face.get('name', 'Unknown'),
575
- 'distance': face.get('distance', 999)
576
- }
577
-
578
- logger.info(f"Frame {frame_count}: Full recognition - {len(faces)} faces")
579
- else:
580
- # Just detect (FAST - no DeepFace), apply cached recognition
581
- faces, _ = detector.detect_faces(frame_rgb, FIXED_CONFIDENCE, recognize=False)
582
 
583
- # Apply cached recognition to detected faces
584
- for face in faces:
585
- face_key = (face['x'] // 50, face['y'] // 50)
586
-
587
- # Look for cached result near this position
588
- cached = None
589
- for key, result in last_recognition_results.items():
590
- # Check if face is in similar position (within 2 grid cells)
591
- if abs(key[0] - face_key[0]) <= 2 and abs(key[1] - face_key[1]) <= 2:
592
- cached = result
593
- break
594
 
595
- if cached:
596
- face['match'] = cached['match']
597
- face['person_id'] = cached['person_id']
598
- face['name'] = cached['name']
599
- face['distance'] = cached['distance']
600
- else:
601
- face['match'] = False
602
- face['person_id'] = 'unknown'
603
- face['name'] = 'Unknown'
604
 
605
- # Redraw annotations with current/cached results
606
- annotated_rgb = frame_rgb.copy()
607
- for face in faces:
608
- x1, y1 = face['x'], face['y']
609
- x2, y2 = x1 + face['width'], y1 + face['height']
610
-
611
- if face.get('match', False):
612
- color = (0, 255, 0)
613
- label = f"{face['name']} ({face['person_id']})"
614
- else:
615
- color = (255, 0, 0)
616
- label = "Unknown"
617
-
618
- cv2.rectangle(annotated_rgb, (x1, y1), (x2, y2), color, 3)
619
- (w, h), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2)
620
- cv2.rectangle(annotated_rgb, (x1, y1 - h - 10), (x1 + w, y1), color, -1)
621
- cv2.putText(annotated_rgb, label, (x1, y1 - 5),
622
- cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)
623
 
624
- # Count detections for statistics
625
- for face in faces:
626
- total_detections += 1
627
- if face.get("match"):
628
- person_id = face.get("person_id")
629
- if person_id not in known_detections:
630
- known_detections[person_id] = 0
631
- known_detections[person_id] += 1
632
 
633
- # Write frame to output
634
- annotated_bgr = cv2.cvtColor(annotated_rgb, cv2.COLOR_RGB2BGR)
635
- out.write(annotated_bgr)
636
 
637
- if frame_count % 30 == 0:
638
- logger.info(f"Processed {frame_count}/{total_frames}")
639
-
640
- cap.release()
641
- out.release()
642
-
643
- # Generate results summary
644
- result = f"### 📊 Video Processing Complete\n\n"
645
- result += f"**Frames:** {frame_count} | **Duration:** {frame_count/fps:.1f}s\n"
646
- result += f"**Total Detections:** {total_detections}\n\n"
647
-
648
- if known_detections:
649
- result += f"**Recognized People:**\n"
650
- persons = db.get_all_persons()
651
- for person_id, count in known_detections.items():
652
- name = next((p['name'] for p in persons if p['id'] == person_id), "Unknown")
653
- result += f"- {name} (ID: {person_id}): {count} times\n"
654
- else:
655
- result += f"**No known people detected**\n"
656
-
657
- unknown = total_detections - sum(known_detections.values())
658
- if unknown > 0:
659
- result += f"\n**Unknown faces:** {unknown} detections\n"
660
-
661
- logger.info(f"✅ Video complete: {output_path}")
662
- return output_path, result
663
-
664
- except Exception as e:
665
- logger.error(f"Error: {e}")
666
- return None, f"❌ Error: {str(e)}"
667
 
668
- def recognize_webcam(image):
669
- """Recognize faces in webcam stream."""
670
- if image is None:
671
- return None
672
- try:
673
- detector = get_global_detector()
674
- _, annotated = detector.detect_faces(image, FIXED_CONFIDENCE, recognize=True)
675
- return annotated
676
- except Exception as e:
677
- logger.error(f"Webcam error: {e}")
678
- return image
679
-
680
- def add_person_handler(person_id, name, image):
681
- """Add person to database."""
682
- if not person_id or not name:
683
- return "⚠️ Please provide ID and Name", refresh_people_list()
684
- if image is None:
685
- return "⚠️ Please upload an image", refresh_people_list()
686
-
687
- db = get_face_database()
688
- success, message = db.add_person(person_id, name, image)
689
- return message, refresh_people_list()
690
-
691
- def delete_person_handler(person_id):
692
- """Delete person from database."""
693
- if not person_id:
694
- return "⚠️ Please provide Person ID", refresh_people_list()
695
-
696
- db = get_face_database()
697
- success, message = db.delete_person(person_id)
698
- return message, refresh_people_list()
699
-
700
- def refresh_people_list():
701
- """Get formatted list of all people."""
702
- db = get_face_database()
703
- persons = db.get_all_persons()
704
-
705
- if not persons:
706
- return "### 📭 Database is Empty\n\nAdd people using the form above!"
707
-
708
- result = f"### 👥 Registered People ({len(persons)})\n\n"
709
- for p in sorted(persons, key=lambda x: x['id']):
710
- result += f"- **{p['name']}** (ID: `{p['id']}`)\n"
711
 
712
- return result
 
713
 
714
- def get_database_stats():
715
- """Get database statistics."""
716
- db = get_face_database()
717
- if not CHROMADB_AVAILABLE or db.collection is None:
718
- return "### ⚠️ ChromaDB Not Available"
719
 
720
- count = db.collection.count()
721
- persons = db.get_all_persons()
722
-
723
- stats = f"### 📊 Database Statistics\n\n"
724
- stats += f"- **Total Embeddings:** {count}\n"
725
- stats += f"- **Unique People:** {len(persons)}\n"
726
- stats += f"- **Status:** {'✅ Active' if count > 0 else '📭 Empty'}\n\n"
727
-
728
- if db.hf_api and db.hf_token and db.space_id:
729
- stats += f"- **Auto-Commit:** ✅ Enabled\n"
730
- else:
731
- stats += f"- **Auto-Commit:** ⚠️ Disabled\n"
732
 
733
- return stats
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
734
 
735
  # ====================================================
736
- # GRADIO UI
737
  # ====================================================
738
- with gr.Blocks(theme=gr.themes.Soft(primary_hue="blue"), title="Face Recognition") as demo:
739
- gr.Markdown("# 🎭 Face Recognition System")
740
- gr.Markdown("**YOLO + DeepFace + ChromaDB** • Image, Video & Webcam Support")
741
-
742
- if not DEEPFACE_AVAILABLE or not CHROMADB_AVAILABLE:
743
- gr.Markdown("### ⚠️ Missing Dependencies")
744
- if not DEEPFACE_AVAILABLE:
745
- gr.Markdown("- Install: `pip install deepface tf-keras`")
746
- if not CHROMADB_AVAILABLE:
747
- gr.Markdown("- Install: `pip install chromadb`")
748
 
 
 
 
 
 
749
  with gr.Row():
 
 
 
 
 
 
 
 
 
 
 
 
 
 
750
  with gr.Column(scale=3):
751
  with gr.Tabs():
752
- with gr.TabItem("🎭 Face Recognition"):
 
 
 
 
 
753
  with gr.Tabs():
754
- with gr.TabItem("📷 Image"):
755
- with gr.Row():
756
- img_in = gr.Image(
757
- sources=["upload", "clipboard"],
758
- type="numpy",
759
- label="Input Image",
760
- height=500
761
- )
762
- with gr.Column():
763
- img_out = gr.Image(
764
- type="numpy",
765
- label="Recognition Result",
766
- height=400
767
- )
768
- img_status = gr.Markdown("_Upload an image_")
769
 
770
- with gr.Row():
771
- recognize_btn = gr.Button("🔍 Recognize Faces", variant="primary", scale=3)
772
- gr.ClearButton([img_in, img_out, img_status], scale=1)
 
773
 
774
- # EXAMPLES SECTION
775
- gr.Examples(
776
- examples=[
777
- ["examples/messi_test.webp"],
778
- ["examples/mark_test.webp"],
779
- ["examples/group_photo.png"],
780
- ["examples/group2.webp"],
781
- ["examples/not_face_1.jpg"],
782
- ["examples/single_face.jpg"],
783
- ["examples/two_faces.png"],
784
- ],
785
- inputs=img_in,
786
- label="Example Images - Click to Try"
787
- )
788
-
789
- with gr.TabItem("🎥 Video"):
790
- with gr.Row():
791
- vid_in = gr.File(
792
- file_types=[".mp4", ".avi", ".mov", ".mkv"],
793
- label="Upload Video File"
794
- )
795
- with gr.Column():
796
- vid_out = gr.Video(
797
- label="Processed Video",
798
- height=400
799
- )
800
- vid_status = gr.Markdown("_Upload a video_")
801
-
802
- with gr.Row():
803
- process_vid_btn = gr.Button("🔍 Process Video", variant="primary", scale=3)
804
- gr.ClearButton([vid_in, vid_out, vid_status], scale=1)
805
-
806
- with gr.TabItem("📹 Webcam"):
807
- with gr.Row():
808
- web_in = gr.Image(
809
- sources=["webcam"],
810
- type="numpy",
811
- streaming=True,
812
- label="Live Feed",
813
- height=500
814
- )
815
- web_out = gr.Image(
816
- type="numpy",
817
- label="Recognition",
818
- height=500
819
- )
820
-
821
- with gr.TabItem("🗄️ Database"):
822
- with gr.Row():
823
- db_stats = gr.Markdown(value="Loading...")
824
-
825
- gr.Markdown("---")
826
 
827
- with gr.Row():
828
- with gr.Column():
829
- gr.Markdown("#### ➕ Add Person")
830
- gr.Markdown("💡 *Tip: Add same person 5-10 times with different angles for better recognition*")
831
- add_id = gr.Textbox(label="Person ID", placeholder="004")
832
- add_name = gr.Textbox(label="Name", placeholder="Alice")
833
- add_img = gr.Image(
834
- sources=["upload", "webcam"],
835
- type="numpy",
836
- label="Face Photo"
837
- )
838
- add_btn = gr.Button("Add to Database", variant="primary")
839
- add_status = gr.Markdown("")
840
-
841
- with gr.Column():
842
- gr.Markdown("#### 🗑️ Delete Person")
843
- del_id = gr.Textbox(label="Person ID", placeholder="004")
844
- del_btn = gr.Button("Delete", variant="stop")
845
- del_status = gr.Markdown("")
846
 
847
- gr.Markdown("---")
848
- people_list = gr.Markdown(value="Loading...")
849
- refresh_btn = gr.Button("🔄 Refresh List")
850
-
851
- # Event Handlers
852
- recognize_btn.click(
853
- recognize_image,
854
- inputs=[img_in],
855
- outputs=[img_out, img_status]
856
- )
 
 
 
 
 
 
 
 
857
 
858
- process_vid_btn.click(
859
- recognize_video,
860
- inputs=[vid_in],
861
- outputs=[vid_out, vid_status]
862
- )
863
 
864
- web_in.stream(
865
- recognize_webcam,
866
- inputs=[web_in],
867
- outputs=web_out
868
- )
869
 
870
- add_btn.click(
871
- add_person_handler,
872
- inputs=[add_id, add_name, add_img],
873
- outputs=[add_status, people_list]
874
- )
 
 
 
 
875
 
876
- del_btn.click(
877
- delete_person_handler,
878
- inputs=[del_id],
879
- outputs=[del_status, people_list]
880
- )
 
881
 
882
- refresh_btn.click(
883
- refresh_people_list,
884
- outputs=people_list
885
- )
886
 
887
- demo.load(get_database_stats, outputs=db_stats)
888
- demo.load(refresh_people_list, outputs=people_list)
889
 
890
- # ====================================================
891
- # MAIN
892
- # ====================================================
893
  if __name__ == "__main__":
894
- logger.info("🚀 Starting Face Recognition System...")
895
  try:
896
- get_global_detector()
897
- get_face_database()
898
- logger.info("✅ Ready! Launching...")
899
  demo.launch()
900
  except Exception as e:
901
- logger.error(f"Startup failed: {e}")
 
 
 
 
 
1
  # --- Standard Libraries ---
2
  import logging
3
  import atexit
4
  import tempfile
5
  import os
6
+ import hashlib
7
  from dataclasses import dataclass
8
  from typing import Any, Dict, List, Tuple, Optional
9
  from pathlib import Path
 
10
 
11
  # --- Computer Vision & UI Libraries ---
12
  import cv2
 
14
  import gradio as gr
15
  from ultralytics import YOLO
16
 
17
+ # --- Face Recognition Libraries (Optional / Fallback) ---
 
 
 
 
 
 
 
 
18
  try:
19
  from deepface import DeepFace
20
  DEEPFACE_AVAILABLE = True
21
  except ImportError:
22
  DEEPFACE_AVAILABLE = False
23
+ logging.warning("⚠️ DeepFace not installed - Recognition will be disabled (Fallback Mode).")
24
 
25
  try:
26
  import chromadb
 
27
  CHROMADB_AVAILABLE = True
28
  except ImportError:
29
  CHROMADB_AVAILABLE = False
30
+ logging.warning("⚠️ ChromaDB not installed - Database features disabled.")
31
 
32
  # --- Configure Logging ---
33
  logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
34
  logger = logging.getLogger(__name__)
35
 
36
  # ====================================================
37
+ # 1. CONFIGURATION & UTILITIES
38
  # ====================================================
39
+
40
  TEMP_FILES = []
41
 
42
  def cleanup_temp_files():
43
+ """Clean up temporary video files on exit."""
44
  for f in TEMP_FILES:
45
  try:
46
  if os.path.exists(f):
47
  os.remove(f)
48
+ except Exception:
49
+ pass
 
50
 
51
  atexit.register(cleanup_temp_files)
52
 
53
  def create_temp_file(suffix=".mp4") -> str:
 
54
  path = tempfile.mktemp(suffix=suffix)
55
  TEMP_FILES.append(path)
56
  return path
57
 
58
  # ====================================================
59
+ # 2. THE DATABASE LAYER (Backend Only - No UI)
60
  # ====================================================
61
+ class FaceDatabase:
62
+ """
63
+ Handles loading known faces from the 'known_faces' folder.
64
+ Runs automatically on startup.
65
+ """
66
+ def __init__(self, db_path="./chroma_db", faces_dir="known_faces"):
67
+ self.faces_dir = Path(faces_dir)
68
+ self.client = None
69
+ self.collection = None
70
+ self.is_active = False
71
+
72
+ if not DEEPFACE_AVAILABLE or not CHROMADB_AVAILABLE:
73
+ logger.warning("❌ Database unavailable (Missing dependencies)")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
74
  return
75
+
 
 
76
  try:
77
  self.client = chromadb.PersistentClient(path=db_path)
 
78
  self.collection = self.client.get_or_create_collection(
79
  name="face_embeddings",
80
  metadata={"hnsw:space": "cosine"}
81
  )
82
+ self.is_active = True
83
 
84
+ # Auto-index on startup
85
+ if self.faces_dir.exists():
86
+ self._scan_and_index()
 
 
87
  else:
88
+ self.faces_dir.mkdir(parents=True, exist_ok=True)
89
+ logger.info(f"📁 Created {faces_dir} folder. Add images here!")
90
 
91
  except Exception as e:
92
+ logger.error(f"❌ DB Init Error: {e}")
93
+ self.is_active = False
94
+
95
+ def _get_hash(self, img_path: Path) -> str:
 
 
96
  with open(img_path, 'rb') as f:
97
  return hashlib.md5(f.read()).hexdigest()
98
+
99
+ def _scan_and_index(self):
100
+ """Scans folders and adds new images to ChromaDB."""
101
+ logger.info("🔄 Scanning 'known_faces' folder...")
102
+ count = 0
103
+ for person_dir in self.faces_dir.iterdir():
104
+ if not person_dir.is_dir(): continue
105
+
106
+ # Folder format expectation: "001_John_Doe"
107
+ parts = person_dir.name.split('_', 1)
108
+ if len(parts) < 2:
109
+ # Fallback for folders like "John"
110
+ p_id = "000"
111
+ p_name = person_dir.name
112
+ else:
113
+ p_id, p_name = parts[0], parts[1].replace('_', ' ')
 
 
114
 
115
+ images = list(person_dir.glob("*.jpg")) + list(person_dir.glob("*.png")) + list(person_dir.glob("*.webp"))
 
 
 
 
 
 
116
 
117
+ for img_path in images:
118
  try:
119
+ img_hash = self._get_hash(img_path)
120
+ # Check if already indexed
121
+ if self.collection.get(ids=[img_hash])['ids']:
 
122
  continue
123
+
124
+ # Generate Embedding
125
+ embedding_objs = DeepFace.represent(
126
+ img_path=str(img_path),
127
+ model_name="Facenet512",
 
 
 
128
  enforce_detection=False
129
  )
130
 
131
+ if embedding_objs:
132
+ embedding = embedding_objs[0]["embedding"]
133
+ self.collection.add(
134
+ ids=[img_hash],
135
+ embeddings=[embedding],
136
+ metadatas=[{"id": p_id, "name": p_name, "file": img_path.name}]
137
+ )
138
+ count += 1
139
+ logger.info(f"✅ Indexed: {p_name}")
 
 
 
 
 
 
 
 
 
 
140
  except Exception as e:
141
+ logger.error(f"⚠️ Failed to index {img_path.name}: {e}")
142
 
143
+ if count > 0:
144
+ logger.info(f"📥 Added {count} new faces to database.")
 
145
  else:
146
+ logger.info("ℹ️ Database is up to date.")
147
+
148
+ def recognize(self, face_img: np.ndarray) -> Dict[str, Any]:
149
+ """Returns {'match': bool, 'name': str, 'id': str, 'color': tuple}"""
150
+ # Default response (Unknown / Red)
151
+ default = {"match": False, "name": "Unknown", "id": "Unknown", "color": (255, 0, 0)}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
152
 
153
+ if not self.is_active or self.collection is None or self.collection.count() == 0:
154
+ return default
155
+
156
  try:
157
+ # Create temp file for DeepFace (it prefers paths)
158
+ temp_path = "temp_query.jpg"
159
+ cv2.imwrite(temp_path, cv2.cvtColor(face_img, cv2.COLOR_RGB2BGR))
160
 
161
+ embedding_objs = DeepFace.represent(
162
  img_path=temp_path,
163
+ model_name="Facenet512",
164
  enforce_detection=False
165
  )
166
+ if os.path.exists(temp_path): os.remove(temp_path)
167
+
168
+ if not embedding_objs: return default
169
+
170
+ query_embed = embedding_objs[0]["embedding"]
 
 
 
 
171
  results = self.collection.query(
172
+ query_embeddings=[query_embed],
173
+ n_results=1
 
174
  )
175
+
176
+ if not results['ids'][0]: return default
177
+
 
178
  distance = results['distances'][0][0]
179
  metadata = results['metadatas'][0][0]
180
+
181
+ # Threshold: Lower is stricter. 0.45 is a good balance for Facenet512
182
+ if distance < 0.45:
183
  return {
184
  "match": True,
185
+ "name": metadata['name'],
186
+ "id": metadata['id'],
187
+ "color": (0, 255, 0) # Green for match
188
  }
189
+ return default
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
190
 
191
+ except Exception as e:
192
+ logger.error(f"Recognition Error: {e}")
193
+ return default
194
 
195
+ def get_stats(self):
196
+ if self.is_active and self.collection:
197
+ return f"✅ Active | {self.collection.count()} Faces Indexed"
198
+ return "❌ Offline (Check dependencies or 'known_faces' folder)"
 
 
199
 
200
+ # Singleton DB
201
+ FACE_DB = FaceDatabase()
202
 
203
  # ====================================================
204
+ # 3. THE UNIFIED DETECTOR (YOLO)
205
  # ====================================================
206
+ class Detector:
207
+ def __init__(self):
208
+ logger.info("📦 Loading YOLOv8-Face...")
 
 
 
 
209
  try:
210
+ self.model = YOLO("yolov8n-face.pt")
211
+ logger.info("✅ Model Loaded.")
 
212
  except Exception as e:
213
+ logger.error(f"❌ Model Load Failed: {e}")
214
+ raise e
215
+
216
+ def detect(self, image: np.ndarray, conf: float = 0.5):
217
+ results = self.model(image, conf=conf, verbose=False)
 
 
 
 
 
 
218
  faces = []
 
 
 
 
219
  for r in results:
220
+ if r.boxes is None: continue
 
221
  for box in r.boxes:
222
  x1, y1, x2, y2 = map(int, box.xyxy[0])
223
+ faces.append({
224
+ "box": (x1, y1, x2-x1, y2-y1),
225
+ "conf": float(box.conf[0])
226
+ })
227
+ return faces
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
228
 
229
+ GLOBAL_DETECTOR = Detector()
 
 
 
 
 
 
230
 
231
  # ====================================================
232
+ # 4. CORE LOGIC: THE "GHOST" PIPELINE
233
  # ====================================================
234
+
235
+ def apply_blur(image, x, y, w, h, style="pixelate", intensity=20):
236
+ """Destructive step: Modifies pixels."""
237
+ # Expand blur slightly for safety (margin)
238
+ h_img, w_img = image.shape[:2]
239
+ margin = int(w * 0.1) # 10% margin
240
+ x = max(0, x - margin)
241
+ y = max(0, y - margin)
242
+ w = min(w_img - x, w + (2 * margin))
243
+ h = min(h_img - y, h + (2 * margin))
244
 
245
+ roi = image[y:y+h, x:x+w]
246
+ if roi.size == 0: return image
247
+
248
+ if style == "pixelate":
249
+ # Divide by intensity to shrink, then scale back up
250
+ h_roi, w_roi = roi.shape[:2]
251
+ k = max(2, int(100 - intensity) // 2) # Inverse logic for intuitive slider
252
+ if k < 1: k = 1
253
+ # Pixel size logic
254
+ pix_size = max(5, int(intensity))
255
+
256
+ small = cv2.resize(roi, (max(1, w_roi//pix_size), max(1, h_roi//pix_size)), interpolation=cv2.INTER_LINEAR)
257
+ pixelated = cv2.resize(small, (w_roi, h_roi), interpolation=cv2.INTER_NEAREST)
258
+ image[y:y+h, x:x+w] = pixelated
259
+
260
+ elif style == "gaussian":
261
+ k = (int(intensity) * 2) + 1
262
+ blurred = cv2.GaussianBlur(roi, (k, k), 0)
263
+ image[y:y+h, x:x+w] = blurred
264
+
265
+ elif style == "solid":
266
+ cv2.rectangle(image, (x, y), (x+w, y+h), (0,0,0), -1)
267
+
268
+ return image
 
269
 
270
+ def draw_label(image, x, y, w, text, color, on_blur=False):
271
+ """Annotation step: Draws text. If on_blur is True, uses high contrast background."""
272
+ font = cv2.FONT_HERSHEY_SIMPLEX
273
+ scale = 0.6
274
+ thickness = 2
275
+ (tw, th), _ = cv2.getTextSize(text, font, scale, thickness)
276
 
277
+ # If face is blurred, we put the text right in the center of the face box
278
+ if on_blur:
279
+ center_x = x + (w // 2) - (tw // 2)
280
+ center_y = y + (w // 2)
281
+
282
+ # Draw filled box behind text for readability
283
+ cv2.rectangle(image, (center_x - 5, center_y - th - 5), (center_x + tw + 5, center_y + 5), color, -1)
284
+ # White text
285
+ cv2.putText(image, text, (center_x, center_y), font, scale, (255, 255, 255), thickness)
286
+ else:
287
+ # Standard bounding box look (Top of box)
288
+ cv2.rectangle(image, (x, y - th - 10), (x + tw + 10, y), color, -1)
289
+ cv2.putText(image, text, (x + 5, y - 5), font, scale, (255, 255, 255), thickness)
290
+
291
+ def process_frame(image, mode, sensitivity, blur_style, blur_amount):
292
+ """
293
+ THE MASTER FUNCTION.
294
+ Modes:
295
+ 1. "privacy" -> Detect + Blur (No ID)
296
+ 2. "data" -> Detect + Recognize (No Blur)
297
+ 3. "smart" -> Detect + Recognize + Blur + ID Overlay
298
+ """
299
+ if image is None: return None
300
+
301
+ # 1. Detection (The Snapshot)
302
+ # ---------------------------
303
+ conf = 0.3 if sensitivity == "High" else 0.5
304
+ faces = GLOBAL_DETECTOR.detect(image, conf=conf)
305
+
306
+ processed_img = image.copy()
307
+
308
+ for face in faces:
309
+ x, y, w, h = face['box']
310
+
311
+ # 2. Analysis (The "Ghost" Step)
312
+ # ---------------------------
313
+ identity = {"name": "", "color": (0, 255, 0)}
314
+
315
+ if mode in ["data", "smart"]:
316
+ # Crop and Check DB
317
+ face_crop = image[y:y+h, x:x+w]
318
+ if face_crop.size > 0:
319
+ res = FACE_DB.recognize(face_crop)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
320
 
321
+ if res['match']:
322
+ label_text = f"ID: {res['id']} ({res['name']})"
323
+ else:
324
+ label_text = "Unknown"
 
 
 
 
 
 
 
325
 
326
+ identity = {"name": label_text, "color": res['color']}
327
+
328
+ # 3. Modification & Annotation
329
+ # ---------------------------
330
+
331
+ # -- Mode 1: Raw Privacy --
332
+ if mode == "privacy":
333
+ # Just destroy pixels. No data.
334
+ processed_img = apply_blur(processed_img, x, y, w, h, blur_style, blur_amount)
335
 
336
+ # -- Mode 2: Data Layer --
337
+ elif mode == "data":
338
+ # No pixel destruction. Just data boxes.
339
+ cv2.rectangle(processed_img, (x, y), (x+w, y+h), identity['color'], 2)
340
+ draw_label(processed_img, x, y, w, identity['name'], identity['color'], on_blur=False)
 
 
 
 
 
 
 
 
 
 
 
 
 
341
 
342
+ # -- Mode 3: Smart Redaction (The Demo Winner) --
343
+ elif mode == "smart":
344
+ # A. Destroy Pixels (Blur)
345
+ processed_img = apply_blur(processed_img, x, y, w, h, blur_style, blur_amount)
 
 
 
 
346
 
347
+ # B. Overlay Data (The "Ghost" data applied back on top)
348
+ draw_label(processed_img, x, y, w, identity['name'], identity['color'], on_blur=True)
 
349
 
350
+ return processed_img
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
351
 
352
+ # ====================================================
353
+ # 5. VIDEO PROCESSING HELPERS
354
+ # ====================================================
355
+ def process_video_general(video_path, mode, sensitivity, blur_style, blur_amount, progress=gr.Progress()):
356
+ """Generic video processor for all tabs."""
357
+ if not video_path: return None
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
358
 
359
+ cap = cv2.VideoCapture(video_path)
360
+ if not cap.isOpened(): return None
361
 
362
+ fps = int(cap.get(cv2.CAP_PROP_FPS))
363
+ width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
364
+ height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
365
+ total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
 
366
 
367
+ out_path = create_temp_file()
368
+ # Attempt h264, fallback to mp4v
369
+ fourcc = cv2.VideoWriter_fourcc(*'avc1')
370
+ out = cv2.VideoWriter(out_path, fourcc, fps, (width, height))
371
+ if not out.isOpened():
372
+ out = cv2.VideoWriter(out_path, cv2.VideoWriter_fourcc(*'mp4v'), fps, (width, height))
 
 
 
 
 
 
373
 
374
+ cnt = 0
375
+ while cap.isOpened():
376
+ ret, frame = cap.read()
377
+ if not ret: break
378
+
379
+ # Process using the Master Function
380
+ res_frame = process_frame(frame, mode, sensitivity, blur_style, blur_amount)
381
+
382
+ out.write(res_frame)
383
+ cnt += 1
384
+ if total > 0 and cnt % 10 == 0:
385
+ progress(cnt/total, desc=f"Processing Frame {cnt}/{total}")
386
+
387
+ cap.release()
388
+ out.release()
389
+ return out_path
390
 
391
  # ====================================================
392
+ # 6. GRADIO INTERFACE (The Story Flow)
393
  # ====================================================
 
 
 
 
 
 
 
 
 
 
394
 
395
+ with gr.Blocks(theme=gr.themes.Soft(primary_hue="blue"), title="Smart Redaction Demo") as demo:
396
+
397
+ gr.Markdown("# 🛡️ Smart Redaction System")
398
+ gr.Markdown("### From Raw Privacy to Intelligent Security")
399
+
400
  with gr.Row():
401
+ # LEFT COLUMN: SETTINGS
402
+ with gr.Column(scale=1, variant="panel"):
403
+ gr.Markdown("### ⚙️ Global Configuration")
404
+
405
+ # Global status check
406
+ db_status = gr.Markdown(f"**Database Status:** {FACE_DB.get_stats()}")
407
+
408
+ blur_style = gr.Radio(["pixelate", "gaussian", "solid"], label="Blur Style", value="pixelate")
409
+ blur_amount = gr.Slider(5, 50, value=25, label="Blur Intensity")
410
+ sensitivity = gr.Radio(["Balanced", "High"], label="Detection Sensitivity", value="Balanced")
411
+
412
+ gr.Info("ℹ️ Tip: Images in 'known_faces' folder are loaded automatically.")
413
+
414
+ # RIGHT COLUMN: THE 3 TABS - THE STORY
415
  with gr.Column(scale=3):
416
  with gr.Tabs():
417
+
418
+ # --- TAB 1: RAW PRIVACY ---
419
+ with gr.TabItem("1️⃣ Raw Privacy (Baseline)"):
420
+ gr.Markdown("### 🔒 Total Anonymization")
421
+ gr.Markdown("*Scenario: GDPR Compliance. Everyone is hidden. No data is extracted.*")
422
+
423
  with gr.Tabs():
424
+ with gr.TabItem("Image"):
425
+ p_img_in = gr.Image(label="Input", type="numpy", height=400)
426
+ p_img_out = gr.Image(label="Anonymized Output", height=400)
427
+ p_btn = gr.Button("Apply Privacy", variant="primary")
 
 
 
 
 
 
 
 
 
 
 
428
 
429
+ with gr.TabItem("Video"):
430
+ p_vid_in = gr.Video(label="Input Video")
431
+ p_vid_out = gr.Video(label="Anonymized Output")
432
+ p_vid_btn = gr.Button("Process Video", variant="primary")
433
 
434
+ with gr.TabItem("Webcam"):
435
+ p_web_in = gr.Image(sources=["webcam"], streaming=True, type="numpy")
436
+ p_web_out = gr.Image(label="Live Privacy Feed")
437
+
438
+ # --- TAB 2: THE DATA LAYER ---
439
+ with gr.TabItem("2️⃣ The Data Layer (Security)"):
440
+ gr.Markdown("### 🔍 Recognition & Intelligence")
441
+ gr.Markdown("*Scenario: Security Control Room. We identify Known vs Unknown. No Privacy.*")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
442
 
443
+ with gr.Tabs():
444
+ with gr.TabItem("Image"):
445
+ d_img_in = gr.Image(label="Input", type="numpy", height=400)
446
+ d_img_out = gr.Image(label="Data Output", height=400)
447
+ d_btn = gr.Button("Analyze Data", variant="primary")
448
+
449
+ with gr.TabItem("Video"):
450
+ d_vid_in = gr.Video(label="Input Video")
451
+ d_vid_out = gr.Video(label="Data Output")
452
+ d_vid_btn = gr.Button("Analyze Video", variant="primary")
453
+
454
+ with gr.TabItem("Webcam"):
455
+ d_web_in = gr.Image(sources=["webcam"], streaming=True, type="numpy")
456
+ d_web_out = gr.Image(label="Live Data Feed")
457
+
458
+ # --- TAB 3: SMART REDACTION ---
459
+ with gr.TabItem("3️⃣ Smart Redaction (Combined)"):
460
+ gr.Markdown("### 🛡️ Intelligent Privacy")
461
+ gr.Markdown("*Scenario: The Solution. Faces are blurred for privacy, but Identities are overlaid for security.*")
462
 
463
+ with gr.Tabs():
464
+ with gr.TabItem("Image"):
465
+ s_img_in = gr.Image(label="Input", type="numpy", height=400)
466
+ s_img_out = gr.Image(label="Smart Redaction Output", height=400)
467
+ s_btn = gr.Button("Apply Smart Redaction", variant="primary")
468
+
469
+ with gr.TabItem("Video"):
470
+ s_vid_in = gr.Video(label="Input Video")
471
+ s_vid_out = gr.Video(label="Smart Redaction Output")
472
+ s_vid_btn = gr.Button("Process Smart Video", variant="primary")
473
+
474
+ with gr.TabItem("Webcam"):
475
+ s_web_in = gr.Image(sources=["webcam"], streaming=True, type="numpy")
476
+ s_web_out = gr.Image(label="Live Smart Feed")
477
+
478
+ # =========================================
479
+ # EVENT HANDLERS (Wiring the Logic)
480
+ # =========================================
481
 
482
+ # Tab 1: Privacy (Mode="privacy")
483
+ p_btn.click(lambda img, s, b, a: process_frame(img, "privacy", s, b, a),
484
+ inputs=[p_img_in, sensitivity, blur_style, blur_amount], outputs=p_img_out)
 
 
485
 
486
+ p_vid_btn.click(lambda vid, s, b, a: process_video_general(vid, "privacy", s, b, a),
487
+ inputs=[p_vid_in, sensitivity, blur_style, blur_amount], outputs=p_vid_out)
 
 
 
488
 
489
+ p_web_in.stream(lambda img, s, b, a: process_frame(img, "privacy", s, b, a),
490
+ inputs=[p_web_in, sensitivity, blur_style, blur_amount], outputs=p_web_out)
491
+
492
+ # Tab 2: Data (Mode="data")
493
+ d_btn.click(lambda img, s, b, a: process_frame(img, "data", s, b, a),
494
+ inputs=[d_img_in, sensitivity, blur_style, blur_amount], outputs=d_img_out)
495
+
496
+ d_vid_btn.click(lambda vid, s, b, a: process_video_general(vid, "data", s, b, a),
497
+ inputs=[d_vid_in, sensitivity, blur_style, blur_amount], outputs=d_vid_out)
498
 
499
+ d_web_in.stream(lambda img, s, b, a: process_frame(img, "data", s, b, a),
500
+ inputs=[d_web_in, sensitivity, blur_style, blur_amount], outputs=d_web_out)
501
+
502
+ # Tab 3: Smart (Mode="smart")
503
+ s_btn.click(lambda img, s, b, a: process_frame(img, "smart", s, b, a),
504
+ inputs=[s_img_in, sensitivity, blur_style, blur_amount], outputs=s_img_out)
505
 
506
+ s_vid_btn.click(lambda vid, s, b, a: process_video_general(vid, "smart", s, b, a),
507
+ inputs=[s_vid_in, sensitivity, blur_style, blur_amount], outputs=s_vid_out)
 
 
508
 
509
+ s_web_in.stream(lambda img, s, b, a: process_frame(img, "smart", s, b, a),
510
+ inputs=[s_web_in, sensitivity, blur_style, blur_amount], outputs=s_web_out)
511
 
 
 
 
512
  if __name__ == "__main__":
513
+ # Ensure models are loaded before UI starts
514
  try:
515
+ # Trigger singleton init
516
+ if GLOBAL_DETECTOR:
517
+ logger.info("✅ System Ready. Launching...")
518
  demo.launch()
519
  except Exception as e:
520
+ logger.error(f"Startup Failed: {e}")