Attendance / app.py
PrashanthB461's picture
Update app.py
9fe0485 verified
import gradio as gr
import cv2
import numpy as np
import pandas as pd
from datetime import datetime, date, timedelta
from deepface import DeepFace
import pickle
import os
from io import BytesIO
import base64
from PIL import Image
import json
import threading
import time
import queue
class AttendanceSystem:
def __init__(self):
self.known_face_embeddings = []
self.known_face_names = []
self.known_face_ids = []
self.attendance_records = []
self.next_worker_id = 1
self.video_capture = None
self.is_streaming = False
self.frame_queue = queue.Queue(maxsize=2)
self.recognition_thread = None
self.last_recognition_time = {}
self.recognition_cooldown = 5 # seconds between recognitions for same person
# Create directories for data storage
os.makedirs("data", exist_ok=True)
os.makedirs("data/faces", exist_ok=True)
self.load_data()
def load_data(self):
"""Load all stored data"""
try:
# Load face embeddings and worker data
if os.path.exists("data/workers.pkl"):
with open("data/workers.pkl", "rb") as f:
data = pickle.load(f)
self.known_face_embeddings = data.get("embeddings", [])
self.known_face_names = data.get("names", [])
self.known_face_ids = data.get("ids", [])
self.next_worker_id = data.get("next_id", 1)
# Load attendance records
if os.path.exists("data/attendance.json"):
with open("data/attendance.json", "r") as f:
self.attendance_records = json.load(f)
except Exception as e:
print(f"Error loading data: {e}")
self.known_face_embeddings = []
self.known_face_names = []
self.known_face_ids = []
self.attendance_records = []
self.next_worker_id = 1
def save_data(self):
"""Save all data to files"""
try:
# Save worker data
worker_data = {
"embeddings": self.known_face_embeddings,
"names": self.known_face_names,
"ids": self.known_face_ids,
"next_id": self.next_worker_id
}
with open("data/workers.pkl", "wb") as f:
pickle.dump(worker_data, f)
# Save attendance records
with open("data/attendance.json", "w") as f:
json.dump(self.attendance_records, f, indent=2)
except Exception as e:
print(f"Error saving data: {e}")
def register_worker_manual(self, image, name):
"""Manual worker registration"""
if image is None or not name.strip():
return "❌ Please provide both image and name!", self.get_registered_workers_info()
# Convert PIL image to RGB array
if isinstance(image, Image.Image):
image = np.array(image)
try:
# Verify the image contains a face
face_analysis = DeepFace.analyze(img_path=image, actions=['emotion'], enforce_detection=True, detector_backend='opencv')
# Get face embedding
embedding = DeepFace.represent(img_path=image, model_name='Facenet')[0]['embedding']
# Check if person already exists
name = name.strip().title()
if name in self.known_face_names:
return f"❌ {name} is already registered!", self.get_registered_workers_info()
# Generate new worker ID
worker_id = f"W{self.next_worker_id:04d}"
# Add the face embedding, name, and ID
self.known_face_embeddings.append(embedding)
self.known_face_names.append(name)
self.known_face_ids.append(worker_id)
self.next_worker_id += 1
# Save face image
face_image = Image.fromarray(image)
face_image.save(f"data/faces/{worker_id}_{name.replace(' ', '_')}.jpg")
self.save_data()
return f"βœ… {name} has been successfully registered with ID: {worker_id}!", self.get_registered_workers_info()
except ValueError as e:
if "Face could not be detected" in str(e):
return "❌ No face detected in the image! Please try again with a clear face image.", self.get_registered_workers_info()
return f"❌ Error processing image: {str(e)}", self.get_registered_workers_info()
except Exception as e:
return f"❌ Error during registration: {str(e)}", self.get_registered_workers_info()
def register_worker_auto(self, face_image):
"""Automatic worker registration for unrecognized faces"""
try:
# Generate new worker ID and name
worker_id = f"W{self.next_worker_id:04d}"
worker_name = f"Unknown_Worker_{self.next_worker_id}"
# Get face embedding
embedding = DeepFace.represent(img_path=face_image, model_name='Facenet')[0]['embedding']
# Add to database
self.known_face_embeddings.append(embedding)
self.known_face_names.append(worker_name)
self.known_face_ids.append(worker_id)
self.next_worker_id += 1
# Save face image
face_pil = Image.fromarray(cv2.cvtColor(face_image, cv2.COLOR_BGR2RGB))
face_pil.save(f"data/faces/{worker_id}_{worker_name}.jpg")
self.save_data()
return worker_id, worker_name
except Exception as e:
print(f"Error in auto registration: {e}")
return None, None
def mark_attendance(self, worker_id, worker_name):
"""Mark attendance for a worker"""
try:
today = date.today().isoformat()
current_time = datetime.now()
# Check if already marked today
already_marked = any(
record["worker_id"] == worker_id and record["date"] == today
for record in self.attendance_records
)
if not already_marked:
# Mark attendance
self.attendance_records.append({
"worker_id": worker_id,
"name": worker_name,
"date": today,
"time": current_time.strftime("%H:%M:%S"),
"timestamp": current_time.isoformat(),
"status": "Present",
"method": "Auto"
})
self.save_data()
return True
return False
except Exception as e:
print(f"Error marking attendance: {e}")
return False
def process_video_frame(self, frame):
"""Process a single video frame for face recognition"""
try:
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
# Find faces in the frame
face_objs = DeepFace.extract_faces(img_path=rgb_frame, target_size=(160, 160), enforce_detection=False, detector_backend='opencv')
current_time = time.time()
for face_obj in face_objs:
if face_obj['confidence'] > 0.9: # Only consider confident detections
face_area = face_obj['facial_area']
x, y, w, h = face_area['x'], face_area['y'], face_area['w'], face_area['h']
# Extract face image
face_image = frame[y:y+h, x:x+w]
try:
# Get face embedding
embedding = DeepFace.represent(img_path=face_image, model_name='Facenet')[0]['embedding']
worker_id = None
worker_name = "Unknown"
color = (0, 0, 255) # Red for unknown
# Compare with known faces
if len(self.known_face_embeddings) > 0:
# Calculate distances to known faces
distances = []
for known_embedding in self.known_face_embeddings:
distance = np.linalg.norm(np.array(embedding) - np.array(known_embedding))
distances.append(distance)
min_distance = min(distances)
best_match_index = distances.index(min_distance)
if min_distance < 10: # Threshold for recognition
worker_id = self.known_face_ids[best_match_index]
worker_name = self.known_face_names[best_match_index]
color = (0, 255, 0) # Green for known
# Check cooldown period
if worker_id not in self.last_recognition_time or \
current_time - self.last_recognition_time[worker_id] > self.recognition_cooldown:
# Mark attendance
if self.mark_attendance(worker_id, worker_name):
print(f"βœ… Attendance marked for {worker_name} ({worker_id})")
self.last_recognition_time[worker_id] = current_time
else:
# Unknown face - auto register
if face_image.size > 0:
new_id, new_name = self.register_worker_auto(face_image)
if new_id:
worker_id = new_id
worker_name = new_name
color = (255, 165, 0) # Orange for newly registered
print(f"πŸ†• New worker registered: {new_name} ({new_id})")
# Mark attendance for new worker
self.mark_attendance(worker_id, worker_name)
# Draw rectangle and label
cv2.rectangle(frame, (x, y), (x+w, y+h), color, 2)
cv2.rectangle(frame, (x, y+h - 35), (x+w, y+h), color, cv2.FILLED)
label = f"{worker_name}"
if worker_id:
label += f" ({worker_id})"
cv2.putText(frame, label, (x + 6, y+h - 6),
cv2.FONT_HERSHEY_DUPLEX, 0.6, (255, 255, 255), 1)
except Exception as e:
print(f"Error processing face: {e}")
continue
return frame
except Exception as e:
print(f"Error processing frame: {e}")
return frame
def start_video_stream(self, camera_source=0):
"""Start video streaming and recognition"""
try:
if self.is_streaming:
return "⚠️ Video stream is already running!"
self.video_capture = cv2.VideoCapture(camera_source)
if not self.video_capture.isOpened():
return "❌ Could not open camera/video source!"
self.is_streaming = True
def video_loop():
while self.is_streaming:
ret, frame = self.video_capture.read()
if not ret:
break
# Process frame for face recognition
processed_frame = self.process_video_frame(frame)
# Add to queue for display
if not self.frame_queue.full():
try:
self.frame_queue.put_nowait(processed_frame)
except queue.Full:
pass
time.sleep(0.1) # Limit processing rate
self.recognition_thread = threading.Thread(target=video_loop)
self.recognition_thread.daemon = True
self.recognition_thread.start()
return "βœ… Video stream started successfully!"
except Exception as e:
return f"❌ Error starting video stream: {e}"
def stop_video_stream(self):
"""Stop video streaming"""
try:
self.is_streaming = False
if self.video_capture:
self.video_capture.release()
self.video_capture = None
if self.recognition_thread:
self.recognition_thread.join(timeout=2)
# Clear frame queue
while not self.frame_queue.empty():
try:
self.frame_queue.get_nowait()
except queue.Empty:
break
return "βœ… Video stream stopped successfully!"
except Exception as e:
return f"❌ Error stopping video stream: {e}"
def get_current_frame(self):
"""Get current frame for display"""
try:
if not self.frame_queue.empty():
frame = self.frame_queue.get_nowait()
return frame
return None
except queue.Empty:
return None
def get_registered_workers_info(self):
"""Get information about registered workers"""
if not self.known_face_names:
return "No workers registered yet."
info = f"**Registered Workers ({len(self.known_face_names)}):**\n\n"
for i, (worker_id, name) in enumerate(zip(self.known_face_ids, self.known_face_names), 1):
info += f"{i}. **{name}** (ID: {worker_id})\n"
return info
def get_today_attendance(self):
"""Get today's attendance records"""
today = date.today().isoformat()
today_records = [r for r in self.attendance_records if r["date"] == today]
if not today_records:
return f"**Today's Attendance ({today}):**\n\nNo attendance marked yet."
info = f"**Today's Attendance ({today}):**\n\n"
for record in today_records:
method_icon = "πŸ€–" if record.get("method") == "Auto" else "πŸ‘€"
info += f"{method_icon} **{record['name']}** (ID: {record['worker_id']}) - {record['time']}\n"
return info
def get_attendance_report(self, start_date, end_date):
"""Generate attendance report for date range"""
if not start_date or not end_date:
return "Please select both start and end dates."
try:
# Validate date format
datetime.strptime(start_date, '%Y-%m-%d')
datetime.strptime(end_date, '%Y-%m-%d')
except ValueError:
return "Invalid date format. Please use YYYY-MM-DD."
# Filter records by date range
filtered_records = [
r for r in self.attendance_records
if start_date <= r["date"] <= end_date
]
if not filtered_records:
return f"No attendance records found between {start_date} and {end_date}."
# Create DataFrame for analysis
df = pd.DataFrame(filtered_records)
# Summary statistics
total_days = (pd.to_datetime(end_date) - pd.to_datetime(start_date)).days + 1
unique_workers = df['worker_id'].nunique()
total_attendances = len(df)
auto_registrations = len(df[df['method'] == 'Auto'])
report = f"**πŸ“Š Attendance Report ({start_date} to {end_date})**\n\n"
report += f"**Summary:**\n"
report += f"β€’ Total Days: {total_days}\n"
report += f"β€’ Unique Workers: {unique_workers}\n"
report += f"β€’ Total Attendances: {total_attendances}\n"
report += f"β€’ Auto Detections: {auto_registrations}\n\n"
# Individual attendance counts
if not df.empty:
attendance_counts = df.groupby(['worker_id', 'name']).size().reset_index(name='count')
report += f"**πŸ‘₯ Individual Attendance:**\n"
for _, row in attendance_counts.iterrows():
percentage = (row['count'] / total_days) * 100
report += f"β€’ **{row['name']}** ({row['worker_id']}): {row['count']} days ({percentage:.1f}%)\n"
return report
def export_attendance_csv(self):
"""Export attendance records to CSV"""
try:
if not self.attendance_records:
return None, "No attendance records to export."
df = pd.DataFrame(self.attendance_records)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
csv_file = f"attendance_report_{timestamp}.csv"
df.to_csv(csv_file, index=False)
return csv_file, f"βœ… Attendance exported to {csv_file}"
except Exception as e:
return None, f"❌ Error exporting data: {e}"
# Initialize the attendance system
attendance_system = AttendanceSystem()
def create_interface():
with gr.Blocks(
title="🎯 Advanced Attendance System with Live Recognition",
theme=gr.themes.Soft(),
css="""
.gradio-container {
max-width: 1400px !important;
}
.tab-nav {
font-weight: bold;
}
.status-box {
padding: 10px;
border-radius: 5px;
margin: 5px 0;
}
"""
) as demo:
gr.Markdown(
"""
# 🎯 Advanced Attendance System with Live Face Recognition
**Comprehensive facial recognition system with automatic worker registration and attendance tracking**
## πŸš€ **Key Features:**
- **πŸŽ₯ Live Video Stream Recognition** - Real-time face detection from camera/CCTV
- **πŸ€– Automatic Worker Registration** - Auto-register unknown faces with unique IDs
- **πŸ‘€ Manual Registration** - Register workers manually with photos
- **πŸ“… 24-Hour Attendance Rule** - One attendance mark per worker per day
- **πŸ“Š Advanced Analytics** - Detailed reports and data export
- **πŸ’Ύ Persistent Data Storage** - All data saved locally in `/data` folder
## πŸ“ **Data Storage Location:**
- **Worker Database:** `/data/workers.pkl`
- **Attendance Records:** `/data/attendance.json`
- **Face Images:** `/data/faces/` folder
"""
)
with gr.Tabs():
# Live Recognition Tab
with gr.Tab("πŸŽ₯ Live Recognition", elem_classes="tab-nav"):
gr.Markdown("### Real-time Face Recognition and Attendance")
with gr.Row():
with gr.Column(scale=1):
camera_source = gr.Number(
label="Camera Source (0 for default camera, or RTSP URL)",
value=0,
precision=0
)
with gr.Row():
start_stream_btn = gr.Button(
"πŸŽ₯ Start Live Recognition",
variant="primary",
size="lg"
)
stop_stream_btn = gr.Button(
"⏹️ Stop Stream",
variant="secondary",
size="lg"
)
stream_status = gr.Textbox(
label="Stream Status",
value="Ready to start...",
interactive=False,
lines=2
)
gr.Markdown(
"""
**πŸ“‹ Instructions:**
1. Click "Start Live Recognition" to begin
2. System will automatically detect and register new faces
3. Known workers will be marked present (once per day)
4. New workers get auto-assigned IDs (W0001, W0002, etc.)
**🎨 Color Coding:**
- 🟒 **Green:** Known worker (attendance marked)
- 🟠 **Orange:** New worker (auto-registered)
- πŸ”΄ **Red:** Face detected but processing
"""
)
with gr.Column(scale=1):
live_attendance_display = gr.Markdown(
value=attendance_system.get_today_attendance(),
label="Live Attendance Updates"
)
refresh_attendance_btn = gr.Button(
"πŸ”„ Refresh Attendance",
variant="secondary"
)
# Manual Registration Tab
with gr.Tab("πŸ‘€ Manual Registration", elem_classes="tab-nav"):
gr.Markdown("### Register Workers Manually")
with gr.Row():
with gr.Column(scale=1):
register_image = gr.Image(
label="Upload Worker's Photo",
type="pil",
height=300
)
register_name = gr.Textbox(
label="Worker's Full Name",
placeholder="Enter full name...",
lines=1
)
register_btn = gr.Button(
"πŸ‘€ Register Worker",
variant="primary",
size="lg"
)
with gr.Column(scale=1):
register_output = gr.Textbox(
label="Registration Status",
lines=3,
interactive=False
)
registered_workers_info = gr.Markdown(
value=attendance_system.get_registered_workers_info(),
label="Registered Workers Database"
)
# Reports & Analytics Tab
with gr.Tab("πŸ“Š Reports & Analytics", elem_classes="tab-nav"):
gr.Markdown("### Attendance Reports and Data Export")
with gr.Row():
with gr.Column():
gr.Markdown("#### πŸ“… Generate Report")
start_date = gr.Textbox(
label="Start Date (YYYY-MM-DD)",
value=date.today().replace(day=1).strftime('%Y-%m-%d')
)
end_date = gr.Textbox(
label="End Date (YYYY-MM-DD)",
value=date.today().strftime('%Y-%m-%d')
)
generate_report_btn = gr.Button(
"πŸ“Š Generate Report",
variant="primary"
)
gr.Markdown("#### πŸ’Ύ Export Data")
export_btn = gr.Button(
"πŸ“₯ Export to CSV",
variant="secondary"
)
export_status = gr.Textbox(
label="Export Status",
lines=2,
interactive=False
)
export_file = gr.File(
label="Download File",
visible=False
)
with gr.Column():
report_output = gr.Markdown(
value="Select date range and click 'Generate Report' to view attendance analytics.",
label="Attendance Report"
)
# System Info Tab
with gr.Tab("ℹ️ System Information", elem_classes="tab-nav"):
gr.Markdown(
"""
## πŸ“– System Guide
### πŸŽ₯ Live Recognition System
- **Camera Setup:** Use camera index (0, 1, 2...) or RTSP URL for IP cameras
- **Auto Registration:** Unknown faces automatically get worker IDs (W0001, W0002...)
- **24-Hour Rule:** Each worker can only be marked present once per day
- **Real-time Processing:** Continuous face detection and recognition
### πŸ‘€ Manual Registration
- Upload clear, front-facing photos for best results
- One face per image for registration
- Workers get unique IDs automatically assigned
### πŸ“ Data Storage Structure
```
/data/
β”œβ”€β”€ workers.pkl # Worker database (embeddings, names, IDs)
β”œβ”€β”€ attendance.json # All attendance records
└── faces/ # Saved face images
β”œβ”€β”€ W0001_John_Doe.jpg
β”œβ”€β”€ W0002_Jane_Smith.jpg
└── ...
```
### πŸ”§ Technical Features
- **Face Recognition:** Uses DeepFace with Facenet embeddings
- **Distance Threshold:** 10 for face matching accuracy
- **Threading:** Separate threads for video processing and UI
- **Queue Management:** Efficient frame processing with queue system
- **Error Handling:** Robust error handling and recovery
### 🚨 Troubleshooting
- **Camera Issues:** Check camera permissions and connections
- **Poor Recognition:** Ensure good lighting and clear face visibility
- **Performance:** Reduce video resolution for better performance
- **Storage:** Check disk space for face image storage
### πŸ”’ Privacy & Security
- All data stored locally in `/data` folder
- No external API calls or data transmission
- Face images saved securely with worker IDs
- Attendance records in JSON format for easy backup
"""
)
# Event handlers
start_stream_btn.click(
fn=attendance_system.start_video_stream,
inputs=[camera_source],
outputs=[stream_status]
)
stop_stream_btn.click(
fn=attendance_system.stop_video_stream,
outputs=[stream_status]
)
refresh_attendance_btn.click(
fn=attendance_system.get_today_attendance,
outputs=[live_attendance_display]
)
register_btn.click(
fn=attendance_system.register_worker_manual,
inputs=[register_image, register_name],
outputs=[register_output, registered_workers_info]
)
generate_report_btn.click(
fn=attendance_system.get_attendance_report,
inputs=[start_date, end_date],
outputs=[report_output]
)
def export_and_show():
file_path, status = attendance_system.export_attendance_csv()
if file_path:
return status, gr.update(visible=True, value=file_path)
else:
return status, gr.update(visible=False)
export_btn.click(
fn=export_and_show,
outputs=[export_status, export_file]
)
# Remove the problematic auto-refresh implementation
# Users will need to manually click the refresh button
return demo
# Create and launch the interface
if __name__ == "__main__":
demo = create_interface()
demo.launch(
server_name="0.0.0.0",
server_port=7860,
share=False,
show_error=True,
debug=True
)