Attendance / app.py
Sivainti's picture
Update app.py
f16032b verified
# Suppress TensorFlow oneDNN warning before any imports
import os
os.environ['TF_ENABLE_ONEDNN_OPTS'] = '0'
import gradio as gr
import cv2
import numpy as np
import pandas as pd
from datetime import datetime, date, timedelta
from deepface import DeepFace
import pickle
from io import BytesIO
import base64
from PIL import Image
import json
import threading
import time
import queue
import requests
from simple_salesforce import Salesforce
from dotenv import load_dotenv
# Load environment variables from .env file
load_dotenv()
# Hugging Face API configuration
HF_API_URL = "https://api-inference.huggingface.co/models/Salesforce/blip-image-captioning-base"
HF_API_TOKEN = os.getenv("HUGGINGFACE_API_TOKEN")
# Salesforce configuration
SF_USERNAME = os.getenv("smartlabour@attendance.system")
SF_PASSWORD = os.getenv("#Prashanth@123")
SF_SECURITY_TOKEN = os.getenv("pasQDqmWApzD0skgbv76gVgIs")
class AttendanceSystem:
def __init__(self):
self.known_face_embeddings = []
self.known_face_names = []
self.known_face_ids = []
self.attendance_records = []
self.next_worker_id = 1
self.video_capture = None
self.is_streaming = False
self.frame_queue = queue.Queue(maxsize=2)
self.recognition_thread = None
self.last_recognition_time = {}
self.recognition_cooldown = 5 # seconds between recognitions for same person
self.video_file_path = None
self.video_processing = False
# Initialize Salesforce connection
try:
self.sf = Salesforce(
username=SF_USERNAME,
password=SF_PASSWORD,
security_token=SF_SECURITY_TOKEN
)
except Exception as e:
print(f"Error connecting to Salesforce: {e}")
self.sf = None
# Create directories for data storage
os.makedirs("data", exist_ok=True)
os.makedirs("data/faces", exist_ok=True)
self.load_data()
def load_data(self):
"""Load all stored data"""
try:
# Load face embeddings and worker data
if os.path.exists("data/workers.pkl"):
with open("data/workers.pkl", "rb") as f:
data = pickle.load(f)
self.known_face_embeddings = data.get("embeddings", [])
self.known_face_names = data.get("names", [])
self.known_face_ids = data.get("ids", [])
self.next_worker_id = data.get("next_id", 1)
# Load attendance records
if os.path.exists("data/attendance.json"):
with open("data/attendance.json", "r") as f:
self.attendance_records = json.load(f)
except Exception as e:
print(f"Error loading data: {e}")
self.known_face_embeddings = []
self.known_face_names = []
self.known_face_ids = []
self.attendance_records = []
self.next_worker_id = 1
def save_data(self):
"""Save all data to files"""
try:
# Save worker data
worker_data = {
"embeddings": self.known_face_embeddings,
"names": self.known_face_names,
"ids": self.known_face_ids,
"next_id": self.next_worker_id
}
with open("data/workers.pkl", "wb") as f:
pickle.dump(worker_data, f)
# Save attendance records
with open("data/attendance.json", "w") as f:
json.dump(self.attendance_records, f, indent=2)
except Exception as e:
print(f"Error saving data: {e}")
def get_image_caption(self, image):
"""Generate image caption using Hugging Face API"""
try:
# Convert PIL image to bytes
if isinstance(image, Image.Image):
img_byte_arr = BytesIO()
image.save(img_byte_arr, format='JPEG')
img_data = img_byte_arr.getvalue()
# Make API request to Hugging Face
headers = {"Authorization": f"Bearer {HF_API_TOKEN}"}
response = requests.post(HF_API_URL, headers=headers, data=img_data)
if response.status_code == 200:
result = response.json()
if isinstance(result, list) and len(result) > 0:
return result[0].get("generated_text", "No caption generated")
return "No caption generated"
else:
print(f"Hugging Face API error: {response.status_code} - {response.text}")
return "Error generating caption"
except Exception as e:
print(f"Error in Hugging Face API call: {e}")
return "Error generating caption"
def register_worker_manual(self, image, name):
"""Manual worker registration with Hugging Face and Salesforce integration"""
if image is None or not name.strip():
return "❌ Please provide both image and name!", self.get_registered_workers_info()
# Convert PIL image to RGB array
if isinstance(image, Image.Image):
image_array = np.array(image)
try:
# Verify the image contains a face
face_analysis = DeepFace.analyze(img_path=image_array, actions=['emotion'], enforce_detection=True, detector_backend='opencv')
# Get face embedding
embedding = DeepFace.represent(img_path=image_array, model_name='Facenet')[0]['embedding']
# Check if person already exists
name = name.strip().title()
if name in self.known_face_names:
return f"❌ {name} is already registered!", self.get_registered_workers_info()
# Generate new worker ID
worker_id = f"W{self.next_worker_id:04d}"
# Generate image caption using Hugging Face
caption = self.get_image_caption(image)
# Add the face embedding, name, and ID
self.known_face_embeddings.append(embedding)
self.known_face_names.append(name)
self.known_face_ids.append(worker_id)
self.next_worker_id += 1
# Save face image
face_image = Image.fromarray(image_array)
face_image.save(f"data/faces/{worker_id}_{name.replace(' ', '_')}.jpg")
# Save to Salesforce
if self.sf:
try:
self.sf.Worker__c.create({
'Name': name,
'Worker_ID__c': worker_id,
'Face_Embedding__c': json.dumps(embedding),
'Image_Caption__c': caption
})
print(f"βœ… Worker {name} ({worker_id}) saved to Salesforce with caption: {caption}")
except Exception as e:
print(f"Error saving to Salesforce: {e}")
self.save_data()
return f"βœ… {name} has been successfully registered with ID: {worker_id}! Caption: {caption}", self.get_registered_workers_info()
except ValueError as e:
if "Face could not be detected" in str(e):
return "❌ No face detected in the image! Please try again with a clear face image.", self.get_registered_workers_info()
return f"❌ Error processing image: {str(e)}", self.get_registered_workers_info()
except Exception as e:
return f"❌ Error during registration: {str(e)}", self.get_registered_workers_info()
def register_worker_auto(self, face_image):
"""Automatic worker registration for unrecognized faces"""
try:
# Generate new worker ID and name
worker_id = f"W{self.next_worker_id:04d}"
worker_name = f"Unknown_Worker_{self.next_worker_id}"
# Get face embedding
embedding = DeepFace.represent(img_path=face_image, model_name='Facenet')[0]['embedding']
# Generate image caption
face_pil = Image.fromarray(cv2.cvtColor(face_image, cv2.COLOR_BGR2RGB))
caption = self.get_image_caption(face_pil)
# Add to database
self.known_face_embeddings.append(embedding)
self.known_face_names.append(worker_name)
self.known_face_ids.append(worker_id)
self.next_worker_id += 1
# Save face image
face_pil.save(f"data/faces/{worker_id}_{worker_name}.jpg")
# Save to Salesforce
if self.sf:
try:
self.sf.Worker__c.create({
'Name': worker_name,
'Worker_ID__c': worker_id,
'Face_Embedding__c': json.dumps(embedding),
'Image_Caption__c': caption
})
print(f"βœ… Worker {worker_name} ({worker_id}) saved to Salesforce with caption: {caption}")
except Exception as e:
print(f"Error saving to Salesforce: {e}")
self.save_data()
return worker_id, worker_name
except Exception as e:
print(f"Error in auto registration: {e}")
return None, None
def mark_attendance(self, worker_id, worker_name):
"""Mark attendance for a worker and save to Salesforce"""
try:
today = date.today().isoformat()
current_time = datetime.now()
# Check if already marked today
already_marked = any(
record["worker_id"] == worker_id and record["date"] == today
for record in self.attendance_records
)
if not already_marked:
# Create attendance record
attendance_record = {
"worker_id": worker_id,
"name": worker_name,
"date": today,
"time": current_time.strftime("%H:%M:%S"),
"timestamp": current_time.isoformat(),
"status": "Present",
"method": "Auto"
}
self.attendance_records.append(attendance_record)
# Save to Salesforce
if self.sf:
try:
self.sf.Attendance__c.create({
'Worker_ID__c': worker_id,
'Name__c': worker_name,
'Date__c': today,
'Time__c': current_time.strftime("%H:%M:%S"),
'Timestamp__c': current_time.isoformat(),
'Status__c': "Present",
'Method__c': "Auto"
})
print(f"βœ… Attendance for {worker_name} ({worker_id}) saved to Salesforce")
except Exception as e:
print(f"Error saving attendance to Salesforce: {e}")
self.save_data()
return True
return False
except Exception as e:
print(f"Error marking attendance: {e}")
return False
def process_video_frame(self, frame):
"""Process a single video frame for face recognition"""
try:
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
# Find faces in the frame
face_objs = DeepFace.extract_faces(img_path=rgb_frame, target_size=(160, 160), enforce_detection=False, detector_backend='opencv')
current_time = time.time()
for face_obj in face_objs:
if face_obj['confidence'] > 0.9: # Only consider confident detections
face_area = face_obj['facial_area']
x, y, w, h = face_area['x'], face_area['y'], face_area['w'], face_area['h']
# Extract face image
face_image = frame[y:y+h, x:x+w]
try:
# Get face embedding
embedding = DeepFace.represent(img_path=face_image, model_name='Facenet')[0]['embedding']
worker_id = None
worker_name = "Unknown"
color = (0, 0, 255) # Red for unknown
# Compare with known faces
if len(self.known_face_embeddings) > 0:
# Calculate distances to known faces
distances = []
for known_embedding in self.known_face_embeddings:
distance = np.linalg.norm(np.array(embedding) - np.array(known_embedding))
distances.append(distance)
min_distance = min(distances)
best_match_index = distances.index(min_distance)
if min_distance < 10: # Threshold for recognition
worker_id = self.known_face_ids[best_match_index]
worker_name = self.known_face_names[best_match_index]
color = (0, 255, 0) # Green for known
# Check cooldown period
if worker_id not in self.last_recognition_time or \
current_time - self.last_recognition_time[worker_id] > self.recognition_cooldown:
# Mark attendance
if self.mark_attendance(worker_id, worker_name):
print(f"βœ… Attendance marked for {worker_name} ({worker_id})")
self.last_recognition_time[worker_id] = current_time
else:
# Unknown face - auto register
if face_image.size > 0:
new_id, new_name = self.register_worker_auto(face_image)
if new_id:
worker_id = new_id
worker_name = new_name
color = (255, 165, 0) # Orange for newly registered
print(f"πŸ†• New worker registered: {new_name} ({new_id})")
# Mark attendance for new worker
self.mark_attendance(worker_id, worker_name)
# Draw rectangle and label
cv2.rectangle(frame, (x, y), (x+w, y+h), color, 2)
cv2.rectangle(frame, (x, y+h - 35), (x+w, y+h), color, cv2.FILLED)
label = f"{worker_name}"
if worker_id:
label += f" ({worker_id})"
cv2.putText(frame, label, (x + 6, y+h - 6),
cv2.FONT_HERSHEY_DUPLEX, 0.6, (255, 255, 255), 1)
except Exception as e:
print(f"Error processing face: {e}")
continue
return frame
except Exception as e:
print(f"Error processing frame: {e}")
return frame
def start_video_stream(self, camera_source=0):
"""Start video streaming and recognition"""
try:
if self.is_streaming:
return "⚠️ Video stream is already running!"
# Clear previous video file if switching from file to camera
self.video_file_path = None
self.video_capture = cv2.VideoCapture(camera_source)
if not self.video_capture.isOpened():
return "❌ Could not open camera/video source!"
self.is_streaming = True
def video_loop():
while self.is_streaming:
ret, frame = self.video_capture.read()
if not ret:
break
# Process frame for face recognition
processed_frame = self.process_video_frame(frame)
# Add to queue for display
if not self.frame_queue.full():
try:
self.frame_queue.put_nowait(processed_frame)
except queue.Full:
pass
time.sleep(0.1) # Limit processing rate
self.recognition_thread = threading.Thread(target=video_loop)
self.recognition_thread.daemon = True
self.recognition_thread.start()
return "βœ… Live camera stream started successfully!"
except Exception as e:
return f"❌ Error starting video stream: {e}"
def process_uploaded_video(self, video_path):
"""Process an uploaded video file for face recognition"""
try:
if self.is_streaming:
return "⚠️ Please stop current stream before processing a video file!"
if not os.path.exists(video_path):
return "❌ Video file not found!"
self.video_file_path = video_path
self.video_processing = True
def video_processing_loop():
cap = cv2.VideoCapture(video_path)
fps = cap.get(cv2.CAP_PROP_FPS)
frame_delay = 1.0 / fps if fps > 0 else 0.03
while self.video_processing and cap.isOpened():
ret, frame = cap.read()
if not ret:
break
# Process frame for face recognition
processed_frame = self.process_video_frame(frame)
# Add to queue for display
if not self.frame_queue.full():
try:
self.frame_queue.put_nowait(processed_frame)
except queue.Full:
pass
time.sleep(frame_delay)
cap.release()
self.video_processing = False
self.recognition_thread = threading.Thread(target=video_processing_loop)
self.recognition_thread.daemon = True
self.recognition_thread.start()
return f"βœ… Video processing started successfully! ({os.path.basename(video_path)})"
except Exception as e:
return f"❌ Error processing video: {e}"
def stop_video_stream(self):
"""Stop video streaming or processing"""
try:
self.is_streaming = False
self.video_processing = False
if self.video_capture:
self.video_capture.release()
self.video_capture = None
if self.recognition_thread:
self.recognition_thread.join(timeout=2)
# Clear frame queue
while not self.frame_queue.empty():
try:
self.frame_queue.get_nowait()
except queue.Empty:
break
return "βœ… Video stream/processing stopped successfully!"
except Exception as e:
return f"❌ Error stopping video: {e}"
def get_current_frame(self):
"""Get current frame for display"""
try:
if not self.frame_queue.empty():
frame = self.frame_queue.get_nowait()
return frame
return None
except queue.Empty:
return None
def get_registered_workers_info(self):
"""Get information about registered workers from Salesforce"""
if not self.sf:
return "❌ Salesforce connection not established."
try:
workers = self.sf.query_all("SELECT Name, Worker_ID__c, Image_Caption__c FROM Worker__c")['records']
if not workers:
return "No workers registered yet."
info = f"**Registered Workers ({len(workers)}):**\n\n"
for i, worker in enumerate(workers, 1):
info += f"{i}. **{worker['Name']}** (ID: {worker['Worker_ID__c']}) - Caption: {worker['Image_Caption__c'] or 'N/A'}\n"
return info
except Exception as e:
print(f"Error fetching workers from Salesforce: {e}")
return self._get_local_workers_info()
def _get_local_workers_info(self):
"""Fallback to local worker info if Salesforce query fails"""
if not self.known_face_names:
return "No workers registered yet."
info = f"**Registered Workers ({len(self.known_face_names)}):**\n\n"
for i, (worker_id, name) in enumerate(zip(self.known_face_ids, self.known_face_names), 1):
info += f"{i}. **{name}** (ID: {worker_id})\n"
return info
def get_today_attendance(self):
"""Get today's attendance records from Salesforce"""
if not self.sf:
return "❌ Salesforce connection not established."
today = date.today().isoformat()
try:
records = self.sf.query_all(
f"SELECT Name__c, Worker_ID__c, Time__c, Method__c FROM Attendance__c WHERE Date__c = '{today}'"
)['records']
if not records:
return f"**Today's Attendance ({today}):**\n\nNo attendance marked yet."
info = f"**Today's Attendance ({today}):**\n\n"
for record in records:
method_icon = "πŸ€–" if record['Method__c'] == "Auto" else "πŸ‘€"
info += f"{method_icon} **{record['Name__c']}** (ID: {record['Worker_ID__c']}) - {record['Time__c']}\n"
return info
except Exception as e:
print(f"Error fetching attendance from Salesforce: {e}")
return self._get_local_today_attendance()
def _get_local_today_attendance(self):
"""Fallback to local attendance records if Salesforce query fails"""
today = date.today().isoformat()
today_records = [r for r in self.attendance_records if r["date"] == today]
if not today_records:
return f"**Today's Attendance ({today}):**\n\nNo attendance marked yet."
info = f"**Today's Attendance ({today}):**\n\n"
for record in today_records:
method_icon = "πŸ€–" if record.get("method") == "Auto" else "πŸ‘€"
info += f"{method_icon} **{record['name']}** (ID: {record['worker_id']}) - {record['time']}\n"
return info
def get_attendance_report(self, start_date, end_date):
"""Generate attendance report for date range from Salesforce"""
if not start_date or not end_date:
return "Please select both start and end dates."
try:
# Validate date format
datetime.strptime(start_date, '%Y-%m-%d')
datetime.strptime(end_date, '%Y-%m-%d')
except ValueError:
return "Invalid date format. Please use YYYY-MM-DD."
if not self.sf:
return "❌ Salesforce connection not established."
try:
# Query Salesforce for attendance records
records = self.sf.query_all(
f"SELECT Worker_ID__c, Name__c, Date__c, Time__c, Method__c FROM Attendance__c "
f"WHERE Date__c >= '{start_date}' AND Date__c <= '{end_date}'"
)['records']
if not records:
return f"No attendance records found between {start_date} and {end_date}."
# Create DataFrame for analysis
df = pd.DataFrame(records)
# Summary statistics
total_days = (pd.to_datetime(end_date) - pd.to_datetime(start_date)).days + 1
unique_workers = df['Worker_ID__c'].nunique()
total_attendances = len(df)
auto_registrations = len(df[df['Method__c'] == 'Auto'])
report = f"**πŸ“Š Attendance Report ({start_date} to {end_date})**\n\n"
report += f"**Summary:**\n"
report += f"β€’ Total Days: {total_days}\n"
report += f"β€’ Unique Workers: {unique_workers}\n"
report += f"β€’ Total Attendances: {total_attendances}\n"
report += f"β€’ Auto Detections: {auto_registrations}\n\n"
# Individual attendance counts
if not df.empty:
attendance_counts = df.groupby(['Worker_ID__c', 'Name__c']).size().reset_index(name='count')
report += f"**πŸ‘₯ Individual Attendance:**\n"
for _, row in attendance_counts.iterrows():
percentage = (row['count'] / total_days) * 100
report += f"β€’ **{row['Name__c']}** ({row['Worker_ID__c']}): {row['count']} days ({percentage:.1f}%)\n"
return report
except Exception as e:
print(f"Error generating report from Salesforce: {e}")
return self._get_local_attendance_report(start_date, end_date)
def _get_local_attendance_report(self, start_date, end_date):
"""Fallback to local attendance report if Salesforce query fails"""
# Filter records by date range
filtered_records = [
r for r in self.attendance_records
if start_date <= r["date"] <= end_date
]
if not filtered_records:
return f"No attendance records found between {start_date} and {end_date}."
# Create DataFrame for analysis
df = pd.DataFrame(filtered_records)
# Summary statistics
total_days = (pd.to_datetime(end_date) - pd.to_datetime(start_date)).days + 1
unique_workers = df['worker_id'].nunique()
total_attendances = len(df)
auto_registrations = len(df[df['method'] == 'Auto'])
report = f"**πŸ“Š Attendance Report ({start_date} to {end_date})**\n\n"
report += f"**Summary:**\n"
report += f"β€’ Total Days: {total_days}\n"
report += f"β€’ Unique Workers: {unique_workers}\n"
report += f"β€’ Total Attendances: {total_attendances}\n"
report += f"β€’ Auto Detections: {auto_registrations}\n\n"
# Individual attendance counts
if not df.empty:
attendance_counts = df.groupby(['worker_id', 'name']).size().reset_index(name='count')
report += f"**πŸ‘₯ Individual Attendance:**\n"
for _, row in attendance_counts.iterrows():
percentage = (row['count'] / total_days) * 100
report += f"β€’ **{row['name']}** ({row['worker_id']}): {row['count']} days ({percentage:.1f}%)\n"
return report
def export_attendance_csv(self):
"""Export attendance records to CSV"""
try:
if not self.attendance_records:
return None, "No attendance records to export."
df = pd.DataFrame(self.attendance_records)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
csv_file = f"attendance_report_{timestamp}.csv"
df.to_csv(csv_file, index=False)
return csv_file, f"βœ… Attendance exported to {csv_file}"
except Exception as e:
return None, f"❌ Error exporting data: {e}"
# Initialize the attendance system
attendance_system = AttendanceSystem()
def create_interface():
with gr.Blocks(
title="🎯 Advanced Attendance System with Video Recognition",
theme=gr.themes.Soft(),
css="""
.gradio-container {
max-width: 1400px !important;
}
.tab-nav {
font-weight: bold;
}
.status-box {
padding: 10px;
border-radius: 5px;
margin: 5px 0;
}
.video-option-tabs {
margin-bottom: 15px;
}
"""
) as demo:
gr.Markdown(
"""
# 🎯 Advanced Attendance System with Face Recognition
**Comprehensive facial recognition system with live camera and video file processing, integrated with Hugging Face and Salesforce**
## πŸš€ **Key Features:**
- **πŸŽ₯ Live Camera Recognition** - Real-time face detection from camera/CCTV
- **πŸ“Ή Video File Processing** - Process pre-recorded videos for attendance
- **πŸ€– Automatic Worker Registration** - Auto-register unknown faces with unique IDs
- **πŸ‘€ Manual Registration** - Register workers manually with photos and AI-generated captions
- **πŸ“… 24-Hour Attendance Rule** - One attendance mark per worker per day
- **πŸ“Š Advanced Analytics** - Detailed reports and data export
- **πŸ€— Hugging Face Integration** - AI-powered image captioning
- **☁️ Salesforce Integration** - Store worker and attendance data in Salesforce
"""
)
with gr.Tabs():
# Video Recognition Tab
with gr.Tab("πŸŽ₯ Video Recognition", elem_classes="tab-nav"):
gr.Markdown("### Face Recognition from Live Camera or Video File")
with gr.Row():
with gr.Column(scale=1):
with gr.Tabs(selected="live", elem_classes="video-option-tabs") as video_tabs:
with gr.Tab("Live Camera", id="live"):
camera_source = gr.Number(
label="Camera Source (0 for default camera, or RTSP URL)",
value=0,
precision=0
)
with gr.Row():
start_stream_btn = gr.Button(
"πŸŽ₯ Start Live Recognition",
variant="primary",
size="lg"
)
with gr.Tab("Upload Video", id="upload"):
video_file = gr.Video(
label="Upload Video File",
sources=["upload"],
format="mp4"
)
with gr.Row():
process_video_btn = gr.Button(
"πŸ“Ή Process Video File",
variant="primary",
size="lg"
)
stop_stream_btn = gr.Button(
"⏹️ Stop Processing",
variant="stop",
size="lg"
)
stream_status = gr.Textbox(
label="Processing Status",
value="Ready to start...",
interactive=False,
lines=2
)
gr.Markdown(
"""
**πŸ“‹ Instructions:**
- **Live Camera:** Select camera source and click "Start Live Recognition"
- **Video File:** Upload a video file and click "Process Video File"
- Click "Stop Processing" to stop current session
**🎨 Color Coding:**
- 🟒 **Green:** Known worker (attendance marked)
- 🟠 **Orange:** New worker (auto-registered)
- πŸ”΄ **Red:** Face detected but processing
"""
)
with gr.Column(scale=1):
video_output = gr.Image(
label="Recognition Output",
streaming=True,
interactive=False
)
live_attendance_display = gr.Markdown(
value=attendance_system.get_today_attendance(),
label="Live Attendance Updates"
)
refresh_attendance_btn = gr.Button(
"πŸ”„ Refresh Attendance",
variant="secondary"
)
# Manual Registration Tab
with gr.Tab("πŸ‘€ Manual Registration", elem_classes="tab-nav"):
gr.Markdown("### Register Workers Manually")
with gr.Row():
with gr.Column(scale=1):
register_image = gr.Image(
label="Upload Worker's Photo",
type="pil",
height=300
)
register_name = gr.Textbox(
label="Worker's Full Name",
placeholder="Enter full name...",
lines=1
)
register_btn = gr.Button(
"πŸ‘€ Register Worker",
variant="primary",
size="lg"
)
with gr.Column(scale=1):
register_output = gr.Textbox(
label="Registration Status",
lines=3,
interactive=False
)
registered_workers_info = gr.Markdown(
value=attendance_system.get_registered_workers_info(),
label="Registered Workers Database"
)
# Reports & Analytics Tab
with gr.Tab("πŸ“Š Reports & Analytics", elem_classes="tab-nav"):
gr.Markdown("### Attendance Reports and Data Export")
with gr.Row():
with gr.Column():
gr.Markdown("#### πŸ“… Generate Report")
start_date = gr.Textbox(
label="Start Date (YYYY-MM-DD)",
value=date.today().replace(day=1).strftime('%Y-%m-%d')
)
end_date = gr.Textbox(
label="End Date (YYYY-MM-DD)",
value=date.today().strftime('%Y-%m-%d')
)
generate_report_btn = gr.Button(
"πŸ“Š Generate Report",
variant="primary"
)
gr.Markdown("#### πŸ’Ύ Export Data")
export_btn = gr.Button(
"πŸ“₯ Export to CSV",
variant="secondary"
)
export_status = gr.Textbox(
label="Export Status",
lines=2,
interactive=False
)
export_file = gr.File(
label="Download File",
visible=False
)
with gr.Column():
report_output = gr.Markdown(
value="Select date range and click 'Generate Report' to view attendance analytics.",
label="Attendance Report"
)
# Event handlers
start_stream_btn.click(
fn=attendance_system.start_video_stream,
inputs=[camera_source],
outputs=[stream_status]
)
process_video_btn.click(
fn=attendance_system.process_uploaded_video,
inputs=[video_file],
outputs=[stream_status]
)
stop_stream_btn.click(
fn=attendance_system.stop_video_stream,
outputs=[stream_status]
)
refresh_attendance_btn.click(
fn=attendance_system.get_today_attendance,
outputs=[live_attendance_display]
)
register_btn.click(
fn=attendance_system.register_worker_manual,
inputs=[register_image, register_name],
outputs=[register_output, registered_workers_info]
)
generate_report_btn.click(
fn=attendance_system.get_attendance_report,
inputs=[start_date, end_date],
outputs=[report_output]
)
def export_and_show():
file_path, status = attendance_system.export_attendance_csv()
if file_path:
return status, gr.update(visible=True, value=file_path)
else:
return status, gr.update(visible=False)
export_btn.click(
fn=export_and_show,
outputs=[export_status, export_file]
)
# Video frame update
def update_video_frame():
start_time = time.time()
while True:
current_time = time.time()
if current_time - start_time >= 0.03: # Update every 0.03 seconds
frame = attendance_system.get_current_frame()
if frame is not None:
# Convert BGR to RGB
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
return frame
start_time = current_time
time.sleep(0.01) # Small sleep to prevent busy-waiting
# Start the video frame update as a background thread
video_thread = threading.Thread(target=lambda: demo.queue()(update_video_frame)())
video_thread.daemon = True
video_thread.start()
return demo
# Create and launch the interface
if __name__ == "__main__":
demo = create_interface()
demo.launch(
server_name="0.0.0.0",
server_port=7860,
share=False,
show_error=True,
debug=True
)