Spaces:
Sleeping
Sleeping
| # Suppress TensorFlow oneDNN warnings | |
| import os | |
| os.environ['TF_ENABLE_ONEDNN_OPTS'] = '0' | |
| import gradio as gr | |
| import cv2 | |
| import numpy as np | |
| import pandas as pd | |
| from datetime import datetime, date | |
| from typing import Tuple, Optional | |
| import logging | |
| from deepface import DeepFace | |
| import pickle | |
| from io import BytesIO | |
| import base64 | |
| from PIL import Image | |
| import json | |
| import threading | |
| import time | |
| import queue | |
| import requests | |
| from simple_salesforce import Salesforce | |
| from dotenv import load_dotenv | |
| from retrying import retry | |
| # Setup logging | |
| logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") | |
| logger = logging.getLogger(__name__) | |
| # Load environment variables | |
| load_dotenv() | |
| # Hugging Face API configuration | |
| HF_API_URL = "https://api-inference.huggingface.co/models/Salesforce/blip-image-captioning-base" | |
| HF_API_TOKEN = os.getenv("HUGGINGFACE_API_TOKEN") | |
| # Salesforce configuration | |
| SF_CREDENTIALS = { | |
| "username": "smartlabour@attendance.system", | |
| "password": "#Prashanth@1234", | |
| "security_token": "7xPmtDFoWlZUGK0V2QSwFZJ6c", | |
| "domain": "login" | |
| } | |
| def connect_to_salesforce(): | |
| try: | |
| sf = Salesforce(**SF_CREDENTIALS) | |
| logger.info("Connected to Salesforce") | |
| sf.describe() | |
| return sf | |
| except Exception as e: | |
| logger.error(f"Salesforce connection failed: {e}") | |
| raise | |
| class AttendanceSystem: | |
| def __init__(self): | |
| self.known_face_embeddings = [] | |
| self.known_face_names = [] | |
| self.known_face_ids = [] | |
| self.attendance_records = [] | |
| self.next_worker_id = 1 | |
| self.video_capture = None | |
| self.is_streaming = False | |
| self.frame_queue = queue.Queue(maxsize=2) | |
| self.recognition_thread = None | |
| self.last_recognition_time = {} | |
| self.recognition_cooldown = 5 # seconds | |
| self.video_file_path = None | |
| self.video_processing = False | |
| # Initialize Salesforce | |
| try: | |
| self.sf = connect_to_salesforce() | |
| except Exception as e: | |
| logger.error(f"Error connecting to Salesforce: {e}") | |
| self.sf = None | |
| # Create directories | |
| os.makedirs("data", exist_ok=True) | |
| os.makedirs("data/faces", exist_ok=True) | |
| self.load_data() | |
| def load_data(self): | |
| try: | |
| if os.path.exists("data/workers.pkl"): | |
| with open("data/workers.pkl", "rb") as f: | |
| data = pickle.load(f) | |
| self.known_face_embeddings = data.get("embeddings", []) | |
| self.known_face_names = data.get("names", []) | |
| self.known_face_ids = data.get("ids", []) | |
| self.next_worker_id = data.get("next_id", 1) | |
| if os.path.exists("data/attendance.json"): | |
| with open("data/attendance.json", "r") as f: | |
| self.attendance_records = json.load(f) | |
| # Load embeddings from Salesforce for duplicate checks | |
| if self.sf: | |
| try: | |
| workers = self.sf.query_all("SELECT Worker_ID__c, Name, Face_Embedding__c FROM Worker__c")['records'] | |
| for worker in workers: | |
| if worker['Face_Embedding__c']: | |
| embedding = json.loads(worker['Face_Embedding__c']) | |
| if worker['Worker_ID__c'] not in self.known_face_ids: | |
| self.known_face_embeddings.append(embedding) | |
| self.known_face_names.append(worker['Name']) | |
| self.known_face_ids.append(worker['Worker_ID__c']) | |
| self.next_worker_id = max(self.next_worker_id, int(worker['Worker_ID__c'][1:]) + 1) | |
| except Exception as e: | |
| logger.error(f"Error loading embeddings from Salesforce: {e}") | |
| except Exception as e: | |
| logger.error(f"Error loading data: {e}") | |
| self.known_face_embeddings = [] | |
| self.known_face_names = [] | |
| self.known_face_ids = [] | |
| self.attendance_records = [] | |
| self.next_worker_id = 1 | |
| def save_data(self): | |
| try: | |
| worker_data = { | |
| "embeddings": self.known_face_embeddings, | |
| "names": self.known_face_names, | |
| "ids": self.known_face_ids, | |
| "next_id": self.next_worker_id | |
| } | |
| with open("data/workers.pkl", "wb") as f: | |
| pickle.dump(worker_data, f) | |
| with open("data/attendance.json", "w") as f: | |
| json.dump(self.attendance_records, f, indent=2) | |
| except Exception as e: | |
| logger.error(f"Error saving data: {e}") | |
| def get_image_caption(self, image: Image.Image) -> str: | |
| """Generate image caption using Hugging Face API""" | |
| try: | |
| img_byte_arr = BytesIO() | |
| image.save(img_byte_arr, format='JPEG', quality=85) | |
| img_data = img_byte_arr.getvalue() | |
| headers = {"Authorization": f"Bearer {HF_API_TOKEN}"} | |
| response = requests.post(HF_API_URL, headers=headers, data=img_data) | |
| if response.status_code == 200: | |
| result = response.json() | |
| if isinstance(result, list) and len(result) > 0: | |
| return result[0].get("generated_text", "No caption generated") | |
| return "No caption generated" | |
| else: | |
| logger.error(f"Hugging Face API error: {response.status_code} - {response.text}") | |
| return "Error generating caption" | |
| except Exception as e: | |
| logger.error(f"Error in Hugging Face API call: {e}") | |
| return "Error generating caption" | |
| def upload_image_to_salesforce(self, image: Image.Image, worker_salesforce_id: str, worker_id: str, worker_name: str) -> Optional[str]: | |
| """Upload worker image to Salesforce as ContentVersion""" | |
| try: | |
| if not image: | |
| logger.error("No image provided for upload") | |
| return None | |
| img_byte_arr = BytesIO() | |
| image.save(img_byte_arr, format='JPEG', quality=85) | |
| img_data = img_byte_arr.getvalue() | |
| encoded_image = base64.b64encode(img_data).decode('utf-8') | |
| content_version_data = { | |
| "Title": f"Worker_Image_{worker_id}_{worker_name.replace(' ', '_')}", | |
| "PathOnClient": f"worker_{worker_id}.jpg", | |
| "VersionData": encoded_image, | |
| "FirstPublishLocationId": worker_salesforce_id | |
| } | |
| content_version = self.sf.ContentVersion.create(content_version_data) | |
| file_url = f"https://{self.sf.sf_instance}/sfc/servlet.shepherd/version/download/{content_version['id']}" | |
| logger.info(f"Image uploaded to Salesforce for worker {worker_id}: {file_url}") | |
| return file_url | |
| except Exception as e: | |
| logger.error(f"Error uploading image to Salesforce: {e}") | |
| return None | |
| def register_worker_manual(self, image: Image.Image, name: str) -> Tuple[str, str]: | |
| """Manual worker registration with Hugging Face and Salesforce""" | |
| if image is None or not name.strip(): | |
| return "β Please provide both image and name!", self.get_registered_workers_info() | |
| image_array = np.array(image) | |
| try: | |
| face_analysis = DeepFace.analyze(img_path=image_array, actions=['emotion'], enforce_detection=True, detector_backend='opencv') | |
| embedding = DeepFace.represent(img_path=image_array, model_name='Facenet')[0]['embedding'] | |
| name = name.strip().title() | |
| if name in self.known_face_names: | |
| return f"β {name} is already registered!", self.get_registered_workers_info() | |
| # Check for duplicate face | |
| if len(self.known_face_embeddings) > 0: | |
| distances = [np.linalg.norm(np.array(embedding) - np.array(known_embedding)) | |
| for known_embedding in self.known_face_embeddings] | |
| min_distance = min(distances) | |
| if min_distance < 10: | |
| best_match_index = distances.index(min_distance) | |
| matched_name = self.known_face_names[best_match_index] | |
| matched_id = self.known_face_ids[best_match_index] | |
| return f"β Face matches existing worker: {matched_name} ({matched_id})!", self.get_registered_workers_info() | |
| worker_id = f"W{self.next_worker_id:04d}" | |
| caption = self.get_image_caption(image) | |
| self.known_face_embeddings.append(embedding) | |
| self.known_face_names.append(name) | |
| self.known_face_ids.append(worker_id) | |
| self.next_worker_id += 1 | |
| face_image = Image.fromarray(image_array) | |
| local_path = f"data/faces/{worker_id}_{name.replace(' ', '_')}.jpg" | |
| face_image.save(local_path) | |
| image_url = None | |
| if self.sf: | |
| try: | |
| worker_record = self.sf.Worker__c.create({ | |
| 'Name': name, | |
| 'Worker_ID__c': worker_id, | |
| 'Face_Embedding__c': json.dumps(embedding), | |
| 'Image_Caption__c': caption | |
| }) | |
| image_url = self.upload_image_to_salesforce(face_image, worker_record['id'], worker_id, name) | |
| if image_url: | |
| self.sf.Worker__c.update(worker_record['id'], {'Image_URL__c': image_url}) | |
| else: | |
| logger.warning("Image URL not set due to upload failure") | |
| except Exception as e: | |
| logger.error(f"Error saving to Salesforce: {e}") | |
| self.save_data() | |
| return f"β {name} has been successfully registered with ID: {worker_id}! Caption: {caption}\nImage URL: {image_url or 'Not uploaded'}", self.get_registered_workers_info() | |
| except ValueError as e: | |
| if "Face could not be detected" in str(e): | |
| return "β No face detected in the image!", self.get_registered_workers_info() | |
| return f"β Error processing image: {str(e)}", self.get_registered_workers_info() | |
| except Exception as e: | |
| return f"β Error during registration: {str(e)}", self.get_registered_workers_info() | |
| def register_worker_auto(self, face_image: np.ndarray) -> Tuple[Optional[str], Optional[str]]: | |
| """Automatic worker registration for unrecognized faces""" | |
| try: | |
| embedding = DeepFace.represent(img_path=face_image, model_name='Facenet')[0]['embedding'] | |
| # Check for duplicate face | |
| if len(self.known_face_embeddings) > 0: | |
| distances = [np.linalg.norm(np.array(embedding) - np.array(known_embedding)) | |
| for known_embedding in self.known_face_embeddings] | |
| min_distance = min(distances) | |
| if min_distance < 10: | |
| best_match_index = distances.index(min_distance) | |
| return self.known_face_ids[best_match_index], self.known_face_names[best_match_index] | |
| worker_id = f"W{self.next_worker_id:04d}" | |
| worker_name = f"Unknown_Worker_{self.next_worker_id}" | |
| face_pil = Image.fromarray(cv2.cvtColor(face_image, cv2.COLOR_BGR2RGB)) | |
| caption = self.get_image_caption(face_pil) | |
| self.known_face_embeddings.append(embedding) | |
| self.known_face_names.append(worker_name) | |
| self.known_face_ids.append(worker_id) | |
| self.next_worker_id += 1 | |
| local_path = f"data/faces/{worker_id}_{worker_name}.jpg" | |
| face_pil.save(local_path) | |
| image_url = None | |
| if self.sf: | |
| try: | |
| worker_record = self.sf.Worker__c.create({ | |
| 'Name': worker_name, | |
| 'Worker_ID__c': worker_id, | |
| 'Face_Embedding__c': json.dumps(embedding), | |
| 'Image_Caption__c': caption | |
| }) | |
| image_url = self.upload_image_to_salesforce(face_pil, worker_record['id'], worker_id, worker_name) | |
| if image_url: | |
| self.sf.Worker__c.update(worker_record['id'], {'Image_URL__c': image_url}) | |
| except Exception as e: | |
| logger.error(f"Error saving to Salesforce: {e}") | |
| self.save_data() | |
| return worker_id, worker_name | |
| except Exception as e: | |
| logger.error(f"Error in auto registration: {e}") | |
| return None, None | |
| def mark_attendance(self, worker_id: str, worker_name: str) -> bool: | |
| try: | |
| today = date.today().isoformat() | |
| current_time = datetime.now() | |
| already_marked = any( | |
| record["worker_id"] == worker_id and record["date"] == today | |
| for record in self.attendance_records | |
| ) | |
| if not already_marked: | |
| attendance_record = { | |
| "worker_id": worker_id, | |
| "name": worker_name, | |
| "date": today, | |
| "time": current_time.strftime("%H:%M:%S"), | |
| "timestamp": current_time.isoformat(), | |
| "status": "Present", | |
| "method": "Auto" | |
| } | |
| self.attendance_records.append(attendance_record) | |
| if self.sf: | |
| try: | |
| self.sf.Attendance__c.create({ | |
| 'Worker_ID__c': worker_id, | |
| 'Name__c': worker_name, | |
| 'Date__c': today, | |
| 'Time__c': current_time.strftime("%H:%M:%S"), | |
| 'Timestamp__c': current_time.isoformat(), | |
| 'Status__c': "Present", | |
| 'Method__c': "Auto" | |
| }) | |
| logger.info(f"Attendance for {worker_name} ({worker_id}) saved to Salesforce") | |
| except Exception as e: | |
| logger.error(f"Error saving attendance to Salesforce: {e}") | |
| self.save_data() | |
| return True | |
| return False | |
| except Exception as e: | |
| logger.error(f"Error marking attendance: {e}") | |
| return False | |
| def process_video_frame(self, frame: np.ndarray) -> np.ndarray: | |
| try: | |
| rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
| face_objs = DeepFace.extract_faces(img_path=rgb_frame, target_size=(160, 160), enforce_detection=False, detector_backend='opencv') | |
| current_time = time.time() | |
| for face_obj in face_objs: | |
| if face_obj['confidence'] > 0.9: | |
| face_area = face_obj['facial_area'] | |
| x, y, w, h = face_area['x'], face_area['y'], face_area['w'], face_area['h'] | |
| face_image = frame[y:y+h, x:x+w] | |
| try: | |
| embedding = DeepFace.represent(img_path=face_image, model_name='Facenet')[0]['embedding'] | |
| worker_id = None | |
| worker_name = "Unknown" | |
| color = (0, 0, 255) | |
| if len(self.known_face_embeddings) > 0: | |
| distances = [np.linalg.norm(np.array(embedding) - np.array(known_embedding)) | |
| for known_embedding in self.known_face_embeddings] | |
| min_distance = min(distances) | |
| best_match_index = distances.index(min_distance) | |
| if min_distance < 10: | |
| worker_id = self.known_face_ids[best_match_index] | |
| worker_name = self.known_face_names[best_match_index] | |
| color = (0, 255, 0) | |
| if worker_id not in self.last_recognition_time or \ | |
| current_time - self.last_recognition_time[worker_id] > self.recognition_cooldown: | |
| if self.mark_attendance(worker_id, worker_name): | |
| logger.info(f"Attendance marked for {worker_name} ({worker_id})") | |
| self.last_recognition_time[worker_id] = current_time | |
| else: | |
| if face_image.size > 0: | |
| new_id, new_name = self.register_worker_auto(face_image) | |
| if new_id: | |
| worker_id = new_id | |
| worker_name = new_name | |
| color = (255, 165, 0) | |
| logger.info(f"New worker registered: {new_name} ({new_id})") | |
| if self.mark_attendance(worker_id, worker_name): | |
| logger.info(f"Attendance marked for new worker {worker_name} ({worker_id})") | |
| cv2.rectangle(frame, (x, y), (x+w, y+h), color, 2) | |
| cv2.rectangle(frame, (x, y+h - 35), (x+w, y+h), color, cv2.FILLED) | |
| label = f"{worker_name} ({worker_id})" if worker_id else worker_name | |
| cv2.putText(frame, label, (x + 6, y+h - 6), | |
| cv2.FONT_HERSHEY_DUPLEX, 0.6, (255, 255, 255), 1) | |
| except Exception as e: | |
| logger.error(f"Error processing face: {e}") | |
| continue | |
| return frame | |
| except Exception as e: | |
| logger.error(f"Error processing frame: {e}") | |
| return frame | |
| def start_video_stream(self, camera_source: int = 0) -> str: | |
| try: | |
| if self.is_streaming: | |
| return "β οΈ Video stream is already running!" | |
| self.video_file_path = None | |
| self.video_capture = cv2.VideoCapture(camera_source) | |
| if not self.video_capture.isOpened(): | |
| return "β Could not open camera/video source!" | |
| self.is_streaming = True | |
| def video_loop(): | |
| while self.is_streaming: | |
| ret, frame = self.video_capture.read() | |
| if not ret: | |
| break | |
| processed_frame = self.process_video_frame(frame) | |
| if not self.frame_queue.full(): | |
| try: | |
| self.frame_queue.put_nowait(processed_frame) | |
| except queue.Full: | |
| pass | |
| time.sleep(0.1) | |
| self.recognition_thread = threading.Thread(target=video_loop) | |
| self.recognition_thread.daemon = True | |
| self.recognition_thread.start() | |
| return "β Live camera stream started successfully!" | |
| except Exception as e: | |
| return f"β Error starting video stream: {e}" | |
| def process_uploaded_video(self, video_path: str) -> str: | |
| try: | |
| if self.is_streaming: | |
| return "β οΈ Please stop current stream before processing a video file!" | |
| if not os.path.exists(video_path): | |
| return "β Video file not found!" | |
| self.video_file_path = video_path | |
| self.video_processing = True | |
| def video_processing_loop(): | |
| cap = cv2.VideoCapture(video_path) | |
| fps = cap.get(cv2.CAP_PROP_FPS) | |
| frame_delay = 1.0 / fps if fps > 0 else 0.03 | |
| while self.video_processing and cap.isOpened(): | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| processed_frame = self.process_video_frame(frame) | |
| if not self.frame_queue.full(): | |
| try: | |
| self.frame_queue.put_nowait(processed_frame) | |
| except queue.Full: | |
| pass | |
| time.sleep(frame_delay) | |
| cap.release() | |
| self.video_processing = False | |
| self.recognition_thread = threading.Thread(target=video_processing_loop) | |
| self.recognition_thread.daemon = True | |
| self.recognition_thread.start() | |
| return f"β Video processing started successfully! ({os.path.basename(video_path)})" | |
| except Exception as e: | |
| return f"β Error processing video: {e}" | |
| def stop_video_stream(self) -> str: | |
| try: | |
| self.is_streaming = False | |
| self.video_processing = False | |
| if self.video_capture: | |
| self.video_capture.release() | |
| self.video_capture = None | |
| if self.recognition_thread: | |
| self.recognition_thread.join(timeout=2) | |
| while not self.frame_queue.empty(): | |
| try: | |
| self.frame_queue.get_nowait() | |
| except queue.Empty: | |
| break | |
| return "β Video stream/processing stopped successfully!" | |
| except Exception as e: | |
| return f"β Error stopping video: {e}" | |
| def get_current_frame(self) -> Optional[np.ndarray]: | |
| try: | |
| if not self.frame_queue.empty(): | |
| return self.frame_queue.get_nowait() | |
| return None | |
| except queue.Empty: | |
| return None | |
| def get_registered_workers_info(self) -> str: | |
| if not self.sf: | |
| return "β Salesforce connection not established." | |
| try: | |
| workers = self.sf.query_all("SELECT Name, Worker_ID__c, Image_Caption__c, Image_URL__c FROM Worker__c")['records'] | |
| if not workers: | |
| return "No workers registered yet." | |
| info = f"**Registered Workers ({len(workers)}):**\n\n" | |
| for i, worker in enumerate(workers, 1): | |
| info += f"{i}. **{worker['Name']}** (ID: {worker['Worker_ID__c']}) - Caption: {worker['Image_Caption__c'] or 'N/A'}\n" | |
| if worker['Image_URL__c']: | |
| info += f" Image: [View]({worker['Image_URL__c']})\n" | |
| return info | |
| except Exception as e: | |
| logger.error(f"Error fetching workers from Salesforce: {e}") | |
| return self._get_local_workers_info() | |
| def _get_local_workers_info(self) -> str: | |
| if not self.known_face_names: | |
| return "No workers registered yet." | |
| info = f"**Registered Workers ({len(self.known_face_names)}):**\n\n" | |
| for i, (worker_id, name) in enumerate(zip(self.known_face_ids, self.known_face_names), 1): | |
| info += f"{i}. **{name}** (ID: {worker_id})\n" | |
| return info | |
| def get_today_attendance(self) -> str: | |
| if not self.sf: | |
| return "β Salesforce connection not established." | |
| today = date.today().isoformat() | |
| try: | |
| records = self.sf.query_all( | |
| f"SELECT Name__c, Worker_ID__c, Time__c, Method__c FROM Attendance__c WHERE Date__c = '{today}'" | |
| )['records'] | |
| if not records: | |
| return f"**Today's Attendance ({today}):**\n\nNo attendance marked yet." | |
| info = f"**Today's Attendance ({today}):**\n\n" | |
| for record in records: | |
| method_icon = "π€" if record['Method__c'] == "Auto" else "π€" | |
| info += f"{method_icon} **{record['Name__c']}** (ID: {record['Worker_ID__c']}) - {record['Time__c']}\n" | |
| return info | |
| except Exception as e: | |
| logger.error(f"Error fetching attendance from Salesforce: {e}") | |
| return self._get_local_today_attendance() | |
| def _get_local_today_attendance(self) -> str: | |
| today = date.today().isoformat() | |
| today_records = [r for r in self.attendance_records if r["date"] == today] | |
| if not today_records: | |
| return f"**Today's Attendance ({today}):**\n\nNo attendance marked yet." | |
| info = f"**Today's Attendance ({today}):**\n\n" | |
| for record in today_records: | |
| method_icon = "π€" if record.get("method") == "Auto" else "π€" | |
| info += f"{method_icon} **{record['name']}** (ID: {record['worker_id']}) - {record['time']}\n" | |
| return info | |
| def get_attendance_report(self, start_date: str, end_date: str) -> str: | |
| if not start_date or not end_date: | |
| return "Please select both start and end dates." | |
| try: | |
| datetime.strptime(start_date, '%Y-%m-%d') | |
| datetime.strptime(end_date, '%Y-%m-%d') | |
| except ValueError: | |
| return "Invalid date format. Please use YYYY-MM-DD." | |
| if not self.sf: | |
| return "β Salesforce connection not established." | |
| try: | |
| records = self.sf.query_all( | |
| f"SELECT Worker_ID__c, Name__c, Date__c, Time__c, Method__c FROM Attendance__c " | |
| f"WHERE Date__c >= '{start_date}' AND Date__c <= '{end_date}'" | |
| )['records'] | |
| if not records: | |
| return f"No attendance records found between {start_date} and {end_date}." | |
| df = pd.DataFrame(records) | |
| total_days = (pd.to_datetime(end_date) - pd.to_datetime(start_date)).days + 1 | |
| unique_workers = df['Worker_ID__c'].nunique() | |
| total_attendances = len(df) | |
| auto_registrations = len(df[df['Method__c'] == 'Auto']) | |
| report = f"**π Attendance Report ({start_date} to {end_date})**\n\n" | |
| report += f"**Summary:**\n" | |
| report += f"β’ Total Days: {total_days}\n" | |
| report += f"β’ Unique Workers: {unique_workers}\n" | |
| report += f"β’ Total Attendances: {total_attendances}\n" | |
| report += f"β’ Auto Detections: {auto_registrations}\n\n" | |
| if not df.empty: | |
| attendance_counts = df.groupby(['Worker_ID__c', 'Name__c']).size().reset_index(name='count') | |
| report += f"**π₯ Individual Attendance:**\n" | |
| for _, row in attendance_counts.iterrows(): | |
| percentage = (row['count'] / total_days) * 100 | |
| report += f"β’ **{row['Name__c']}** ({row['Worker_ID__c']}): {row['count']} days ({percentage:.1f}%)\n" | |
| return report | |
| except Exception as e: | |
| logger.error(f"Error generating report from Salesforce: {e}") | |
| return self._get_local_attendance_report(start_date, end_date) | |
| def _get_local_attendance_report(self, start_date: str, end_date: str) -> str: | |
| filtered_records = [ | |
| r for r in self.attendance_records | |
| if start_date <= r["date"] <= end_date | |
| ] | |
| if not filtered_records: | |
| return f"No attendance records found between {start_date} and {end_date}." | |
| df = pd.DataFrame(filtered_records) | |
| total_days = (pd.to_datetime(end_date) - pd.to_datetime(start_date)).days + 1 | |
| unique_workers = df['worker_id'].nunique() | |
| total_attendances = len(df) | |
| auto_registrations = len(df[df['method'] == 'Auto']) | |
| report = f"**π Attendance Report ({start_date} to {end_date})**\n\n" | |
| report += f"**Summary:**\n" | |
| report += f"β’ Total Days: {total_days}\n" | |
| report += f"β’ Unique Workers: {unique_workers}\n" | |
| report += f"β’ Total Attendances: {total_attendances}\n" | |
| report += f"β’ Auto Detections: {auto_registrations}\n\n" | |
| if not df.empty: | |
| attendance_counts = df.groupby(['worker_id', 'name']).size().reset_index(name='count') | |
| report += f"**π₯ Individual Attendance:**\n" | |
| for _, row in attendance_counts.iterrows(): | |
| percentage = (row['count'] / total_days) * 100 | |
| report += f"β’ **{row['name']}** ({row['worker_id']}): {row['count']} days ({percentage:.1f}%)\n" | |
| return report | |
| def export_attendance_csv(self) -> Tuple[Optional[str], str]: | |
| try: | |
| if not self.attendance_records: | |
| return None, "No attendance records to export." | |
| df = pd.DataFrame(self.attendance_records) | |
| timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') | |
| csv_file = f"attendance_report_{timestamp}.csv" | |
| df.to_csv(csv_file, index=False) | |
| return csv_file, f"β Attendance exported to {csv_file}" | |
| except Exception as e: | |
| return None, f"β Error exporting data: {e}" | |
| # Initialize system | |
| attendance_system = AttendanceSystem() | |
| def create_interface(): | |
| with gr.Blocks( | |
| title="π― Advanced Attendance System with Video Recognition", | |
| theme=gr.themes.Soft(), | |
| css=""" | |
| .gradio-container { max-width: 1400px !important; } | |
| .tab-nav { font-weight: bold; } | |
| .status-box { padding: 10px; border-radius: 5px; margin: 5px 0; } | |
| .video-option-tabs { margin-bottom: 15px; } | |
| """ | |
| ) as demo: | |
| gr.Markdown( | |
| """ | |
| # π― Advanced Attendance System with Face Recognition | |
| **Comprehensive facial recognition system with live camera and video file processing, integrated with Hugging Face and Salesforce** | |
| ## π **Key Features:** | |
| - **π₯ Live Camera Recognition** - Real-time face detection from camera/CCTV | |
| - **πΉ Video File Processing** - Process pre-recorded videos for attendance | |
| - **π€ Automatic Worker Registration** - Auto-register unknown faces with unique IDs | |
| - **π€ Manual Registration** - Register workers manually with photos and AI-generated captions | |
| - **π 24-Hour Attendance Rule** - One attendance mark per worker per day | |
| - **π Advanced Analytics** - Detailed reports and data export | |
| - **π€ Hugging Face Integration** - AI-powered image captioning | |
| - **βοΈ Salesforce Integration** - Store worker and attendance data in Salesforce | |
| """ | |
| ) | |
| with gr.Tabs(): | |
| with gr.Tab("π₯ Video Recognition", elem_classes="tab-nav"): | |
| gr.Markdown("### Face Recognition from Live Camera or Video File") | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| with gr.Tabs(selected="live", elem_classes="video-option-tabs") as video_tabs: | |
| with gr.Tab("Live Camera", id="live"): | |
| camera_source = gr.Number( | |
| label="Camera Source (0 for default camera, or RTSP URL)", | |
| value=0, | |
| precision=0 | |
| ) | |
| with gr.Row(): | |
| start_stream_btn = gr.Button( | |
| "π₯ Start Live Recognition", | |
| variant="primary", | |
| size="lg" | |
| ) | |
| with gr.Tab("Upload Video", id="upload"): | |
| video_file = gr.Video( | |
| label="Upload Video File", | |
| sources=["upload"], | |
| format="mp4" | |
| ) | |
| with gr.Row(): | |
| process_video_btn = gr.Button( | |
| "πΉ Process Video File", | |
| variant="primary", | |
| size="lg" | |
| ) | |
| stop_stream_btn = gr.Button( | |
| "βΉοΈ Stop Processing", | |
| variant="stop", | |
| size="lg" | |
| ) | |
| stream_status = gr.Textbox( | |
| label="Processing Status", | |
| value="Ready to start...", | |
| interactive=False, | |
| lines=2 | |
| ) | |
| gr.Markdown( | |
| """ | |
| **π Instructions:** | |
| - **Live Camera:** Select camera source and click "Start Live Recognition" | |
| - **Video File:** Upload a video file and click "Process Video File" | |
| - Click "Stop Processing" to stop current session | |
| **π¨ Color Coding:** | |
| - π’ **Green:** Known worker (attendance marked) | |
| - π **Orange:** New worker (auto-registered) | |
| - π΄ **Red:** Face detected but processing | |
| """ | |
| ) | |
| with gr.Column(scale=1): | |
| video_output = gr.Image( | |
| label="Recognition Output", | |
| streaming=True, | |
| interactive=False | |
| ) | |
| live_attendance_display = gr.Markdown( | |
| value=attendance_system.get_today_attendance(), | |
| label="Live Attendance Updates" | |
| ) | |
| refresh_attendance_btn = gr.Button( | |
| "π Refresh Attendance", | |
| variant="secondary" | |
| ) | |
| with gr.Tab("π€ Manual Registration", elem_classes="tab-nav"): | |
| gr.Markdown("### Register Workers Manually") | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| register_image = gr.Image( | |
| label="Upload Worker's Photo", | |
| type="pil", | |
| height=300 | |
| ) | |
| register_name = gr.Textbox( | |
| label="Worker's Full Name", | |
| placeholder="Enter full name...", | |
| lines=1 | |
| ) | |
| register_btn = gr.Button( | |
| "π€ Register Worker", | |
| variant="primary", | |
| size="lg" | |
| ) | |
| with gr.Column(scale=1): | |
| register_output = gr.Textbox( | |
| label="Registration Status", | |
| lines=3, | |
| interactive=False | |
| ) | |
| registered_workers_info = gr.Markdown( | |
| value=attendance_system.get_registered_workers_info(), | |
| label="Registered Workers Database" | |
| ) | |
| with gr.Tab("π Reports & Analytics", elem_classes="tab-nav"): | |
| gr.Markdown("### Attendance Reports and Data Export") | |
| with gr.Row(): | |
| with gr.Column(): | |
| gr.Markdown("#### π Generate Report") | |
| start_date = gr.Textbox( | |
| label="Start Date (YYYY-MM-DD)", | |
| value=date.today().replace(day=1).strftime('%Y-%m-%d') | |
| ) | |
| end_date = gr.Textbox( | |
| label="End Date (YYYY-MM-DD)", | |
| value=date.today().strftime('%Y-%m-%d') | |
| ) | |
| generate_report_btn = gr.Button( | |
| "π Generate Report", | |
| variant="primary" | |
| ) | |
| gr.Markdown("#### πΎ Export Data") | |
| export_btn = gr.Button( | |
| "π₯ Export to CSV", | |
| variant="secondary" | |
| ) | |
| export_status = gr.Textbox( | |
| label="Export Status", | |
| lines=2, | |
| interactive=False | |
| ) | |
| export_file = gr.File( | |
| label="Download File", | |
| visible=False | |
| ) | |
| with gr.Column(): | |
| report_output = gr.Markdown( | |
| value="Select date range and click 'Generate Report' to view attendance analytics.", | |
| label="Attendance Report" | |
| ) | |
| start_stream_btn.click( | |
| fn=attendance_system.start_video_stream, | |
| inputs=[camera_source], | |
| outputs=[stream_status] | |
| ) | |
| process_video_btn.click( | |
| fn=attendance_system.process_uploaded_video, | |
| inputs=[video_file], | |
| outputs=[stream_status] | |
| ) | |
| stop_stream_btn.click( | |
| fn=attendance_system.stop_video_stream, | |
| outputs=[stream_status] | |
| ) | |
| refresh_attendance_btn.click( | |
| fn=attendance_system.get_today_attendance, | |
| outputs=[live_attendance_display] | |
| ) | |
| register_btn.click( | |
| fn=attendance_system.register_worker_manual, | |
| inputs=[register_image, register_name], | |
| outputs=[register_output, registered_workers_info] | |
| ) | |
| generate_report_btn.click( | |
| fn=attendance_system.get_attendance_report, | |
| inputs=[start_date, end_date], | |
| outputs=[report_output] | |
| ) | |
| def export_and_show(): | |
| file_path, status = attendance_system.export_attendance_csv() | |
| if file_path: | |
| return status, gr.update(visible=True, value=file_path) | |
| else: | |
| return status, gr.update(visible=False) | |
| export_btn.click( | |
| fn=export_and_show, | |
| outputs=[export_status, export_file] | |
| ) | |
| def update_video_frame(): | |
| start_time = time.time() | |
| while True: | |
| current_time = time.time() | |
| if current_time - start_time >= 0.03: | |
| frame = attendance_system.get_current_frame() | |
| if frame is not None: | |
| frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
| return frame | |
| start_time = current_time | |
| time.sleep(0.01) | |
| video_thread = threading.Thread(target=lambda: demo.queue()(update_video_frame)()) | |
| video_thread.daemon = True | |
| video_thread.start() | |
| return demo | |
| if __name__ == "__main__": | |
| demo = create_interface() | |
| demo.launch( | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| share=False, | |
| show_error=True, | |
| debug=True | |
| ) |