Spaces:
Sleeping
Sleeping
| import os | |
| import cv2 | |
| import numpy as np | |
| import gradio as gr | |
| import json | |
| import tempfile | |
| from ultralytics import YOLO | |
| from insightface.app import FaceAnalysis | |
| from huggingface_hub import hf_hub_download | |
| # ========================================== | |
| # CONFIGURATION & STATE MANAGEMENT | |
| # ========================================== | |
| DB_FILE = "face_db.json" | |
| EMBEDDINGS_FILE = "face_embeddings.npy" | |
| class FaceSystem: | |
| def __init__(self): | |
| print("π Initializing AI Models...") | |
| # 1. Load YOLOv8-Face | |
| model_path = hf_hub_download(repo_id="arnabdhar/YOLOv8-Face-Detection", filename="model.pt") | |
| self.detector = YOLO(model_path) | |
| # 2. Load InsightFace | |
| self.recognizer = FaceAnalysis(name='buffalo_l', providers=['CPUExecutionProvider']) | |
| self.recognizer.prepare(ctx_id=0, det_size=(640, 640)) | |
| # 3. Load Database | |
| self.known_names = [] | |
| self.known_embeddings = np.empty((0, 512)) | |
| self.load_db() | |
| print("β System Ready.") | |
| def load_db(self): | |
| if os.path.exists(DB_FILE) and os.path.exists(EMBEDDINGS_FILE): | |
| with open(DB_FILE, 'r') as f: | |
| self.known_names = json.load(f) | |
| self.known_embeddings = np.load(EMBEDDINGS_FILE) | |
| print(f"π Loaded {len(self.known_names)} identities.") | |
| else: | |
| print("π Database empty. Starting fresh.") | |
| def save_db(self): | |
| with open(DB_FILE, 'w') as f: | |
| json.dump(self.known_names, f) | |
| np.save(EMBEDDINGS_FILE, self.known_embeddings) | |
| def enroll_user(self, name, image): | |
| if image is None or name.strip() == "": | |
| return "β οΈ Error: Missing name or photo." | |
| img_bgr = cv2.cvtColor(image, cv2.COLOR_RGB2BGR) | |
| faces = self.recognizer.get(img_bgr) | |
| if len(faces) == 0: | |
| return "β οΈ Error: No face detected." | |
| # Get largest face | |
| face = sorted(faces, key=lambda x: (x.bbox[2]-x.bbox[0]) * (x.bbox[3]-x.bbox[1]))[-1] | |
| embedding = face.normed_embedding.reshape(1, -1) | |
| if self.known_embeddings.shape[0] == 0: | |
| self.known_embeddings = embedding | |
| else: | |
| self.known_embeddings = np.vstack([self.known_embeddings, embedding]) | |
| self.known_names.append(name) | |
| self.save_db() | |
| return f"β Success: '{name}' added to database." | |
| def recognize_and_process(self, frame, blur_intensity=20, threshold=0.5): | |
| """Core processing logic for a single frame""" | |
| if frame is None: return None | |
| img_vis = frame.copy() | |
| h, w = img_vis.shape[:2] | |
| # Detect | |
| results = self.detector(img_vis, conf=0.5, verbose=False) | |
| for result in results: | |
| boxes = result.boxes | |
| for box in boxes: | |
| x1, y1, x2, y2 = map(int, box.xyxy[0]) | |
| # Add context margin | |
| margin = 0 | |
| cx1 = max(0, x1 - margin); cy1 = max(0, y1 - margin) | |
| cx2 = min(w, x2 + margin); cy2 = min(h, y2 + margin) | |
| face_crop = img_vis[cy1:cy2, cx1:cx2] | |
| # Identify | |
| name = "Unknown" | |
| color = (200, 0, 0) # Red | |
| if self.known_embeddings.shape[0] > 0 and face_crop.size > 0: | |
| face_crop_bgr = cv2.cvtColor(face_crop, cv2.COLOR_RGB2BGR) | |
| analysis = self.recognizer.get(face_crop_bgr) | |
| if len(analysis) > 0: | |
| target_emb = analysis[0].normed_embedding | |
| similarities = np.dot(self.known_embeddings, target_emb) | |
| best_idx = np.argmax(similarities) | |
| if similarities[best_idx] > threshold: | |
| name = self.known_names[best_idx] | |
| color = (0, 255, 0) # Green | |
| # Privacy Blur (Pixelate) | |
| roi = img_vis[y1:y2, x1:x2] | |
| if roi.size > 0: | |
| block_size = max(3, int(30 - (blur_intensity / 4))) | |
| h_roi, w_roi = roi.shape[:2] | |
| small = cv2.resize(roi, (max(1, w_roi // block_size), max(1, h_roi // block_size)), interpolation=cv2.INTER_LINEAR) | |
| pixelated = cv2.resize(small, (w_roi, h_roi), interpolation=cv2.INTER_NEAREST) | |
| img_vis[y1:y2, x1:x2] = pixelated | |
| # Overlay | |
| cv2.rectangle(img_vis, (x1, y1), (x2, y2), color, 2) | |
| label_size, _ = cv2.getTextSize(name, cv2.FONT_HERSHEY_SIMPLEX, 0.7, 2) | |
| cv2.rectangle(img_vis, (x1, y1 - label_size[1] - 10), (x1 + label_size[0] + 10, y1), color, -1) | |
| cv2.putText(img_vis, name, (x1 + 5, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2) | |
| return img_vis | |
| # Initialize | |
| system = FaceSystem() | |
| # ========================================== | |
| # VIDEO PROCESSING HELPER | |
| # ========================================== | |
| def process_video_file(video_path, blur_intensity, threshold): | |
| """Reads a video file, processes every frame, saves it, and returns path.""" | |
| if video_path is None: return None | |
| cap = cv2.VideoCapture(video_path) | |
| fps = int(cap.get(cv2.CAP_PROP_FPS)) | |
| width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| # Create temp output file | |
| temp_out = tempfile.NamedTemporaryFile(suffix='.mp4', delete=False) | |
| output_path = temp_out.name | |
| # Setup writer (mp4v is usually safe for CPU) | |
| fourcc = cv2.VideoWriter_fourcc(*'mp4v') | |
| writer = cv2.VideoWriter(output_path, fourcc, fps, (width, height)) | |
| while cap.isOpened(): | |
| ret, frame = cap.read() | |
| if not ret: break | |
| # Process frame using our existing core function | |
| # We convert BGR (OpenCV) to RGB (needed for our function) then back to BGR | |
| frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
| processed_rgb = system.recognize_and_process(frame_rgb, blur_intensity, threshold) | |
| processed_bgr = cv2.cvtColor(processed_rgb, cv2.COLOR_RGB2BGR) | |
| writer.write(processed_bgr) | |
| cap.release() | |
| writer.release() | |
| return output_path | |
| # ========================================== | |
| # GRADIO INTERFACE (Fixed UI) | |
| # ========================================== | |
| with gr.Blocks() as demo: | |
| gr.Markdown("# ποΈ SecureVision Pro") | |
| with gr.Tabs(): | |
| # --- SURVEILLANCE TAB --- | |
| with gr.Tab("πΉ Surveillance"): | |
| # Global Settings for this tab | |
| with gr.Accordion("βοΈ Settings", open=False): | |
| blur_slider = gr.Slider(1, 100, value=50, label="Privacy Level") | |
| conf_slider = gr.Slider(0.1, 0.9, value=0.5, label="Recognition Threshold") | |
| with gr.Tabs(): | |
| # Sub-Tab 1: Live Webcam | |
| with gr.TabItem("Live Webcam"): | |
| with gr.Row(): | |
| web_in = gr.Image(sources=["webcam"], streaming=True, label="Live Feed") | |
| web_out = gr.Image(label="Live Output") | |
| web_in.stream(system.recognize_and_process, [web_in, blur_slider, conf_slider], web_out) | |
| # Sub-Tab 2: Upload Image | |
| with gr.TabItem("Upload Image"): | |
| with gr.Row(): | |
| img_in = gr.Image(sources=["upload", "clipboard"], label="Upload Image") | |
| img_out = gr.Image(label="Processed Image") | |
| img_btn = gr.Button("Analyze Image", variant="primary") | |
| img_btn.click(system.recognize_and_process, [img_in, blur_slider, conf_slider], img_out) | |
| # Sub-Tab 3: Upload Video | |
| with gr.TabItem("Upload Video"): | |
| with gr.Row(): | |
| vid_in = gr.Video(label="Upload Video") | |
| vid_out = gr.Video(label="Processed Output") | |
| vid_btn = gr.Button("Process Video", variant="primary") | |
| vid_btn.click(process_video_file, [vid_in, blur_slider, conf_slider], vid_out) | |
| # --- DATABASE TAB --- | |
| with gr.Tab("π€ Database"): | |
| with gr.Row(): | |
| with gr.Column(): | |
| gr.Markdown("### Enroll New Personnel") | |
| new_name = gr.Textbox(label="Full Name / ID") | |
| new_photo = gr.Image(sources=["upload", "webcam"], label="Reference Photo") | |
| add_btn = gr.Button("Enroll User", variant="primary") | |
| status_msg = gr.Markdown("") | |
| with gr.Column(): | |
| gr.Markdown("### Database Status") | |
| def get_user_list(): | |
| if not system.known_names: return "No users enrolled." | |
| return "\n".join([f"β’ {n}" for n in system.known_names]) | |
| user_list = gr.Markdown(get_user_list) | |
| refresh_btn = gr.Button("Refresh List") | |
| add_btn.click(system.enroll_user, [new_name, new_photo], status_msg) | |
| refresh_btn.click(get_user_list, outputs=user_list) | |
| add_btn.click(get_user_list, outputs=user_list) | |
| if __name__ == "__main__": | |
| demo.launch() |