# Suppress TensorFlow oneDNN warnings import os os.environ['TF_ENABLE_ONEDNN_OPTS'] = '0' # Standard Library Imports import base64 import json import logging import queue import threading import time from datetime import datetime, date, timedelta from io import BytesIO from typing import Tuple, Optional, List, Dict import pickle from collections import OrderedDict # Third-Party Imports import cv2 import gradio as gr import numpy as np from PIL import Image import requests from dotenv import load_dotenv from deepface import DeepFace from retrying import retry from simple_salesforce import Salesforce from scipy.spatial import distance as dist # --- CONFIGURATION --- logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") logger = logging.getLogger(__name__) load_dotenv() # --- API & SALESFORCE CREDENTIALS --- HF_API_URL = "https://api-inference.huggingface.co/models/Salesforce/blip-image-captioning-base" HF_API_TOKEN = os.getenv("HUGGINGFACE_API_TOKEN") SF_CREDENTIALS = { "username": os.getenv("SF_USERNAME", "smartlabour@attendance.system"), "password": os.getenv("SF_PASSWORD", "#Prashanth@1234"), "security_token": os.getenv("SF_SECURITY_TOKEN", "7xPmtDFoWlZUGK0V2QSwFZJ6c"), "domain": os.getenv("SF_DOMAIN", "login") } # --- PERFORMANCE & ACCURACY TUNING --- FACE_MATCH_THRESHOLD = 0.6 # Stricter threshold for accurate matching AUTO_REGISTER_CONFIDENCE = 0.98 # Confidence to register a new face MAX_DISAPPEARED_FRAMES = 20 # Frames to keep tracking a lost face BATCH_SIZE = 5 # Process faces in batches for efficiency # --- FACE TRACKER CLASS --- class FaceTracker: def __init__(self, attendance_system): self.attendance_system = attendance_system self.next_object_id = 0 self.objects = OrderedDict() self.disappeared = OrderedDict() self.face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml') def register(self, centroid, bbox): object_id = self.next_object_id self.objects[object_id] = {'centroid': centroid, 'bbox': bbox, 'name': "Identifying...", 'worker_id': None, 'color': (255, 255, 0)} # Cyan self.disappeared[object_id] = 0 self.next_object_id += 1 threading.Thread(target=self.recognize_face, args=(object_id, bbox)).start() def recognize_face(self, object_id, bbox): (startX, startY, endX, endY) = bbox face_image = self.attendance_system.current_frame[startY:endY, startX:endX] if face_image.size == 0: return result = self.attendance_system.identify_and_log_face(face_image) if result and object_id in self.objects: self.objects[object_id].update({ 'name': result['name'], 'worker_id': result['worker_id'], 'color': result['color'] }) def deregister(self, object_id): del self.objects[object_id] del self.disappeared[object_id] def update(self, frame): gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) rects = self.face_cascade.detectMultiScale(gray, scaleFactor=1.1, minNeighbors=5, minSize=(30, 30)) input_centroids = np.zeros((len(rects), 2), dtype="int") bboxes = [] for (i, (x, y, w, h)) in enumerate(rects): cX = int(x + w / 2.0) cY = int(y + h / 2.0) input_centroids[i] = (cX, cY) bboxes.append((x, y, x + w, y + h)) if len(self.objects) == 0: for i in range(len(input_centroids)): self.register(input_centroids[i], bboxes[i]) else: object_ids = list(self.objects.keys()) object_centroids = [obj['centroid'] for obj in self.objects.values()] D = dist.cdist(np.array(object_centroids), input_centroids) rows = D.min(axis=1).argsort() cols = D.argmin(axis=1)[rows] used_rows, used_cols = set(), set() for (row, col) in zip(rows, cols): if row in used_rows or col in used_cols: continue object_id = object_ids[row] self.objects[object_id]['centroid'] = input_centroids[col] self.objects[object_id]['bbox'] = bboxes[col] self.disappeared[object_id] = 0 used_rows.add(row) used_cols.add(col) unused_rows = set(range(D.shape[0])).difference(used_rows) unused_cols = set(range(D.shape[1])).difference(used_cols) for row in unused_rows: object_id = object_ids[row] self.disappeared[object_id] += 1 if self.disappeared[object_id] > MAX_DISAPPEARED_FRAMES: self.deregister(object_id) for col in unused_cols: self.register(input_centroids[col], bboxes[col]) for object_id, data in self.objects.items(): (startX, startY, endX, endY) = data['bbox'] label = data['name'] cv2.rectangle(frame, (startX, startY), (endX, endY), data['color'], 2) cv2.putText(frame, label, (startX, startY - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.7, data['color'], 2) return frame # --- SALESFORCE CONNECTION --- @retry(stop_max_attempt_number=3, wait_fixed=2000) def connect_to_salesforce() -> Optional[Salesforce]: try: sf = Salesforce(**SF_CREDENTIALS) sf.describe() logger.info("✅ Successfully connected to Salesforce.") return sf except Exception as e: logger.error(f"❌ Salesforce connection failed: {e}") raise # --- CORE LOGIC --- class AttendanceSystem: def __init__(self): self.processing_thread = None self.is_processing = threading.Event() self.frame_queue = queue.Queue(maxsize=30) self.error_message = None self.current_frame = None self.known_face_embeddings: List[List[np.ndarray]] = [] # Store multiple embeddings per worker self.known_face_names: List[str] = [] self.known_face_ids: List[str] = [] self.next_worker_id: int = 1 self.session_log_present: List[str] = [] # Workers marked present self.session_log_new: List[str] = [] # Newly registered workers self.session_attended_ids: Dict[str, datetime] = {} # Track attendance with timestamps self.sf = connect_to_salesforce() self._create_directories() self.load_worker_data() def _create_directories(self): os.makedirs("data/faces", exist_ok=True) def load_worker_data(self): logger.info("Loading worker data...") if self.sf: try: workers = self.sf.query_all("SELECT Worker_ID__c, Name, Face_Embeddings__c FROM Worker__c")['records'] if not workers: self._load_local_worker_data() return temp_embeddings, temp_names, temp_ids, max_id = [], [], [], 0 for worker in workers: if worker.get('Face_Embeddings__c'): embeddings = [np.array(emb) for emb in json.loads(worker['Face_Embeddings__c'])] temp_embeddings.append(embeddings) temp_names.append(worker['Name']) temp_ids.append(worker['Worker_ID__c']) try: worker_num = int(''.join(filter(str.isdigit, worker['Worker_ID__c']))) if worker_num > max_id: max_id = worker_num except (ValueError, TypeError): continue self.known_face_embeddings = temp_embeddings self.known_face_names = temp_names self.known_face_ids = temp_ids self.next_worker_id = max_id + 1 self.save_local_worker_data() logger.info(f"✅ Loaded {len(self.known_face_ids)} workers from Salesforce. Next ID: {self.next_worker_id}") except Exception as e: logger.error(f"❌ Error loading from Salesforce: {e}. Attempting local load.") self._load_local_worker_data() else: logger.warning("Salesforce not connected. Loading from local cache.") self._load_local_worker_data() def _load_local_worker_data(self): try: if os.path.exists("data/workers.pkl"): with open("data/workers.pkl", "rb") as f: data = pickle.load(f) self.known_face_embeddings = data.get("embeddings", []) self.known_face_names = data.get("names", []) self.known_face_ids = data.get("ids", []) self.next_worker_id = data.get("next_id", 1) logger.info(f"✅ Loaded {len(self.known_face_ids)} workers from local cache. Next ID: {self.next_worker_id}") except Exception as e: logger.error(f"❌ Error loading local data: {e}") def save_local_worker_data(self): try: worker_data = { "embeddings": self.known_face_embeddings, "names": self.known_face_names, "ids": self.known_face_ids, "next_id": self.next_worker_id } with open("data/workers.pkl", "wb") as f: pickle.dump(worker_data, f) except Exception as e: logger.error(f"❌ Error saving local worker data: {e}") def register_worker_manual(self, image: Image.Image, name: str) -> Tuple[str, str]: if image is None or not name.strip(): return "❌ Please provide both image and name!", self.get_registered_workers_info() try: image_array = np.array(image) DeepFace.analyze(img_path=image_array, actions=['emotion'], enforce_detection=True) embedding = DeepFace.represent(img_path=image_array, model_name='Facenet', enforce_detection=False)[0]['embedding'] if self._is_duplicate_face(embedding): return f"❌ Face matches an existing worker!", self.get_registered_workers_info() worker_id = f"W{self.next_worker_id:04d}" name = name.strip().title() self._add_worker_to_system(worker_id, name, [embedding], image_array) self.save_local_worker_data() return f"✅ {name} registered with ID: {worker_id}!", self.get_registered_workers_info() except ValueError: return "❌ No face detected in the image!", self.get_registered_workers_info() except Exception as e: logger.error(f"Manual registration error: {e}") return f"❌ Registration error: {e}", self.get_registered_workers_info() def _register_worker_auto(self, face_image: np.ndarray) -> Optional[Dict]: try: analysis = DeepFace.analyze(img_path=face_image, actions=['emotion'], enforce_detection=False) if analysis[0]['face_confidence'] < AUTO_REGISTER_CONFIDENCE: return None embedding = DeepFace.represent(img_path=face_image, model_name='Facenet', enforce_detection=False)[0]['embedding'] if self._is_duplicate_face(embedding): return None worker_id = f"W{self.next_worker_id:04d}" worker_name = f"New Worker {self.next_worker_id}" self._add_worker_to_system(worker_id, worker_name, [embedding], face_image) self.save_local_worker_data() self.mark_attendance(worker_id, worker_name, is_new=True) return {'name': worker_name, 'worker_id': worker_id} except Exception as e: logger.error(f"❌ Auto-registration error: {e}") return None def _add_worker_to_system(self, worker_id: str, name: str, embeddings: List[np.ndarray], image_array: np.ndarray): self.known_face_embeddings.append(embeddings) self.known_face_names.append(name) self.known_face_ids.append(worker_id) self.next_worker_id += 1 if self.sf: try: image = Image.fromarray(cv2.cvtColor(image_array, cv2.COLOR_BGR2RGB)) caption = self._get_image_caption(image) worker_record = self.sf.Worker__c.create({ 'Worker_ID__c': worker_id, 'Name': name, 'Face_Embeddings__c': json.dumps([emb.tolist() for emb in embeddings]), 'Image_Caption__c': caption }) content_doc_id = self._upload_image_to_salesforce(image, worker_record['id'], worker_id) if content_doc_id: self.sf.Worker__c.update(worker_record['id'], {'Image_URL__c': content_doc_id}) except Exception as e: logger.error(f"❌ Salesforce worker sync error: {e}") def _is_duplicate_face(self, embedding: List[float]) -> bool: if not self.known_face_embeddings: return False for worker_embeddings in self.known_face_embeddings: for known_emb in worker_embeddings: distance = np.linalg.norm(np.array(embedding) - np.array(known_emb)) if distance < FACE_MATCH_THRESHOLD: return True return False def mark_attendance(self, worker_id: str, worker_name: str, is_new: bool = False): now = datetime.now() if worker_id in self.session_attended_ids: last_attendance = self.session_attended_ids[worker_id] if now - last_attendance < timedelta(hours=24): return self.session_attended_ids[worker_id] = now current_time_str = now.strftime('%H:%M:%S') log_msg = f"🆕 [{current_time_str}] Registered: {worker_name} ({worker_id})" if is_new else f"✅ [{current_time_str}] Present: {worker_name} ({worker_id})" target_log = self.session_log_new if is_new else self.session_log_present target_log.insert(0, log_msg) if self.sf: try: self.sf.Attendance__c.create({ 'Worker_ID__c': worker_id, 'Name': worker_name, 'Attendance_Date__c': now.date().isoformat(), 'Attendance_Time__c': current_time_str, 'Status__c': 'Present' }) except Exception as e: logger.error(f"❌ Salesforce attendance sync error: {e}") def identify_and_log_face(self, face_image: np.ndarray) -> Optional[Dict]: try: embedding = DeepFace.represent(img_path=face_image, model_name='Facenet', enforce_detection=False)[0]['embedding'] if self.known_face_embeddings: min_dist = float('inf') match_index = -1 for i, worker_embeddings in enumerate(self.known_face_embeddings): for known_emb in worker_embeddings: distance = np.linalg.norm(np.array(embedding) - np.array(known_emb)) if distance < min_dist and distance < FACE_MATCH_THRESHOLD: min_dist = distance match_index = i if match_index >= 0: worker_id = self.known_face_ids[match_index] worker_name = self.known_face_names[match_index] self.mark_attendance(worker_id, worker_name) return {'name': worker_name, 'worker_id': worker_id, 'color': (0, 255, 0)} # Green new_worker = self._register_worker_auto(face_image) if new_worker: return {'name': new_worker['name'], 'worker_id': new_worker['worker_id'], 'color': (0, 165, 255)} # Orange except Exception as e: logger.error(f"Face identification failed: {e}") return {'name': 'Unknown', 'worker_id': None, 'color': (0, 0, 255)} # Red def _processing_loop(self, source): video_capture = cv2.VideoCapture(source) if not video_capture.isOpened(): self.error_message = f"❌ Error: Could not open video source: {source}" logger.error(self.error_message) self.is_processing.clear() return tracker = FaceTracker(self) frame_count = 0 while self.is_processing.is_set(): ret, frame = video_capture.read() if not ret: logger.info("Video ended or failed to read frame.") break self.current_frame = frame.copy() annotated_frame = tracker.update(self.current_frame) logger.debug(f"Processed frame {frame_count}, queue size: {self.frame_queue.qsize()}") if not self.frame_queue.full(): self.frame_queue.put(annotated_frame) else: try: self.frame_queue.get_nowait() except queue.Empty: pass self.frame_queue.put(annotated_frame) frame_count += 1 time.sleep(0.01) video_capture.release() self.is_processing.clear() logger.info("Processing finished.") def start_processing(self, source) -> str: if self.is_processing.is_set(): return "⚠️ Processing is already active." self.session_log_present.clear() self.session_log_new.clear() self.session_attended_ids.clear() self.error_message = None while not self.frame_queue.empty(): self.frame_queue.get() self.is_processing.set() self.processing_thread = threading.Thread(target=self._processing_loop, args=(source,)) self.processing_thread.daemon = True self.processing_thread.start() return "✅ Started processing..." def stop_processing(self) -> str: if not self.is_processing.is_set(): return "⚠️ Processing is not currently active." self.is_processing.clear() if self.processing_thread: self.processing_thread.join(timeout=2) return "✅ Processing stopped by user." def get_registered_workers_info(self) -> str: self.load_worker_data() if not self.known_face_ids: return "No workers registered." info_list = [f"- **{name}** (ID: {id})" for name, id in sorted(zip(self.known_face_names, self.known_face_ids))] return f"**👥 Registered Workers ({len(info_list)})**\n" + "\n".join(info_list) # --- GRADIO UI --- attendance_system = AttendanceSystem() def create_interface(): with gr.Blocks(theme=gr.themes.Soft(), title="Attendance System") as demo: gr.Markdown("# 🚀 High-Performance Face Recognition Attendance") with gr.Tabs(): with gr.Tab("⚙️ Controls & Status"): gr.Markdown("### 1. Choose Input Source & Start Processing") with gr.Row(): with gr.Column(scale=1): selected_tab_index = gr.Number(value=0, visible=False) with gr.Tabs() as video_tabs: with gr.Tab("Live Camera", id=0): camera_source = gr.Number(label="Camera Source Index", value=0, precision=0) with gr.Tab("Upload Video", id=1): video_file = gr.Video(label="Upload Video File", sources=["upload"]) with gr.Column(scale=1): start_btn = gr.Button("▶️ Start Processing", variant="primary") stop_btn = gr.Button("⏹️ Stop Processing", variant="stop") status_box = gr.Textbox(label="Status", interactive=False, value="System Ready.") gr.Markdown("### 2. View Results in the 'Output & Log' Tab") gr.Markdown("**🎨 Color Coding:** Identifying, Present, New, Unknown") with gr.Tab("📊 Output & Log"): with gr.Row(equal_height=True): with gr.Column(scale=2): video_output = gr.Image(label="Recognition Output", interactive=False, type="numpy") with gr.Column(scale=1): gr.Markdown("### 📋 Present Workers") session_log_present = gr.Markdown(label="Present Workers Log", value="Awaiting detections...") gr.Markdown("### 🆕 Newly Registered Workers") session_log_new = gr.Markdown(label="New Workers Log", value="Awaiting new registrations...") with gr.Tab("👤 Worker Management"): with gr.Row(): with gr.Column(): gr.Markdown("### Register New Worker") register_image = gr.Image(label="Upload Worker's Photo", type="pil", sources=["upload"]) register_name = gr.Textbox(label="Worker's Full Name") register_btn = gr.Button("Register Worker", variant="primary") register_output = gr.Textbox(label="Registration Status", interactive=False) with gr.Column(): gr.Markdown("### Current Worker Roster") registered_workers_info = gr.Markdown(value=attendance_system.get_registered_workers_info()) refresh_workers_btn = gr.Button("🔄 Refresh List") def on_tab_select(evt: gr.SelectData): return evt.index video_tabs.select(fn=on_tab_select, inputs=None, outputs=[selected_tab_index]) def start_wrapper(tab_index, cam_src, vid_path): source = int(cam_src) if tab_index == 0 else vid_path if tab_index == 1 and vid_path is None: return "❌ Please upload a video file." return attendance_system.start_processing(source) if source is not None else "Please provide an input source." start_btn.click(fn=start_wrapper, inputs=[selected_tab_index, camera_source, video_file], outputs=[status_box]) stop_btn.click(fn=attendance_system.stop_processing, inputs=None, outputs=[status_box]) register_btn.click(fn=attendance_system.register_worker_manual, inputs=[register_image, register_name], outputs=[register_output, registered_workers_info]) refresh_workers_btn.click(fn=attendance_system.get_registered_workers_info, outputs=[registered_workers_info]) def update_ui_generator(): last_frame = None while True: status_text = "Status: Processing..." if attendance_system.is_processing.is_set() else "Status: Stopped." present_md = "\n".join(attendance_system.session_log_present) or "Awaiting detections..." new_md = "\n".join(attendance_system.session_log_new) or "Awaiting new registrations..." frame = None if attendance_system.is_processing.is_set(): try: frame = attendance_system.frame_queue.get_nowait() last_frame = frame logger.debug("Retrieved frame from queue for UI update") except queue.Empty: frame = last_frame logger.debug("Using last frame due to empty queue") else: present_md = "Processing stopped. Final Present Log:\n\n" + present_md new_md = "Processing stopped. Final New Workers Log:\n\n" + new_md if frame is not None: yield cv2.cvtColor(frame, cv2.COLOR_BGR2RGB), present_md, new_md, status_text else: yield None, present_md, new_md, status_text time.sleep(1/30) demo.load(fn=update_ui_generator, outputs=[video_output, session_log_present, session_log_new, status_box]) return demo if __name__ == "__main__": def _get_image_caption(image: Image.Image) -> str: if not HF_API_TOKEN: return "Hugging Face API token not configured." try: buffered = BytesIO() image.save(buffered, format="JPEG") img_data = buffered.getvalue() headers = {"Authorization": f"Bearer {HF_API_TOKEN}"} response = requests.post(HF_API_URL, headers=headers, data=img_data, timeout=10) response.raise_for_status() result = response.json() return result[0].get("generated_text", "No caption found.") except Exception as e: logger.error(f"Hugging Face API error: {e}") return "Caption generation failed." def _upload_image_to_salesforce(self, image: Image.Image, record_id: str, worker_id: str) -> Optional[str]: if not self.sf: return None try: buffered = BytesIO() image.save(buffered, format="JPEG") encoded_image = base64.b64encode(buffered.getvalue()).decode('utf-8') cv = self.sf.ContentVersion.create({ 'Title': f'Image_{worker_id}', 'PathOnClient': f'{worker_id}.jpg', 'VersionData': encoded_image, 'FirstPublishLocationId': record_id }) content_doc_link = self.sf.query(f"SELECT ContentDocumentId FROM ContentDocumentLink WHERE LinkedEntityId = '{record_id}'")['records'][0] content_doc_id = content_doc_link['ContentDocumentId'] return f"/sfc/servlet.shepherd/document/download/{content_doc_id}" except Exception as e: logger.error(f"Salesforce image upload error: {e}") return None AttendanceSystem._get_image_caption = _get_image_caption AttendanceSystem._upload_image_to_salesforce = _upload_image_to_salesforce app = create_interface() app.queue() app.launch(server_name="0.0.0.0", server_port=7860, show_error=True, debug=True)