import gradio as gr import cv2 import numpy as np import pandas as pd from datetime import datetime, date, timedelta from deepface import DeepFace import pickle import os from io import BytesIO import base64 from PIL import Image import json import threading import time import queue class AttendanceSystem: def __init__(self): self.known_face_embeddings = [] self.known_face_names = [] self.known_face_ids = [] self.attendance_records = [] self.next_worker_id = 1 self.video_capture = None self.is_streaming = False self.frame_queue = queue.Queue(maxsize=2) self.recognition_thread = None self.last_recognition_time = {} self.recognition_cooldown = 5 # seconds between recognitions for same person # Create directories for data storage os.makedirs("data", exist_ok=True) os.makedirs("data/faces", exist_ok=True) self.load_data() def load_data(self): """Load all stored data""" try: # Load face embeddings and worker data if os.path.exists("data/workers.pkl"): with open("data/workers.pkl", "rb") as f: data = pickle.load(f) self.known_face_embeddings = data.get("embeddings", []) self.known_face_names = data.get("names", []) self.known_face_ids = data.get("ids", []) self.next_worker_id = data.get("next_id", 1) # Load attendance records if os.path.exists("data/attendance.json"): with open("data/attendance.json", "r") as f: self.attendance_records = json.load(f) except Exception as e: print(f"Error loading data: {e}") self.known_face_embeddings = [] self.known_face_names = [] self.known_face_ids = [] self.attendance_records = [] self.next_worker_id = 1 def save_data(self): """Save all data to files""" try: # Save worker data worker_data = { "embeddings": self.known_face_embeddings, "names": self.known_face_names, "ids": self.known_face_ids, "next_id": self.next_worker_id } with open("data/workers.pkl", "wb") as f: pickle.dump(worker_data, f) # Save attendance records with open("data/attendance.json", "w") as f: json.dump(self.attendance_records, f, indent=2) except Exception as e: print(f"Error saving data: {e}") def register_worker_manual(self, image, name): """Manual worker registration""" if image is None or not name.strip(): return "❌ Please provide both image and name!", self.get_registered_workers_info() # Convert PIL image to RGB array if isinstance(image, Image.Image): image = np.array(image) try: # Verify the image contains a face face_analysis = DeepFace.analyze(img_path=image, actions=['emotion'], enforce_detection=True, detector_backend='opencv') # Get face embedding embedding = DeepFace.represent(img_path=image, model_name='Facenet')[0]['embedding'] # Check if person already exists name = name.strip().title() if name in self.known_face_names: return f"❌ {name} is already registered!", self.get_registered_workers_info() # Generate new worker ID worker_id = f"W{self.next_worker_id:04d}" # Add the face embedding, name, and ID self.known_face_embeddings.append(embedding) self.known_face_names.append(name) self.known_face_ids.append(worker_id) self.next_worker_id += 1 # Save face image face_image = Image.fromarray(image) face_image.save(f"data/faces/{worker_id}_{name.replace(' ', '_')}.jpg") self.save_data() return f"✅ {name} has been successfully registered with ID: {worker_id}!", self.get_registered_workers_info() except ValueError as e: if "Face could not be detected" in str(e): return "❌ No face detected in the image! Please try again with a clear face image.", self.get_registered_workers_info() return f"❌ Error processing image: {str(e)}", self.get_registered_workers_info() except Exception as e: return f"❌ Error during registration: {str(e)}", self.get_registered_workers_info() def register_worker_auto(self, face_image): """Automatic worker registration for unrecognized faces""" try: # Generate new worker ID and name worker_id = f"W{self.next_worker_id:04d}" worker_name = f"Unknown_Worker_{self.next_worker_id}" # Get face embedding embedding = DeepFace.represent(img_path=face_image, model_name='Facenet')[0]['embedding'] # Add to database self.known_face_embeddings.append(embedding) self.known_face_names.append(worker_name) self.known_face_ids.append(worker_id) self.next_worker_id += 1 # Save face image face_pil = Image.fromarray(cv2.cvtColor(face_image, cv2.COLOR_BGR2RGB)) face_pil.save(f"data/faces/{worker_id}_{worker_name}.jpg") self.save_data() return worker_id, worker_name except Exception as e: print(f"Error in auto registration: {e}") return None, None def mark_attendance(self, worker_id, worker_name): """Mark attendance for a worker""" try: today = date.today().isoformat() current_time = datetime.now() # Check if already marked today already_marked = any( record["worker_id"] == worker_id and record["date"] == today for record in self.attendance_records ) if not already_marked: # Mark attendance self.attendance_records.append({ "worker_id": worker_id, "name": worker_name, "date": today, "time": current_time.strftime("%H:%M:%S"), "timestamp": current_time.isoformat(), "status": "Present", "method": "Auto" }) self.save_data() return True return False except Exception as e: print(f"Error marking attendance: {e}") return False def process_video_frame(self, frame): """Process a single video frame for face recognition""" try: rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) # Find faces in the frame face_objs = DeepFace.extract_faces(img_path=rgb_frame, target_size=(160, 160), enforce_detection=False, detector_backend='opencv') current_time = time.time() for face_obj in face_objs: if face_obj['confidence'] > 0.9: # Only consider confident detections face_area = face_obj['facial_area'] x, y, w, h = face_area['x'], face_area['y'], face_area['w'], face_area['h'] # Extract face image face_image = frame[y:y+h, x:x+w] try: # Get face embedding embedding = DeepFace.represent(img_path=face_image, model_name='Facenet')[0]['embedding'] worker_id = None worker_name = "Unknown" color = (0, 0, 255) # Red for unknown # Compare with known faces if len(self.known_face_embeddings) > 0: # Calculate distances to known faces distances = [] for known_embedding in self.known_face_embeddings: distance = np.linalg.norm(np.array(embedding) - np.array(known_embedding)) distances.append(distance) min_distance = min(distances) best_match_index = distances.index(min_distance) if min_distance < 10: # Threshold for recognition worker_id = self.known_face_ids[best_match_index] worker_name = self.known_face_names[best_match_index] color = (0, 255, 0) # Green for known # Check cooldown period if worker_id not in self.last_recognition_time or \ current_time - self.last_recognition_time[worker_id] > self.recognition_cooldown: # Mark attendance if self.mark_attendance(worker_id, worker_name): print(f"✅ Attendance marked for {worker_name} ({worker_id})") self.last_recognition_time[worker_id] = current_time else: # Unknown face - auto register if face_image.size > 0: new_id, new_name = self.register_worker_auto(face_image) if new_id: worker_id = new_id worker_name = new_name color = (255, 165, 0) # Orange for newly registered print(f"🆕 New worker registered: {new_name} ({new_id})") # Mark attendance for new worker self.mark_attendance(worker_id, worker_name) # Draw rectangle and label cv2.rectangle(frame, (x, y), (x+w, y+h), color, 2) cv2.rectangle(frame, (x, y+h - 35), (x+w, y+h), color, cv2.FILLED) label = f"{worker_name}" if worker_id: label += f" ({worker_id})" cv2.putText(frame, label, (x + 6, y+h - 6), cv2.FONT_HERSHEY_DUPLEX, 0.6, (255, 255, 255), 1) except Exception as e: print(f"Error processing face: {e}") continue return frame except Exception as e: print(f"Error processing frame: {e}") return frame def start_video_stream(self, camera_source=0): """Start video streaming and recognition""" try: if self.is_streaming: return "⚠️ Video stream is already running!" self.video_capture = cv2.VideoCapture(camera_source) if not self.video_capture.isOpened(): return "❌ Could not open camera/video source!" self.is_streaming = True def video_loop(): while self.is_streaming: ret, frame = self.video_capture.read() if not ret: break # Process frame for face recognition processed_frame = self.process_video_frame(frame) # Add to queue for display if not self.frame_queue.full(): try: self.frame_queue.put_nowait(processed_frame) except queue.Full: pass time.sleep(0.1) # Limit processing rate self.recognition_thread = threading.Thread(target=video_loop) self.recognition_thread.daemon = True self.recognition_thread.start() return "✅ Video stream started successfully!" except Exception as e: return f"❌ Error starting video stream: {e}" def stop_video_stream(self): """Stop video streaming""" try: self.is_streaming = False if self.video_capture: self.video_capture.release() self.video_capture = None if self.recognition_thread: self.recognition_thread.join(timeout=2) # Clear frame queue while not self.frame_queue.empty(): try: self.frame_queue.get_nowait() except queue.Empty: break return "✅ Video stream stopped successfully!" except Exception as e: return f"❌ Error stopping video stream: {e}" def get_current_frame(self): """Get current frame for display""" try: if not self.frame_queue.empty(): frame = self.frame_queue.get_nowait() return frame return None except queue.Empty: return None def get_registered_workers_info(self): """Get information about registered workers""" if not self.known_face_names: return "No workers registered yet." info = f"**Registered Workers ({len(self.known_face_names)}):**\n\n" for i, (worker_id, name) in enumerate(zip(self.known_face_ids, self.known_face_names), 1): info += f"{i}. **{name}** (ID: {worker_id})\n" return info def get_today_attendance(self): """Get today's attendance records""" today = date.today().isoformat() today_records = [r for r in self.attendance_records if r["date"] == today] if not today_records: return f"**Today's Attendance ({today}):**\n\nNo attendance marked yet." info = f"**Today's Attendance ({today}):**\n\n" for record in today_records: method_icon = "🤖" if record.get("method") == "Auto" else "👤" info += f"{method_icon} **{record['name']}** (ID: {record['worker_id']}) - {record['time']}\n" return info def get_attendance_report(self, start_date, end_date): """Generate attendance report for date range""" if not start_date or not end_date: return "Please select both start and end dates." try: # Validate date format datetime.strptime(start_date, '%Y-%m-%d') datetime.strptime(end_date, '%Y-%m-%d') except ValueError: return "Invalid date format. Please use YYYY-MM-DD." # Filter records by date range filtered_records = [ r for r in self.attendance_records if start_date <= r["date"] <= end_date ] if not filtered_records: return f"No attendance records found between {start_date} and {end_date}." # Create DataFrame for analysis df = pd.DataFrame(filtered_records) # Summary statistics total_days = (pd.to_datetime(end_date) - pd.to_datetime(start_date)).days + 1 unique_workers = df['worker_id'].nunique() total_attendances = len(df) auto_registrations = len(df[df['method'] == 'Auto']) report = f"**📊 Attendance Report ({start_date} to {end_date})**\n\n" report += f"**Summary:**\n" report += f"• Total Days: {total_days}\n" report += f"• Unique Workers: {unique_workers}\n" report += f"• Total Attendances: {total_attendances}\n" report += f"• Auto Detections: {auto_registrations}\n\n" # Individual attendance counts if not df.empty: attendance_counts = df.groupby(['worker_id', 'name']).size().reset_index(name='count') report += f"**👥 Individual Attendance:**\n" for _, row in attendance_counts.iterrows(): percentage = (row['count'] / total_days) * 100 report += f"• **{row['name']}** ({row['worker_id']}): {row['count']} days ({percentage:.1f}%)\n" return report def export_attendance_csv(self): """Export attendance records to CSV""" try: if not self.attendance_records: return None, "No attendance records to export." df = pd.DataFrame(self.attendance_records) timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') csv_file = f"attendance_report_{timestamp}.csv" df.to_csv(csv_file, index=False) return csv_file, f"✅ Attendance exported to {csv_file}" except Exception as e: return None, f"❌ Error exporting data: {e}" # Initialize the attendance system attendance_system = AttendanceSystem() def create_interface(): with gr.Blocks( title="🎯 Advanced Attendance System with Live Recognition", theme=gr.themes.Soft(), css=""" .gradio-container { max-width: 1400px !important; } .tab-nav { font-weight: bold; } .status-box { padding: 10px; border-radius: 5px; margin: 5px 0; } """ ) as demo: gr.Markdown( """ # 🎯 Advanced Attendance System with Live Face Recognition **Comprehensive facial recognition system with automatic worker registration and attendance tracking** ## 🚀 **Key Features:** - **🎥 Live Video Stream Recognition** - Real-time face detection from camera/CCTV - **🤖 Automatic Worker Registration** - Auto-register unknown faces with unique IDs - **👤 Manual Registration** - Register workers manually with photos - **📅 24-Hour Attendance Rule** - One attendance mark per worker per day - **📊 Advanced Analytics** - Detailed reports and data export - **💾 Persistent Data Storage** - All data saved locally in `/data` folder ## 📁 **Data Storage Location:** - **Worker Database:** `/data/workers.pkl` - **Attendance Records:** `/data/attendance.json` - **Face Images:** `/data/faces/` folder """ ) with gr.Tabs(): # Live Recognition Tab with gr.Tab("🎥 Live Recognition", elem_classes="tab-nav"): gr.Markdown("### Real-time Face Recognition and Attendance") with gr.Row(): with gr.Column(scale=1): camera_source = gr.Number( label="Camera Source (0 for default camera, or RTSP URL)", value=0, precision=0 ) with gr.Row(): start_stream_btn = gr.Button( "🎥 Start Live Recognition", variant="primary", size="lg" ) stop_stream_btn = gr.Button( "⏹️ Stop Stream", variant="secondary", size="lg" ) stream_status = gr.Textbox( label="Stream Status", value="Ready to start...", interactive=False, lines=2 ) gr.Markdown( """ **📋 Instructions:** 1. Click "Start Live Recognition" to begin 2. System will automatically detect and register new faces 3. Known workers will be marked present (once per day) 4. New workers get auto-assigned IDs (W0001, W0002, etc.) **🎨 Color Coding:** - 🟢 **Green:** Known worker (attendance marked) - 🟠 **Orange:** New worker (auto-registered) - 🔴 **Red:** Face detected but processing """ ) with gr.Column(scale=1): live_attendance_display = gr.Markdown( value=attendance_system.get_today_attendance(), label="Live Attendance Updates" ) refresh_attendance_btn = gr.Button( "🔄 Refresh Attendance", variant="secondary" ) # Manual Registration Tab with gr.Tab("👤 Manual Registration", elem_classes="tab-nav"): gr.Markdown("### Register Workers Manually") with gr.Row(): with gr.Column(scale=1): register_image = gr.Image( label="Upload Worker's Photo", type="pil", height=300 ) register_name = gr.Textbox( label="Worker's Full Name", placeholder="Enter full name...", lines=1 ) register_btn = gr.Button( "👤 Register Worker", variant="primary", size="lg" ) with gr.Column(scale=1): register_output = gr.Textbox( label="Registration Status", lines=3, interactive=False ) registered_workers_info = gr.Markdown( value=attendance_system.get_registered_workers_info(), label="Registered Workers Database" ) # Reports & Analytics Tab with gr.Tab("📊 Reports & Analytics", elem_classes="tab-nav"): gr.Markdown("### Attendance Reports and Data Export") with gr.Row(): with gr.Column(): gr.Markdown("#### 📅 Generate Report") start_date = gr.Textbox( label="Start Date (YYYY-MM-DD)", value=date.today().replace(day=1).strftime('%Y-%m-%d') ) end_date = gr.Textbox( label="End Date (YYYY-MM-DD)", value=date.today().strftime('%Y-%m-%d') ) generate_report_btn = gr.Button( "📊 Generate Report", variant="primary" ) gr.Markdown("#### 💾 Export Data") export_btn = gr.Button( "📥 Export to CSV", variant="secondary" ) export_status = gr.Textbox( label="Export Status", lines=2, interactive=False ) export_file = gr.File( label="Download File", visible=False ) with gr.Column(): report_output = gr.Markdown( value="Select date range and click 'Generate Report' to view attendance analytics.", label="Attendance Report" ) # System Info Tab with gr.Tab("ℹ️ System Information", elem_classes="tab-nav"): gr.Markdown( """ ## 📖 System Guide ### 🎥 Live Recognition System - **Camera Setup:** Use camera index (0, 1, 2...) or RTSP URL for IP cameras - **Auto Registration:** Unknown faces automatically get worker IDs (W0001, W0002...) - **24-Hour Rule:** Each worker can only be marked present once per day - **Real-time Processing:** Continuous face detection and recognition ### 👤 Manual Registration - Upload clear, front-facing photos for best results - One face per image for registration - Workers get unique IDs automatically assigned ### 📁 Data Storage Structure ``` /data/ ├── workers.pkl # Worker database (embeddings, names, IDs) ├── attendance.json # All attendance records └── faces/ # Saved face images ├── W0001_John_Doe.jpg ├── W0002_Jane_Smith.jpg └── ... ``` ### 🔧 Technical Features - **Face Recognition:** Uses DeepFace with Facenet embeddings - **Distance Threshold:** 10 for face matching accuracy - **Threading:** Separate threads for video processing and UI - **Queue Management:** Efficient frame processing with queue system - **Error Handling:** Robust error handling and recovery ### 🚨 Troubleshooting - **Camera Issues:** Check camera permissions and connections - **Poor Recognition:** Ensure good lighting and clear face visibility - **Performance:** Reduce video resolution for better performance - **Storage:** Check disk space for face image storage ### 🔒 Privacy & Security - All data stored locally in `/data` folder - No external API calls or data transmission - Face images saved securely with worker IDs - Attendance records in JSON format for easy backup """ ) # Event handlers start_stream_btn.click( fn=attendance_system.start_video_stream, inputs=[camera_source], outputs=[stream_status] ) stop_stream_btn.click( fn=attendance_system.stop_video_stream, outputs=[stream_status] ) refresh_attendance_btn.click( fn=attendance_system.get_today_attendance, outputs=[live_attendance_display] ) register_btn.click( fn=attendance_system.register_worker_manual, inputs=[register_image, register_name], outputs=[register_output, registered_workers_info] ) generate_report_btn.click( fn=attendance_system.get_attendance_report, inputs=[start_date, end_date], outputs=[report_output] ) def export_and_show(): file_path, status = attendance_system.export_attendance_csv() if file_path: return status, gr.update(visible=True, value=file_path) else: return status, gr.update(visible=False) export_btn.click( fn=export_and_show, outputs=[export_status, export_file] ) # Remove the problematic auto-refresh implementation # Users will need to manually click the refresh button return demo # Create and launch the interface if __name__ == "__main__": demo = create_interface() demo.launch( server_name="0.0.0.0", server_port=7860, share=False, show_error=True, debug=True )