| import os |
| import cv2 |
| import numpy as np |
| import pandas as pd |
| from datetime import datetime |
| import gradio as gr |
| from PIL import Image, ImageDraw, ImageFont |
| from transformers import TFAutoModel, AutoFeatureExtractor |
| import tensorflow as tf |
| import faiss |
| import logging |
| from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes |
| from cryptography.hazmat.backends import default_backend |
| import base64 |
| import re |
| import pickle |
| import uuid |
|
|
| |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') |
| logger = logging.getLogger(__name__) |
|
|
| |
| ENCRYPTION_KEY = os.urandom(32) |
| IV_LENGTH = 16 |
|
|
| |
| REPORT_DIR = "reports" |
| if not os.path.exists(REPORT_DIR): |
| try: |
| os.makedirs(REPORT_DIR) |
| except OSError as e: |
| logger.error(f"Failed to create reports directory: {e}") |
| raise |
|
|
| def encrypt_embedding(embedding): |
| """Encrypts a face embedding using AES.""" |
| try: |
| iv = os.urandom(IV_LENGTH) |
| cipher = Cipher(algorithms.AES(ENCRYPTION_KEY), modes.CBC(iv), backend=default_backend()) |
| encryptor = cipher.encryptor() |
| embedding_bytes = embedding.tobytes() |
| padding_length = 16 - (len(embedding_bytes) % 16) |
| embedding_bytes += bytes([padding_length] * padding_length) |
| encrypted = encryptor.update(embedding_bytes) + encryptor.finalize() |
| return base64.b64encode(iv + encrypted).decode('utf-8') |
| except Exception as e: |
| logger.error(f"Error encrypting embedding: {e}") |
| return None |
|
|
| def decrypt_embedding(encrypted_embedding): |
| """Decrypts a face embedding using AES.""" |
| try: |
| encrypted_data = base64.b64decode(encrypted_embedding) |
| iv = encrypted_data[:IV_LENGTH] |
| encrypted = encrypted_data[IV_LENGTH:] |
| cipher = Cipher(algorithms.AES(ENCRYPTION_KEY), modes.CBC(iv), backend=default_backend()) |
| decryptor = cipher.decryptor() |
| decrypted_padded = decryptor.update(encrypted) + decryptor.finalize() |
| padding_length = decrypted_padded[-1] |
| decrypted = decrypted_padded[:-padding_length] |
| return np.frombuffer(decrypted, dtype=np.float32) |
| except Exception as e: |
| logger.error(f"Error decrypting embedding: {e}") |
| return None |
|
|
| |
| try: |
| feature_extractor = AutoFeatureExtractor.from_pretrained("google/vit-base-patch16-224") |
| model = TFAutoModel.from_pretrained("google/vit-base-patch16-224") |
| device = "/cpu:0" |
| logger.info(f"Face recognition model loaded on {device}") |
| except Exception as e: |
| logger.error(f"Fatal error loading model: {str(e)}") |
| raise |
|
|
| |
| DIMENSION = 128 |
| index = faiss.IndexFlatL2(DIMENSION) |
| worker_db = {} |
| DB_FILE = "worker_db.pkl" |
| LOG_FILE = "attendance_logs.csv" |
| AUDIT_LOG_FILE = "audit_logs.csv" |
|
|
| |
| if os.path.exists(DB_FILE): |
| try: |
| with open(DB_FILE, "rb") as f: |
| worker_db = pickle.load(f) |
| if worker_db: |
| embeddings = np.array([decrypt_embedding(data["embedding"]) for data in worker_db.values() if decrypt_embedding(data["embedding"]) is not None]).astype(np.float32).reshape(-1, DIMENSION) |
| if embeddings.size > 0: |
| index.add(embeddings) |
| logger.info(f"Loaded and indexed {len(worker_db)} workers from database.") |
| except Exception as e: |
| logger.error(f"Failed to load worker database: {e}") |
|
|
| |
| if not os.path.exists(LOG_FILE): |
| pd.DataFrame(columns=["worker_id", "first_name", "last_name", "timestamp", "camera_id", "verification_status"]).to_csv(LOG_FILE, index=False) |
|
|
| if not os.path.exists(AUDIT_LOG_FILE): |
| pd.DataFrame(columns=["timestamp", "action", "worker_id", "details"]).to_csv(AUDIT_LOG_FILE, index=False) |
|
|
| def log_audit(action, worker_id, details): |
| """Logs audit trail for actions.""" |
| audit_entry = { |
| "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), |
| "action": action, |
| "worker_id": worker_id, |
| "details": details |
| } |
| try: |
| pd.DataFrame([audit_entry]).to_csv(AUDIT_LOG_FILE, mode="a", header=not os.path.exists(AUDIT_LOG_FILE), index=False) |
| logger.info(f"Audit log: {action} for worker {worker_id}") |
| except Exception as e: |
| logger.error(f"Failed to log audit entry: {e}") |
|
|
| def extract_embedding(face_image_pil): |
| """Extracts facial embedding from a PIL image using TensorFlow ViT.""" |
| try: |
| inputs = feature_extractor(images=face_image_pil, return_tensors="tf") |
| outputs = model(inputs, training=False) |
| embedding = outputs.last_hidden_state[:, 0, :].numpy().flatten() |
| if len(embedding) > 128: |
| embedding = embedding[:128] |
| elif len(embedding) < 128: |
| embedding = np.pad(embedding, (0, 128 - len(embedding)), mode='constant') |
| return embedding |
| except Exception as e: |
| logger.error(f"Error extracting embedding: {e}") |
| return None |
|
|
| def detect_faces(image_pil): |
| """Detects faces in a PIL image and returns their coordinates.""" |
| try: |
| frame = np.array(image_pil) |
| frame_bgr = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR) |
| gray = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2GRAY) |
| face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + "haarcascade_frontalface_default.xml") |
| faces = face_cascade.detectMultiScale(gray, scaleFactor=1.1, minNeighbors=5, minSize=(40, 40)) |
| return faces |
| except Exception as e: |
| logger.error(f"Error detecting faces: {e}") |
| return [] |
|
|
| def log_attendance(worker_id, camera_id="Gate_1"): |
| """Logs attendance, preventing duplicate entries for the same day.""" |
| today_str = datetime.now().strftime("%Y-%m-%d") |
| try: |
| logs_df = pd.read_csv(LOG_FILE) |
| daily_log = logs_df[(logs_df['worker_id'] == worker_id) & (pd.to_datetime(logs_df['timestamp']).dt.strftime('%Y-%m-%d') == today_str)] |
| if not daily_log.empty: |
| logger.info(f"Attendance already logged for worker {worker_id} today.") |
| return f"Attendance already logged for {worker_db[worker_id]['first_name']} {worker_db[worker_id]['last_name']} today." |
| except (FileNotFoundError, pd.errors.EmptyDataError): |
| pass |
|
|
| if worker_id not in worker_db: |
| logger.error(f"Worker ID {worker_id} not found in database.") |
| return "Error: Worker not registered." |
|
|
| worker_info = worker_db[worker_id] |
| log_entry = { |
| "worker_id": worker_id, |
| "first_name": worker_info['first_name'], |
| "last_name": worker_info['last_name'], |
| "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), |
| "camera_id": camera_id, |
| "verification_status": worker_info['verification_status'] |
| } |
| try: |
| pd.DataFrame([log_entry]).to_csv(LOG_FILE, mode="a", header=not os.path.exists(LOG_FILE), index=False) |
| log_audit("Attendance Logged", worker_id, f"Logged for {worker_info['first_name']} {worker_info['last_name']} at {camera_id}") |
| logger.info(f"Attendance logged for worker {worker_id}.") |
| return f"Attendance logged for {worker_info['first_name']} {worker_info['last_name']}." |
| except Exception as e: |
| logger.error(f"Failed to log attendance: {e}") |
| return "Error: Failed to log attendance." |
|
|
| def generate_daily_report(): |
| """Generates a daily summary report with full worker details, including in-time, and saves it as a downloadable file.""" |
| today_str = datetime.now().strftime("%Y-%m-%d") |
| try: |
| logs_df = pd.read_csv(LOG_FILE) |
| daily_logs = logs_df[pd.to_datetime(logs_df['timestamp']).dt.strftime('%Y-%m-%d') == today_str] |
| if daily_logs.empty: |
| return "No attendance data available for today.", None |
|
|
| total_workers = len(daily_logs['worker_id'].unique()) |
| verified_workers = len(daily_logs[daily_logs['verification_status'] == 'Verified']['worker_id'].unique()) |
| unverified_workers = len(daily_logs[daily_logs['verification_status'] == 'Pending']['worker_id'].unique()) |
| |
| daily_wage = 500 |
| total_wages = total_workers * daily_wage |
|
|
| |
| worker_details = [] |
| for _, row in daily_logs.iterrows(): |
| worker_details.append( |
| f"{row['worker_id']} | " |
| f"{row['first_name']} {row['last_name']} | " |
| f"{row['timestamp']} | " |
| f"{row['verification_status']}" |
| ) |
| |
| details_section = "\n".join(worker_details) if worker_details else "No workers logged today." |
| |
| report = f""" |
| Daily Attendance Report - {today_str} |
| =============================== |
| Summary |
| ------------------------------- |
| Total Unique Workers: {total_workers} |
| Verified Workers: {verified_workers} |
| Unverified Workers: {unverified_workers} |
| Total Wages (INR): {total_wages} |
| =============================== |
| Worker Details |
| ------------------------------- |
| Worker ID | Name | In-Time | Verification Status |
| ------------------------------- |
| {details_section} |
| =============================== |
| """ |
| if unverified_workers > 0: |
| report += "\nCompliance Risk: Unverified workers detected. Please review and verify." |
|
|
| |
| report_filename = f"daily_report_{today_str}_{uuid.uuid4().hex[:12]}.txt" |
| report_filepath = os.path.join(REPORT_DIR, report_filename) |
| try: |
| with open(report_filepath, "w", encoding='utf-8') as f: |
| f.write(report) |
| logger.info(f"Report saved to {report_filepath}") |
| return report, report_filepath |
| except Exception as e: |
| logger.error(f"Failed to save report: {e}") |
| return report, None |
| except (FileNotFoundError, pd.errors.EmptyDataError) as e: |
| logger.error(f"Error generating report: {e}") |
| return "No attendance data available for today.", None |
|
|
| def process_image_for_recognition(image_pil, camera_id="Gate_1"): |
| """ |
| Recognizes known workers in an image, logs their attendance, and draws bounding boxes. |
| Queues unrecognized faces for verification. |
| """ |
| if image_pil is None: |
| return None, "Please upload an image." |
|
|
| faces = detect_faces(image_pil) |
| if not len(faces): |
| return image_pil, "No faces were detected." |
|
|
| draw = ImageDraw.Draw(image_pil) |
| try: |
| font = ImageFont.truetype("arial.ttf", 15) |
| except IOError: |
| font = ImageFont.load_default() |
| |
| recognition_results = [] |
| processed_ids_this_run = set() |
| unverified_faces = [] |
|
|
| for (x, y, w, h) in faces: |
| face_pil = image_pil.crop((x, y, x + w, y + h)) |
| embedding = extract_embedding(face_pil) |
| if embedding is None: |
| continue |
|
|
| embedding_np = embedding.astype(np.float32).reshape(1, -1) |
| |
| is_known_worker = False |
| if index.ntotal > 0: |
| distances, indices = index.search(embedding_np, 1) |
| similarity_score = 1 - (distances[0][0] / 2) |
|
|
| if similarity_score > 0.78: |
| worker_id = list(worker_db.keys())[indices[0][0]] |
| |
| if worker_id in processed_ids_this_run: |
| continue |
| |
| is_known_worker = True |
| processed_ids_this_run.add(worker_id) |
| |
| worker_name = f"{worker_db[worker_id]['first_name']} {worker_db[worker_id]['last_name']}" |
| label = f"{worker_name} ({worker_id})" |
| |
| draw.rectangle([(x, y), (x + w, y + h)], outline="green", width=3) |
| draw.text((x, y - 20), label, fill="green", font=font) |
| |
| log_message = log_attendance(worker_id, camera_id) |
| recognition_results.append(f"Recognized: {label}. {log_message}") |
| |
| if not is_known_worker: |
| unverified_id = f"U{len(unverified_faces) + 1:04d}" |
| unverified_faces.append({ |
| "worker_id": unverified_id, |
| "first_name": "Unknown", |
| "last_name": "Person", |
| "embedding": encrypt_embedding(embedding), |
| "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), |
| "camera_id": camera_id, |
| "verification_status": "Pending" |
| }) |
| draw.rectangle([(x, y), (x + w, y + h)], outline="red", width=3) |
| draw.text((x, y - 20), "Unknown Person", fill="red", font=font) |
| recognition_results.append(f"Unknown Person Detected (ID: {unverified_id}). Please register via 'Manual Worker Registration' tab.") |
|
|
| if unverified_faces: |
| try: |
| pd.DataFrame(unverified_faces).to_csv(LOG_FILE, mode="a", header=not os.path.exists(LOG_FILE), index=False) |
| log_audit("Unverified Face Detected", "N/A", f"Detected {len(unverified_faces)} unverified faces") |
| except Exception as e: |
| logger.error(f"Failed to log unverified faces: {e}") |
|
|
| return image_pil, "\n".join(recognition_results) |
|
|
| def add_worker_manually(worker_id, first_name, last_name, image_pil, camera_id="Gate_1"): |
| """Manually adds a verified worker with Worker ID, First Name, Last Name, and encrypted embedding.""" |
| if not worker_id or not first_name or not last_name or image_pil is None: |
| return "Error: Worker ID, First Name, Last Name, and image cannot be empty." |
| if not worker_id.strip() or not first_name.strip() or not last_name.strip(): |
| return "Error: Worker ID, First Name, and Last Name cannot be blank." |
|
|
| |
| if not re.match(r'^[A-Za-z0-9]+$', worker_id): |
| return "Error: Worker ID must be alphanumeric (e.g., W0001)." |
|
|
| if worker_id in worker_db: |
| return f"Error: Worker ID {worker_id} already exists." |
|
|
| faces = detect_faces(image_pil) |
| if len(faces) == 0: |
| return "No face detected in the image. Please use a clear, frontal photo." |
| if len(faces) > 1: |
| return "Multiple faces detected. Please upload an image with only one person." |
|
|
| x, y, w, h = faces[0] |
| face_pil = image_pil.crop((x, y, x + w, y + h)) |
| embedding = extract_embedding(face_pil) |
| if embedding is None: |
| return "Failed to process face embedding. Please try another image." |
|
|
| if index.ntotal > 0: |
| embedding_np = embedding.astype(np.float32).reshape(1, -1) |
| distances, indices = index.search(embedding_np, 1) |
| similarity_score = 1 - (distances[0][0] / 2) |
| if similarity_score > 0.80: |
| existing_worker_id = list(worker_db.keys())[indices[0][0]] |
| existing_name = f"{worker_db[existing_worker_id]['first_name']} {worker_db[existing_worker_id]['last_name']}" |
| return f"This person seems to be already registered as '{existing_name}' (ID: {existing_worker_id}) with a similarity score of {similarity_score:.2f}." |
|
|
| encrypted_embedding = encrypt_embedding(embedding) |
| if encrypted_embedding is None: |
| return "Failed to encrypt embedding. Please try again." |
| |
| worker_db[worker_id] = { |
| "first_name": first_name.strip(), |
| "last_name": last_name.strip(), |
| "embedding": encrypted_embedding, |
| "verification_status": "Verified", |
| "last_seen": datetime.now().strftime("%Y-%m-%d %H:%M:%S") |
| } |
| try: |
| index.add(embedding.astype(np.float32).reshape(1, -1)) |
| with open(DB_FILE, "wb") as f: |
| pickle.dump(worker_db, f) |
| log_audit("Worker Added", worker_id, f"Manually added worker {first_name} {last_name}") |
| logger.info(f"Manually added new worker {worker_id}: {first_name} {last_name}") |
| return f"Successfully added worker '{first_name} {last_name}' with ID: {worker_id}" |
| except Exception as e: |
| logger.error(f"Failed to add worker: {e}") |
| return "Error: Failed to add worker to system." |
|
|
| |
| def start_cctv_stream(stream_source, camera_id="CCTV_1"): |
| """Start processing frames from a CCTV stream (RTSP or webcam).""" |
| cap = cv2.VideoCapture(stream_source) |
| if not cap.isOpened(): |
| return None, "Error: Could not open video stream. Check the stream source (e.g., RTSP URL or camera index)." |
|
|
| st_frame = st.empty() |
| results_text = st.empty() |
|
|
| while True: |
| ret, frame = cap.read() |
| if not ret: |
| logger.error("Failed to grab frame from stream.") |
| break |
|
|
| |
| frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) |
| image_pil = Image.fromarray(frame_rgb) |
|
|
| |
| processed_image, results = process_image_for_recognition(image_pil, camera_id) |
| if processed_image is not None: |
| st_frame.image(processed_image, channels="RGB", caption="Live CCTV Feed") |
|
|
| results_text.text_area("Recognition Results", value=results, height=200) |
|
|
| |
| cv2.waitKey(30) |
|
|
| cap.release() |
| return None, "Stream stopped." |
|
|
| def stop_cctv_stream(): |
| """Stop the CCTV stream.""" |
| return None, "Stream stopped manually." |
|
|
| |
| with gr.Blocks(theme=gr.themes.Soft()) as demo: |
| gr.Markdown( |
| """ |
| # SLAV System - Smart Labour Attendance & Verification |
| **Workflow:** |
| 1. **Attendance:** Use the "Face Recognition" tab. Upload an image to mark attendance for **known** workers. |
| 2. **New Workers:** If a person is marked as "Unknown", go to the "Manual Worker Registration" tab to add them with their Worker ID, First Name, and Last Name. |
| 3. **Reports:** View daily summaries and detailed worker logs in the "Daily Report" tab, with downloadable report files. |
| 4. **CCTV Streaming:** Use the "CCTV Live Feed" tab to process real-time video streams. |
| """ |
| ) |
| with gr.Tabs(): |
| with gr.TabItem("Face Recognition"): |
| with gr.Row(): |
| with gr.Column(scale=1): |
| image_input = gr.Image(type="pil", label="Upload Image for Attendance", sources=["upload"]) |
| camera_id_input = gr.Textbox(label="Camera ID", placeholder="e.g., Gate_1") |
| submit_button = gr.Button("Process Image", variant="primary") |
| with gr.Column(scale=2): |
| image_output = gr.Image(type="pil", label="Processed Image", interactive=False) |
| text_output = gr.Textbox(label="Recognition Results", lines=6, interactive=False) |
| submit_button.click(process_image_for_recognition, inputs=[image_input, camera_id_input], outputs=[image_output, text_output]) |
|
|
| with gr.TabItem("Manual Worker Registration"): |
| gr.Markdown("## Add a New Verified Worker") |
| with gr.Row(): |
| with gr.Column(): |
| worker_id_input = gr.Textbox(label="Worker ID", placeholder="e.g., W0001") |
| worker_first_name_input = gr.Textbox(label="First Name", placeholder="e.g., John") |
| worker_last_name_input = gr.Textbox(label="Last Name", placeholder="e.g., Doe") |
| worker_image_input = gr.Image(type="pil", label="Upload Clear Face Image of Worker", sources=["upload"]) |
| worker_camera_id_input = gr.Textbox(label="Camera ID", placeholder="e.g., Gate_1") |
| add_worker_button = gr.Button("Add Worker to System", variant="primary") |
| with gr.Column(): |
| add_worker_output = gr.Textbox(label="Result", interactive=False) |
| add_worker_button.click(add_worker_manually, inputs=[worker_id_input, worker_first_name_input, worker_last_name_input, worker_image_input, worker_camera_id_input], outputs=add_worker_output) |
| |
| with gr.TabItem("View Attendance Log"): |
| gr.Markdown("## Attendance Log") |
| log_display = gr.Dataframe(headers=["worker_id", "first_name", "last_name", "timestamp", "camera_id", "verification_status"], interactive=False, wrap=True) |
| |
| def update_log_display(): |
| if os.path.exists(LOG_FILE): |
| try: |
| df = pd.read_csv(LOG_FILE) |
| if not df.empty: |
| df['timestamp'] = pd.to_datetime(df['timestamp'], errors='coerce') |
| df = df.sort_values(by="timestamp", ascending=False).fillna('') |
| return df[["worker_id", "first_name", "last_name", "timestamp", "camera_id", "verification_status"]] |
| return pd.DataFrame(columns=["worker_id", "first_name", "last_name", "timestamp", "camera_id", "verification_status"]) |
| except pd.errors.EmptyDataError: |
| return pd.DataFrame(columns=["worker_id", "first_name", "last_name", "timestamp", "camera_id", "verification_status"]) |
| return pd.DataFrame(columns=["worker_id", "first_name", "last_name", "timestamp", "camera_id", "verification_status"]) |
| |
| refresh_button = gr.Button("Refresh Log") |
| refresh_button.click(update_log_display, None, log_display) |
| demo.load(update_log_display, None, log_display) |
|
|
| with gr.TabItem("Daily Report"): |
| gr.Markdown("## Daily Attendance Report") |
| report_output = gr.Textbox(label="Daily Summary", lines=20, interactive=False) |
| report_download = gr.File(label="Download Report", interactive=False) |
| report_button = gr.Button("Generate Daily Report") |
| report_button.click(generate_daily_report, None, [report_output, report_download]) |
|
|
| with gr.TabItem("CCTV Live Feed"): |
| gr.Markdown("## Real-Time CCTV Feed") |
| stream_source = gr.Textbox(label="Stream Source (e.g., RTSP URL or Webcam Index 0)", placeholder="rtsp://username:password@ip:port/stream or 0") |
| camera_id_input = gr.Textbox(label="Camera ID", value="CCTV_1", placeholder="e.g., CCTV_1") |
| start_button = gr.Button("Start Stream", variant="primary") |
| stop_button = gr.Button("Stop Stream", variant="secondary") |
| video_output = gr.Image(label="Live Feed", interactive=False) |
| results_output = gr.Textbox(label="Recognition Results", lines=6, interactive=False) |
|
|
| def stream_wrapper(source, camera_id): |
| return start_cctv_stream(source, camera_id) |
|
|
| start_button.click( |
| stream_wrapper, |
| inputs=[stream_source, camera_id_input], |
| outputs=[video_output, results_output] |
| ) |
| stop_button.click( |
| stop_cctv_stream, |
| inputs=None, |
| outputs=[video_output, results_output] |
| ) |
|
|
| if __name__ == "__main__": |
| demo.launch(server_name="0.0.0.0", server_port=7860, debug=True) |