import os import cv2 import numpy as np import gradio as gr import json import tempfile from ultralytics import YOLO from insightface.app import FaceAnalysis from huggingface_hub import hf_hub_download # ========================================== # CONFIGURATION & STATE MANAGEMENT # ========================================== DB_FILE = "face_db.json" EMBEDDINGS_FILE = "face_embeddings.npy" class FaceSystem: def __init__(self): print("🚀 Initializing AI Models...") # 1. Load YOLOv8-Face model_path = hf_hub_download(repo_id="arnabdhar/YOLOv8-Face-Detection", filename="model.pt") self.detector = YOLO(model_path) # 2. Load InsightFace self.recognizer = FaceAnalysis(name='buffalo_l', providers=['CPUExecutionProvider']) self.recognizer.prepare(ctx_id=0, det_size=(640, 640)) # 3. Load Database self.known_names = [] self.known_embeddings = np.empty((0, 512)) self.load_db() print("✅ System Ready.") def load_db(self): if os.path.exists(DB_FILE) and os.path.exists(EMBEDDINGS_FILE): with open(DB_FILE, 'r') as f: self.known_names = json.load(f) self.known_embeddings = np.load(EMBEDDINGS_FILE) print(f"📂 Loaded {len(self.known_names)} identities.") else: print("📂 Database empty. Starting fresh.") def save_db(self): with open(DB_FILE, 'w') as f: json.dump(self.known_names, f) np.save(EMBEDDINGS_FILE, self.known_embeddings) def enroll_user(self, name, image): if image is None or name.strip() == "": return "⚠️ Error: Missing name or photo." img_bgr = cv2.cvtColor(image, cv2.COLOR_RGB2BGR) faces = self.recognizer.get(img_bgr) if len(faces) == 0: return "⚠️ Error: No face detected." # Get largest face face = sorted(faces, key=lambda x: (x.bbox[2]-x.bbox[0]) * (x.bbox[3]-x.bbox[1]))[-1] embedding = face.normed_embedding.reshape(1, -1) if self.known_embeddings.shape[0] == 0: self.known_embeddings = embedding else: self.known_embeddings = np.vstack([self.known_embeddings, embedding]) self.known_names.append(name) self.save_db() return f"✅ Success: '{name}' added to database." def recognize_and_process(self, frame, blur_intensity=20, threshold=0.5): """Core processing logic for a single frame""" if frame is None: return None img_vis = frame.copy() h, w = img_vis.shape[:2] # Detect results = self.detector(img_vis, conf=0.5, verbose=False) for result in results: boxes = result.boxes for box in boxes: x1, y1, x2, y2 = map(int, box.xyxy[0]) # Add context margin margin = 0 cx1 = max(0, x1 - margin); cy1 = max(0, y1 - margin) cx2 = min(w, x2 + margin); cy2 = min(h, y2 + margin) face_crop = img_vis[cy1:cy2, cx1:cx2] # Identify name = "Unknown" color = (200, 0, 0) # Red if self.known_embeddings.shape[0] > 0 and face_crop.size > 0: face_crop_bgr = cv2.cvtColor(face_crop, cv2.COLOR_RGB2BGR) analysis = self.recognizer.get(face_crop_bgr) if len(analysis) > 0: target_emb = analysis[0].normed_embedding similarities = np.dot(self.known_embeddings, target_emb) best_idx = np.argmax(similarities) if similarities[best_idx] > threshold: name = self.known_names[best_idx] color = (0, 255, 0) # Green # Privacy Blur (Pixelate) roi = img_vis[y1:y2, x1:x2] if roi.size > 0: block_size = max(3, int(30 - (blur_intensity / 4))) h_roi, w_roi = roi.shape[:2] small = cv2.resize(roi, (max(1, w_roi // block_size), max(1, h_roi // block_size)), interpolation=cv2.INTER_LINEAR) pixelated = cv2.resize(small, (w_roi, h_roi), interpolation=cv2.INTER_NEAREST) img_vis[y1:y2, x1:x2] = pixelated # Overlay cv2.rectangle(img_vis, (x1, y1), (x2, y2), color, 2) label_size, _ = cv2.getTextSize(name, cv2.FONT_HERSHEY_SIMPLEX, 0.7, 2) cv2.rectangle(img_vis, (x1, y1 - label_size[1] - 10), (x1 + label_size[0] + 10, y1), color, -1) cv2.putText(img_vis, name, (x1 + 5, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2) return img_vis # Initialize system = FaceSystem() # ========================================== # VIDEO PROCESSING HELPER # ========================================== def process_video_file(video_path, blur_intensity, threshold): """Reads a video file, processes every frame, saves it, and returns path.""" if video_path is None: return None cap = cv2.VideoCapture(video_path) fps = int(cap.get(cv2.CAP_PROP_FPS)) width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) # Create temp output file temp_out = tempfile.NamedTemporaryFile(suffix='.mp4', delete=False) output_path = temp_out.name # Setup writer (mp4v is usually safe for CPU) fourcc = cv2.VideoWriter_fourcc(*'mp4v') writer = cv2.VideoWriter(output_path, fourcc, fps, (width, height)) while cap.isOpened(): ret, frame = cap.read() if not ret: break # Process frame using our existing core function # We convert BGR (OpenCV) to RGB (needed for our function) then back to BGR frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) processed_rgb = system.recognize_and_process(frame_rgb, blur_intensity, threshold) processed_bgr = cv2.cvtColor(processed_rgb, cv2.COLOR_RGB2BGR) writer.write(processed_bgr) cap.release() writer.release() return output_path # ========================================== # GRADIO INTERFACE (Fixed UI) # ========================================== with gr.Blocks() as demo: gr.Markdown("# 👁️ SecureVision Pro") with gr.Tabs(): # --- SURVEILLANCE TAB --- with gr.Tab("📹 Surveillance"): # Global Settings for this tab with gr.Accordion("⚙️ Settings", open=False): blur_slider = gr.Slider(1, 100, value=50, label="Privacy Level") conf_slider = gr.Slider(0.1, 0.9, value=0.5, label="Recognition Threshold") with gr.Tabs(): # Sub-Tab 1: Live Webcam with gr.TabItem("Live Webcam"): with gr.Row(): web_in = gr.Image(sources=["webcam"], streaming=True, label="Live Feed") web_out = gr.Image(label="Live Output") web_in.stream(system.recognize_and_process, [web_in, blur_slider, conf_slider], web_out) # Sub-Tab 2: Upload Image with gr.TabItem("Upload Image"): with gr.Row(): img_in = gr.Image(sources=["upload", "clipboard"], label="Upload Image") img_out = gr.Image(label="Processed Image") img_btn = gr.Button("Analyze Image", variant="primary") img_btn.click(system.recognize_and_process, [img_in, blur_slider, conf_slider], img_out) # Sub-Tab 3: Upload Video with gr.TabItem("Upload Video"): with gr.Row(): vid_in = gr.Video(label="Upload Video") vid_out = gr.Video(label="Processed Output") vid_btn = gr.Button("Process Video", variant="primary") vid_btn.click(process_video_file, [vid_in, blur_slider, conf_slider], vid_out) # --- DATABASE TAB --- with gr.Tab("👤 Database"): with gr.Row(): with gr.Column(): gr.Markdown("### Enroll New Personnel") new_name = gr.Textbox(label="Full Name / ID") new_photo = gr.Image(sources=["upload", "webcam"], label="Reference Photo") add_btn = gr.Button("Enroll User", variant="primary") status_msg = gr.Markdown("") with gr.Column(): gr.Markdown("### Database Status") def get_user_list(): if not system.known_names: return "No users enrolled." return "\n".join([f"• {n}" for n in system.known_names]) user_list = gr.Markdown(get_user_list) refresh_btn = gr.Button("Refresh List") add_btn.click(system.enroll_user, [new_name, new_photo], status_msg) refresh_btn.click(get_user_list, outputs=user_list) add_btn.click(get_user_list, outputs=user_list) if __name__ == "__main__": demo.launch()