# Suppress TensorFlow oneDNN warnings
import os
os.environ['TF_ENABLE_ONEDNN_OPTS'] = '0'
# Standard Library Imports
import base64
import json
import logging
import queue
import threading
import time
from datetime import datetime, date
from io import BytesIO
from typing import Tuple, Optional, List, Dict
import pickle
from collections import OrderedDict
# Third-Party Imports
import cv2
import gradio as gr
import numpy as np
from PIL import Image
import requests
from dotenv import load_dotenv
from deepface import DeepFace
from retrying import retry
from simple_salesforce import Salesforce
from scipy.spatial import distance as dist
# --- CONFIGURATION ---
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)
load_dotenv()
# --- API & SALESFORCE CREDENTIALS ---
HF_API_URL = "https://api-inference.huggingface.co/models/Salesforce/blip-image-captioning-base"
HF_API_TOKEN = os.getenv("HUGGINGFACE_API_TOKEN")
SF_CREDENTIALS = {
"username": os.getenv("SF_USERNAME", "smartlabour@attendance.system"),
"password": os.getenv("SF_PASSWORD", "#Prashanth@1234"),
"security_token": os.getenv("SF_SECURITY_TOKEN", "7xPmtDFoWlZUGK0V2QSwFZJ6c"),
"domain": os.getenv("SF_DOMAIN", "login")
}
# --- PERFORMANCE & ACCURACY TUNING ---
FACE_MATCH_THRESHOLD = 0.6 # Stricter threshold for accurate matching
AUTO_REGISTER_CONFIDENCE = 0.98 # Confidence to register a new face
MAX_DISAPPEARED_FRAMES = 20 # How many frames to keep tracking a lost face
# --- FACE TRACKER CLASS ---
class FaceTracker:
def __init__(self, attendance_system):
self.attendance_system = attendance_system
self.next_object_id = 0
self.objects = OrderedDict()
self.disappeared = OrderedDict()
self.face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml')
def register(self, centroid, bbox):
object_id = self.next_object_id
self.objects[object_id] = {'centroid': centroid, 'bbox': bbox, 'name': "Identifying...", 'worker_id': None, 'color': (255, 255, 0)} # Cyan for identifying
self.disappeared[object_id] = 0
self.next_object_id += 1
# Start recognition in a separate thread to not block the video stream
threading.Thread(target=self.recognize_face, args=(object_id, bbox)).start()
def recognize_face(self, object_id, bbox):
"""Runs expensive recognition and updates the tracker object."""
(startX, startY, endX, endY) = bbox
face_image = self.attendance_system.current_frame[startY:endY, startX:endX]
if face_image.size == 0: return
result = self.attendance_system.identify_and_log_face(face_image)
if result and object_id in self.objects:
self.objects[object_id].update({
'name': result['name'],
'worker_id': result['worker_id'],
'color': result['color']
})
def deregister(self, object_id):
del self.objects[object_id]
del self.disappeared[object_id]
def update(self, frame):
# Use a fast detector to find potential face locations
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
rects = self.face_cascade.detectMultiScale(gray, scaleFactor=1.1, minNeighbors=5, minSize=(30, 30))
input_centroids = np.zeros((len(rects), 2), dtype="int")
bboxes = []
for (i, (x, y, w, h)) in enumerate(rects):
cX = int(x + w / 2.0)
cY = int(y + h / 2.0)
input_centroids[i] = (cX, cY)
bboxes.append((x, y, x + w, y + h))
if len(self.objects) == 0:
for i in range(len(input_centroids)):
self.register(input_centroids[i], bboxes[i])
else:
object_ids = list(self.objects.keys())
object_centroids = [obj['centroid'] for obj in self.objects.values()]
D = dist.cdist(np.array(object_centroids), input_centroids)
rows = D.min(axis=1).argsort()
cols = D.argmin(axis=1)[rows]
used_rows, used_cols = set(), set()
for (row, col) in zip(rows, cols):
if row in used_rows or col in used_cols: continue
object_id = object_ids[row]
self.objects[object_id]['centroid'] = input_centroids[col]
self.objects[object_id]['bbox'] = bboxes[col]
self.disappeared[object_id] = 0
used_rows.add(row)
used_cols.add(col)
unused_rows = set(range(D.shape[0])).difference(used_rows)
unused_cols = set(range(D.shape[1])).difference(used_cols)
for row in unused_rows:
object_id = object_ids[row]
self.disappeared[object_id] += 1
if self.disappeared[object_id] > MAX_DISAPPEARED_FRAMES:
self.deregister(object_id)
for col in unused_cols:
self.register(input_centroids[col], bboxes[col])
# Draw results on the frame
for object_id, data in self.objects.items():
(startX, startY, endX, endY) = data['bbox']
label = data['name']
cv2.rectangle(frame, (startX, startY), (endX, endY), data['color'], 2)
cv2.putText(frame, label, (startX, startY - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.7, data['color'], 2)
return frame
# --- SALESFORCE CONNECTION ---
@retry(stop_max_attempt_number=3, wait_fixed=2000)
def connect_to_salesforce() -> Optional[Salesforce]:
try:
sf = Salesforce(**SF_CREDENTIALS)
sf.describe()
logger.info("✅ Successfully connected to Salesforce.")
return sf
except Exception as e:
logger.error(f"❌ Salesforce connection failed: {e}")
raise
# --- CORE LOGIC ---
class AttendanceSystem:
def __init__(self):
self.processing_thread = None
self.is_processing = threading.Event()
self.frame_queue = queue.Queue(maxsize=30)
self.error_message = None
self.current_frame = None
self.known_face_embeddings: List[np.ndarray] = []
self.known_face_names: List[str] = []
self.known_face_ids: List[str] = []
self.next_worker_id: int = 1
self.session_log: List[str] = []
self.session_attended_ids = set()
self.sf = connect_to_salesforce()
self._create_directories()
self.load_worker_data()
def _create_directories(self):
os.makedirs("data/faces", exist_ok=True)
def load_worker_data(self):
# This logic remains the same
logger.info("Loading worker data...")
if self.sf:
try:
workers = self.sf.query_all("SELECT Worker_ID__c, Name, Face_Embedding__c FROM Worker__c")['records']
if not workers:
self._load_local_worker_data()
return
temp_embeddings, temp_names, temp_ids, max_id = [], [], [], 0
for worker in workers:
if worker.get('Face_Embedding__c'):
temp_embeddings.append(np.array(json.loads(worker['Face_Embedding__c'])))
temp_names.append(worker['Name'])
temp_ids.append(worker['Worker_ID__c'])
try:
worker_num = int(''.join(filter(str.isdigit, worker['Worker_ID__c'])))
if worker_num > max_id:
max_id = worker_num
except (ValueError, TypeError):
continue
self.known_face_embeddings = temp_embeddings
self.known_face_names = temp_names
self.known_face_ids = temp_ids
self.next_worker_id = max_id + 1
self.save_local_worker_data()
logger.info(f"✅ Loaded {len(self.known_face_ids)} workers from Salesforce. Next ID: {self.next_worker_id}")
except Exception as e:
logger.error(f"❌ Error loading from Salesforce: {e}. Attempting local load.")
self._load_local_worker_data()
else:
logger.warning("Salesforce not connected. Loading from local cache.")
self._load_local_worker_data()
def _load_local_worker_data(self):
try:
if os.path.exists("data/workers.pkl"):
with open("data/workers.pkl", "rb") as f: data = pickle.load(f)
self.known_face_embeddings = data.get("embeddings", [])
self.known_face_names = data.get("names", [])
self.known_face_ids = data.get("ids", [])
self.next_worker_id = data.get("next_id", 1)
logger.info(f"✅ Loaded {len(self.known_face_ids)} workers from local cache. Next ID: {self.next_worker_id}")
except Exception as e:
logger.error(f"❌ Error loading local data: {e}")
def save_local_worker_data(self):
try:
worker_data = {"embeddings": self.known_face_embeddings, "names": self.known_face_names, "ids": self.known_face_ids, "next_id": self.next_worker_id}
with open("data/workers.pkl", "wb") as f: pickle.dump(worker_data, f)
except Exception as e:
logger.error(f"❌ Error saving local worker data: {e}")
def register_worker_manual(self, image: Image.Image, name: str) -> Tuple[str, str]:
# This logic remains the same
if image is None or not name.strip():
return "❌ Please provide both image and name!", self.get_registered_workers_info()
try:
image_array = np.array(image)
DeepFace.analyze(img_path=image_array, actions=['emotion'], enforce_detection=True)
embedding = DeepFace.represent(img_path=image_array, model_name='Facenet', enforce_detection=False)[0]['embedding']
if self._is_duplicate_face(embedding):
return f"❌ Face matches an existing worker!", self.get_registered_workers_info()
worker_id = f"W{self.next_worker_id:04d}"
name = name.strip().title()
self._add_worker_to_system(worker_id, name, embedding, image_array)
self.save_local_worker_data()
return f"✅ {name} registered with ID: {worker_id}!", self.get_registered_workers_info()
except ValueError:
return "❌ No face detected in the image!", self.get_registered_workers_info()
except Exception as e:
logger.error(f"Manual registration error: {e}")
return f"❌ Registration error: {e}", self.get_registered_workers_info()
def _register_worker_auto(self, face_image: np.ndarray) -> Optional[Dict]:
try:
embedding = DeepFace.represent(img_path=face_image, model_name='Facenet', enforce_detection=False)[0]['embedding']
if self._is_duplicate_face(embedding): return None
worker_id = f"W{self.next_worker_id:04d}"
worker_name = f"New Worker {self.next_worker_id}"
self._add_worker_to_system(worker_id, worker_name, embedding, face_image)
self.save_local_worker_data()
self.mark_attendance(worker_id, worker_name, is_new=True)
return {'name': worker_name, 'worker_id': worker_id}
except Exception as e:
logger.error(f"❌ Auto-registration error: {e}")
return None
def _add_worker_to_system(self, worker_id: str, name: str, embedding: List[float], image_array: np.ndarray):
self.known_face_embeddings.append(np.array(embedding))
self.known_face_names.append(name)
self.known_face_ids.append(worker_id)
self.next_worker_id += 1
# ... (Salesforce sync logic remains the same)
def _is_duplicate_face(self, embedding: List[float]) -> bool:
if not self.known_face_embeddings: return False
distances = [np.linalg.norm(np.array(embedding) - known) for known in self.known_face_embeddings]
return min(distances) < FACE_MATCH_THRESHOLD
def mark_attendance(self, worker_id: str, worker_name: str, is_new: bool = False):
if worker_id in self.session_attended_ids:
return
self.session_attended_ids.add(worker_id)
current_time_str = datetime.now().strftime('%H:%M:%S')
log_msg = ""
if is_new:
log_msg = f"🆕 [{current_time_str}] Registered: {worker_name} ({worker_id})"
else:
log_msg = f"✅ [{current_time_str}] Present: {worker_name} ({worker_id})"
self.session_log.insert(0, log_msg) # Add to top of the log
# ... (Salesforce attendance marking logic remains the same)
def identify_and_log_face(self, face_image: np.ndarray) -> Optional[Dict]:
"""Core recognition logic called by the tracker."""
try:
embedding = DeepFace.represent(img_path=face_image, model_name='Facenet', enforce_detection=False)[0]['embedding']
if self.known_face_embeddings:
distances = [np.linalg.norm(np.array(embedding) - known) for known in self.known_face_embeddings]
min_dist = min(distances)
if min_dist < FACE_MATCH_THRESHOLD:
match_index = np.argmin(distances)
worker_id = self.known_face_ids[match_index]
worker_name = self.known_face_names[match_index]
self.mark_attendance(worker_id, worker_name)
return {'name': worker_name, 'worker_id': worker_id, 'color': (0, 255, 0)} # Green
# If no match or no known faces, try to auto-register
new_worker = self._register_worker_auto(face_image)
if new_worker:
return {'name': new_worker['name'], 'worker_id': new_worker['worker_id'], 'color': (0, 165, 255)} # Orange
except Exception as e:
logger.error(f"Face identification failed: {e}")
return {'name': 'Unknown', 'worker_id': None, 'color': (0, 0, 255)} # Red
def _processing_loop(self, source):
video_capture = cv2.VideoCapture(source)
if not video_capture.isOpened():
self.error_message = "❌ Error: Could not open video source."
self.is_processing.clear()
return
tracker = FaceTracker(self)
while self.is_processing.is_set():
ret, frame = video_capture.read()
if not ret:
break
self.current_frame = frame.copy()
annotated_frame = tracker.update(self.current_frame)
if not self.frame_queue.full():
self.frame_queue.put(annotated_frame)
else:
# If queue is full, drop the oldest frame to prevent lag
try: self.frame_queue.get_nowait()
except queue.Empty: pass
self.frame_queue.put(annotated_frame)
time.sleep(0.01) # Yield CPU
video_capture.release()
self.is_processing.clear()
logger.info("Processing finished.")
def start_processing(self, source) -> str:
if self.is_processing.is_set(): return "⚠️ Processing is already active."
self.session_log.clear()
self.session_attended_ids.clear()
self.error_message = None
while not self.frame_queue.empty(): self.frame_queue.get() # Clear queue
self.is_processing.set()
self.processing_thread = threading.Thread(target=self._processing_loop, args=(source,))
self.processing_thread.daemon = True
self.processing_thread.start()
return "✅ Started processing..."
def stop_processing(self) -> str:
if not self.is_processing.is_set():
return "⚠️ Processing is not currently active."
self.is_processing.clear()
if self.processing_thread:
self.processing_thread.join(timeout=2)
return "✅ Processing stopped by user."
def get_registered_workers_info(self) -> str:
# This logic remains the same
self.load_worker_data()
if not self.known_face_ids: return "No workers registered."
info_list = [f"- **{name}** (ID: {id})" for name, id in sorted(zip(self.known_face_names, self.known_face_ids))]
return f"**👥 Registered Workers ({len(info_list)})**\n" + "\n".join(info_list)
# --- GRADIO UI ---
attendance_system = AttendanceSystem()
def create_interface():
with gr.Blocks(theme=gr.themes.Soft(), title="Attendance System") as demo:
gr.Markdown("# 🚀 High-Performance Face Recognition Attendance")
with gr.Tabs():
with gr.Tab("⚙️ Controls & Status"):
# ... UI layout remains the same ...
gr.Markdown("### 1. Choose Input Source & Start Processing")
with gr.Row():
with gr.Column(scale=1):
selected_tab_index = gr.Number(value=0, visible=False)
with gr.Tabs() as video_tabs:
with gr.Tab("Live Camera", id=0):
camera_source = gr.Number(label="Camera Source Index", value=0, precision=0)
with gr.Tab("Upload Video", id=1):
video_file = gr.Video(label="Upload Video File", sources=["upload"])
with gr.Column(scale=1):
start_btn = gr.Button("▶️ Start Processing", variant="primary")
stop_btn = gr.Button("⏹️ Stop Processing", variant="stop")
status_box = gr.Textbox(label="Status", interactive=False, value="System Ready.")
gr.Markdown("### 2. View Results in the 'Output & Log' Tab")
gr.Markdown("**🎨 Color Coding:** Identifying, Present, New, Unknown")
with gr.Tab("📊 Output & Log"):
with gr.Row(equal_height=True):
with gr.Column(scale=2):
video_output = gr.Image(label="Recognition Output", interactive=False, type="numpy")
with gr.Column(scale=1):
session_log_display = gr.Markdown(label="📋 Session Log", value="System is ready.")
with gr.Tab("👤 Worker Management"):
# ... UI layout remains the same ...
with gr.Row():
with gr.Column():
gr.Markdown("### Register New Worker")
register_image = gr.Image(label="Upload Worker's Photo", type="pil", sources=["upload"])
register_name = gr.Textbox(label="Worker's Full Name")
register_btn = gr.Button("Register Worker", variant="primary")
register_output = gr.Textbox(label="Registration Status", interactive=False)
with gr.Column():
gr.Markdown("### Current Worker Roster")
registered_workers_info = gr.Markdown(value=attendance_system.get_registered_workers_info())
refresh_workers_btn = gr.Button("🔄 Refresh List")
# --- Event Handlers ---
def on_tab_select(evt: gr.SelectData): return evt.index
video_tabs.select(fn=on_tab_select, inputs=None, outputs=[selected_tab_index])
def start_wrapper(tab_index, cam_src, vid_path):
source = int(cam_src) if tab_index == 0 else vid_path
return "Please provide an input source." if source is None else attendance_system.start_processing(source)
start_btn.click(fn=start_wrapper, inputs=[selected_tab_index, camera_source, video_file], outputs=[status_box])
stop_btn.click(fn=attendance_system.stop_processing, inputs=None, outputs=[status_box])
register_btn.click(fn=attendance_system.register_worker_manual, inputs=[register_image, register_name], outputs=[register_output, registered_workers_info])
refresh_workers_btn.click(fn=attendance_system.get_registered_workers_info, outputs=[registered_workers_info])
def update_ui_generator():
last_frame = None
while True:
status_text = "Status: Processing..."
log_md = "\n".join(attendance_system.session_log) or "Awaiting detections..."
frame = None
if attendance_system.is_processing.is_set():
try:
frame = attendance_system.frame_queue.get_nowait()
last_frame = frame
except queue.Empty:
frame = last_frame # Show last frame if no new one
else:
status_text = "Status: Stopped."
log_md = "Processing stopped. Final Log:\n\n" + log_md
if frame is not None:
yield cv2.cvtColor(frame, cv2.COLOR_BGR2RGB), log_md, status_text
else:
yield None, log_md, status_text
time.sleep(1/30) # Aim for ~30 FPS UI updates
demo.load(fn=update_ui_generator, outputs=[video_output, session_log_display, status_box])
return demo
if __name__ == "__main__":
# Helper functions for Salesforce sync (omitted for brevity, they are unchanged)
def _get_image_caption(image: Image.Image) -> str:
# This function is not part of the class but can be called by it
if not HF_API_TOKEN: return "Hugging Face API token not configured."
try:
buffered = BytesIO()
image.save(buffered, format="JPEG")
img_data = buffered.getvalue()
headers = {"Authorization": f"Bearer {HF_API_TOKEN}"}
response = requests.post(HF_API_URL, headers=headers, data=img_data, timeout=10)
response.raise_for_status()
result = response.json()
return result[0].get("generated_text", "No caption found.")
except Exception as e:
logger.error(f"Hugging Face API error: {e}")
return "Caption generation failed."
def _upload_image_to_salesforce(self, image: Image.Image, record_id: str, worker_id: str) -> Optional[str]:
# This function is not part of the class but can be called by it
if not self.sf: return None
try:
buffered = BytesIO()
image.save(buffered, format="JPEG")
encoded_image = base64.b64encode(buffered.getvalue()).decode('utf-8')
cv = self.sf.ContentVersion.create({'Title': f'Image_{worker_id}', 'PathOnClient': f'{worker_id}.jpg', 'VersionData': encoded_image, 'FirstPublishLocationId': record_id})
content_doc_link = self.sf.query(f"SELECT ContentDocumentId FROM ContentDocumentLink WHERE LinkedEntityId = '{record_id}'")['records'][0]
content_doc_id = content_doc_link['ContentDocumentId']
return f"/sfc/servlet.shepherd/document/download/{content_doc_id}"
except Exception as e:
logger.error(f"Salesforce image upload error: {e}")
return None
# Monkey-patch helper methods into the class instance for simplicity
AttendanceSystem._get_image_caption = _get_image_caption
AttendanceSystem._upload_image_to_salesforce = _upload_image_to_salesforce
app = create_interface()
app.queue()
app.launch(server_name="0.0.0.0", server_port=7860, show_error=True, debug=True)