Sivainti's picture
Update app.py
fcd2674 verified
# Suppress TensorFlow oneDNN warnings
import os
os.environ['TF_ENABLE_ONEDNN_OPTS'] = '0'
# Standard Library Imports
import base64
import json
import logging
import queue
import threading
import time
from datetime import datetime, date, timedelta
from io import BytesIO
from typing import Tuple, Optional, List, Dict
import pickle
from collections import OrderedDict
# Third-Party Imports
import cv2
import gradio as gr
import numpy as np
from PIL import Image
import requests
from dotenv import load_dotenv
from deepface import DeepFace
from retrying import retry
from simple_salesforce import Salesforce
from scipy.spatial import distance as dist
# --- CONFIGURATION ---
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)
load_dotenv()
# --- API & SALESFORCE CREDENTIALS ---
HF_API_URL = "https://api-inference.huggingface.co/models/Salesforce/blip-image-captioning-base"
HF_API_TOKEN = os.getenv("HUGGINGFACE_API_TOKEN")
SF_CREDENTIALS = {
"username": os.getenv("SF_USERNAME", "smartlabour@attendance.system"),
"password": os.getenv("SF_PASSWORD", "#Prashanth@1234"),
"security_token": os.getenv("SF_SECURITY_TOKEN", "7xPmtDFoWlZUGK0V2QSwFZJ6c"),
"domain": os.getenv("SF_DOMAIN", "login")
}
# --- PERFORMANCE & ACCURACY TUNING ---
FACE_MATCH_THRESHOLD = 0.6 # Stricter threshold for accurate matching
AUTO_REGISTER_CONFIDENCE = 0.98 # Confidence to register a new face
MAX_DISAPPEARED_FRAMES = 20 # Frames to keep tracking a lost face
BATCH_SIZE = 5 # Process faces in batches for efficiency
# --- FACE TRACKER CLASS ---
class FaceTracker:
def __init__(self, attendance_system):
self.attendance_system = attendance_system
self.next_object_id = 0
self.objects = OrderedDict()
self.disappeared = OrderedDict()
self.face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml')
def register(self, centroid, bbox):
object_id = self.next_object_id
self.objects[object_id] = {'centroid': centroid, 'bbox': bbox, 'name': "Identifying...", 'worker_id': None, 'color': (255, 255, 0)} # Cyan
self.disappeared[object_id] = 0
self.next_object_id += 1
threading.Thread(target=self.recognize_face, args=(object_id, bbox)).start()
def recognize_face(self, object_id, bbox):
(startX, startY, endX, endY) = bbox
face_image = self.attendance_system.current_frame[startY:endY, startX:endX]
if face_image.size == 0:
return
result = self.attendance_system.identify_and_log_face(face_image)
if result and object_id in self.objects:
self.objects[object_id].update({
'name': result['name'],
'worker_id': result['worker_id'],
'color': result['color']
})
def deregister(self, object_id):
del self.objects[object_id]
del self.disappeared[object_id]
def update(self, frame):
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
rects = self.face_cascade.detectMultiScale(gray, scaleFactor=1.1, minNeighbors=5, minSize=(30, 30))
input_centroids = np.zeros((len(rects), 2), dtype="int")
bboxes = []
for (i, (x, y, w, h)) in enumerate(rects):
cX = int(x + w / 2.0)
cY = int(y + h / 2.0)
input_centroids[i] = (cX, cY)
bboxes.append((x, y, x + w, y + h))
if len(self.objects) == 0:
for i in range(len(input_centroids)):
self.register(input_centroids[i], bboxes[i])
else:
object_ids = list(self.objects.keys())
object_centroids = [obj['centroid'] for obj in self.objects.values()]
D = dist.cdist(np.array(object_centroids), input_centroids)
rows = D.min(axis=1).argsort()
cols = D.argmin(axis=1)[rows]
used_rows, used_cols = set(), set()
for (row, col) in zip(rows, cols):
if row in used_rows or col in used_cols:
continue
object_id = object_ids[row]
self.objects[object_id]['centroid'] = input_centroids[col]
self.objects[object_id]['bbox'] = bboxes[col]
self.disappeared[object_id] = 0
used_rows.add(row)
used_cols.add(col)
unused_rows = set(range(D.shape[0])).difference(used_rows)
unused_cols = set(range(D.shape[1])).difference(used_cols)
for row in unused_rows:
object_id = object_ids[row]
self.disappeared[object_id] += 1
if self.disappeared[object_id] > MAX_DISAPPEARED_FRAMES:
self.deregister(object_id)
for col in unused_cols:
self.register(input_centroids[col], bboxes[col])
for object_id, data in self.objects.items():
(startX, startY, endX, endY) = data['bbox']
label = data['name']
cv2.rectangle(frame, (startX, startY), (endX, endY), data['color'], 2)
cv2.putText(frame, label, (startX, startY - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.7, data['color'], 2)
return frame
# --- SALESFORCE CONNECTION ---
@retry(stop_max_attempt_number=3, wait_fixed=2000)
def connect_to_salesforce() -> Optional[Salesforce]:
try:
sf = Salesforce(**SF_CREDENTIALS)
sf.describe()
logger.info("βœ… Successfully connected to Salesforce.")
return sf
except Exception as e:
logger.error(f"❌ Salesforce connection failed: {e}")
raise
# --- CORE LOGIC ---
class AttendanceSystem:
def __init__(self):
self.processing_thread = None
self.is_processing = threading.Event()
self.frame_queue = queue.Queue(maxsize=30)
self.error_message = None
self.current_frame = None
self.known_face_embeddings: List[List[np.ndarray]] = [] # Store multiple embeddings per worker
self.known_face_names: List[str] = []
self.known_face_ids: List[str] = []
self.next_worker_id: int = 1
self.session_log_present: List[str] = [] # Workers marked present
self.session_log_new: List[str] = [] # Newly registered workers
self.session_attended_ids: Dict[str, datetime] = {} # Track attendance with timestamps
self.sf = connect_to_salesforce()
self._create_directories()
self.load_worker_data()
def _create_directories(self):
os.makedirs("data/faces", exist_ok=True)
def load_worker_data(self):
logger.info("Loading worker data...")
if self.sf:
try:
workers = self.sf.query_all("SELECT Worker_ID__c, Name, Face_Embeddings__c FROM Worker__c")['records']
if not workers:
self._load_local_worker_data()
return
temp_embeddings, temp_names, temp_ids, max_id = [], [], [], 0
for worker in workers:
if worker.get('Face_Embeddings__c'):
embeddings = [np.array(emb) for emb in json.loads(worker['Face_Embeddings__c'])]
temp_embeddings.append(embeddings)
temp_names.append(worker['Name'])
temp_ids.append(worker['Worker_ID__c'])
try:
worker_num = int(''.join(filter(str.isdigit, worker['Worker_ID__c'])))
if worker_num > max_id:
max_id = worker_num
except (ValueError, TypeError):
continue
self.known_face_embeddings = temp_embeddings
self.known_face_names = temp_names
self.known_face_ids = temp_ids
self.next_worker_id = max_id + 1
self.save_local_worker_data()
logger.info(f"βœ… Loaded {len(self.known_face_ids)} workers from Salesforce. Next ID: {self.next_worker_id}")
except Exception as e:
logger.error(f"❌ Error loading from Salesforce: {e}. Attempting local load.")
self._load_local_worker_data()
else:
logger.warning("Salesforce not connected. Loading from local cache.")
self._load_local_worker_data()
def _load_local_worker_data(self):
try:
if os.path.exists("data/workers.pkl"):
with open("data/workers.pkl", "rb") as f:
data = pickle.load(f)
self.known_face_embeddings = data.get("embeddings", [])
self.known_face_names = data.get("names", [])
self.known_face_ids = data.get("ids", [])
self.next_worker_id = data.get("next_id", 1)
logger.info(f"βœ… Loaded {len(self.known_face_ids)} workers from local cache. Next ID: {self.next_worker_id}")
except Exception as e:
logger.error(f"❌ Error loading local data: {e}")
def save_local_worker_data(self):
try:
worker_data = {
"embeddings": self.known_face_embeddings,
"names": self.known_face_names,
"ids": self.known_face_ids,
"next_id": self.next_worker_id
}
with open("data/workers.pkl", "wb") as f:
pickle.dump(worker_data, f)
except Exception as e:
logger.error(f"❌ Error saving local worker data: {e}")
def register_worker_manual(self, image: Image.Image, name: str) -> Tuple[str, str]:
if image is None or not name.strip():
return "❌ Please provide both image and name!", self.get_registered_workers_info()
try:
image_array = np.array(image)
DeepFace.analyze(img_path=image_array, actions=['emotion'], enforce_detection=True)
embedding = DeepFace.represent(img_path=image_array, model_name='Facenet', enforce_detection=False)[0]['embedding']
if self._is_duplicate_face(embedding):
return f"❌ Face matches an existing worker!", self.get_registered_workers_info()
worker_id = f"W{self.next_worker_id:04d}"
name = name.strip().title()
self._add_worker_to_system(worker_id, name, [embedding], image_array)
self.save_local_worker_data()
return f"βœ… {name} registered with ID: {worker_id}!", self.get_registered_workers_info()
except ValueError:
return "❌ No face detected in the image!", self.get_registered_workers_info()
except Exception as e:
logger.error(f"Manual registration error: {e}")
return f"❌ Registration error: {e}", self.get_registered_workers_info()
def _register_worker_auto(self, face_image: np.ndarray) -> Optional[Dict]:
try:
analysis = DeepFace.analyze(img_path=face_image, actions=['emotion'], enforce_detection=False)
if analysis[0]['face_confidence'] < AUTO_REGISTER_CONFIDENCE:
return None
embedding = DeepFace.represent(img_path=face_image, model_name='Facenet', enforce_detection=False)[0]['embedding']
if self._is_duplicate_face(embedding):
return None
worker_id = f"W{self.next_worker_id:04d}"
worker_name = f"New Worker {self.next_worker_id}"
self._add_worker_to_system(worker_id, worker_name, [embedding], face_image)
self.save_local_worker_data()
self.mark_attendance(worker_id, worker_name, is_new=True)
return {'name': worker_name, 'worker_id': worker_id}
except Exception as e:
logger.error(f"❌ Auto-registration error: {e}")
return None
def _add_worker_to_system(self, worker_id: str, name: str, embeddings: List[np.ndarray], image_array: np.ndarray):
self.known_face_embeddings.append(embeddings)
self.known_face_names.append(name)
self.known_face_ids.append(worker_id)
self.next_worker_id += 1
if self.sf:
try:
image = Image.fromarray(cv2.cvtColor(image_array, cv2.COLOR_BGR2RGB))
caption = self._get_image_caption(image)
worker_record = self.sf.Worker__c.create({
'Worker_ID__c': worker_id,
'Name': name,
'Face_Embeddings__c': json.dumps([emb.tolist() for emb in embeddings]),
'Image_Caption__c': caption
})
content_doc_id = self._upload_image_to_salesforce(image, worker_record['id'], worker_id)
if content_doc_id:
self.sf.Worker__c.update(worker_record['id'], {'Image_URL__c': content_doc_id})
except Exception as e:
logger.error(f"❌ Salesforce worker sync error: {e}")
def _is_duplicate_face(self, embedding: List[float]) -> bool:
if not self.known_face_embeddings:
return False
for worker_embeddings in self.known_face_embeddings:
for known_emb in worker_embeddings:
distance = np.linalg.norm(np.array(embedding) - np.array(known_emb))
if distance < FACE_MATCH_THRESHOLD:
return True
return False
def mark_attendance(self, worker_id: str, worker_name: str, is_new: bool = False):
now = datetime.now()
if worker_id in self.session_attended_ids:
last_attendance = self.session_attended_ids[worker_id]
if now - last_attendance < timedelta(hours=24):
return
self.session_attended_ids[worker_id] = now
current_time_str = now.strftime('%H:%M:%S')
log_msg = f"πŸ†• [{current_time_str}] Registered: {worker_name} ({worker_id})" if is_new else f"βœ… [{current_time_str}] Present: {worker_name} ({worker_id})"
target_log = self.session_log_new if is_new else self.session_log_present
target_log.insert(0, log_msg)
if self.sf:
try:
self.sf.Attendance__c.create({
'Worker_ID__c': worker_id,
'Name': worker_name,
'Attendance_Date__c': now.date().isoformat(),
'Attendance_Time__c': current_time_str,
'Status__c': 'Present'
})
except Exception as e:
logger.error(f"❌ Salesforce attendance sync error: {e}")
def identify_and_log_face(self, face_image: np.ndarray) -> Optional[Dict]:
try:
embedding = DeepFace.represent(img_path=face_image, model_name='Facenet', enforce_detection=False)[0]['embedding']
if self.known_face_embeddings:
min_dist = float('inf')
match_index = -1
for i, worker_embeddings in enumerate(self.known_face_embeddings):
for known_emb in worker_embeddings:
distance = np.linalg.norm(np.array(embedding) - np.array(known_emb))
if distance < min_dist and distance < FACE_MATCH_THRESHOLD:
min_dist = distance
match_index = i
if match_index >= 0:
worker_id = self.known_face_ids[match_index]
worker_name = self.known_face_names[match_index]
self.mark_attendance(worker_id, worker_name)
return {'name': worker_name, 'worker_id': worker_id, 'color': (0, 255, 0)} # Green
new_worker = self._register_worker_auto(face_image)
if new_worker:
return {'name': new_worker['name'], 'worker_id': new_worker['worker_id'], 'color': (0, 165, 255)} # Orange
except Exception as e:
logger.error(f"Face identification failed: {e}")
return {'name': 'Unknown', 'worker_id': None, 'color': (0, 0, 255)} # Red
def _processing_loop(self, source):
video_capture = cv2.VideoCapture(source)
if not video_capture.isOpened():
self.error_message = f"❌ Error: Could not open video source: {source}"
logger.error(self.error_message)
self.is_processing.clear()
return
tracker = FaceTracker(self)
frame_count = 0
while self.is_processing.is_set():
ret, frame = video_capture.read()
if not ret:
logger.info("Video ended or failed to read frame.")
break
self.current_frame = frame.copy()
annotated_frame = tracker.update(self.current_frame)
logger.debug(f"Processed frame {frame_count}, queue size: {self.frame_queue.qsize()}")
if not self.frame_queue.full():
self.frame_queue.put(annotated_frame)
else:
try:
self.frame_queue.get_nowait()
except queue.Empty:
pass
self.frame_queue.put(annotated_frame)
frame_count += 1
time.sleep(0.01)
video_capture.release()
self.is_processing.clear()
logger.info("Processing finished.")
def start_processing(self, source) -> str:
if self.is_processing.is_set():
return "⚠️ Processing is already active."
self.session_log_present.clear()
self.session_log_new.clear()
self.session_attended_ids.clear()
self.error_message = None
while not self.frame_queue.empty():
self.frame_queue.get()
self.is_processing.set()
self.processing_thread = threading.Thread(target=self._processing_loop, args=(source,))
self.processing_thread.daemon = True
self.processing_thread.start()
return "βœ… Started processing..."
def stop_processing(self) -> str:
if not self.is_processing.is_set():
return "⚠️ Processing is not currently active."
self.is_processing.clear()
if self.processing_thread:
self.processing_thread.join(timeout=2)
return "βœ… Processing stopped by user."
def get_registered_workers_info(self) -> str:
self.load_worker_data()
if not self.known_face_ids:
return "No workers registered."
info_list = [f"- **{name}** (ID: {id})" for name, id in sorted(zip(self.known_face_names, self.known_face_ids))]
return f"**πŸ‘₯ Registered Workers ({len(info_list)})**\n" + "\n".join(info_list)
# --- GRADIO UI ---
attendance_system = AttendanceSystem()
def create_interface():
with gr.Blocks(theme=gr.themes.Soft(), title="Attendance System") as demo:
gr.Markdown("# πŸš€ High-Performance Face Recognition Attendance")
with gr.Tabs():
with gr.Tab("βš™οΈ Controls & Status"):
gr.Markdown("### 1. Choose Input Source & Start Processing")
with gr.Row():
with gr.Column(scale=1):
selected_tab_index = gr.Number(value=0, visible=False)
with gr.Tabs() as video_tabs:
with gr.Tab("Live Camera", id=0):
camera_source = gr.Number(label="Camera Source Index", value=0, precision=0)
with gr.Tab("Upload Video", id=1):
video_file = gr.Video(label="Upload Video File", sources=["upload"])
with gr.Column(scale=1):
start_btn = gr.Button("▢️ Start Processing", variant="primary")
stop_btn = gr.Button("⏹️ Stop Processing", variant="stop")
status_box = gr.Textbox(label="Status", interactive=False, value="System Ready.")
gr.Markdown("### 2. View Results in the 'Output & Log' Tab")
gr.Markdown("**🎨 Color Coding:** <font color='cyan'>Identifying</font>, <font color='green'>Present</font>, <font color='orange'>New</font>, <font color='red'>Unknown</font>")
with gr.Tab("πŸ“Š Output & Log"):
with gr.Row(equal_height=True):
with gr.Column(scale=2):
video_output = gr.Image(label="Recognition Output", interactive=False, type="numpy")
with gr.Column(scale=1):
gr.Markdown("### πŸ“‹ Present Workers")
session_log_present = gr.Markdown(label="Present Workers Log", value="Awaiting detections...")
gr.Markdown("### πŸ†• Newly Registered Workers")
session_log_new = gr.Markdown(label="New Workers Log", value="Awaiting new registrations...")
with gr.Tab("πŸ‘€ Worker Management"):
with gr.Row():
with gr.Column():
gr.Markdown("### Register New Worker")
register_image = gr.Image(label="Upload Worker's Photo", type="pil", sources=["upload"])
register_name = gr.Textbox(label="Worker's Full Name")
register_btn = gr.Button("Register Worker", variant="primary")
register_output = gr.Textbox(label="Registration Status", interactive=False)
with gr.Column():
gr.Markdown("### Current Worker Roster")
registered_workers_info = gr.Markdown(value=attendance_system.get_registered_workers_info())
refresh_workers_btn = gr.Button("πŸ”„ Refresh List")
def on_tab_select(evt: gr.SelectData):
return evt.index
video_tabs.select(fn=on_tab_select, inputs=None, outputs=[selected_tab_index])
def start_wrapper(tab_index, cam_src, vid_path):
source = int(cam_src) if tab_index == 0 else vid_path
if tab_index == 1 and vid_path is None:
return "❌ Please upload a video file."
return attendance_system.start_processing(source) if source is not None else "Please provide an input source."
start_btn.click(fn=start_wrapper, inputs=[selected_tab_index, camera_source, video_file], outputs=[status_box])
stop_btn.click(fn=attendance_system.stop_processing, inputs=None, outputs=[status_box])
register_btn.click(fn=attendance_system.register_worker_manual, inputs=[register_image, register_name], outputs=[register_output, registered_workers_info])
refresh_workers_btn.click(fn=attendance_system.get_registered_workers_info, outputs=[registered_workers_info])
def update_ui_generator():
last_frame = None
while True:
status_text = "Status: Processing..." if attendance_system.is_processing.is_set() else "Status: Stopped."
present_md = "\n".join(attendance_system.session_log_present) or "Awaiting detections..."
new_md = "\n".join(attendance_system.session_log_new) or "Awaiting new registrations..."
frame = None
if attendance_system.is_processing.is_set():
try:
frame = attendance_system.frame_queue.get_nowait()
last_frame = frame
logger.debug("Retrieved frame from queue for UI update")
except queue.Empty:
frame = last_frame
logger.debug("Using last frame due to empty queue")
else:
present_md = "Processing stopped. Final Present Log:\n\n" + present_md
new_md = "Processing stopped. Final New Workers Log:\n\n" + new_md
if frame is not None:
yield cv2.cvtColor(frame, cv2.COLOR_BGR2RGB), present_md, new_md, status_text
else:
yield None, present_md, new_md, status_text
time.sleep(1/30)
demo.load(fn=update_ui_generator, outputs=[video_output, session_log_present, session_log_new, status_box])
return demo
if __name__ == "__main__":
def _get_image_caption(image: Image.Image) -> str:
if not HF_API_TOKEN:
return "Hugging Face API token not configured."
try:
buffered = BytesIO()
image.save(buffered, format="JPEG")
img_data = buffered.getvalue()
headers = {"Authorization": f"Bearer {HF_API_TOKEN}"}
response = requests.post(HF_API_URL, headers=headers, data=img_data, timeout=10)
response.raise_for_status()
result = response.json()
return result[0].get("generated_text", "No caption found.")
except Exception as e:
logger.error(f"Hugging Face API error: {e}")
return "Caption generation failed."
def _upload_image_to_salesforce(self, image: Image.Image, record_id: str, worker_id: str) -> Optional[str]:
if not self.sf:
return None
try:
buffered = BytesIO()
image.save(buffered, format="JPEG")
encoded_image = base64.b64encode(buffered.getvalue()).decode('utf-8')
cv = self.sf.ContentVersion.create({
'Title': f'Image_{worker_id}',
'PathOnClient': f'{worker_id}.jpg',
'VersionData': encoded_image,
'FirstPublishLocationId': record_id
})
content_doc_link = self.sf.query(f"SELECT ContentDocumentId FROM ContentDocumentLink WHERE LinkedEntityId = '{record_id}'")['records'][0]
content_doc_id = content_doc_link['ContentDocumentId']
return f"/sfc/servlet.shepherd/document/download/{content_doc_id}"
except Exception as e:
logger.error(f"Salesforce image upload error: {e}")
return None
AttendanceSystem._get_image_caption = _get_image_caption
AttendanceSystem._upload_image_to_salesforce = _upload_image_to_salesforce
app = create_interface()
app.queue()
app.launch(server_name="0.0.0.0", server_port=7860, show_error=True, debug=True)