Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import cv2 | |
| import numpy as np | |
| import pandas as pd | |
| from datetime import datetime, date, timedelta | |
| from deepface import DeepFace | |
| import pickle | |
| import os | |
| from io import BytesIO | |
| import base64 | |
| from PIL import Image | |
| import json | |
| import threading | |
| import time | |
| import queue | |
| class AttendanceSystem: | |
| def __init__(self): | |
| self.known_face_embeddings = [] | |
| self.known_face_names = [] | |
| self.known_face_ids = [] | |
| self.attendance_records = [] | |
| self.next_worker_id = 1 | |
| self.video_capture = None | |
| self.is_streaming = False | |
| self.frame_queue = queue.Queue(maxsize=2) | |
| self.recognition_thread = None | |
| self.last_recognition_time = {} | |
| self.recognition_cooldown = 5 # seconds between recognitions for same person | |
| # Create directories for data storage | |
| os.makedirs("data", exist_ok=True) | |
| os.makedirs("data/faces", exist_ok=True) | |
| self.load_data() | |
| def load_data(self): | |
| """Load all stored data""" | |
| try: | |
| # Load face embeddings and worker data | |
| if os.path.exists("data/workers.pkl"): | |
| with open("data/workers.pkl", "rb") as f: | |
| data = pickle.load(f) | |
| self.known_face_embeddings = data.get("embeddings", []) | |
| self.known_face_names = data.get("names", []) | |
| self.known_face_ids = data.get("ids", []) | |
| self.next_worker_id = data.get("next_id", 1) | |
| # Load attendance records | |
| if os.path.exists("data/attendance.json"): | |
| with open("data/attendance.json", "r") as f: | |
| self.attendance_records = json.load(f) | |
| except Exception as e: | |
| print(f"Error loading data: {e}") | |
| self.known_face_embeddings = [] | |
| self.known_face_names = [] | |
| self.known_face_ids = [] | |
| self.attendance_records = [] | |
| self.next_worker_id = 1 | |
| def save_data(self): | |
| """Save all data to files""" | |
| try: | |
| # Save worker data | |
| worker_data = { | |
| "embeddings": self.known_face_embeddings, | |
| "names": self.known_face_names, | |
| "ids": self.known_face_ids, | |
| "next_id": self.next_worker_id | |
| } | |
| with open("data/workers.pkl", "wb") as f: | |
| pickle.dump(worker_data, f) | |
| # Save attendance records | |
| with open("data/attendance.json", "w") as f: | |
| json.dump(self.attendance_records, f, indent=2) | |
| except Exception as e: | |
| print(f"Error saving data: {e}") | |
| def register_worker_manual(self, image, name): | |
| """Manual worker registration""" | |
| if image is None or not name.strip(): | |
| return "β Please provide both image and name!", self.get_registered_workers_info() | |
| # Convert PIL image to RGB array | |
| if isinstance(image, Image.Image): | |
| image = np.array(image) | |
| try: | |
| # Verify the image contains a face | |
| face_analysis = DeepFace.analyze(img_path=image, actions=['emotion'], enforce_detection=True, detector_backend='opencv') | |
| # Get face embedding | |
| embedding = DeepFace.represent(img_path=image, model_name='Facenet')[0]['embedding'] | |
| # Check if person already exists | |
| name = name.strip().title() | |
| if name in self.known_face_names: | |
| return f"β {name} is already registered!", self.get_registered_workers_info() | |
| # Generate new worker ID | |
| worker_id = f"W{self.next_worker_id:04d}" | |
| # Add the face embedding, name, and ID | |
| self.known_face_embeddings.append(embedding) | |
| self.known_face_names.append(name) | |
| self.known_face_ids.append(worker_id) | |
| self.next_worker_id += 1 | |
| # Save face image | |
| face_image = Image.fromarray(image) | |
| face_image.save(f"data/faces/{worker_id}_{name.replace(' ', '_')}.jpg") | |
| self.save_data() | |
| return f"β {name} has been successfully registered with ID: {worker_id}!", self.get_registered_workers_info() | |
| except ValueError as e: | |
| if "Face could not be detected" in str(e): | |
| return "β No face detected in the image! Please try again with a clear face image.", self.get_registered_workers_info() | |
| return f"β Error processing image: {str(e)}", self.get_registered_workers_info() | |
| except Exception as e: | |
| return f"β Error during registration: {str(e)}", self.get_registered_workers_info() | |
| def register_worker_auto(self, face_image): | |
| """Automatic worker registration for unrecognized faces""" | |
| try: | |
| # Generate new worker ID and name | |
| worker_id = f"W{self.next_worker_id:04d}" | |
| worker_name = f"Unknown_Worker_{self.next_worker_id}" | |
| # Get face embedding | |
| embedding = DeepFace.represent(img_path=face_image, model_name='Facenet')[0]['embedding'] | |
| # Add to database | |
| self.known_face_embeddings.append(embedding) | |
| self.known_face_names.append(worker_name) | |
| self.known_face_ids.append(worker_id) | |
| self.next_worker_id += 1 | |
| # Save face image | |
| face_pil = Image.fromarray(cv2.cvtColor(face_image, cv2.COLOR_BGR2RGB)) | |
| face_pil.save(f"data/faces/{worker_id}_{worker_name}.jpg") | |
| self.save_data() | |
| return worker_id, worker_name | |
| except Exception as e: | |
| print(f"Error in auto registration: {e}") | |
| return None, None | |
| def mark_attendance(self, worker_id, worker_name): | |
| """Mark attendance for a worker""" | |
| try: | |
| today = date.today().isoformat() | |
| current_time = datetime.now() | |
| # Check if already marked today | |
| already_marked = any( | |
| record["worker_id"] == worker_id and record["date"] == today | |
| for record in self.attendance_records | |
| ) | |
| if not already_marked: | |
| # Mark attendance | |
| self.attendance_records.append({ | |
| "worker_id": worker_id, | |
| "name": worker_name, | |
| "date": today, | |
| "time": current_time.strftime("%H:%M:%S"), | |
| "timestamp": current_time.isoformat(), | |
| "status": "Present", | |
| "method": "Auto" | |
| }) | |
| self.save_data() | |
| return True | |
| return False | |
| except Exception as e: | |
| print(f"Error marking attendance: {e}") | |
| return False | |
| def process_video_frame(self, frame): | |
| """Process a single video frame for face recognition""" | |
| try: | |
| rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
| # Find faces in the frame | |
| face_objs = DeepFace.extract_faces(img_path=rgb_frame, target_size=(160, 160), enforce_detection=False, detector_backend='opencv') | |
| current_time = time.time() | |
| for face_obj in face_objs: | |
| if face_obj['confidence'] > 0.9: # Only consider confident detections | |
| face_area = face_obj['facial_area'] | |
| x, y, w, h = face_area['x'], face_area['y'], face_area['w'], face_area['h'] | |
| # Extract face image | |
| face_image = frame[y:y+h, x:x+w] | |
| try: | |
| # Get face embedding | |
| embedding = DeepFace.represent(img_path=face_image, model_name='Facenet')[0]['embedding'] | |
| worker_id = None | |
| worker_name = "Unknown" | |
| color = (0, 0, 255) # Red for unknown | |
| # Compare with known faces | |
| if len(self.known_face_embeddings) > 0: | |
| # Calculate distances to known faces | |
| distances = [] | |
| for known_embedding in self.known_face_embeddings: | |
| distance = np.linalg.norm(np.array(embedding) - np.array(known_embedding)) | |
| distances.append(distance) | |
| min_distance = min(distances) | |
| best_match_index = distances.index(min_distance) | |
| if min_distance < 10: # Threshold for recognition | |
| worker_id = self.known_face_ids[best_match_index] | |
| worker_name = self.known_face_names[best_match_index] | |
| color = (0, 255, 0) # Green for known | |
| # Check cooldown period | |
| if worker_id not in self.last_recognition_time or \ | |
| current_time - self.last_recognition_time[worker_id] > self.recognition_cooldown: | |
| # Mark attendance | |
| if self.mark_attendance(worker_id, worker_name): | |
| print(f"β Attendance marked for {worker_name} ({worker_id})") | |
| self.last_recognition_time[worker_id] = current_time | |
| else: | |
| # Unknown face - auto register | |
| if face_image.size > 0: | |
| new_id, new_name = self.register_worker_auto(face_image) | |
| if new_id: | |
| worker_id = new_id | |
| worker_name = new_name | |
| color = (255, 165, 0) # Orange for newly registered | |
| print(f"π New worker registered: {new_name} ({new_id})") | |
| # Mark attendance for new worker | |
| self.mark_attendance(worker_id, worker_name) | |
| # Draw rectangle and label | |
| cv2.rectangle(frame, (x, y), (x+w, y+h), color, 2) | |
| cv2.rectangle(frame, (x, y+h - 35), (x+w, y+h), color, cv2.FILLED) | |
| label = f"{worker_name}" | |
| if worker_id: | |
| label += f" ({worker_id})" | |
| cv2.putText(frame, label, (x + 6, y+h - 6), | |
| cv2.FONT_HERSHEY_DUPLEX, 0.6, (255, 255, 255), 1) | |
| except Exception as e: | |
| print(f"Error processing face: {e}") | |
| continue | |
| return frame | |
| except Exception as e: | |
| print(f"Error processing frame: {e}") | |
| return frame | |
| def start_video_stream(self, camera_source=0): | |
| """Start video streaming and recognition""" | |
| try: | |
| if self.is_streaming: | |
| return "β οΈ Video stream is already running!" | |
| self.video_capture = cv2.VideoCapture(camera_source) | |
| if not self.video_capture.isOpened(): | |
| return "β Could not open camera/video source!" | |
| self.is_streaming = True | |
| def video_loop(): | |
| while self.is_streaming: | |
| ret, frame = self.video_capture.read() | |
| if not ret: | |
| break | |
| # Process frame for face recognition | |
| processed_frame = self.process_video_frame(frame) | |
| # Add to queue for display | |
| if not self.frame_queue.full(): | |
| try: | |
| self.frame_queue.put_nowait(processed_frame) | |
| except queue.Full: | |
| pass | |
| time.sleep(0.1) # Limit processing rate | |
| self.recognition_thread = threading.Thread(target=video_loop) | |
| self.recognition_thread.daemon = True | |
| self.recognition_thread.start() | |
| return "β Video stream started successfully!" | |
| except Exception as e: | |
| return f"β Error starting video stream: {e}" | |
| def stop_video_stream(self): | |
| """Stop video streaming""" | |
| try: | |
| self.is_streaming = False | |
| if self.video_capture: | |
| self.video_capture.release() | |
| self.video_capture = None | |
| if self.recognition_thread: | |
| self.recognition_thread.join(timeout=2) | |
| # Clear frame queue | |
| while not self.frame_queue.empty(): | |
| try: | |
| self.frame_queue.get_nowait() | |
| except queue.Empty: | |
| break | |
| return "β Video stream stopped successfully!" | |
| except Exception as e: | |
| return f"β Error stopping video stream: {e}" | |
| def get_current_frame(self): | |
| """Get current frame for display""" | |
| try: | |
| if not self.frame_queue.empty(): | |
| frame = self.frame_queue.get_nowait() | |
| return frame | |
| return None | |
| except queue.Empty: | |
| return None | |
| def get_registered_workers_info(self): | |
| """Get information about registered workers""" | |
| if not self.known_face_names: | |
| return "No workers registered yet." | |
| info = f"**Registered Workers ({len(self.known_face_names)}):**\n\n" | |
| for i, (worker_id, name) in enumerate(zip(self.known_face_ids, self.known_face_names), 1): | |
| info += f"{i}. **{name}** (ID: {worker_id})\n" | |
| return info | |
| def get_today_attendance(self): | |
| """Get today's attendance records""" | |
| today = date.today().isoformat() | |
| today_records = [r for r in self.attendance_records if r["date"] == today] | |
| if not today_records: | |
| return f"**Today's Attendance ({today}):**\n\nNo attendance marked yet." | |
| info = f"**Today's Attendance ({today}):**\n\n" | |
| for record in today_records: | |
| method_icon = "π€" if record.get("method") == "Auto" else "π€" | |
| info += f"{method_icon} **{record['name']}** (ID: {record['worker_id']}) - {record['time']}\n" | |
| return info | |
| def get_attendance_report(self, start_date, end_date): | |
| """Generate attendance report for date range""" | |
| if not start_date or not end_date: | |
| return "Please select both start and end dates." | |
| try: | |
| # Validate date format | |
| datetime.strptime(start_date, '%Y-%m-%d') | |
| datetime.strptime(end_date, '%Y-%m-%d') | |
| except ValueError: | |
| return "Invalid date format. Please use YYYY-MM-DD." | |
| # Filter records by date range | |
| filtered_records = [ | |
| r for r in self.attendance_records | |
| if start_date <= r["date"] <= end_date | |
| ] | |
| if not filtered_records: | |
| return f"No attendance records found between {start_date} and {end_date}." | |
| # Create DataFrame for analysis | |
| df = pd.DataFrame(filtered_records) | |
| # Summary statistics | |
| total_days = (pd.to_datetime(end_date) - pd.to_datetime(start_date)).days + 1 | |
| unique_workers = df['worker_id'].nunique() | |
| total_attendances = len(df) | |
| auto_registrations = len(df[df['method'] == 'Auto']) | |
| report = f"**π Attendance Report ({start_date} to {end_date})**\n\n" | |
| report += f"**Summary:**\n" | |
| report += f"β’ Total Days: {total_days}\n" | |
| report += f"β’ Unique Workers: {unique_workers}\n" | |
| report += f"β’ Total Attendances: {total_attendances}\n" | |
| report += f"β’ Auto Detections: {auto_registrations}\n\n" | |
| # Individual attendance counts | |
| if not df.empty: | |
| attendance_counts = df.groupby(['worker_id', 'name']).size().reset_index(name='count') | |
| report += f"**π₯ Individual Attendance:**\n" | |
| for _, row in attendance_counts.iterrows(): | |
| percentage = (row['count'] / total_days) * 100 | |
| report += f"β’ **{row['name']}** ({row['worker_id']}): {row['count']} days ({percentage:.1f}%)\n" | |
| return report | |
| def export_attendance_csv(self): | |
| """Export attendance records to CSV""" | |
| try: | |
| if not self.attendance_records: | |
| return None, "No attendance records to export." | |
| df = pd.DataFrame(self.attendance_records) | |
| timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') | |
| csv_file = f"attendance_report_{timestamp}.csv" | |
| df.to_csv(csv_file, index=False) | |
| return csv_file, f"β Attendance exported to {csv_file}" | |
| except Exception as e: | |
| return None, f"β Error exporting data: {e}" | |
| # Initialize the attendance system | |
| attendance_system = AttendanceSystem() | |
| def create_interface(): | |
| with gr.Blocks( | |
| title="π― Advanced Attendance System with Live Recognition", | |
| theme=gr.themes.Soft(), | |
| css=""" | |
| .gradio-container { | |
| max-width: 1400px !important; | |
| } | |
| .tab-nav { | |
| font-weight: bold; | |
| } | |
| .status-box { | |
| padding: 10px; | |
| border-radius: 5px; | |
| margin: 5px 0; | |
| } | |
| """ | |
| ) as demo: | |
| gr.Markdown( | |
| """ | |
| # π― Advanced Attendance System with Live Face Recognition | |
| **Comprehensive facial recognition system with automatic worker registration and attendance tracking** | |
| ## π **Key Features:** | |
| - **π₯ Live Video Stream Recognition** - Real-time face detection from camera/CCTV | |
| - **π€ Automatic Worker Registration** - Auto-register unknown faces with unique IDs | |
| - **π€ Manual Registration** - Register workers manually with photos | |
| - **π 24-Hour Attendance Rule** - One attendance mark per worker per day | |
| - **π Advanced Analytics** - Detailed reports and data export | |
| - **πΎ Persistent Data Storage** - All data saved locally in `/data` folder | |
| ## π **Data Storage Location:** | |
| - **Worker Database:** `/data/workers.pkl` | |
| - **Attendance Records:** `/data/attendance.json` | |
| - **Face Images:** `/data/faces/` folder | |
| """ | |
| ) | |
| with gr.Tabs(): | |
| # Live Recognition Tab | |
| with gr.Tab("π₯ Live Recognition", elem_classes="tab-nav"): | |
| gr.Markdown("### Real-time Face Recognition and Attendance") | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| camera_source = gr.Number( | |
| label="Camera Source (0 for default camera, or RTSP URL)", | |
| value=0, | |
| precision=0 | |
| ) | |
| with gr.Row(): | |
| start_stream_btn = gr.Button( | |
| "π₯ Start Live Recognition", | |
| variant="primary", | |
| size="lg" | |
| ) | |
| stop_stream_btn = gr.Button( | |
| "βΉοΈ Stop Stream", | |
| variant="secondary", | |
| size="lg" | |
| ) | |
| stream_status = gr.Textbox( | |
| label="Stream Status", | |
| value="Ready to start...", | |
| interactive=False, | |
| lines=2 | |
| ) | |
| gr.Markdown( | |
| """ | |
| **π Instructions:** | |
| 1. Click "Start Live Recognition" to begin | |
| 2. System will automatically detect and register new faces | |
| 3. Known workers will be marked present (once per day) | |
| 4. New workers get auto-assigned IDs (W0001, W0002, etc.) | |
| **π¨ Color Coding:** | |
| - π’ **Green:** Known worker (attendance marked) | |
| - π **Orange:** New worker (auto-registered) | |
| - π΄ **Red:** Face detected but processing | |
| """ | |
| ) | |
| with gr.Column(scale=1): | |
| live_attendance_display = gr.Markdown( | |
| value=attendance_system.get_today_attendance(), | |
| label="Live Attendance Updates" | |
| ) | |
| refresh_attendance_btn = gr.Button( | |
| "π Refresh Attendance", | |
| variant="secondary" | |
| ) | |
| # Manual Registration Tab | |
| with gr.Tab("π€ Manual Registration", elem_classes="tab-nav"): | |
| gr.Markdown("### Register Workers Manually") | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| register_image = gr.Image( | |
| label="Upload Worker's Photo", | |
| type="pil", | |
| height=300 | |
| ) | |
| register_name = gr.Textbox( | |
| label="Worker's Full Name", | |
| placeholder="Enter full name...", | |
| lines=1 | |
| ) | |
| register_btn = gr.Button( | |
| "π€ Register Worker", | |
| variant="primary", | |
| size="lg" | |
| ) | |
| with gr.Column(scale=1): | |
| register_output = gr.Textbox( | |
| label="Registration Status", | |
| lines=3, | |
| interactive=False | |
| ) | |
| registered_workers_info = gr.Markdown( | |
| value=attendance_system.get_registered_workers_info(), | |
| label="Registered Workers Database" | |
| ) | |
| # Reports & Analytics Tab | |
| with gr.Tab("π Reports & Analytics", elem_classes="tab-nav"): | |
| gr.Markdown("### Attendance Reports and Data Export") | |
| with gr.Row(): | |
| with gr.Column(): | |
| gr.Markdown("#### π Generate Report") | |
| start_date = gr.Textbox( | |
| label="Start Date (YYYY-MM-DD)", | |
| value=date.today().replace(day=1).strftime('%Y-%m-%d') | |
| ) | |
| end_date = gr.Textbox( | |
| label="End Date (YYYY-MM-DD)", | |
| value=date.today().strftime('%Y-%m-%d') | |
| ) | |
| generate_report_btn = gr.Button( | |
| "π Generate Report", | |
| variant="primary" | |
| ) | |
| gr.Markdown("#### πΎ Export Data") | |
| export_btn = gr.Button( | |
| "π₯ Export to CSV", | |
| variant="secondary" | |
| ) | |
| export_status = gr.Textbox( | |
| label="Export Status", | |
| lines=2, | |
| interactive=False | |
| ) | |
| export_file = gr.File( | |
| label="Download File", | |
| visible=False | |
| ) | |
| with gr.Column(): | |
| report_output = gr.Markdown( | |
| value="Select date range and click 'Generate Report' to view attendance analytics.", | |
| label="Attendance Report" | |
| ) | |
| # System Info Tab | |
| with gr.Tab("βΉοΈ System Information", elem_classes="tab-nav"): | |
| gr.Markdown( | |
| """ | |
| ## π System Guide | |
| ### π₯ Live Recognition System | |
| - **Camera Setup:** Use camera index (0, 1, 2...) or RTSP URL for IP cameras | |
| - **Auto Registration:** Unknown faces automatically get worker IDs (W0001, W0002...) | |
| - **24-Hour Rule:** Each worker can only be marked present once per day | |
| - **Real-time Processing:** Continuous face detection and recognition | |
| ### π€ Manual Registration | |
| - Upload clear, front-facing photos for best results | |
| - One face per image for registration | |
| - Workers get unique IDs automatically assigned | |
| ### π Data Storage Structure | |
| ``` | |
| /data/ | |
| βββ workers.pkl # Worker database (embeddings, names, IDs) | |
| βββ attendance.json # All attendance records | |
| βββ faces/ # Saved face images | |
| βββ W0001_John_Doe.jpg | |
| βββ W0002_Jane_Smith.jpg | |
| βββ ... | |
| ``` | |
| ### π§ Technical Features | |
| - **Face Recognition:** Uses DeepFace with Facenet embeddings | |
| - **Distance Threshold:** 10 for face matching accuracy | |
| - **Threading:** Separate threads for video processing and UI | |
| - **Queue Management:** Efficient frame processing with queue system | |
| - **Error Handling:** Robust error handling and recovery | |
| ### π¨ Troubleshooting | |
| - **Camera Issues:** Check camera permissions and connections | |
| - **Poor Recognition:** Ensure good lighting and clear face visibility | |
| - **Performance:** Reduce video resolution for better performance | |
| - **Storage:** Check disk space for face image storage | |
| ### π Privacy & Security | |
| - All data stored locally in `/data` folder | |
| - No external API calls or data transmission | |
| - Face images saved securely with worker IDs | |
| - Attendance records in JSON format for easy backup | |
| """ | |
| ) | |
| # Event handlers | |
| start_stream_btn.click( | |
| fn=attendance_system.start_video_stream, | |
| inputs=[camera_source], | |
| outputs=[stream_status] | |
| ) | |
| stop_stream_btn.click( | |
| fn=attendance_system.stop_video_stream, | |
| outputs=[stream_status] | |
| ) | |
| refresh_attendance_btn.click( | |
| fn=attendance_system.get_today_attendance, | |
| outputs=[live_attendance_display] | |
| ) | |
| register_btn.click( | |
| fn=attendance_system.register_worker_manual, | |
| inputs=[register_image, register_name], | |
| outputs=[register_output, registered_workers_info] | |
| ) | |
| generate_report_btn.click( | |
| fn=attendance_system.get_attendance_report, | |
| inputs=[start_date, end_date], | |
| outputs=[report_output] | |
| ) | |
| def export_and_show(): | |
| file_path, status = attendance_system.export_attendance_csv() | |
| if file_path: | |
| return status, gr.update(visible=True, value=file_path) | |
| else: | |
| return status, gr.update(visible=False) | |
| export_btn.click( | |
| fn=export_and_show, | |
| outputs=[export_status, export_file] | |
| ) | |
| # Remove the problematic auto-refresh implementation | |
| # Users will need to manually click the refresh button | |
| return demo | |
| # Create and launch the interface | |
| if __name__ == "__main__": | |
| demo = create_interface() | |
| demo.launch( | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| share=False, | |
| show_error=True, | |
| debug=True | |
| ) |