# Suppress TensorFlow oneDNN warnings import os os.environ['TF_ENABLE_ONEDNN_OPTS'] = '0' # Standard Library Imports import base64 import json import logging import queue import threading import time from datetime import datetime, date from io import BytesIO from typing import Tuple, Optional, List, Dict import pickle from collections import OrderedDict # Third-Party Imports import cv2 import gradio as gr import numpy as np from PIL import Image import requests from dotenv import load_dotenv from deepface import DeepFace from retrying import retry from simple_salesforce import Salesforce from scipy.spatial import distance as dist # --- CONFIGURATION --- logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") logger = logging.getLogger(__name__) load_dotenv() # --- API & SALESFORCE CREDENTIALS --- HF_API_URL = "https://api-inference.huggingface.co/models/Salesforce/blip-image-captioning-base" HF_API_TOKEN = os.getenv("HUGGINGFACE_API_TOKEN") SF_CREDENTIALS = { "username": os.getenv("SF_USERNAME", "smartlabour@attendance.system"), "password": os.getenv("SF_PASSWORD", "#Prashanth@1234"), "security_token": os.getenv("SF_SECURITY_TOKEN", "7xPmtDFoWlZUGK0V2QSwFZJ6c"), "domain": os.getenv("SF_DOMAIN", "login") } # --- PERFORMANCE & ACCURACY TUNING --- FACE_MATCH_THRESHOLD = 0.6 # Stricter threshold for accurate matching AUTO_REGISTER_CONFIDENCE = 0.98 # Confidence to register a new face MAX_DISAPPEARED_FRAMES = 20 # How many frames to keep tracking a lost face # --- FACE TRACKER CLASS --- class FaceTracker: def __init__(self, attendance_system): self.attendance_system = attendance_system self.next_object_id = 0 self.objects = OrderedDict() self.disappeared = OrderedDict() self.face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml') def register(self, centroid, bbox): object_id = self.next_object_id self.objects[object_id] = {'centroid': centroid, 'bbox': bbox, 'name': "Identifying...", 'worker_id': None, 'color': (255, 255, 0)} # Cyan for identifying self.disappeared[object_id] = 0 self.next_object_id += 1 # Start recognition in a separate thread to not block the video stream threading.Thread(target=self.recognize_face, args=(object_id, bbox)).start() def recognize_face(self, object_id, bbox): """Runs expensive recognition and updates the tracker object.""" (startX, startY, endX, endY) = bbox face_image = self.attendance_system.current_frame[startY:endY, startX:endX] if face_image.size == 0: return result = self.attendance_system.identify_and_log_face(face_image) if result and object_id in self.objects: self.objects[object_id].update({ 'name': result['name'], 'worker_id': result['worker_id'], 'color': result['color'] }) def deregister(self, object_id): del self.objects[object_id] del self.disappeared[object_id] def update(self, frame): # Use a fast detector to find potential face locations gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) rects = self.face_cascade.detectMultiScale(gray, scaleFactor=1.1, minNeighbors=5, minSize=(30, 30)) input_centroids = np.zeros((len(rects), 2), dtype="int") bboxes = [] for (i, (x, y, w, h)) in enumerate(rects): cX = int(x + w / 2.0) cY = int(y + h / 2.0) input_centroids[i] = (cX, cY) bboxes.append((x, y, x + w, y + h)) if len(self.objects) == 0: for i in range(len(input_centroids)): self.register(input_centroids[i], bboxes[i]) else: object_ids = list(self.objects.keys()) object_centroids = [obj['centroid'] for obj in self.objects.values()] D = dist.cdist(np.array(object_centroids), input_centroids) rows = D.min(axis=1).argsort() cols = D.argmin(axis=1)[rows] used_rows, used_cols = set(), set() for (row, col) in zip(rows, cols): if row in used_rows or col in used_cols: continue object_id = object_ids[row] self.objects[object_id]['centroid'] = input_centroids[col] self.objects[object_id]['bbox'] = bboxes[col] self.disappeared[object_id] = 0 used_rows.add(row) used_cols.add(col) unused_rows = set(range(D.shape[0])).difference(used_rows) unused_cols = set(range(D.shape[1])).difference(used_cols) for row in unused_rows: object_id = object_ids[row] self.disappeared[object_id] += 1 if self.disappeared[object_id] > MAX_DISAPPEARED_FRAMES: self.deregister(object_id) for col in unused_cols: self.register(input_centroids[col], bboxes[col]) # Draw results on the frame for object_id, data in self.objects.items(): (startX, startY, endX, endY) = data['bbox'] label = data['name'] cv2.rectangle(frame, (startX, startY), (endX, endY), data['color'], 2) cv2.putText(frame, label, (startX, startY - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.7, data['color'], 2) return frame # --- SALESFORCE CONNECTION --- @retry(stop_max_attempt_number=3, wait_fixed=2000) def connect_to_salesforce() -> Optional[Salesforce]: try: sf = Salesforce(**SF_CREDENTIALS) sf.describe() logger.info("✅ Successfully connected to Salesforce.") return sf except Exception as e: logger.error(f"❌ Salesforce connection failed: {e}") raise # --- CORE LOGIC --- class AttendanceSystem: def __init__(self): self.processing_thread = None self.is_processing = threading.Event() self.frame_queue = queue.Queue(maxsize=30) self.error_message = None self.current_frame = None self.known_face_embeddings: List[np.ndarray] = [] self.known_face_names: List[str] = [] self.known_face_ids: List[str] = [] self.next_worker_id: int = 1 self.session_log: List[str] = [] self.session_attended_ids = set() self.sf = connect_to_salesforce() self._create_directories() self.load_worker_data() def _create_directories(self): os.makedirs("data/faces", exist_ok=True) def load_worker_data(self): # This logic remains the same logger.info("Loading worker data...") if self.sf: try: workers = self.sf.query_all("SELECT Worker_ID__c, Name, Face_Embedding__c FROM Worker__c")['records'] if not workers: self._load_local_worker_data() return temp_embeddings, temp_names, temp_ids, max_id = [], [], [], 0 for worker in workers: if worker.get('Face_Embedding__c'): temp_embeddings.append(np.array(json.loads(worker['Face_Embedding__c']))) temp_names.append(worker['Name']) temp_ids.append(worker['Worker_ID__c']) try: worker_num = int(''.join(filter(str.isdigit, worker['Worker_ID__c']))) if worker_num > max_id: max_id = worker_num except (ValueError, TypeError): continue self.known_face_embeddings = temp_embeddings self.known_face_names = temp_names self.known_face_ids = temp_ids self.next_worker_id = max_id + 1 self.save_local_worker_data() logger.info(f"✅ Loaded {len(self.known_face_ids)} workers from Salesforce. Next ID: {self.next_worker_id}") except Exception as e: logger.error(f"❌ Error loading from Salesforce: {e}. Attempting local load.") self._load_local_worker_data() else: logger.warning("Salesforce not connected. Loading from local cache.") self._load_local_worker_data() def _load_local_worker_data(self): try: if os.path.exists("data/workers.pkl"): with open("data/workers.pkl", "rb") as f: data = pickle.load(f) self.known_face_embeddings = data.get("embeddings", []) self.known_face_names = data.get("names", []) self.known_face_ids = data.get("ids", []) self.next_worker_id = data.get("next_id", 1) logger.info(f"✅ Loaded {len(self.known_face_ids)} workers from local cache. Next ID: {self.next_worker_id}") except Exception as e: logger.error(f"❌ Error loading local data: {e}") def save_local_worker_data(self): try: worker_data = {"embeddings": self.known_face_embeddings, "names": self.known_face_names, "ids": self.known_face_ids, "next_id": self.next_worker_id} with open("data/workers.pkl", "wb") as f: pickle.dump(worker_data, f) except Exception as e: logger.error(f"❌ Error saving local worker data: {e}") def register_worker_manual(self, image: Image.Image, name: str) -> Tuple[str, str]: # This logic remains the same if image is None or not name.strip(): return "❌ Please provide both image and name!", self.get_registered_workers_info() try: image_array = np.array(image) DeepFace.analyze(img_path=image_array, actions=['emotion'], enforce_detection=True) embedding = DeepFace.represent(img_path=image_array, model_name='Facenet', enforce_detection=False)[0]['embedding'] if self._is_duplicate_face(embedding): return f"❌ Face matches an existing worker!", self.get_registered_workers_info() worker_id = f"W{self.next_worker_id:04d}" name = name.strip().title() self._add_worker_to_system(worker_id, name, embedding, image_array) self.save_local_worker_data() return f"✅ {name} registered with ID: {worker_id}!", self.get_registered_workers_info() except ValueError: return "❌ No face detected in the image!", self.get_registered_workers_info() except Exception as e: logger.error(f"Manual registration error: {e}") return f"❌ Registration error: {e}", self.get_registered_workers_info() def _register_worker_auto(self, face_image: np.ndarray) -> Optional[Dict]: try: embedding = DeepFace.represent(img_path=face_image, model_name='Facenet', enforce_detection=False)[0]['embedding'] if self._is_duplicate_face(embedding): return None worker_id = f"W{self.next_worker_id:04d}" worker_name = f"New Worker {self.next_worker_id}" self._add_worker_to_system(worker_id, worker_name, embedding, face_image) self.save_local_worker_data() self.mark_attendance(worker_id, worker_name, is_new=True) return {'name': worker_name, 'worker_id': worker_id} except Exception as e: logger.error(f"❌ Auto-registration error: {e}") return None def _add_worker_to_system(self, worker_id: str, name: str, embedding: List[float], image_array: np.ndarray): self.known_face_embeddings.append(np.array(embedding)) self.known_face_names.append(name) self.known_face_ids.append(worker_id) self.next_worker_id += 1 # ... (Salesforce sync logic remains the same) def _is_duplicate_face(self, embedding: List[float]) -> bool: if not self.known_face_embeddings: return False distances = [np.linalg.norm(np.array(embedding) - known) for known in self.known_face_embeddings] return min(distances) < FACE_MATCH_THRESHOLD def mark_attendance(self, worker_id: str, worker_name: str, is_new: bool = False): if worker_id in self.session_attended_ids: return self.session_attended_ids.add(worker_id) current_time_str = datetime.now().strftime('%H:%M:%S') log_msg = "" if is_new: log_msg = f"🆕 [{current_time_str}] Registered: {worker_name} ({worker_id})" else: log_msg = f"✅ [{current_time_str}] Present: {worker_name} ({worker_id})" self.session_log.insert(0, log_msg) # Add to top of the log # ... (Salesforce attendance marking logic remains the same) def identify_and_log_face(self, face_image: np.ndarray) -> Optional[Dict]: """Core recognition logic called by the tracker.""" try: embedding = DeepFace.represent(img_path=face_image, model_name='Facenet', enforce_detection=False)[0]['embedding'] if self.known_face_embeddings: distances = [np.linalg.norm(np.array(embedding) - known) for known in self.known_face_embeddings] min_dist = min(distances) if min_dist < FACE_MATCH_THRESHOLD: match_index = np.argmin(distances) worker_id = self.known_face_ids[match_index] worker_name = self.known_face_names[match_index] self.mark_attendance(worker_id, worker_name) return {'name': worker_name, 'worker_id': worker_id, 'color': (0, 255, 0)} # Green # If no match or no known faces, try to auto-register new_worker = self._register_worker_auto(face_image) if new_worker: return {'name': new_worker['name'], 'worker_id': new_worker['worker_id'], 'color': (0, 165, 255)} # Orange except Exception as e: logger.error(f"Face identification failed: {e}") return {'name': 'Unknown', 'worker_id': None, 'color': (0, 0, 255)} # Red def _processing_loop(self, source): video_capture = cv2.VideoCapture(source) if not video_capture.isOpened(): self.error_message = "❌ Error: Could not open video source." self.is_processing.clear() return tracker = FaceTracker(self) while self.is_processing.is_set(): ret, frame = video_capture.read() if not ret: break self.current_frame = frame.copy() annotated_frame = tracker.update(self.current_frame) if not self.frame_queue.full(): self.frame_queue.put(annotated_frame) else: # If queue is full, drop the oldest frame to prevent lag try: self.frame_queue.get_nowait() except queue.Empty: pass self.frame_queue.put(annotated_frame) time.sleep(0.01) # Yield CPU video_capture.release() self.is_processing.clear() logger.info("Processing finished.") def start_processing(self, source) -> str: if self.is_processing.is_set(): return "⚠️ Processing is already active." self.session_log.clear() self.session_attended_ids.clear() self.error_message = None while not self.frame_queue.empty(): self.frame_queue.get() # Clear queue self.is_processing.set() self.processing_thread = threading.Thread(target=self._processing_loop, args=(source,)) self.processing_thread.daemon = True self.processing_thread.start() return "✅ Started processing..." def stop_processing(self) -> str: if not self.is_processing.is_set(): return "⚠️ Processing is not currently active." self.is_processing.clear() if self.processing_thread: self.processing_thread.join(timeout=2) return "✅ Processing stopped by user." def get_registered_workers_info(self) -> str: # This logic remains the same self.load_worker_data() if not self.known_face_ids: return "No workers registered." info_list = [f"- **{name}** (ID: {id})" for name, id in sorted(zip(self.known_face_names, self.known_face_ids))] return f"**👥 Registered Workers ({len(info_list)})**\n" + "\n".join(info_list) # --- GRADIO UI --- attendance_system = AttendanceSystem() def create_interface(): with gr.Blocks(theme=gr.themes.Soft(), title="Attendance System") as demo: gr.Markdown("# 🚀 High-Performance Face Recognition Attendance") with gr.Tabs(): with gr.Tab("⚙️ Controls & Status"): # ... UI layout remains the same ... gr.Markdown("### 1. Choose Input Source & Start Processing") with gr.Row(): with gr.Column(scale=1): selected_tab_index = gr.Number(value=0, visible=False) with gr.Tabs() as video_tabs: with gr.Tab("Live Camera", id=0): camera_source = gr.Number(label="Camera Source Index", value=0, precision=0) with gr.Tab("Upload Video", id=1): video_file = gr.Video(label="Upload Video File", sources=["upload"]) with gr.Column(scale=1): start_btn = gr.Button("▶️ Start Processing", variant="primary") stop_btn = gr.Button("⏹️ Stop Processing", variant="stop") status_box = gr.Textbox(label="Status", interactive=False, value="System Ready.") gr.Markdown("### 2. View Results in the 'Output & Log' Tab") gr.Markdown("**🎨 Color Coding:** Identifying, Present, New, Unknown") with gr.Tab("📊 Output & Log"): with gr.Row(equal_height=True): with gr.Column(scale=2): video_output = gr.Image(label="Recognition Output", interactive=False, type="numpy") with gr.Column(scale=1): session_log_display = gr.Markdown(label="📋 Session Log", value="System is ready.") with gr.Tab("👤 Worker Management"): # ... UI layout remains the same ... with gr.Row(): with gr.Column(): gr.Markdown("### Register New Worker") register_image = gr.Image(label="Upload Worker's Photo", type="pil", sources=["upload"]) register_name = gr.Textbox(label="Worker's Full Name") register_btn = gr.Button("Register Worker", variant="primary") register_output = gr.Textbox(label="Registration Status", interactive=False) with gr.Column(): gr.Markdown("### Current Worker Roster") registered_workers_info = gr.Markdown(value=attendance_system.get_registered_workers_info()) refresh_workers_btn = gr.Button("🔄 Refresh List") # --- Event Handlers --- def on_tab_select(evt: gr.SelectData): return evt.index video_tabs.select(fn=on_tab_select, inputs=None, outputs=[selected_tab_index]) def start_wrapper(tab_index, cam_src, vid_path): source = int(cam_src) if tab_index == 0 else vid_path return "Please provide an input source." if source is None else attendance_system.start_processing(source) start_btn.click(fn=start_wrapper, inputs=[selected_tab_index, camera_source, video_file], outputs=[status_box]) stop_btn.click(fn=attendance_system.stop_processing, inputs=None, outputs=[status_box]) register_btn.click(fn=attendance_system.register_worker_manual, inputs=[register_image, register_name], outputs=[register_output, registered_workers_info]) refresh_workers_btn.click(fn=attendance_system.get_registered_workers_info, outputs=[registered_workers_info]) def update_ui_generator(): last_frame = None while True: status_text = "Status: Processing..." log_md = "\n".join(attendance_system.session_log) or "Awaiting detections..." frame = None if attendance_system.is_processing.is_set(): try: frame = attendance_system.frame_queue.get_nowait() last_frame = frame except queue.Empty: frame = last_frame # Show last frame if no new one else: status_text = "Status: Stopped." log_md = "Processing stopped. Final Log:\n\n" + log_md if frame is not None: yield cv2.cvtColor(frame, cv2.COLOR_BGR2RGB), log_md, status_text else: yield None, log_md, status_text time.sleep(1/30) # Aim for ~30 FPS UI updates demo.load(fn=update_ui_generator, outputs=[video_output, session_log_display, status_box]) return demo if __name__ == "__main__": # Helper functions for Salesforce sync (omitted for brevity, they are unchanged) def _get_image_caption(image: Image.Image) -> str: # This function is not part of the class but can be called by it if not HF_API_TOKEN: return "Hugging Face API token not configured." try: buffered = BytesIO() image.save(buffered, format="JPEG") img_data = buffered.getvalue() headers = {"Authorization": f"Bearer {HF_API_TOKEN}"} response = requests.post(HF_API_URL, headers=headers, data=img_data, timeout=10) response.raise_for_status() result = response.json() return result[0].get("generated_text", "No caption found.") except Exception as e: logger.error(f"Hugging Face API error: {e}") return "Caption generation failed." def _upload_image_to_salesforce(self, image: Image.Image, record_id: str, worker_id: str) -> Optional[str]: # This function is not part of the class but can be called by it if not self.sf: return None try: buffered = BytesIO() image.save(buffered, format="JPEG") encoded_image = base64.b64encode(buffered.getvalue()).decode('utf-8') cv = self.sf.ContentVersion.create({'Title': f'Image_{worker_id}', 'PathOnClient': f'{worker_id}.jpg', 'VersionData': encoded_image, 'FirstPublishLocationId': record_id}) content_doc_link = self.sf.query(f"SELECT ContentDocumentId FROM ContentDocumentLink WHERE LinkedEntityId = '{record_id}'")['records'][0] content_doc_id = content_doc_link['ContentDocumentId'] return f"/sfc/servlet.shepherd/document/download/{content_doc_id}" except Exception as e: logger.error(f"Salesforce image upload error: {e}") return None # Monkey-patch helper methods into the class instance for simplicity AttendanceSystem._get_image_caption = _get_image_caption AttendanceSystem._upload_image_to_salesforce = _upload_image_to_salesforce app = create_interface() app.queue() app.launch(server_name="0.0.0.0", server_port=7860, show_error=True, debug=True)