PrashanthB461 commited on
Commit
3962d18
Β·
verified Β·
1 Parent(s): 3506a7b

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +88 -37
app.py CHANGED
@@ -87,9 +87,10 @@ class AttendanceSystem:
87
  self.recognition_cooldown = 10 # Cooldown to prevent duplicates
88
  self.session_log: List[str] = []
89
  self.session_marked_present = set() # Track who's already marked present in this session
90
- self.session_registered = set() # Track who's already auto-registered in this session
91
  self.face_recognition_buffer = {} # Buffer for multiple detections before confirming
92
- self.buffer_threshold = 2 # Reduced threshold for faster recognition
 
93
  self.frame_skip_counter = 0 # Skip frames for better performance
94
 
95
  # Initialize
@@ -176,31 +177,33 @@ class AttendanceSystem:
176
  except Exception as e:
177
  return f"❌ Registration error: {e}", self.get_registered_workers_info()
178
 
179
- def _register_worker_auto(self, face_image: np.ndarray, face_embedding: List[float]) -> Optional[Tuple[str, str]]:
 
180
  try:
181
- # Check for duplicates with more lenient threshold for auto-registration
182
- if self._is_duplicate_face(face_embedding, threshold=12.0):
 
183
  return None
184
 
185
  worker_id = f"W{self.next_worker_id:04d}"
186
-
187
- # Check if already auto-registered in this session
188
- if worker_id in self.session_registered:
189
- return None
190
-
191
  worker_name = f"Unknown Worker {self.next_worker_id}"
 
 
192
  self._add_worker_to_system(worker_id, worker_name, face_embedding, face_image)
193
  self.save_local_worker_data()
194
 
195
- # Mark as registered in this session
196
- self.session_registered.add(worker_id)
 
197
 
198
  log_msg = f"πŸ†• [{datetime.now().strftime('%H:%M:%S')}] Auto-registered: {worker_name} ({worker_id})"
199
  self.session_log.append(log_msg)
200
  logger.info(log_msg)
 
201
  return worker_id, worker_name
202
  except Exception as e:
203
  logger.error(f"❌ Auto-registration error: {e}")
 
204
  return None
205
 
206
  def _add_worker_to_system(self, worker_id: str, name: str, embedding: List[float], image_array: np.ndarray):
@@ -221,18 +224,22 @@ class AttendanceSystem:
221
  logger.error(f"❌ Salesforce sync error for {worker_id}: {e}")
222
 
223
  def _is_duplicate_face(self, embedding: List[float], threshold: float = 10.0) -> bool:
224
- """Enhanced duplicate detection"""
225
  if not self.known_face_embeddings:
226
  return False
227
 
228
  embedding_array = np.array(embedding)
229
- for known_embedding in self.known_face_embeddings:
230
  # Use both euclidean distance and cosine similarity
231
  euclidean_dist = np.linalg.norm(embedding_array - known_embedding)
232
  cosine_sim = np.dot(embedding_array, known_embedding) / (np.linalg.norm(embedding_array) * np.linalg.norm(known_embedding))
233
 
 
 
 
234
  # If either similarity is high or distance is low, consider it duplicate
235
- if cosine_sim > 0.80 or euclidean_dist < threshold:
 
236
  return True
237
 
238
  return False
@@ -297,7 +304,7 @@ class AttendanceSystem:
297
  )
298
 
299
  # Combined score (lower is better)
300
- combined_score = euclidean_dist * (1 - cosine_sim)
301
 
302
  if combined_score < best_score:
303
  best_score = combined_score
@@ -305,15 +312,19 @@ class AttendanceSystem:
305
 
306
  return best_match_idx, best_score
307
 
 
 
 
 
308
  # --- Video Processing ---
309
  def process_frame(self, frame: np.ndarray) -> np.ndarray:
310
  """
311
- Enhanced frame processing with better accuracy and duplicate prevention
312
  """
313
  try:
314
  # Skip frames for better performance
315
  self.frame_skip_counter += 1
316
- if self.frame_skip_counter % 3 != 0: # Process every 3rd frame
317
  return frame
318
 
319
  # Use multiple detection backends for better results
@@ -333,8 +344,8 @@ class AttendanceSystem:
333
  confidence = face_obj.get('confidence', 0.0)
334
  print(f" Face #{i+1}: Confidence Score = {confidence:.2f}")
335
 
336
- # More lenient confidence threshold
337
- if confidence < 0.85:
338
  print(" -> Confidence too low, skipping.")
339
  continue
340
 
@@ -348,7 +359,7 @@ class AttendanceSystem:
348
  continue
349
 
350
  # More lenient minimum face size check
