Test / app.py
AhmedAdamu's picture
Update app.py
5c40b77 verified
import os
import cv2
import numpy as np
import gradio as gr
import json
import tempfile
from ultralytics import YOLO
from insightface.app import FaceAnalysis
from huggingface_hub import hf_hub_download
# ==========================================
# CONFIGURATION & STATE MANAGEMENT
# ==========================================
DB_FILE = "face_db.json"
EMBEDDINGS_FILE = "face_embeddings.npy"
class FaceSystem:
def __init__(self):
print("πŸš€ Initializing AI Models...")
# 1. Load YOLOv8-Face
model_path = hf_hub_download(repo_id="arnabdhar/YOLOv8-Face-Detection", filename="model.pt")
self.detector = YOLO(model_path)
# 2. Load InsightFace
self.recognizer = FaceAnalysis(name='buffalo_l', providers=['CPUExecutionProvider'])
self.recognizer.prepare(ctx_id=0, det_size=(640, 640))
# 3. Load Database
self.known_names = []
self.known_embeddings = np.empty((0, 512))
self.load_db()
print("βœ… System Ready.")
def load_db(self):
if os.path.exists(DB_FILE) and os.path.exists(EMBEDDINGS_FILE):
with open(DB_FILE, 'r') as f:
self.known_names = json.load(f)
self.known_embeddings = np.load(EMBEDDINGS_FILE)
print(f"πŸ“‚ Loaded {len(self.known_names)} identities.")
else:
print("πŸ“‚ Database empty. Starting fresh.")
def save_db(self):
with open(DB_FILE, 'w') as f:
json.dump(self.known_names, f)
np.save(EMBEDDINGS_FILE, self.known_embeddings)
def enroll_user(self, name, image):
if image is None or name.strip() == "":
return "⚠️ Error: Missing name or photo."
img_bgr = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
faces = self.recognizer.get(img_bgr)
if len(faces) == 0:
return "⚠️ Error: No face detected."
# Get largest face
face = sorted(faces, key=lambda x: (x.bbox[2]-x.bbox[0]) * (x.bbox[3]-x.bbox[1]))[-1]
embedding = face.normed_embedding.reshape(1, -1)
if self.known_embeddings.shape[0] == 0:
self.known_embeddings = embedding
else:
self.known_embeddings = np.vstack([self.known_embeddings, embedding])
self.known_names.append(name)
self.save_db()
return f"βœ… Success: '{name}' added to database."
def recognize_and_process(self, frame, blur_intensity=20, threshold=0.5):
"""Core processing logic for a single frame"""
if frame is None: return None
img_vis = frame.copy()
h, w = img_vis.shape[:2]
# Detect
results = self.detector(img_vis, conf=0.5, verbose=False)
for result in results:
boxes = result.boxes
for box in boxes:
x1, y1, x2, y2 = map(int, box.xyxy[0])
# Add context margin
margin = 0
cx1 = max(0, x1 - margin); cy1 = max(0, y1 - margin)
cx2 = min(w, x2 + margin); cy2 = min(h, y2 + margin)
face_crop = img_vis[cy1:cy2, cx1:cx2]
# Identify
name = "Unknown"
color = (200, 0, 0) # Red
if self.known_embeddings.shape[0] > 0 and face_crop.size > 0:
face_crop_bgr = cv2.cvtColor(face_crop, cv2.COLOR_RGB2BGR)
analysis = self.recognizer.get(face_crop_bgr)
if len(analysis) > 0:
target_emb = analysis[0].normed_embedding
similarities = np.dot(self.known_embeddings, target_emb)
best_idx = np.argmax(similarities)
if similarities[best_idx] > threshold:
name = self.known_names[best_idx]
color = (0, 255, 0) # Green
# Privacy Blur (Pixelate)
roi = img_vis[y1:y2, x1:x2]
if roi.size > 0:
block_size = max(3, int(30 - (blur_intensity / 4)))
h_roi, w_roi = roi.shape[:2]
small = cv2.resize(roi, (max(1, w_roi // block_size), max(1, h_roi // block_size)), interpolation=cv2.INTER_LINEAR)
pixelated = cv2.resize(small, (w_roi, h_roi), interpolation=cv2.INTER_NEAREST)
img_vis[y1:y2, x1:x2] = pixelated
# Overlay
cv2.rectangle(img_vis, (x1, y1), (x2, y2), color, 2)
label_size, _ = cv2.getTextSize(name, cv2.FONT_HERSHEY_SIMPLEX, 0.7, 2)
cv2.rectangle(img_vis, (x1, y1 - label_size[1] - 10), (x1 + label_size[0] + 10, y1), color, -1)
cv2.putText(img_vis, name, (x1 + 5, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
return img_vis
# Initialize
system = FaceSystem()
# ==========================================
# VIDEO PROCESSING HELPER
# ==========================================
def process_video_file(video_path, blur_intensity, threshold):
"""Reads a video file, processes every frame, saves it, and returns path."""
if video_path is None: return None
cap = cv2.VideoCapture(video_path)
fps = int(cap.get(cv2.CAP_PROP_FPS))
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
# Create temp output file
temp_out = tempfile.NamedTemporaryFile(suffix='.mp4', delete=False)
output_path = temp_out.name
# Setup writer (mp4v is usually safe for CPU)
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
writer = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
while cap.isOpened():
ret, frame = cap.read()
if not ret: break
# Process frame using our existing core function
# We convert BGR (OpenCV) to RGB (needed for our function) then back to BGR
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
processed_rgb = system.recognize_and_process(frame_rgb, blur_intensity, threshold)
processed_bgr = cv2.cvtColor(processed_rgb, cv2.COLOR_RGB2BGR)
writer.write(processed_bgr)
cap.release()
writer.release()
return output_path
# ==========================================
# GRADIO INTERFACE (Fixed UI)
# ==========================================
with gr.Blocks() as demo:
gr.Markdown("# πŸ‘οΈ SecureVision Pro")
with gr.Tabs():
# --- SURVEILLANCE TAB ---
with gr.Tab("πŸ“Ή Surveillance"):
# Global Settings for this tab
with gr.Accordion("βš™οΈ Settings", open=False):
blur_slider = gr.Slider(1, 100, value=50, label="Privacy Level")
conf_slider = gr.Slider(0.1, 0.9, value=0.5, label="Recognition Threshold")
with gr.Tabs():
# Sub-Tab 1: Live Webcam
with gr.TabItem("Live Webcam"):
with gr.Row():
web_in = gr.Image(sources=["webcam"], streaming=True, label="Live Feed")
web_out = gr.Image(label="Live Output")
web_in.stream(system.recognize_and_process, [web_in, blur_slider, conf_slider], web_out)
# Sub-Tab 2: Upload Image
with gr.TabItem("Upload Image"):
with gr.Row():
img_in = gr.Image(sources=["upload", "clipboard"], label="Upload Image")
img_out = gr.Image(label="Processed Image")
img_btn = gr.Button("Analyze Image", variant="primary")
img_btn.click(system.recognize_and_process, [img_in, blur_slider, conf_slider], img_out)
# Sub-Tab 3: Upload Video
with gr.TabItem("Upload Video"):
with gr.Row():
vid_in = gr.Video(label="Upload Video")
vid_out = gr.Video(label="Processed Output")
vid_btn = gr.Button("Process Video", variant="primary")
vid_btn.click(process_video_file, [vid_in, blur_slider, conf_slider], vid_out)
# --- DATABASE TAB ---
with gr.Tab("πŸ‘€ Database"):
with gr.Row():
with gr.Column():
gr.Markdown("### Enroll New Personnel")
new_name = gr.Textbox(label="Full Name / ID")
new_photo = gr.Image(sources=["upload", "webcam"], label="Reference Photo")
add_btn = gr.Button("Enroll User", variant="primary")
status_msg = gr.Markdown("")
with gr.Column():
gr.Markdown("### Database Status")
def get_user_list():
if not system.known_names: return "No users enrolled."
return "\n".join([f"β€’ {n}" for n in system.known_names])
user_list = gr.Markdown(get_user_list)
refresh_btn = gr.Button("Refresh List")
add_btn.click(system.enroll_user, [new_name, new_photo], status_msg)
refresh_btn.click(get_user_list, outputs=user_list)
add_btn.click(get_user_list, outputs=user_list)
if __name__ == "__main__":
demo.launch()