|
|
import os |
|
|
import cv2 |
|
|
import numpy as np |
|
|
import pandas as pd |
|
|
from datetime import datetime |
|
|
import gradio as gr |
|
|
from PIL import Image, ImageDraw, ImageFont |
|
|
from transformers import AutoModel, AutoFeatureExtractor |
|
|
import torch |
|
|
import faiss |
|
|
import logging |
|
|
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes |
|
|
from cryptography.hazmat.backends import default_backend |
|
|
import base64 |
|
|
import re |
|
|
import pickle |
|
|
import uuid |
|
|
|
|
|
|
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
ENCRYPTION_KEY = os.urandom(32) |
|
|
IV_LENGTH = 16 |
|
|
|
|
|
|
|
|
REPORT_DIR = "reports" |
|
|
if not os.path.exists(REPORT_DIR): |
|
|
try: |
|
|
os.makedirs(REPORT_DIR) |
|
|
except OSError as e: |
|
|
logger.error(f"Failed to create reports directory: {e}") |
|
|
raise |
|
|
|
|
|
def encrypt_embedding(embedding): |
|
|
"""Encrypts a face embedding using AES.""" |
|
|
try: |
|
|
iv = os.urandom(IV_LENGTH) |
|
|
cipher = Cipher(algorithms.AES(ENCRYPTION_KEY), modes.CBC(iv), backend=default_backend()) |
|
|
encryptor = cipher.encryptor() |
|
|
embedding_bytes = embedding.tobytes() |
|
|
padding_length = 16 - (len(embedding_bytes) % 16) |
|
|
embedding_bytes += bytes([padding_length] * padding_length) |
|
|
encrypted = encryptor.update(embedding_bytes) + encryptor.finalize() |
|
|
return base64.b64encode(iv + encrypted).decode('utf-8') |
|
|
except Exception as e: |
|
|
logger.error(f"Error encrypting embedding: {e}") |
|
|
return None |
|
|
|
|
|
def decrypt_embedding(encrypted_embedding): |
|
|
"""Decrypts a face embedding using AES.""" |
|
|
try: |
|
|
encrypted_data = base64.b64decode(encrypted_embedding) |
|
|
iv = encrypted_data[:IV_LENGTH] |
|
|
encrypted = encrypted_data[IV_LENGTH:] |
|
|
cipher = Cipher(algorithms.AES(ENCRYPTION_KEY), modes.CBC(iv), backend=default_backend()) |
|
|
decryptor = cipher.decryptor() |
|
|
decrypted_padded = decryptor.update(encrypted) + decryptor.finalize() |
|
|
padding_length = decrypted_padded[-1] |
|
|
decrypted = decrypted_padded[:-padding_length] |
|
|
return np.frombuffer(decrypted, dtype=np.float32) |
|
|
except Exception as e: |
|
|
logger.error(f"Error decrypting embedding: {e}") |
|
|
return None |
|
|
|
|
|
|
|
|
try: |
|
|
feature_extractor = AutoFeatureExtractor.from_pretrained("google/vit-base-patch16-224") |
|
|
model = AutoModel.from_pretrained("google/vit-base-patch16-224") |
|
|
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
|
|
model.to(device) |
|
|
model.eval() |
|
|
logger.info(f"Face recognition model loaded on {device}") |
|
|
except Exception as e: |
|
|
logger.error(f"Fatal error loading model: {str(e)}") |
|
|
raise |
|
|
|
|
|
|
|
|
DIMENSION = 128 |
|
|
index = faiss.IndexFlatL2(DIMENSION) |
|
|
worker_db = {} |
|
|
DB_FILE = "worker_db.pkl" |
|
|
LOG_FILE = "attendance_logs.csv" |
|
|
AUDIT_LOG_FILE = "audit_logs.csv" |
|
|
|
|
|
|
|
|
if os.path.exists(DB_FILE): |
|
|
try: |
|
|
with open(DB_FILE, "rb") as f: |
|
|
worker_db = pickle.load(f) |
|
|
if worker_db: |
|
|
embeddings = np.array([decrypt_embedding(data["embedding"]) for data in worker_db.values() if decrypt_embedding(data["embedding"]) is not None]).astype(np.float32).reshape(-1, DIMENSION) |
|
|
if embeddings.size > 0: |
|
|
index.add(embeddings) |
|
|
logger.info(f"Loaded and indexed {len(worker_db)} workers from database.") |
|
|
except Exception as e: |
|
|
logger.error(f"Failed to load worker database: {e}") |
|
|
|
|
|
|
|
|
if not os.path.exists(LOG_FILE): |
|
|
pd.DataFrame(columns=["worker_id", "first_name", "last_name", "timestamp", "camera_id", "verification_status"]).to_csv(LOG_FILE, index=False) |
|
|
|
|
|
if not os.path.exists(AUDIT_LOG_FILE): |
|
|
pd.DataFrame(columns=["timestamp", "action", "worker_id", "details"]).to_csv(AUDIT_LOG_FILE, index=False) |
|
|
|
|
|
def log_audit(action, worker_id, details): |
|
|
"""Logs audit trail for actions.""" |
|
|
audit_entry = { |
|
|
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), |
|
|
"action": action, |
|
|
"worker_id": worker_id, |
|
|
"details": details |
|
|
} |
|
|
try: |
|
|
pd.DataFrame([audit_entry]).to_csv(AUDIT_LOG_FILE, mode="a", header=not os.path.exists(AUDIT_LOG_FILE), index=False) |
|
|
logger.info(f"Audit log: {action} for worker {worker_id}") |
|
|
except Exception as e: |
|
|
logger.error(f"Failed to log audit entry: {e}") |
|
|
|
|
|
def extract_embedding(face_image_pil): |
|
|
"""Extracts facial embedding from a PIL image, mimicking FaceNet's 128D output.""" |
|
|
try: |
|
|
inputs = feature_extractor(images=face_image_pil, return_tensors="pt").to(device) |
|
|
with torch.no_grad(): |
|
|
outputs = model(**inputs) |
|
|
embedding = outputs.last_hidden_state[:, 0, :].cpu().numpy().flatten() |
|
|
if len(embedding) > 128: |
|
|
embedding = embedding[:128] |
|
|
elif len(embedding) < 128: |
|
|
embedding = np.pad(embedding, (0, 128 - len(embedding)), mode='constant') |
|
|
return embedding |
|
|
except Exception as e: |
|
|
logger.error(f"Error extracting embedding: {e}") |
|
|
return None |
|
|
|
|
|
def detect_faces(image_pil): |
|
|
"""Detects faces in a PIL image and returns their coordinates.""" |
|
|
try: |
|
|
frame = np.array(image_pil) |
|
|
frame_bgr = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR) |
|
|
gray = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2GRAY) |
|
|
face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + "haarcascade_frontalface_default.xml") |
|
|
faces = face_cascade.detectMultiScale(gray, scaleFactor=1.1, minNeighbors=5, minSize=(40, 40)) |
|
|
return faces |
|
|
except Exception as e: |
|
|
logger.error(f"Error detecting faces: {e}") |
|
|
return [] |
|
|
|
|
|
def log_attendance(worker_id, camera_id="Gate_1"): |
|
|
"""Logs attendance, preventing duplicate entries for the same day.""" |
|
|
today_str = datetime.now().strftime("%Y-%m-%d") |
|
|
try: |
|
|
logs_df = pd.read_csv(LOG_FILE) |
|
|
daily_log = logs_df[(logs_df['worker_id'] == worker_id) & (pd.to_datetime(logs_df['timestamp']).dt.strftime('%Y-%m-%d') == today_str)] |
|
|
if not daily_log.empty: |
|
|
logger.info(f"Attendance already logged for worker {worker_id} today.") |
|
|
return f"Attendance already logged for {worker_db[worker_id]['first_name']} {worker_db[worker_id]['last_name']} today." |
|
|
except (FileNotFoundError, pd.errors.EmptyDataError): |
|
|
pass |
|
|
|
|
|
if worker_id not in worker_db: |
|
|
logger.error(f"Worker ID {worker_id} not found in database.") |
|
|
return "Error: Worker not registered." |
|
|
|
|
|
worker_info = worker_db[worker_id] |
|
|
log_entry = { |
|
|
"worker_id": worker_id, |
|
|
"first_name": worker_info['first_name'], |
|
|
"last_name": worker_info['last_name'], |
|
|
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), |
|
|
"camera_id": camera_id, |
|
|
"verification_status": worker_info['verification_status'] |
|
|
} |
|
|
try: |
|
|
pd.DataFrame([log_entry]).to_csv(LOG_FILE, mode="a", header=not os.path.exists(LOG_FILE), index=False) |
|
|
log_audit("Attendance Logged", worker_id, f"Logged for {worker_info['first_name']} {worker_info['last_name']} at {camera_id}") |
|
|
logger.info(f"Attendance logged for worker {worker_id}.") |
|
|
return f"Attendance logged for {worker_info['first_name']} {worker_info['last_name']}." |
|
|
except Exception as e: |
|
|
logger.error(f"Failed to log attendance: {e}") |
|
|
return "Error: Failed to log attendance." |
|
|
|
|
|
def generate_daily_report(): |
|
|
"""Generates a daily summary report with full worker details, including in-time, and saves it as a downloadable file.""" |
|
|
today_str = datetime.now().strftime("%Y-%m-%d") |
|
|
try: |
|
|
logs_df = pd.read_csv(LOG_FILE) |
|
|
daily_logs = logs_df[pd.to_datetime(logs_df['timestamp']).dt.strftime('%Y-%m-%d') == today_str] |
|
|
if daily_logs.empty: |
|
|
return "No attendance data available for today.", None |
|
|
|
|
|
total_workers = len(daily_logs['worker_id'].unique()) |
|
|
verified_workers = len(daily_logs[daily_logs['verification_status'] == 'Verified']['worker_id'].unique()) |
|
|
unverified_workers = len(daily_logs[daily_logs['verification_status'] == 'Pending']['worker_id'].unique()) |
|
|
|
|
|
daily_wage = 500 |
|
|
total_wages = total_workers * daily_wage |
|
|
|
|
|
|
|
|
worker_details = [] |
|
|
for _, row in daily_logs.iterrows(): |
|
|
worker_details.append( |
|
|
f"{row['worker_id']} | " |
|
|
f"{row['first_name']} {row['last_name']} | " |
|
|
f"{row['timestamp']} | " |
|
|
f"{row['verification_status']}" |
|
|
) |
|
|
|
|
|
details_section = "\n".join(worker_details) if worker_details else "No workers logged today." |
|
|
|
|
|
report = f""" |
|
|
Daily Attendance Report - {today_str} |
|
|
=============================== |
|
|
Summary |
|
|
------------------------------- |
|
|
Total Unique Workers: {total_workers} |
|
|
Verified Workers: {verified_workers} |
|
|
Unverified Workers: {unverified_workers} |
|
|
Total Wages (INR): {total_wages} |
|
|
=============================== |
|
|
Worker Details |
|
|
------------------------------- |
|
|
Worker ID | Name | In-Time | Verification Status |
|
|
------------------------------- |
|
|
{details_section} |
|
|
=============================== |
|
|
""" |
|
|
if unverified_workers > 0: |
|
|
report += "\nCompliance Risk: Unverified workers detected. Please review and verify." |
|
|
|
|
|
|
|
|
report_filename = f"daily_report_{today_str}_{uuid.uuid4().hex[:12]}.txt" |
|
|
report_filepath = os.path.join(REPORT_DIR, report_filename) |
|
|
try: |
|
|
with open(report_filepath, "w", encoding='utf-8') as f: |
|
|
f.write(report) |
|
|
logger.info(f"Report saved to {report_filepath}") |
|
|
return report, report_filepath |
|
|
except Exception as e: |
|
|
logger.error(f"Failed to save report: {e}") |
|
|
return report, None |
|
|
except (FileNotFoundError, pd.errors.EmptyDataError) as e: |
|
|
logger.error(f"Error generating report: {e}") |
|
|
return "No attendance data available for today.", None |
|
|
|
|
|
def process_image_for_recognition(image_pil, camera_id="Gate_1"): |
|
|
""" |
|
|
Recognizes known workers in an image, logs their attendance, and draws bounding boxes. |
|
|
Queues unrecognized faces for verification. |
|
|
""" |
|
|
if image_pil is None: |
|
|
return None, "Please upload an image." |
|
|
|
|
|
faces = detect_faces(image_pil) |
|
|
if not len(faces): |
|
|
return image_pil, "No faces were detected." |
|
|
|
|
|
draw = ImageDraw.Draw(image_pil) |
|
|
try: |
|
|
font = ImageFont.truetype("arial.ttf", 15) |
|
|
except IOError: |
|
|
font = ImageFont.load_default() |
|
|
|
|
|
recognition_results = [] |
|
|
processed_ids_this_run = set() |
|
|
unverified_faces = [] |
|
|
|
|
|
for (x, y, w, h) in faces: |
|
|
face_pil = image_pil.crop((x, y, x + w, y + h)) |
|
|
embedding = extract_embedding(face_pil) |
|
|
if embedding is None: |
|
|
continue |
|
|
|
|
|
embedding_np = embedding.astype(np.float32).reshape(1, -1) |
|
|
|
|
|
is_known_worker = False |
|
|
if index.ntotal > 0: |
|
|
distances, indices = index.search(embedding_np, 1) |
|
|
similarity_score = 1 - (distances[0][0] / 2) |
|
|
|
|
|
if similarity_score > 0.78: |
|
|
worker_id = list(worker_db.keys())[indices[0][0]] |
|
|
|
|
|
if worker_id in processed_ids_this_run: |
|
|
continue |
|
|
|
|
|
is_known_worker = True |
|
|
processed_ids_this_run.add(worker_id) |
|
|
|
|
|
worker_name = f"{worker_db[worker_id]['first_name']} {worker_db[worker_id]['last_name']}" |
|
|
label = f"{worker_name} ({worker_id})" |
|
|
|
|
|
draw.rectangle([(x, y), (x + w, y + h)], outline="green", width=3) |
|
|
draw.text((x, y - 20), label, fill="green", font=font) |
|
|
|
|
|
log_message = log_attendance(worker_id, camera_id) |
|
|
recognition_results.append(f"Recognized: {label}. {log_message}") |
|
|
|
|
|
if not is_known_worker: |
|
|
unverified_id = f"U{len(unverified_faces) + 1:04d}" |
|
|
unverified_faces.append({ |
|
|
"worker_id": unverified_id, |
|
|
"first_name": "Unknown", |
|
|
"last_name": "Person", |
|
|
"embedding": encrypt_embedding(embedding), |
|
|
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), |
|
|
"camera_id": camera_id, |
|
|
"verification_status": "Pending" |
|
|
}) |
|
|
draw.rectangle([(x, y), (x + w, y + h)], outline="red", width=3) |
|
|
draw.text((x, y - 20), "Unknown Person", fill="red", font=font) |
|
|
recognition_results.append(f"Unknown Person Detected (ID: {unverified_id}). Please register via 'Manual Worker Registration' tab.") |
|
|
|
|
|
if unverified_faces: |
|
|
try: |
|
|
pd.DataFrame(unverified_faces).to_csv(LOG_FILE, mode="a", header=not os.path.exists(LOG_FILE), index=False) |
|
|
log_audit("Unverified Face Detected", "N/A", f"Detected {len(unverified_faces)} unverified faces") |
|
|
except Exception as e: |
|
|
logger.error(f"Failed to log unverified faces: {e}") |
|
|
|
|
|
return image_pil, "\n".join(recognition_results) |
|
|
|
|
|
def add_worker_manually(worker_id, first_name, last_name, image_pil, camera_id="Gate_1"): |
|
|
"""Manually adds a verified worker with Worker ID, First Name, Last Name, and encrypted embedding.""" |
|
|
if not worker_id or not first_name or not last_name or image_pil is None: |
|
|
return "Error: Worker ID, First Name, Last Name, and image cannot be empty." |
|
|
if not worker_id.strip() or not first_name.strip() or not last_name.strip(): |
|
|
return "Error: Worker ID, First Name, and Last Name cannot be blank." |
|
|
|
|
|
|
|
|
if not re.match(r'^[A-Za-z0-9]+$', worker_id): |
|
|
return "Error: Worker ID must be alphanumeric (e.g., W0001)." |
|
|
|
|
|
if worker_id in worker_db: |
|
|
return f"Error: Worker ID {worker_id} already exists." |
|
|
|
|
|
faces = detect_faces(image_pil) |
|
|
if len(faces) == 0: |
|
|
return "No face detected in the image. Please use a clear, frontal photo." |
|
|
if len(faces) > 1: |
|
|
return "Multiple faces detected. Please upload an image with only one person." |
|
|
|
|
|
x, y, w, h = faces[0] |
|
|
face_pil = image_pil.crop((x, y, x + w, y + h)) |
|
|
embedding = extract_embedding(face_pil) |
|
|
if embedding is None: |
|
|
return "Failed to process face embedding. Please try another image." |
|
|
|
|
|
if index.ntotal > 0: |
|
|
embedding_np = embedding.astype(np.float32).reshape(1, -1) |
|
|
distances, indices = index.search(embedding_np, 1) |
|
|
similarity_score = 1 - (distances[0][0] / 2) |
|
|
if similarity_score > 0.80: |
|
|
existing_worker_id = list(worker_db.keys())[indices[0][0]] |
|
|
existing_name = f"{worker_db[existing_worker_id]['first_name']} {worker_db[existing_worker_id]['last_name']}" |
|
|
return f"This person seems to be already registered as '{existing_name}' (ID: {existing_worker_id}) with a similarity score of {similarity_score:.2f}." |
|
|
|
|
|
encrypted_embedding = encrypt_embedding(embedding) |
|
|
if encrypted_embedding is None: |
|
|
return "Failed to encrypt embedding. Please try again." |
|
|
|
|
|
worker_db[worker_id] = { |
|
|
"first_name": first_name.strip(), |
|
|
"last_name": last_name.strip(), |
|
|
"embedding": encrypted_embedding, |
|
|
"verification_status": "Verified", |
|
|
"last_seen": datetime.now().strftime("%Y-%m-%d %H:%M:%S") |
|
|
} |
|
|
try: |
|
|
index.add(embedding.astype(np.float32).reshape(1, -1)) |
|
|
with open(DB_FILE, "wb") as f: |
|
|
pickle.dump(worker_db, f) |
|
|
log_audit("Worker Added", worker_id, f"Manually added worker {first_name} {last_name}") |
|
|
logger.info(f"Manually added new worker {worker_id}: {first_name} {last_name}") |
|
|
return f"Successfully added worker '{first_name} {last_name}' with ID: {worker_id}" |
|
|
except Exception as e: |
|
|
logger.error(f"Failed to add worker: {e}") |
|
|
return "Error: Failed to add worker to system." |
|
|
|
|
|
|
|
|
with gr.Blocks(theme=gr.themes.Soft()) as demo: |
|
|
gr.Markdown( |
|
|
""" |
|
|
# SLAV System - Smart Labour Attendance & Verification |
|
|
**Workflow:** |
|
|
1. **Attendance:** Use the "Face Recognition" tab. Upload an image to mark attendance for **known** workers. |
|
|
2. **New Workers:** If a person is marked as "Unknown", go to the "Manual Worker Registration" tab to add them with their Worker ID, First Name, and Last Name. |
|
|
3. **Reports:** View daily summaries and detailed worker logs in the "Daily Report" tab, with downloadable report files. |
|
|
""" |
|
|
) |
|
|
with gr.Tabs(): |
|
|
with gr.TabItem("Face Recognition"): |
|
|
with gr.Row(): |
|
|
with gr.Column(scale=1): |
|
|
image_input = gr.Image(type="pil", label="Upload Image for Attendance", sources=["upload"]) |
|
|
camera_id_input = gr.Textbox(label="Camera ID", placeholder="e.g., Gate_1") |
|
|
submit_button = gr.Button("Process Image", variant="primary") |
|
|
with gr.Column(scale=2): |
|
|
image_output = gr.Image(type="pil", label="Processed Image", interactive=False) |
|
|
text_output = gr.Textbox(label="Recognition Results", lines=6, interactive=False) |
|
|
submit_button.click(process_image_for_recognition, inputs=[image_input, camera_id_input], outputs=[image_output, text_output]) |
|
|
|
|
|
with gr.TabItem("Manual Worker Registration"): |
|
|
gr.Markdown("## Add a New Verified Worker") |
|
|
with gr.Row(): |
|
|
with gr.Column(): |
|
|
worker_id_input = gr.Textbox(label="Worker ID", placeholder="e.g., W0001") |
|
|
worker_first_name_input = gr.Textbox(label="First Name", placeholder="e.g., John") |
|
|
worker_last_name_input = gr.Textbox(label="Last Name", placeholder="e.g., Doe") |
|
|
worker_image_input = gr.Image(type="pil", label="Upload Clear Face Image of Worker", sources=["upload"]) |
|
|
worker_camera_id_input = gr.Textbox(label="Camera ID", placeholder="e.g., Gate_1") |
|
|
add_worker_button = gr.Button("Add Worker to System", variant="primary") |
|
|
with gr.Column(): |
|
|
add_worker_output = gr.Textbox(label="Result", interactive=False) |
|
|
add_worker_button.click(add_worker_manually, inputs=[worker_id_input, worker_first_name_input, worker_last_name_input, worker_image_input, worker_camera_id_input], outputs=add_worker_output) |
|
|
|
|
|
with gr.TabItem("View Attendance Log"): |
|
|
gr.Markdown("## Attendance Log") |
|
|
log_display = gr.Dataframe(headers=["worker_id", "first_name", "last_name", "timestamp", "camera_id", "verification_status"], interactive=False, wrap=True) |
|
|
|
|
|
def update_log_display(): |
|
|
if os.path.exists(LOG_FILE): |
|
|
try: |
|
|
df = pd.read_csv(LOG_FILE) |
|
|
if not df.empty: |
|
|
df['timestamp'] = pd.to_datetime(df['timestamp'], errors='coerce') |
|
|
df = df.sort_values(by="timestamp", ascending=False).fillna('') |
|
|
return df[["worker_id", "first_name", "last_name", "timestamp", "camera_id", "verification_status"]] |
|
|
return pd.DataFrame(columns=["worker_id", "first_name", "last_name", "timestamp", "camera_id", "verification_status"]) |
|
|
except pd.errors.EmptyDataError: |
|
|
return pd.DataFrame(columns=["worker_id", "first_name", "last_name", "timestamp", "camera_id", "verification_status"]) |
|
|
return pd.DataFrame(columns=["worker_id", "first_name", "last_name", "timestamp", "camera_id", "verification_status"]) |
|
|
|
|
|
refresh_button = gr.Button("Refresh Log") |
|
|
refresh_button.click(update_log_display, None, log_display) |
|
|
demo.load(update_log_display, None, log_display) |
|
|
|
|
|
with gr.TabItem("Daily Report"): |
|
|
gr.Markdown("## Daily Attendance Report") |
|
|
report_output = gr.Textbox(label="Daily Summary", lines=20, interactive=False) |
|
|
report_download = gr.File(label="Download Report", interactive=False) |
|
|
report_button = gr.Button("Generate Daily Report") |
|
|
report_button.click(generate_daily_report, None, [report_output, report_download]) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
demo.launch(server_name="0.0.0.0", server_port=7860, debug=True) |