351
- if w < 50 or h < 50:
352
  print(" -> Face too small, skipping.")
353
  continue
354
 
@@ -361,6 +372,7 @@ class AttendanceSystem:
361
  continue
362
 
363
  color, worker_id, worker_name = (0, 0, 255), None, "Unknown"
 
364
 
365
  if self.known_face_embeddings:
366
  # Enhanced matching
@@ -402,34 +414,72 @@ class AttendanceSystem:
402
  del self.face_recognition_buffer[buffer_key]
403
 
404
  else:
405
- # Check if this should be auto-registered
406
- if min_dist > 15.0: # Only register if very different from existing faces
407
- color = (0, 165, 255) # Orange for potential new worker
408
- print(f" βœ— NO MATCH. Attempting to register as new worker...")
409
- new_worker = self._register_worker_auto(face_image, embedding)
410
- if new_worker:
411
- worker_id, worker_name = new_worker[0], new_worker[1]
412
- if self.mark_attendance(worker_id, worker_name):
413
- self.last_recognition_time[worker_id] = time.time()
 
 
 
 
 
414
  else:
415
- print(" βœ— Uncertain match, skipping registration.")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
416
  else:
417
- # No known faces, auto-register
 
418
  color = (0, 165, 255) # Orange for new worker
419
- print(" -> No known faces in database. Auto-registering...")
420
- new_worker = self._register_worker_auto(face_image, embedding)
421
  if new_worker:
422
  worker_id, worker_name = new_worker[0], new_worker[1]
 
423
  if self.mark_attendance(worker_id, worker_name):
424
  self.last_recognition_time[worker_id] = time.time()
 
 
425
 
426
- # Clean old buffer entries
427
  current_time = time.time()
428
  keys_to_remove = [k for k, v in self.face_recognition_buffer.items()
429
  if current_time - v['last_time'] > 5.0]
430
  for key in keys_to_remove:
431
  del self.face_recognition_buffer[key]
432
 
 
 
 
 
 
 
433
  label = f"{worker_name}" + (f" ({worker_id})" if worker_id else "")
434
  cv2.rectangle(frame, (x, y), (x+w, y+h), color, 2)
435
  cv2.putText(frame, label, (x, y-10), cv2.FONT_HERSHEY_SIMPLEX, 0.7, color, 2)
@@ -463,7 +513,7 @@ class AttendanceSystem:
463
  self.session_log.clear()
464
  self.last_recognition_time.clear()
465
  self.session_marked_present.clear() # Reset session attendance tracking
466
- self.session_registered.clear() # Reset session registration tracking
467
  self.face_recognition_buffer.clear() # Reset recognition buffer
468
  self.error_message = None
469
  self.last_processed_frame = None
@@ -482,6 +532,7 @@ class AttendanceSystem:
482
  self.last_processed_frame = None
483
  self.final_log = None
484
  self.face_recognition_buffer.clear()
 
485
  return "βœ… Processing stopped by user."
486
 
487
  # --- Helper & Reporting ---
@@ -541,7 +592,7 @@ def create_interface():
541
  stop_btn = gr.Button("⏹️ Stop Processing", variant="stop")
542
  status_box = gr.Textbox(label="Status", interactive=False, value="System Ready.")
543
  gr.Markdown("### 2. View Results in the 'Output & Log' Tab")
544
- gr.Markdown("**🎨 Color Coding:** <font color='green'>Green</font> = Known, <font color='orange'>Orange</font> = New, <font color='red'>Red</font> = Unknown")
545
 
546
  with gr.Tab("πŸ“Š Output & Log"):
547
  with gr.Row():
 
87
  self.recognition_cooldown = 10 # Cooldown to prevent duplicates
88
  self.session_log: List[str] = []
89
  self.session_marked_present = set() # Track who's already marked present in this session
90
+ self.auto_registration_candidates = {} # Track faces for auto-registration
91
  self.face_recognition_buffer = {} # Buffer for multiple detections before confirming
92
+ self.buffer_threshold = 3 # Increased threshold for more reliable recognition
93
+ self.auto_register_threshold = 5 # Detections needed before auto-registration
94
  self.frame_skip_counter = 0 # Skip frames for better performance
95
 
96
  # Initialize
 
177
  except Exception as e:
178
  return f"❌ Registration error: {e}", self.get_registered_workers_info()
179
 
180
+ def _register_worker_auto(self, face_image: np.ndarray, face_embedding: List[float], embedding_key: str) -> Optional[Tuple[str, str]]:
181
+ """Enhanced auto-registration with better duplicate checking"""
182
  try:
