Spaces:
Sleeping
Sleeping
File size: 9,512 Bytes
9e9b090 a3fcb74 5c40b77 9e9b090 a3fcb74 9e9b090 a3fcb74 2d43a8f a3fcb74 1890212 5c40b77 a3fcb74 1890212 5c40b77 a3fcb74 1890212 a3fcb74 5c40b77 a3fcb74 5c40b77 a3fcb74 1890212 a3fcb74 a0be2c0 a3fcb74 5c40b77 a3fcb74 5c40b77 a3fcb74 5c40b77 a3fcb74 5c40b77 a3fcb74 5c40b77 a3fcb74 5c40b77 a3fcb74 5c40b77 a3fcb74 5c40b77 a3fcb74 5c40b77 a3fcb74 5c40b77 a3fcb74 5c40b77 a3fcb74 5c40b77 a3fcb74 5c40b77 a3fcb74 5c40b77 158ac93 5c40b77 158ac93 1890212 5c40b77 1890212 a3fcb74 5c40b77 1890212 5c40b77 a3fcb74 5c40b77 9e9b090 1890212 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 | import os
import cv2
import numpy as np
import gradio as gr
import json
import tempfile
from ultralytics import YOLO
from insightface.app import FaceAnalysis
from huggingface_hub import hf_hub_download
# ==========================================
# CONFIGURATION & STATE MANAGEMENT
# ==========================================
DB_FILE = "face_db.json"
EMBEDDINGS_FILE = "face_embeddings.npy"
class FaceSystem:
def __init__(self):
print("π Initializing AI Models...")
# 1. Load YOLOv8-Face
model_path = hf_hub_download(repo_id="arnabdhar/YOLOv8-Face-Detection", filename="model.pt")
self.detector = YOLO(model_path)
# 2. Load InsightFace
self.recognizer = FaceAnalysis(name='buffalo_l', providers=['CPUExecutionProvider'])
self.recognizer.prepare(ctx_id=0, det_size=(640, 640))
# 3. Load Database
self.known_names = []
self.known_embeddings = np.empty((0, 512))
self.load_db()
print("β
System Ready.")
def load_db(self):
if os.path.exists(DB_FILE) and os.path.exists(EMBEDDINGS_FILE):
with open(DB_FILE, 'r') as f:
self.known_names = json.load(f)
self.known_embeddings = np.load(EMBEDDINGS_FILE)
print(f"π Loaded {len(self.known_names)} identities.")
else:
print("π Database empty. Starting fresh.")
def save_db(self):
with open(DB_FILE, 'w') as f:
json.dump(self.known_names, f)
np.save(EMBEDDINGS_FILE, self.known_embeddings)
def enroll_user(self, name, image):
if image is None or name.strip() == "":
return "β οΈ Error: Missing name or photo."
img_bgr = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
faces = self.recognizer.get(img_bgr)
if len(faces) == 0:
return "β οΈ Error: No face detected."
# Get largest face
face = sorted(faces, key=lambda x: (x.bbox[2]-x.bbox[0]) * (x.bbox[3]-x.bbox[1]))[-1]
embedding = face.normed_embedding.reshape(1, -1)
if self.known_embeddings.shape[0] == 0:
self.known_embeddings = embedding
else:
self.known_embeddings = np.vstack([self.known_embeddings, embedding])
self.known_names.append(name)
self.save_db()
return f"β
Success: '{name}' added to database."
def recognize_and_process(self, frame, blur_intensity=20, threshold=0.5):
"""Core processing logic for a single frame"""
if frame is None: return None
img_vis = frame.copy()
h, w = img_vis.shape[:2]
# Detect
results = self.detector(img_vis, conf=0.5, verbose=False)
for result in results:
boxes = result.boxes
for box in boxes:
x1, y1, x2, y2 = map(int, box.xyxy[0])
# Add context margin
margin = 0
cx1 = max(0, x1 - margin); cy1 = max(0, y1 - margin)
cx2 = min(w, x2 + margin); cy2 = min(h, y2 + margin)
face_crop = img_vis[cy1:cy2, cx1:cx2]
# Identify
name = "Unknown"
color = (200, 0, 0) # Red
if self.known_embeddings.shape[0] > 0 and face_crop.size > 0:
face_crop_bgr = cv2.cvtColor(face_crop, cv2.COLOR_RGB2BGR)
analysis = self.recognizer.get(face_crop_bgr)
if len(analysis) > 0:
target_emb = analysis[0].normed_embedding
similarities = np.dot(self.known_embeddings, target_emb)
best_idx = np.argmax(similarities)
if similarities[best_idx] > threshold:
name = self.known_names[best_idx]
color = (0, 255, 0) # Green
# Privacy Blur (Pixelate)
roi = img_vis[y1:y2, x1:x2]
if roi.size > 0:
block_size = max(3, int(30 - (blur_intensity / 4)))
h_roi, w_roi = roi.shape[:2]
small = cv2.resize(roi, (max(1, w_roi // block_size), max(1, h_roi // block_size)), interpolation=cv2.INTER_LINEAR)
pixelated = cv2.resize(small, (w_roi, h_roi), interpolation=cv2.INTER_NEAREST)
img_vis[y1:y2, x1:x2] = pixelated
# Overlay
cv2.rectangle(img_vis, (x1, y1), (x2, y2), color, 2)
label_size, _ = cv2.getTextSize(name, cv2.FONT_HERSHEY_SIMPLEX, 0.7, 2)
cv2.rectangle(img_vis, (x1, y1 - label_size[1] - 10), (x1 + label_size[0] + 10, y1), color, -1)
cv2.putText(img_vis, name, (x1 + 5, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
return img_vis
# Initialize
system = FaceSystem()
# ==========================================
# VIDEO PROCESSING HELPER
# ==========================================
def process_video_file(video_path, blur_intensity, threshold):
"""Reads a video file, processes every frame, saves it, and returns path."""
if video_path is None: return None
cap = cv2.VideoCapture(video_path)
fps = int(cap.get(cv2.CAP_PROP_FPS))
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
# Create temp output file
temp_out = tempfile.NamedTemporaryFile(suffix='.mp4', delete=False)
output_path = temp_out.name
# Setup writer (mp4v is usually safe for CPU)
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
writer = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
while cap.isOpened():
ret, frame = cap.read()
if not ret: break
# Process frame using our existing core function
# We convert BGR (OpenCV) to RGB (needed for our function) then back to BGR
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
processed_rgb = system.recognize_and_process(frame_rgb, blur_intensity, threshold)
processed_bgr = cv2.cvtColor(processed_rgb, cv2.COLOR_RGB2BGR)
writer.write(processed_bgr)
cap.release()
writer.release()
return output_path
# ==========================================
# GRADIO INTERFACE (Fixed UI)
# ==========================================
with gr.Blocks() as demo:
gr.Markdown("# ποΈ SecureVision Pro")
with gr.Tabs():
# --- SURVEILLANCE TAB ---
with gr.Tab("πΉ Surveillance"):
# Global Settings for this tab
with gr.Accordion("βοΈ Settings", open=False):
blur_slider = gr.Slider(1, 100, value=50, label="Privacy Level")
conf_slider = gr.Slider(0.1, 0.9, value=0.5, label="Recognition Threshold")
with gr.Tabs():
# Sub-Tab 1: Live Webcam
with gr.TabItem("Live Webcam"):
with gr.Row():
web_in = gr.Image(sources=["webcam"], streaming=True, label="Live Feed")
web_out = gr.Image(label="Live Output")
web_in.stream(system.recognize_and_process, [web_in, blur_slider, conf_slider], web_out)
# Sub-Tab 2: Upload Image
with gr.TabItem("Upload Image"):
with gr.Row():
img_in = gr.Image(sources=["upload", "clipboard"], label="Upload Image")
img_out = gr.Image(label="Processed Image")
img_btn = gr.Button("Analyze Image", variant="primary")
img_btn.click(system.recognize_and_process, [img_in, blur_slider, conf_slider], img_out)
# Sub-Tab 3: Upload Video
with gr.TabItem("Upload Video"):
with gr.Row():
vid_in = gr.Video(label="Upload Video")
vid_out = gr.Video(label="Processed Output")
vid_btn = gr.Button("Process Video", variant="primary")
vid_btn.click(process_video_file, [vid_in, blur_slider, conf_slider], vid_out)
# --- DATABASE TAB ---
with gr.Tab("π€ Database"):
with gr.Row():
with gr.Column():
gr.Markdown("### Enroll New Personnel")
new_name = gr.Textbox(label="Full Name / ID")
new_photo = gr.Image(sources=["upload", "webcam"], label="Reference Photo")
add_btn = gr.Button("Enroll User", variant="primary")
status_msg = gr.Markdown("")
with gr.Column():
gr.Markdown("### Database Status")
def get_user_list():
if not system.known_names: return "No users enrolled."
return "\n".join([f"β’ {n}" for n in system.known_names])
user_list = gr.Markdown(get_user_list)
refresh_btn = gr.Button("Refresh List")
add_btn.click(system.enroll_user, [new_name, new_photo], status_msg)
refresh_btn.click(get_user_list, outputs=user_list)
add_btn.click(get_user_list, outputs=user_list)
if __name__ == "__main__":
demo.launch() |