PrashanthB461's picture
Update app.py
38e20bb verified
import os
import cv2
import numpy as np
import pandas as pd
from datetime import datetime
import gradio as gr
from PIL import Image, ImageDraw, ImageFont
from transformers import AutoModel, AutoFeatureExtractor
import torch
import faiss
import logging
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
import base64
import re
import pickle
import uuid
# --- Setup logging ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# --- AES Encryption Setup ---
ENCRYPTION_KEY = os.urandom(32) # 256-bit key for AES, should be securely stored in production
IV_LENGTH = 16 # AES block size in bytes
# --- Directory for saving reports ---
REPORT_DIR = "reports"
if not os.path.exists(REPORT_DIR):
try:
os.makedirs(REPORT_DIR)
except OSError as e:
logger.error(f"Failed to create reports directory: {e}")
raise
def encrypt_embedding(embedding):
"""Encrypts a face embedding using AES."""
try:
iv = os.urandom(IV_LENGTH)
cipher = Cipher(algorithms.AES(ENCRYPTION_KEY), modes.CBC(iv), backend=default_backend())
encryptor = cipher.encryptor()
embedding_bytes = embedding.tobytes()
padding_length = 16 - (len(embedding_bytes) % 16)
embedding_bytes += bytes([padding_length] * padding_length)
encrypted = encryptor.update(embedding_bytes) + encryptor.finalize()
return base64.b64encode(iv + encrypted).decode('utf-8')
except Exception as e:
logger.error(f"Error encrypting embedding: {e}")
return None
def decrypt_embedding(encrypted_embedding):
"""Decrypts a face embedding using AES."""
try:
encrypted_data = base64.b64decode(encrypted_embedding)
iv = encrypted_data[:IV_LENGTH]
encrypted = encrypted_data[IV_LENGTH:]
cipher = Cipher(algorithms.AES(ENCRYPTION_KEY), modes.CBC(iv), backend=default_backend())
decryptor = cipher.decryptor()
decrypted_padded = decryptor.update(encrypted) + decryptor.finalize()
padding_length = decrypted_padded[-1]
decrypted = decrypted_padded[:-padding_length]
return np.frombuffer(decrypted, dtype=np.float32)
except Exception as e:
logger.error(f"Error decrypting embedding: {e}")
return None
# --- Load Face Recognition Model (FaceNet-like) ---
try:
feature_extractor = AutoFeatureExtractor.from_pretrained("google/vit-base-patch16-224")
model = AutoModel.from_pretrained("google/vit-base-patch16-224")
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
model.eval()
logger.info(f"Face recognition model loaded on {device}")
except Exception as e:
logger.error(f"Fatal error loading model: {str(e)}")
raise
# --- Initialize FAISS Index and Worker Database ---
DIMENSION = 128 # Adjusted to match FaceNet's 128D embedding
index = faiss.IndexFlatL2(DIMENSION)
worker_db = {}
DB_FILE = "worker_db.pkl"
LOG_FILE = "attendance_logs.csv"
AUDIT_LOG_FILE = "audit_logs.csv"
# --- Load existing worker database ---
if os.path.exists(DB_FILE):
try:
with open(DB_FILE, "rb") as f:
worker_db = pickle.load(f)
if worker_db:
embeddings = np.array([decrypt_embedding(data["embedding"]) for data in worker_db.values() if decrypt_embedding(data["embedding"]) is not None]).astype(np.float32).reshape(-1, DIMENSION)
if embeddings.size > 0:
index.add(embeddings)
logger.info(f"Loaded and indexed {len(worker_db)} workers from database.")
except Exception as e:
logger.error(f"Failed to load worker database: {e}")
# --- Initialize attendance and audit logs ---
if not os.path.exists(LOG_FILE):
pd.DataFrame(columns=["worker_id", "first_name", "last_name", "timestamp", "camera_id", "verification_status"]).to_csv(LOG_FILE, index=False)
if not os.path.exists(AUDIT_LOG_FILE):
pd.DataFrame(columns=["timestamp", "action", "worker_id", "details"]).to_csv(AUDIT_LOG_FILE, index=False)
def log_audit(action, worker_id, details):
"""Logs audit trail for actions."""
audit_entry = {
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"action": action,
"worker_id": worker_id,
"details": details
}
try:
pd.DataFrame([audit_entry]).to_csv(AUDIT_LOG_FILE, mode="a", header=not os.path.exists(AUDIT_LOG_FILE), index=False)
logger.info(f"Audit log: {action} for worker {worker_id}")
except Exception as e:
logger.error(f"Failed to log audit entry: {e}")
def extract_embedding(face_image_pil):
"""Extracts facial embedding from a PIL image, mimicking FaceNet's 128D output."""
try:
inputs = feature_extractor(images=face_image_pil, return_tensors="pt").to(device)
with torch.no_grad():
outputs = model(**inputs)
embedding = outputs.last_hidden_state[:, 0, :].cpu().numpy().flatten()
if len(embedding) > 128:
embedding = embedding[:128]
elif len(embedding) < 128:
embedding = np.pad(embedding, (0, 128 - len(embedding)), mode='constant')
return embedding
except Exception as e:
logger.error(f"Error extracting embedding: {e}")
return None
def detect_faces(image_pil):
"""Detects faces in a PIL image and returns their coordinates."""
try:
frame = np.array(image_pil)
frame_bgr = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
gray = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2GRAY)
face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + "haarcascade_frontalface_default.xml")
faces = face_cascade.detectMultiScale(gray, scaleFactor=1.1, minNeighbors=5, minSize=(40, 40))
return faces
except Exception as e:
logger.error(f"Error detecting faces: {e}")
return []
def log_attendance(worker_id, camera_id="Gate_1"):
"""Logs attendance, preventing duplicate entries for the same day."""
today_str = datetime.now().strftime("%Y-%m-%d")
try:
logs_df = pd.read_csv(LOG_FILE)
daily_log = logs_df[(logs_df['worker_id'] == worker_id) & (pd.to_datetime(logs_df['timestamp']).dt.strftime('%Y-%m-%d') == today_str)]
if not daily_log.empty:
logger.info(f"Attendance already logged for worker {worker_id} today.")
return f"Attendance already logged for {worker_db[worker_id]['first_name']} {worker_db[worker_id]['last_name']} today."
except (FileNotFoundError, pd.errors.EmptyDataError):
pass
if worker_id not in worker_db:
logger.error(f"Worker ID {worker_id} not found in database.")
return "Error: Worker not registered."
worker_info = worker_db[worker_id]
log_entry = {
"worker_id": worker_id,
"first_name": worker_info['first_name'],
"last_name": worker_info['last_name'],
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"camera_id": camera_id,
"verification_status": worker_info['verification_status']
}
try:
pd.DataFrame([log_entry]).to_csv(LOG_FILE, mode="a", header=not os.path.exists(LOG_FILE), index=False)
log_audit("Attendance Logged", worker_id, f"Logged for {worker_info['first_name']} {worker_info['last_name']} at {camera_id}")
logger.info(f"Attendance logged for worker {worker_id}.")
return f"Attendance logged for {worker_info['first_name']} {worker_info['last_name']}."
except Exception as e:
logger.error(f"Failed to log attendance: {e}")
return "Error: Failed to log attendance."
def generate_daily_report():
"""Generates a daily summary report with full worker details, including in-time, and saves it as a downloadable file."""
today_str = datetime.now().strftime("%Y-%m-%d")
try:
logs_df = pd.read_csv(LOG_FILE)
daily_logs = logs_df[pd.to_datetime(logs_df['timestamp']).dt.strftime('%Y-%m-%d') == today_str]
if daily_logs.empty:
return "No attendance data available for today.", None
total_workers = len(daily_logs['worker_id'].unique())
verified_workers = len(daily_logs[daily_logs['verification_status'] == 'Verified']['worker_id'].unique())
unverified_workers = len(daily_logs[daily_logs['verification_status'] == 'Pending']['worker_id'].unique())
daily_wage = 500 # Example wage in INR
total_wages = total_workers * daily_wage
# Generate detailed worker list
worker_details = []
for _, row in daily_logs.iterrows():
worker_details.append(
f"{row['worker_id']} | "
f"{row['first_name']} {row['last_name']} | "
f"{row['timestamp']} | "
f"{row['verification_status']}"
)
details_section = "\n".join(worker_details) if worker_details else "No workers logged today."
report = f"""
Daily Attendance Report - {today_str}
===============================
Summary
-------------------------------
Total Unique Workers: {total_workers}
Verified Workers: {verified_workers}
Unverified Workers: {unverified_workers}
Total Wages (INR): {total_wages}
===============================
Worker Details
-------------------------------
Worker ID | Name | In-Time | Verification Status
-------------------------------
{details_section}
===============================
"""
if unverified_workers > 0:
report += "\nCompliance Risk: Unverified workers detected. Please review and verify."
# Save report to a file
report_filename = f"daily_report_{today_str}_{uuid.uuid4().hex[:12]}.txt"
report_filepath = os.path.join(REPORT_DIR, report_filename)
try:
with open(report_filepath, "w", encoding='utf-8') as f:
f.write(report)
logger.info(f"Report saved to {report_filepath}")
return report, report_filepath # Return file path for Gradio download
except Exception as e:
logger.error(f"Failed to save report: {e}")
return report, None
except (FileNotFoundError, pd.errors.EmptyDataError) as e:
logger.error(f"Error generating report: {e}")
return "No attendance data available for today.", None
def process_image_for_recognition(image_pil, camera_id="Gate_1"):
"""
Recognizes known workers in an image, logs their attendance, and draws bounding boxes.
Queues unrecognized faces for verification.
"""
if image_pil is None:
return None, "Please upload an image."
faces = detect_faces(image_pil)
if not len(faces):
return image_pil, "No faces were detected."
draw = ImageDraw.Draw(image_pil)
try:
font = ImageFont.truetype("arial.ttf", 15)
except IOError:
font = ImageFont.load_default()
recognition_results = []
processed_ids_this_run = set()
unverified_faces = []
for (x, y, w, h) in faces:
face_pil = image_pil.crop((x, y, x + w, y + h))
embedding = extract_embedding(face_pil)
if embedding is None:
continue
embedding_np = embedding.astype(np.float32).reshape(1, -1)
is_known_worker = False
if index.ntotal > 0:
distances, indices = index.search(embedding_np, 1)
similarity_score = 1 - (distances[0][0] / 2)
if similarity_score > 0.78:
worker_id = list(worker_db.keys())[indices[0][0]]
if worker_id in processed_ids_this_run:
continue
is_known_worker = True
processed_ids_this_run.add(worker_id)
worker_name = f"{worker_db[worker_id]['first_name']} {worker_db[worker_id]['last_name']}"
label = f"{worker_name} ({worker_id})"
draw.rectangle([(x, y), (x + w, y + h)], outline="green", width=3)
draw.text((x, y - 20), label, fill="green", font=font)
log_message = log_attendance(worker_id, camera_id)
recognition_results.append(f"Recognized: {label}. {log_message}")
if not is_known_worker:
unverified_id = f"U{len(unverified_faces) + 1:04d}"
unverified_faces.append({
"worker_id": unverified_id,
"first_name": "Unknown",
"last_name": "Person",
"embedding": encrypt_embedding(embedding),
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"camera_id": camera_id,
"verification_status": "Pending"
})
draw.rectangle([(x, y), (x + w, y + h)], outline="red", width=3)
draw.text((x, y - 20), "Unknown Person", fill="red", font=font)
recognition_results.append(f"Unknown Person Detected (ID: {unverified_id}). Please register via 'Manual Worker Registration' tab.")
if unverified_faces:
try:
pd.DataFrame(unverified_faces).to_csv(LOG_FILE, mode="a", header=not os.path.exists(LOG_FILE), index=False)
log_audit("Unverified Face Detected", "N/A", f"Detected {len(unverified_faces)} unverified faces")
except Exception as e:
logger.error(f"Failed to log unverified faces: {e}")
return image_pil, "\n".join(recognition_results)
def add_worker_manually(worker_id, first_name, last_name, image_pil, camera_id="Gate_1"):
"""Manually adds a verified worker with Worker ID, First Name, Last Name, and encrypted embedding."""
if not worker_id or not first_name or not last_name or image_pil is None:
return "Error: Worker ID, First Name, Last Name, and image cannot be empty."
if not worker_id.strip() or not first_name.strip() or not last_name.strip():
return "Error: Worker ID, First Name, and Last Name cannot be blank."
# Validate Worker ID format (alphanumeric, e.g., W0001)
if not re.match(r'^[A-Za-z0-9]+$', worker_id):
return "Error: Worker ID must be alphanumeric (e.g., W0001)."
if worker_id in worker_db:
return f"Error: Worker ID {worker_id} already exists."
faces = detect_faces(image_pil)
if len(faces) == 0:
return "No face detected in the image. Please use a clear, frontal photo."
if len(faces) > 1:
return "Multiple faces detected. Please upload an image with only one person."
x, y, w, h = faces[0]
face_pil = image_pil.crop((x, y, x + w, y + h))
embedding = extract_embedding(face_pil)
if embedding is None:
return "Failed to process face embedding. Please try another image."
if index.ntotal > 0:
embedding_np = embedding.astype(np.float32).reshape(1, -1)
distances, indices = index.search(embedding_np, 1)
similarity_score = 1 - (distances[0][0] / 2)
if similarity_score > 0.80:
existing_worker_id = list(worker_db.keys())[indices[0][0]]
existing_name = f"{worker_db[existing_worker_id]['first_name']} {worker_db[existing_worker_id]['last_name']}"
return f"This person seems to be already registered as '{existing_name}' (ID: {existing_worker_id}) with a similarity score of {similarity_score:.2f}."
encrypted_embedding = encrypt_embedding(embedding)
if encrypted_embedding is None:
return "Failed to encrypt embedding. Please try again."
worker_db[worker_id] = {
"first_name": first_name.strip(),
"last_name": last_name.strip(),
"embedding": encrypted_embedding,
"verification_status": "Verified",
"last_seen": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
try:
index.add(embedding.astype(np.float32).reshape(1, -1))
with open(DB_FILE, "wb") as f:
pickle.dump(worker_db, f)
log_audit("Worker Added", worker_id, f"Manually added worker {first_name} {last_name}")
logger.info(f"Manually added new worker {worker_id}: {first_name} {last_name}")
return f"Successfully added worker '{first_name} {last_name}' with ID: {worker_id}"
except Exception as e:
logger.error(f"Failed to add worker: {e}")
return "Error: Failed to add worker to system."
# --- Gradio UI Layout ---
with gr.Blocks(theme=gr.themes.Soft()) as demo:
gr.Markdown(
"""
# SLAV System - Smart Labour Attendance & Verification
**Workflow:**
1. **Attendance:** Use the "Face Recognition" tab. Upload an image to mark attendance for **known** workers.
2. **New Workers:** If a person is marked as "Unknown", go to the "Manual Worker Registration" tab to add them with their Worker ID, First Name, and Last Name.
3. **Reports:** View daily summaries and detailed worker logs in the "Daily Report" tab, with downloadable report files.
"""
)
with gr.Tabs():
with gr.TabItem("Face Recognition"):
with gr.Row():
with gr.Column(scale=1):
image_input = gr.Image(type="pil", label="Upload Image for Attendance", sources=["upload"])
camera_id_input = gr.Textbox(label="Camera ID", placeholder="e.g., Gate_1")
submit_button = gr.Button("Process Image", variant="primary")
with gr.Column(scale=2):
image_output = gr.Image(type="pil", label="Processed Image", interactive=False)
text_output = gr.Textbox(label="Recognition Results", lines=6, interactive=False)
submit_button.click(process_image_for_recognition, inputs=[image_input, camera_id_input], outputs=[image_output, text_output])
with gr.TabItem("Manual Worker Registration"):
gr.Markdown("## Add a New Verified Worker")
with gr.Row():
with gr.Column():
worker_id_input = gr.Textbox(label="Worker ID", placeholder="e.g., W0001")
worker_first_name_input = gr.Textbox(label="First Name", placeholder="e.g., John")
worker_last_name_input = gr.Textbox(label="Last Name", placeholder="e.g., Doe")
worker_image_input = gr.Image(type="pil", label="Upload Clear Face Image of Worker", sources=["upload"])
worker_camera_id_input = gr.Textbox(label="Camera ID", placeholder="e.g., Gate_1")
add_worker_button = gr.Button("Add Worker to System", variant="primary")
with gr.Column():
add_worker_output = gr.Textbox(label="Result", interactive=False)
add_worker_button.click(add_worker_manually, inputs=[worker_id_input, worker_first_name_input, worker_last_name_input, worker_image_input, worker_camera_id_input], outputs=add_worker_output)
with gr.TabItem("View Attendance Log"):
gr.Markdown("## Attendance Log")
log_display = gr.Dataframe(headers=["worker_id", "first_name", "last_name", "timestamp", "camera_id", "verification_status"], interactive=False, wrap=True)
def update_log_display():
if os.path.exists(LOG_FILE):
try:
df = pd.read_csv(LOG_FILE)
if not df.empty:
df['timestamp'] = pd.to_datetime(df['timestamp'], errors='coerce')
df = df.sort_values(by="timestamp", ascending=False).fillna('')
return df[["worker_id", "first_name", "last_name", "timestamp", "camera_id", "verification_status"]]
return pd.DataFrame(columns=["worker_id", "first_name", "last_name", "timestamp", "camera_id", "verification_status"])
except pd.errors.EmptyDataError:
return pd.DataFrame(columns=["worker_id", "first_name", "last_name", "timestamp", "camera_id", "verification_status"])
return pd.DataFrame(columns=["worker_id", "first_name", "last_name", "timestamp", "camera_id", "verification_status"])
refresh_button = gr.Button("Refresh Log")
refresh_button.click(update_log_display, None, log_display)
demo.load(update_log_display, None, log_display)
with gr.TabItem("Daily Report"):
gr.Markdown("## Daily Attendance Report")
report_output = gr.Textbox(label="Daily Summary", lines=20, interactive=False)
report_download = gr.File(label="Download Report", interactive=False)
report_button = gr.Button("Generate Daily Report")
report_button.click(generate_daily_report, None, [report_output, report_download])
if __name__ == "__main__":
demo.launch(server_name="0.0.0.0", server_port=7860, debug=True)