183
+ # Enhanced duplicate check with more permissive threshold for auto-registration
184
+ if self._is_duplicate_face(face_embedding, threshold=15.0):
185
+ print(" -> Face too similar to existing worker, skipping auto-registration.")
186
  return None
187
 
188
  worker_id = f"W{self.next_worker_id:04d}"
 
 
 
 
 
189
  worker_name = f"Unknown Worker {self.next_worker_id}"
190
+
191
+ # Add to system
192
  self._add_worker_to_system(worker_id, worker_name, face_embedding, face_image)
193
  self.save_local_worker_data()
194
 
195
+ # Remove from candidates after successful registration
196
+ if embedding_key in self.auto_registration_candidates:
197
+ del self.auto_registration_candidates[embedding_key]
198
 
199
  log_msg = f"πŸ†• [{datetime.now().strftime('%H:%M:%S')}] Auto-registered: {worker_name} ({worker_id})"
200
  self.session_log.append(log_msg)
201
  logger.info(log_msg)
202
+ print(f" βœ… AUTO-REGISTERED: {worker_name} ({worker_id})")
203
  return worker_id, worker_name
204
  except Exception as e:
205
  logger.error(f"❌ Auto-registration error: {e}")
206
+ print(f" ❌ Auto-registration failed: {e}")
207
  return None
208
 
209
  def _add_worker_to_system(self, worker_id: str, name: str, embedding: List[float], image_array: np.ndarray):
 
224
  logger.error(f"❌ Salesforce sync error for {worker_id}: {e}")
225
 
226
  def _is_duplicate_face(self, embedding: List[float], threshold: float = 10.0) -> bool:
227
+ """Enhanced duplicate detection with configurable threshold"""
228
  if not self.known_face_embeddings:
229
  return False
230
 
231
  embedding_array = np.array(embedding)
232
+ for i, known_embedding in enumerate(self.known_face_embeddings):
233
  # Use both euclidean distance and cosine similarity
234
  euclidean_dist = np.linalg.norm(embedding_array - known_embedding)
235
  cosine_sim = np.dot(embedding_array, known_embedding) / (np.linalg.norm(embedding_array) * np.linalg.norm(known_embedding))
236
 
237
+ # Debug output for duplicate checking
238
+ print(f" Comparing with {self.known_face_names[i]}: Euclidean={euclidean_dist:.2f}, Cosine={cosine_sim:.3f}")
239
+
240
  # If either similarity is high or distance is low, consider it duplicate
241
+ if cosine_sim > 0.75 or euclidean_dist < threshold:
242
+ print(f" -> DUPLICATE detected with {self.known_face_names[i]}")
243
  return True
244
 
245
  return False
 
304
  )
305
 
306
  # Combined score (lower is better)
307
+ combined_score = euclidean_dist * (2 - cosine_sim) # Adjusted weighting
308
 
309
  if combined_score < best_score:
310
  best_score = combined_score
 
312
 
313
  return best_match_idx, best_score
314
 
315
+ def _create_embedding_key(self, embedding: np.ndarray) -> str:
316
+ """Create a unique key for an embedding for tracking purposes"""
317
+ return str(hash(tuple(embedding[:10]))) # Use first 10 values for key
318
+
319
  # --- Video Processing ---
320
  def process_frame(self, frame: np.ndarray) -> np.ndarray:
321
  """
322
+ Enhanced frame processing with improved auto-registration
323
  """
324
  try:
325
  # Skip frames for better performance
326
  self.frame_skip_counter += 1
327
+ if self.frame_skip_counter % 2 != 0: # Process every 2nd frame
328
  return frame
329
 
330
  # Use multiple detection backends for better results
 
344
  confidence = face_obj.get('confidence', 0.0)
345
  print(f" Face #{i+1}: Confidence Score = {confidence:.2f}")
346
 
347
+ # More lenient confidence threshold for auto-registration
348
+ if confidence < 0.80:
349
  print(" -> Confidence too low, skipping.")
350
  continue
351
 
 
359
  continue
360
 
361
  # More lenient minimum face size check
362
+ if w < 40 or h < 40:
363
  print(" -> Face too small, skipping.")
364
  continue
365
 
 
372
  continue
373
 
374
  color, worker_id, worker_name = (0, 0, 255), None, "Unknown"
375
+ embedding_key = self._create_embedding_key(embedding_array)
376
 
377
  if self.known_face_embeddings:
378
  # Enhanced matching
 
414
  del self.face_recognition_buffer[buffer_key]
415
 
416
  else:
