# Suppress TensorFlow oneDNN warnings import os os.environ['TF_ENABLE_ONEDNN_OPTS'] = '0' # Standard Library Imports import base64 import json import logging import queue import threading import time from datetime import datetime, date from io import BytesIO from typing import Tuple, Optional, List import pickle # Third-Party Imports import cv2 import gradio as gr import numpy as np import pandas as pd from PIL import Image import requests from dotenv import load_dotenv from deepface import DeepFace from retrying import retry from simple_salesforce import Salesforce # --- CONFIGURATION --- # Setup logging logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") logger = logging.getLogger(__name__) # Load environment variables from .env file load_dotenv() # Hugging Face API configuration HF_API_URL = "https://api-inference.huggingface.co/models/Salesforce/blip-image-captioning-base" HF_API_TOKEN = os.getenv("HUGGINGFACE_API_TOKEN") # Salesforce configuration SF_CREDENTIALS = { "username": os.getenv("SF_USERNAME", "smartlabour@attendance.system"), "password": os.getenv("SF_PASSWORD", "#Prashanth@1234"), "security_token": os.getenv("SF_SECURITY_TOKEN", "7xPmtDFoWlZUGK0V2QSwFZJ6c"), "domain": os.getenv("SF_DOMAIN", "login") } # --- SALESFORCE CONNECTION --- @retry(stop_max_attempt_number=3, wait_fixed=2000) def connect_to_salesforce() -> Optional[Salesforce]: """Establish a connection to Salesforce with retry logic.""" try: sf = Salesforce(**SF_CREDENTIALS) sf.describe() # Test connection logger.info("✅ Successfully connected to Salesforce.") return sf except Exception as e: logger.error(f"❌ Salesforce connection failed: {e}") raise # --- CORE LOGIC --- class AttendanceSystem: """ Manages all backend logic for the face recognition attendance system. """ def __init__(self): # State Management self.processing_thread = None self.is_processing = threading.Event() self.frame_queue = queue.Queue(maxsize=10) self.error_message = None self.last_processed_frame = None self.final_log = None # Data Storage self.known_face_embeddings: List[np.ndarray] = [] self.known_face_names: List[str] = [] self.known_face_ids: List[str] = [] self.next_worker_id: int = 1 # Session Tracking self.last_recognition_time = {} self.recognition_cooldown = 10 self.session_log: List[str] = [] self.session_marked_present = set() self.session_registered = set() self.face_recognition_buffer = {} self.buffer_threshold = 3 self.frame_skip_counter = 0 # Initialize self.sf = connect_to_salesforce() self._create_directories() self.load_worker_data() def _create_directories(self): os.makedirs("data/faces", exist_ok=True) def load_worker_data(self): logger.info("Loading worker data...") if self.sf: try: workers = self.sf.query_all("SELECT Worker_ID__c, Name, Face_Embedding__c FROM Worker__c")['records'] if not workers: self._load_local_worker_data() return temp_embeddings, temp_names, temp_ids, max_id = [], [], [], 0 for worker in workers: if worker.get('Face_Embedding__c'): temp_embeddings.append(np.array(json.loads(worker['Face_Embedding__c']))) temp_names.append(worker['Name']) temp_ids.append(worker['Worker_ID__c']) try: worker_num = int(worker['Worker_ID__c'][1:]) if worker_num > max_id: max_id = worker_num except (ValueError, TypeError): continue self.known_face_embeddings = temp_embeddings self.known_face_names = temp_names self.known_face_ids = temp_ids self.next_worker_id = max_id + 1 self.save_local_worker_data() logger.info(f"✅ Loaded {len(self.known_face_ids)} workers from Salesforce.") except Exception as e: logger.error(f"❌ Error loading from Salesforce: {e}. Attempting local load.") self._load_local_worker_data() else: logger.warning("Salesforce not connected. Loading from local cache.") self._load_local_worker_data() def _load_local_worker_data(self): try: if os.path.exists("data/workers.pkl"): with open("data/workers.pkl", "rb") as f: data = pickle.load(f) self.known_face_embeddings = data.get("embeddings", []) self.known_face_names = data.get("names", []) self.known_face_ids = data.get("ids", []) self.next_worker_id = data.get("next_id", 1) logger.info(f"✅ Loaded {len(self.known_face_ids)} workers from local cache.") except Exception as e: logger.error(f"❌ Error loading local data: {e}") def save_local_worker_data(self): try: worker_data = { "embeddings": self.known_face_embeddings, "names": self.known_face_names, "ids": self.known_face_ids, "next_id": self.next_worker_id } with open("data/workers.pkl", "wb") as f: pickle.dump(worker_data, f) except Exception as e: logger.error(f"❌ Error saving local worker data: {e}") # --- Registration and Attendance --- def register_worker_manual(self, image: Image.Image, name: str) -> Tuple[str, str]: if image is None or not name.strip(): return "❌ Please provide both image and name!", self.get_registered_workers_info() try: image_array = np.array(image) DeepFace.analyze(img_path=image_array, actions=['emotion'], enforce_detection=True) embedding = DeepFace.represent(img_path=image_array, model_name='Facenet')[0]['embedding'] if self._is_duplicate_face(embedding): return f"❌ Face matches an existing worker!", self.get_registered_workers_info() worker_id = f"W{self.next_worker_id:04d}" name = name.strip().title() self._add_worker_to_system(worker_id, name, embedding, image_array) self.save_local_worker_data() self.load_worker_data() return f"✅ {name} registered with ID: {worker_id}!", self.get_registered_workers_info() except ValueError: return "❌ No face detected in the image!", self.get_registered_workers_info() except Exception as e: return f"❌ Registration error: {e}", self.get_registered_workers_info() def _register_worker_auto(self, face_image: np.ndarray, face_embedding: List[float]) -> Optional[Tuple[str, str]]: try: # Check for duplicates with strict threshold if self._is_duplicate_face(face_embedding, threshold=10.0): return None worker_id = f"W{self.next_worker_id:04d}" # Check if already auto-registered in this session if worker_id in self.session_registered: return None worker_name = f"Unknown Worker {self.next_worker_id}" self._add_worker_to_system(worker_id, worker_name, face_embedding, face_image) self.save_local_worker_data() # Mark as registered in this session self.session_registered.add(worker_id) log_msg = f"🆕 [{datetime.now().strftime('%H:%M:%S')}] Auto-registered: {worker_name} ({worker_id})" self.session_log.append(log_msg) logger.info(log_msg) return worker_id, worker_name except Exception as e: logger.error(f"❌ Auto-registration error: {e}") return None def _add_worker_to_system(self, worker_id: str, name: str, embedding: List[float], image_array: np.ndarray): self.known_face_embeddings.append(np.array(embedding)) self.known_face_names.append(name) self.known_face_ids.append(worker_id) self.next_worker_id += 1 face_pil = Image.fromarray(cv2.cvtColor(image_array, cv2.COLOR_BGR2RGB)) face_pil.save(f"data/faces/{worker_id}.jpg") caption = self._get_image_caption(face_pil) if self.sf: try: worker_record = self.sf.Worker__c.create({ 'Name': name, 'Worker_ID__c': worker_id, 'Face_Embedding__c': json.dumps(embedding), 'Image_Caption__c': caption }) image_url = self._upload_image_to_salesforce(face_pil, worker_record['id'], worker_id) if image_url: self.sf.Worker__c.update(worker_record['id'], {'Image_URL__c': image_url}) logger.info(f"✅ Worker {worker_id} synced to Salesforce.") except Exception as e: logger.error(f"❌ Salesforce sync error for {worker_id}: {e}") def _is_duplicate_face(self, embedding: List[float], threshold: float = 10.0) -> bool: """Strict duplicate detection with cosine similarity""" if not self.known_face_embeddings: return False embedding_array = np.array(embedding) for known_embedding in self.known_face_embeddings: # Normalize vectors embedding_array_norm = embedding_array / np.linalg.norm(embedding_array) known_embedding_norm = known_embedding / np.linalg.norm(known_embedding) # Calculate cosine similarity cosine_sim = np.dot(embedding_array_norm, known_embedding_norm) if cosine_sim > 0.85: # Strict threshold return True return False def mark_attendance(self, worker_id: str, worker_name: str) -> bool: """Attendance marking with strict checks""" if worker_id in self.session_marked_present: return False today_str = date.today().isoformat() if self._has_attended_today(worker_id, today_str): return False current_time = datetime.now() if self.sf: try: self.sf.Attendance__c.create({ 'Worker_ID__c': worker_id, 'Name__c': worker_name, 'Date__c': today_str, 'Timestamp__c': current_time.isoformat(), 'Status__c': "Present" }) except Exception as e: logger.error(f"❌ Error saving attendance to Salesforce: {e}") self.session_marked_present.add(worker_id) log_msg = f"✅ [{current_time.strftime('%H:%M:%S')}] Marked Present: {worker_name} ({worker_id})" self.session_log.append(log_msg) return True def _has_attended_today(self, worker_id: str, today_str: str) -> bool: last_seen = self.last_recognition_time.get(worker_id) if last_seen and (time.time() - last_seen < self.recognition_cooldown): return True if self.sf: try: query = f"SELECT Id FROM Attendance__c WHERE Worker_ID__c = '{worker_id}' AND Date__c = '{today_str}'" if self.sf.query(query)['totalSize'] > 0: return True except Exception as e: logger.error(f"Attendance check error: {e}") return False def _find_best_match(self, target_embedding: np.ndarray) -> Tuple[int, float]: """Find best match using cosine similarity""" if not self.known_face_embeddings: return -1, 0.0 best_match_idx = -1 best_score = 0.0 target_norm = target_embedding / np.linalg.norm(target_embedding) for i, known_embedding in enumerate(self.known_face_embeddings): known_norm = known_embedding / np.linalg.norm(known_embedding) cosine_sim = np.dot(target_norm, known_norm) if cosine_sim > best_score: best_score = cosine_sim best_match_idx = i return best_match_idx, best_score # --- Video Processing --- def process_frame(self, frame: np.ndarray) -> np.ndarray: """Frame processing with strict recognition rules""" try: # Skip frames for performance self.frame_skip_counter += 1 if self.frame_skip_counter % 3 != 0: return frame # Detect faces with multiple backends face_objs = [] try: face_objs = DeepFace.extract_faces( img_path=frame, detector_backend='opencv', enforce_detection=False ) except Exception as e: logger.warning(f"OpenCV detector failed: {e}") try: face_objs = DeepFace.extract_faces( img_path=frame, detector_backend='mtcnn', enforce_detection=False ) except Exception as e: logger.warning(f"MTCNN detector failed: {e}") if face_objs: logger.debug(f"Found {len(face_objs)} faces in frame") for face_obj in face_objs: confidence = face_obj.get('confidence', 0.0) # Strict confidence threshold if confidence < 0.90: continue facial_area = face_obj['facial_area'] x, y, w, h = facial_area['x'], facial_area['y'], facial_area['w'], facial_area['h'] face_image = frame[y:y+h, x:x+w] if face_image.size == 0 or w < 50 or h < 50: continue try: embedding_obj = DeepFace.represent( img_path=face_image, model_name='Facenet', enforce_detection=False ) embedding = embedding_obj[0]['embedding'] embedding_array = np.array(embedding) except Exception as e: logger.warning(f"Embedding generation failed: {e}") continue color = (0, 0, 255) # Default red for unknown worker_id = None worker_name = "Unknown" if self.known_face_embeddings: match_index, match_score = self._find_best_match(embedding_array) # Strict matching threshold if match_index != -1 and match_score > 0.85: worker_id = self.known_face_ids[match_index] worker_name = self.known_face_names[match_index] color = (0, 255, 0) # Green for known # Buffer recognition buffer_key = f"{worker_id}" if buffer_key not in self.face_recognition_buffer: self.face_recognition_buffer[buffer_key] = { 'count': 1, 'last_time': time.time() } else: self.face_recognition_buffer[buffer_key]['count'] += 1 self.face_recognition_buffer[buffer_key]['last_time'] = time.time() # Mark attendance after consistent detections if (self.face_recognition_buffer[buffer_key]['count'] >= self.buffer_threshold and confidence >= 0.90): if self.mark_attendance(worker_id, worker_name): self.last_recognition_time[worker_id] = time.time() del self.face_recognition_buffer[buffer_key] else: # Only register new if very different from existing faces if match_score < 0.70: # Low similarity threshold color = (0, 165, 255) # Orange for new new_worker = self._register_worker_auto(face_image, embedding) if new_worker: worker_id, worker_name = new_worker if confidence >= 0.90: self.mark_attendance(worker_id, worker_name) else: # No known faces, auto-register with high confidence if confidence >= 0.90: color = (0, 165, 255) # Orange for new new_worker = self._register_worker_auto(face_image, embedding) if new_worker: worker_id, worker_name = new_worker self.mark_attendance(worker_id, worker_name) # Clean old buffer entries current_time = time.time() for key in list(self.face_recognition_buffer.keys()): if current_time - self.face_recognition_buffer[key]['last_time'] > 5.0: del self.face_recognition_buffer[key] # Draw bounding box and label label = f"{worker_name}" + (f" ({worker_id})" if worker_id else "") cv2.rectangle(frame, (x, y), (x+w, y+h), color, 2) cv2.putText(frame, label, (x, y-10), cv2.FONT_HERSHEY_SIMPLEX, 0.7, color, 2) return frame except Exception as e: logger.error(f"Frame processing error: {e}") return frame def _processing_loop(self, source): video_capture = cv2.VideoCapture(source) if not video_capture.isOpened(): err_msg = "❌ Could not open video source" self.error_message = err_msg self.is_processing.clear() return while self.is_processing.is_set(): ret, frame = video_capture.read() if not ret: break processed_frame = self.process_frame(frame) if not self.frame_queue.full(): self.frame_queue.put(processed_frame) self.last_processed_frame = processed_frame time.sleep(0.05) self.final_log = self.session_log.copy() video_capture.release() self.is_processing.clear() def start_processing(self, source) -> str: if self.is_processing.is_set(): return "⚠️ Processing is already active." # Reset session state self.session_log.clear() self.last_recognition_time.clear() self.session_marked_present.clear() self.session_registered.clear() self.face_recognition_buffer.clear() self.error_message = None self.last_processed_frame = None self.final_log = None self.frame_skip_counter = 0 self.is_processing.set() self.processing_thread = threading.Thread( target=self._processing_loop, args=(source,), daemon=True ) self.processing_thread.start() return "✅ Started processing..." def stop_processing(self) -> str: self.is_processing.clear() self.error_message = None self.last_processed_frame = None self.final_log = None self.face_recognition_buffer.clear() return "✅ Processing stopped." # --- Helper Methods --- def _get_image_caption(self, image: Image.Image) -> str: if not HF_API_TOKEN: return "Hugging Face API token not configured." try: buffered = BytesIO() image.save(buffered, format="JPEG") headers = {"Authorization": f"Bearer {HF_API_TOKEN}"} response = requests.post(HF_API_URL, headers=headers, data=buffered.getvalue()) response.raise_for_status() return response.json()[0].get("generated_text", "No caption found.") except Exception as e: logger.error(f"Hugging Face API error: {e}") return "Caption generation failed." def _upload_image_to_salesforce(self, image: Image.Image, record_id: str, worker_id: str) -> Optional[str]: if not self.sf: return None try: buffered = BytesIO() image.save(buffered, format="JPEG") encoded_image = base64.b64encode(buffered.getvalue()).decode('utf-8') cv = self.sf.ContentVersion.create({ 'Title': f'Image_{worker_id}', 'PathOnClient': f'{worker_id}.jpg', 'VersionData': encoded_image, 'FirstPublishLocationId': record_id }) return f"/{cv['id']}" except Exception as e: logger.error(f"Salesforce image upload error: {e}") return None def get_registered_workers_info(self) -> str: if not self.sf: return "❌ Salesforce not connected." try: records = self.sf.query_all( "SELECT Name, Worker_ID__c FROM Worker__c ORDER BY Name" )['records'] if not records: return "No workers registered." worker_list = "\n".join( f"- **{w['Name']}** (ID: {w['Worker_ID__c']})" for w in records ) return f"**👥 Registered Workers ({len(records)})**\n{worker_list}" except Exception as e: return f"Error: {e}" # --- GRADIO UI --- attendance_system = AttendanceSystem() def create_interface(): with gr.Blocks(theme=gr.themes.Soft(), title="Attendance System") as demo: gr.Markdown("# 🎯 Advanced Face Recognition Attendance System") with gr.Tabs(): with gr.Tab("⚙️ Controls & Status"): gr.Markdown("### 1. Choose Input Source & Start Processing") with gr.Row(): with gr.Column(scale=1): selected_tab_index = gr.Number(value=0, visible=False) with gr.Tabs() as video_tabs: with gr.Tab("Live Camera", id=0): camera_source = gr.Number(label="Camera Source", value=0, precision=0) with gr.Tab("Upload Video", id=1): video_file = gr.Video(label="Upload Video File", sources=["upload"]) with gr.Column(scale=1): start_btn = gr.Button("▶️ Start Processing", variant="primary") stop_btn = gr.Button("⏹️ Stop Processing", variant="stop") status_box = gr.Textbox(label="Status", interactive=False, value="System Ready.") gr.Markdown("### 2. View Results in the 'Output & Log' Tab") gr.Markdown("**🎨 Color Coding:** Green = Known, Orange = New, Red = Unknown") with gr.Tab("📊 Output & Log"): with gr.Row(): with gr.Column(scale=2): video_output = gr.Image(label="Recognition Output", interactive=False) with gr.Column(scale=1): session_log_display = gr.Markdown(label="📋 Session Log", value="System is ready.") with gr.Tab("👤 Worker Management"): with gr.Row(): with gr.Column(): register_image = gr.Image(label="Upload Worker's Photo", type="pil") register_name = gr.Textbox(label="Worker's Full Name") register_btn = gr.Button("Register Worker", variant="primary") register_output = gr.Textbox(label="Registration Status", interactive=False) with gr.Column(): registered_workers_info = gr.Markdown(value=attendance_system.get_registered_workers_info()) refresh_workers_btn = gr.Button("🔄 Refresh List") # --- Event Handlers --- def on_tab_select(evt: gr.SelectData): return evt.index video_tabs.select(fn=on_tab_select, inputs=None, outputs=[selected_tab_index]) def start_wrapper(tab_index, cam_src, vid_path): source = cam_src if tab_index == 0 else vid_path if source is None: return "Please provide an input source." return attendance_system.start_processing(source) start_btn.click( fn=start_wrapper, inputs=[selected_tab_index, camera_source, video_file], outputs=[status_box] ) stop_btn.click( fn=attendance_system.stop_processing, inputs=None, outputs=[status_box] ) register_btn.click( fn=attendance_system.register_worker_manual, inputs=[register_image, register_name], outputs=[register_output, registered_workers_info] ) refresh_workers_btn.click( fn=attendance_system.get_registered_workers_info, outputs=[registered_workers_info] ) def update_ui_generator(): while True: if attendance_system.error_message: yield None, attendance_system.error_message time.sleep(2) attendance_system.error_message = None continue if attendance_system.is_processing.is_set(): frame, log_md = None, "\n".join(reversed(attendance_system.session_log)) or "Processing..." try: if not attendance_system.frame_queue.empty(): frame = attendance_system.frame_queue.get_nowait() if frame is not None: frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) except queue.Empty: pass yield frame, log_md else: if attendance_system.last_processed_frame is not None: final_frame = cv2.cvtColor(attendance_system.last_processed_frame, cv2.COLOR_BGR2RGB) final_log_md = "\n".join(reversed(attendance_system.final_log)) or "Processing complete. No log entries." yield final_frame, final_log_md else: yield None, "System stopped. Go to 'Controls & Status' to start." time.sleep(0.1) demo.load( fn=update_ui_generator, outputs=[video_output, session_log_display] ) return demo if __name__ == "__main__": app = create_interface() app.queue() app.launch(server_name="0.0.0.0", server_port=7860, show_error=True, debug=True)