417
+ # Unknown face - candidate for auto-registration
418
+ print(f" βœ— NO MATCH. Distance: {min_dist:.2f} - Candidate for auto-registration")
419
+ color = (0, 165, 255) # Orange for potential new worker
420
+
421
+ # Track this face for auto-registration
422
+ if embedding_key not in self.auto_registration_candidates:
423
+ self.auto_registration_candidates[embedding_key] = {
424
+ 'count': 1,
425
+ 'embedding': embedding,
426
+ 'face_image': face_image.copy(),
427
+ 'last_time': time.time(),
428
+ 'quality_score': confidence
429
+ }
430
+ print(f" -> Added to auto-registration candidates (1/{self.auto_register_threshold})")
431
  else:
432
+ candidate = self.auto_registration_candidates[embedding_key]
433
+ candidate['count'] += 1
434
+ candidate['last_time'] = time.time()
435
+
436
+ # Update with better quality image if available
437
+ if confidence > candidate['quality_score']:
438
+ candidate['face_image'] = face_image.copy()
439
+ candidate['quality_score'] = confidence
440
+
441
+ print(f" -> Auto-registration candidate count: ({candidate['count']}/{self.auto_register_threshold})")
442
+
443
+ # Auto-register if we've seen this face enough times
444
+ if candidate['count'] >= self.auto_register_threshold:
445
+ print(f" -> Attempting auto-registration...")
446
+ new_worker = self._register_worker_auto(candidate['face_image'], candidate['embedding'], embedding_key)
447
+ if new_worker:
448
+ worker_id, worker_name = new_worker[0], new_worker[1]
449
+ color = (0, 255, 0) # Change to green for registered
450
+ # Mark attendance for newly registered worker
451
+ if self.mark_attendance(worker_id, worker_name):
452
+ self.last_recognition_time[worker_id] = time.time()
453
+
454
+ # Clean up if not auto-registered, use generic label
455
+ if not worker_id:
456
+ worker_name = "New Face"
457
  else:
458
+ # No known faces in database - immediate auto-registration attempt
459
+ print(" -> No known faces in database. Attempting immediate auto-registration...")
460
  color = (0, 165, 255) # Orange for new worker
461
+ new_worker = self._register_worker_auto(face_image, embedding, embedding_key)
 
462
  if new_worker:
463
  worker_id, worker_name = new_worker[0], new_worker[1]
464
+ color = (0, 255, 0) # Change to green for registered
465
  if self.mark_attendance(worker_id, worker_name):
466
  self.last_recognition_time[worker_id] = time.time()
467
+ else:
468
+ worker_name = "New Face"
469
 
470
+ # Clean old buffer entries and candidates
471
  current_time = time.time()
472
  keys_to_remove = [k for k, v in self.face_recognition_buffer.items()
473
  if current_time - v['last_time'] > 5.0]
474
  for key in keys_to_remove:
475
  del self.face_recognition_buffer[key]
476
 
477
+ # Clean old auto-registration candidates (older than 30 seconds)
478
+ candidates_to_remove = [k for k, v in self.auto_registration_candidates.items()
479
+ if current_time - v['last_time'] > 30.0]
480
+ for key in candidates_to_remove:
481
+ del self.auto_registration_candidates[key]
482
+
483
  label = f"{worker_name}" + (f" ({worker_id})" if worker_id else "")
484
  cv2.rectangle(frame, (x, y), (x+w, y+h), color, 2)
485
  cv2.putText(frame, label, (x, y-10), cv2.FONT_HERSHEY_SIMPLEX, 0.7, color, 2)
 
513
  self.session_log.clear()
514
  self.last_recognition_time.clear()
515
  self.session_marked_present.clear() # Reset session attendance tracking
516
+ self.auto_registration_candidates.clear() # Reset auto-registration candidates
517
  self.face_recognition_buffer.clear() # Reset recognition buffer
518
  self.error_message = None
519
  self.last_processed_frame = None
 
532
  self.last_processed_frame = None
533
  self.final_log = None
534
  self.face_recognition_buffer.clear()
535
+ self.auto_registration_candidates.clear()
536
  return "βœ… Processing stopped by user."
537
 
538
  # --- Helper & Reporting ---
 
592
  stop_btn = gr.Button("⏹️ Stop Processing", variant="stop")
593
  status_box = gr.Textbox(label="Status", interactive=False, value="System Ready.")
594
  gr.Markdown("### 2. View Results in the 'Output & Log' Tab")
595
+ gr.Markdown("**🎨 Color Coding:** <font color='green'>Green</font> = Known/Registered, <font color='orange'>Orange</font> = New Face, <font color='red'>Red</font> = Unknown")
596
 
597
  with gr.Tab("πŸ“Š Output & Log"):
598
  with gr.Row():