Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import base64 | |
| import html | |
| import json | |
| import math | |
| import os | |
| import re | |
| import shutil | |
| import sys | |
| import urllib.request | |
| import uuid | |
| from datetime import datetime | |
| from io import BytesIO | |
| from pathlib import Path | |
| from typing import Any, Dict, List, Tuple | |
| import cv2 | |
| import gradio as gr | |
| import numpy as np | |
| from PIL import Image | |
| from deepface import DeepFace | |
| from deepface.modules import modeling | |
| from retinaface import RetinaFace | |
| try: | |
| try: | |
| import torchvision.transforms._functional_tensor as tv_functional_tensor | |
| sys.modules.setdefault("torchvision.transforms.functional_tensor", tv_functional_tensor) | |
| except Exception: | |
| pass | |
| import torch | |
| from basicsr.archs.rrdbnet_arch import RRDBNet | |
| from realesrgan import RealESRGANer | |
| except Exception: | |
| torch = None | |
| RRDBNet = None | |
| RealESRGANer = None | |
| BASE_DIR = Path(__file__).resolve().parent | |
| DB_DIR = BASE_DIR / "face_db" | |
| OUTPUT_DIR = BASE_DIR / "outputs" | |
| EMBEDDINGS_FILE = BASE_DIR / "face_db_embeddings.json" | |
| RECOGNITION_MODEL = "ArcFace" | |
| MAX_LOG_ITEMS = 120 | |
| ESRGAN_MODEL_URL = "https://github.com/xinntao/Real-ESRGAN/releases/download/v0.1.0/RealESRGAN_x4plus.pth" | |
| ESRGAN_MODEL_FILENAME = "RealESRGAN_x4plus.pth" | |
| ESRGAN_SCALE = 4 | |
| ESRGAN_MAX_DIMENSION = 1024 | |
| _ESRGAN_MODEL = None | |
| for folder in (DB_DIR, OUTPUT_DIR): | |
| folder.mkdir(parents=True, exist_ok=True) | |
| CUSTOM_CSS = """ | |
| :root { | |
| --sky-1: #f6fbff; | |
| --sky-2: #eef7ff; | |
| --sky-3: #dbeeff; | |
| --sky-4: #b9ddff; | |
| --sky-5: #2563eb; | |
| --sky-6: #0f172a; | |
| } | |
| .gradio-container { | |
| background: linear-gradient(180deg, #f8fbff 0%, #f1f8ff 100%); | |
| } | |
| .app-shell { | |
| max-width: 1480px; | |
| margin: 0 auto; | |
| } | |
| .hero-card { | |
| background: linear-gradient(135deg, rgba(37, 99, 235, 0.10), rgba(125, 211, 252, 0.16)); | |
| border: 1px solid rgba(37, 99, 235, 0.16); | |
| border-radius: 24px; | |
| padding: 20px 24px; | |
| margin-bottom: 14px; | |
| box-shadow: 0 12px 34px rgba(37, 99, 235, 0.08); | |
| } | |
| .panel { | |
| background: rgba(255, 255, 255, 0.90); | |
| border: 1px solid rgba(148, 163, 184, 0.18); | |
| border-radius: 24px; | |
| padding: 14px; | |
| box-shadow: 0 14px 36px rgba(15, 23, 42, 0.06); | |
| } | |
| .soft-note { | |
| background: linear-gradient(180deg, rgba(239, 246, 255, 0.95), rgba(255, 255, 255, 0.96)); | |
| border: 1px dashed rgba(37, 99, 235, 0.25); | |
| border-radius: 18px; | |
| padding: 12px 14px; | |
| color: #1e3a8a; | |
| } | |
| .blue-btn button, | |
| .blue-btn button:hover { | |
| background: linear-gradient(90deg, #2563eb 0%, #38bdf8 100%) !important; | |
| color: white !important; | |
| border: none !important; | |
| } | |
| .neutral-btn button, | |
| .neutral-btn button:hover { | |
| background: white !important; | |
| color: #0f172a !important; | |
| border: 1px solid rgba(148, 163, 184, 0.35) !important; | |
| } | |
| .status-box { | |
| background: white; | |
| border: 1px solid rgba(37, 99, 235, 0.12); | |
| border-radius: 18px; | |
| padding: 12px 14px; | |
| } | |
| .log-feed { | |
| display: flex; | |
| flex-direction: column; | |
| gap: 12px; | |
| } | |
| .log-card { | |
| display: grid; | |
| grid-template-columns: auto 1fr auto; | |
| gap: 14px; | |
| align-items: start; | |
| background: linear-gradient(180deg, rgba(255,255,255,0.98), rgba(248,250,252,0.96)); | |
| border: 1px solid rgba(148, 163, 184, 0.22); | |
| border-radius: 20px; | |
| padding: 12px; | |
| box-shadow: 0 10px 28px rgba(15, 23, 42, 0.06); | |
| } | |
| .log-thumb-wrap { | |
| display: flex; | |
| flex-direction: column; | |
| gap: 6px; | |
| } | |
| .log-thumb { | |
| display: block; | |
| max-width: none; | |
| height: auto; | |
| border-radius: 14px; | |
| background: #e2e8f0; | |
| } | |
| .log-thumb-label { | |
| font-size: 12px; | |
| font-weight: 700; | |
| color: #475569; | |
| text-transform: uppercase; | |
| letter-spacing: 0.04em; | |
| } | |
| .log-main { | |
| min-width: 0; | |
| } | |
| .log-topline { | |
| display: flex; | |
| flex-wrap: wrap; | |
| gap: 10px; | |
| align-items: center; | |
| margin-bottom: 6px; | |
| } | |
| .log-name { | |
| font-size: 28px; | |
| font-weight: 800; | |
| color: #1e293b; | |
| line-height: 1.05; | |
| } | |
| .log-badge { | |
| display: inline-flex; | |
| align-items: center; | |
| border-radius: 999px; | |
| padding: 6px 14px; | |
| font-size: 18px; | |
| font-weight: 800; | |
| color: white; | |
| } | |
| .log-badge.matched { | |
| background: linear-gradient(90deg, #16a34a 0%, #22c55e 100%); | |
| } | |
| .log-badge.unknown { | |
| background: linear-gradient(90deg, #dc2626 0%, #ef4444 100%); | |
| } | |
| .log-meta { | |
| display: flex; | |
| flex-wrap: wrap; | |
| gap: 14px; | |
| color: #334155; | |
| font-size: 15px; | |
| margin-bottom: 8px; | |
| } | |
| .log-reason { | |
| color: #1e40af; | |
| font-size: 15px; | |
| line-height: 1.5; | |
| } | |
| .log-time { | |
| text-align: right; | |
| color: #64748b; | |
| font-size: 16px; | |
| font-weight: 700; | |
| white-space: nowrap; | |
| } | |
| .log-empty { | |
| padding: 22px; | |
| border-radius: 18px; | |
| border: 1px dashed rgba(148, 163, 184, 0.35); | |
| color: #475569; | |
| background: rgba(255,255,255,0.78); | |
| } | |
| """ | |
| def sanitize_username(username: str) -> str: | |
| cleaned = re.sub(r"[^a-zA-Z0-9ก-๙_-]+", "_", username.strip()) | |
| cleaned = re.sub(r"_+", "_", cleaned).strip("_") | |
| return cleaned or "user" | |
| def to_uint8_rgb(image: Any) -> np.ndarray: | |
| arr = np.array(image) | |
| if arr.ndim == 2: | |
| arr = cv2.cvtColor(arr, cv2.COLOR_GRAY2RGB) | |
| if arr.ndim == 3 and arr.shape[2] == 4: | |
| arr = cv2.cvtColor(arr, cv2.COLOR_RGBA2RGB) | |
| if arr.dtype != np.uint8: | |
| if np.max(arr) <= 1.0: | |
| arr = (arr * 255.0).clip(0, 255).astype(np.uint8) | |
| else: | |
| arr = arr.clip(0, 255).astype(np.uint8) | |
| return arr | |
| def save_rgb_image(path: Path, rgb: np.ndarray) -> None: | |
| path.parent.mkdir(parents=True, exist_ok=True) | |
| bgr = cv2.cvtColor(to_uint8_rgb(rgb), cv2.COLOR_RGB2BGR) | |
| cv2.imwrite(str(path), bgr) | |
| def expand_bbox(bbox: List[float], image_shape: Tuple[int, int, int], expand_pct: float = 12.0) -> List[int]: | |
| h, w = image_shape[:2] | |
| x1, y1, x2, y2 = [int(v) for v in bbox] | |
| bw, bh = max(1, x2 - x1), max(1, y2 - y1) | |
| pad_x = int(bw * (expand_pct / 100.0)) | |
| pad_y = int(bh * (expand_pct / 100.0)) | |
| x1 = max(0, x1 - pad_x) | |
| y1 = max(0, y1 - pad_y) | |
| x2 = min(w, x2 + pad_x) | |
| y2 = min(h, y2 + pad_y) | |
| return [x1, y1, x2, y2] | |
| def crop_rgb(rgb: np.ndarray, bbox: List[int]) -> np.ndarray: | |
| x1, y1, x2, y2 = bbox | |
| return rgb[y1:y2, x1:x2].copy() | |
| def looks_like_face_crop(rgb: np.ndarray) -> bool: | |
| h, w = rgb.shape[:2] | |
| if min(h, w) < 48: | |
| return False | |
| ratio = w / max(h, 1) | |
| return 0.55 <= ratio <= 1.8 | |
| def get_largest_face(detections: Any) -> Dict[str, Any] | None: | |
| if not isinstance(detections, dict): | |
| return None | |
| candidates = [] | |
| for _, face in detections.items(): | |
| facial_area = face.get("facial_area") | |
| if not facial_area or len(facial_area) != 4: | |
| continue | |
| x1, y1, x2, y2 = [int(v) for v in facial_area] | |
| area = max(0, x2 - x1) * max(0, y2 - y1) | |
| candidates.append((area, face)) | |
| if not candidates: | |
| return None | |
| return sorted(candidates, key=lambda x: x[0], reverse=True)[0][1] | |
| def detect_face_with_fallback(rgb: np.ndarray) -> Dict[str, Any] | None: | |
| try: | |
| detections = RetinaFace.detect_faces(rgb) | |
| except Exception: | |
| detections = {} | |
| face = get_largest_face(detections) | |
| if face is not None: | |
| return face | |
| h, w = rgb.shape[:2] | |
| min_side = min(h, w) | |
| if min_side < 160: | |
| scale = max(2, int(math.ceil(160 / max(1, min_side)))) | |
| enlarged = cv2.resize(rgb, (w * scale, h * scale), interpolation=cv2.INTER_CUBIC) | |
| try: | |
| detections = RetinaFace.detect_faces(enlarged) | |
| except Exception: | |
| detections = {} | |
| face = get_largest_face(detections) | |
| if face is not None: | |
| x1, y1, x2, y2 = [int(v) for v in face["facial_area"]] | |
| face["facial_area"] = [ | |
| max(0, x1 // scale), | |
| max(0, y1 // scale), | |
| min(w, x2 // scale), | |
| min(h, y2 // scale), | |
| ] | |
| return face | |
| return None | |
| def align_face_with_retinaface(rgb: np.ndarray, bbox: List[int], expand_pct: float = 12.0) -> np.ndarray: | |
| expanded = expand_bbox(bbox, rgb.shape, expand_pct=expand_pct) | |
| face_crop = crop_rgb(rgb, expanded) | |
| if face_crop.size == 0: | |
| raise ValueError("ไม่สามารถ crop ใบหน้าจากภาพได้") | |
| try: | |
| aligned_faces = RetinaFace.extract_faces(img_path=face_crop, align=True, expand_face_area=0) | |
| if aligned_faces: | |
| return to_uint8_rgb(aligned_faces[0]) | |
| except Exception: | |
| pass | |
| return to_uint8_rgb(face_crop) | |
| def load_esrgan_model(): | |
| global _ESRGAN_MODEL | |
| if _ESRGAN_MODEL is not None: | |
| return _ESRGAN_MODEL | |
| if torch is None or RRDBNet is None or RealESRGANer is None: | |
| raise RuntimeError("ยังไม่ได้ติดตั้ง torch / realesrgan / basicsr สำหรับ Real-ESRGAN") | |
| weights_dir = BASE_DIR / "weights" | |
| weights_dir.mkdir(parents=True, exist_ok=True) | |
| model_path = weights_dir / ESRGAN_MODEL_FILENAME | |
| if not model_path.exists(): | |
| urllib.request.urlretrieve(ESRGAN_MODEL_URL, model_path) | |
| model = RRDBNet(num_in_ch=3, num_out_ch=3, num_feat=64, num_block=23, num_grow_ch=32, scale=ESRGAN_SCALE) | |
| if torch.cuda.is_available(): | |
| gpu_id = 0 | |
| else: | |
| gpu_id = None | |
| _ESRGAN_MODEL = RealESRGANer( | |
| scale=ESRGAN_SCALE, | |
| model_path=str(model_path), | |
| model=model, | |
| tile=0, | |
| tile_pad=10, | |
| pre_pad=0, | |
| half=bool(torch.cuda.is_available()), | |
| gpu_id=gpu_id, | |
| ) | |
| return _ESRGAN_MODEL | |
| def trim_image_for_esrgan(rgb: np.ndarray) -> np.ndarray: | |
| h, w = rgb.shape[:2] | |
| trimmed_h = h - (h % 4) | |
| trimmed_w = w - (w % 4) | |
| if trimmed_h <= 0 or trimmed_w <= 0: | |
| raise ValueError("ภาพเล็กเกินไปสำหรับ ESRGAN") | |
| if trimmed_h == h and trimmed_w == w: | |
| return rgb | |
| return rgb[:trimmed_h, :trimmed_w] | |
| def maybe_downscale_for_esrgan(rgb: np.ndarray, max_dimension: int = ESRGAN_MAX_DIMENSION) -> np.ndarray: | |
| h, w = rgb.shape[:2] | |
| longest_side = max(h, w) | |
| if longest_side <= max_dimension: | |
| return rgb | |
| scale = max_dimension / float(longest_side) | |
| new_w = max(4, int(round(w * scale))) | |
| new_h = max(4, int(round(h * scale))) | |
| resized = cv2.resize(rgb, (new_w, new_h), interpolation=cv2.INTER_AREA) | |
| return trim_image_for_esrgan(resized) | |
| def enhance_with_esrgan(rgb: np.ndarray) -> np.ndarray: | |
| model = load_esrgan_model() | |
| prepared = maybe_downscale_for_esrgan(trim_image_for_esrgan(to_uint8_rgb(rgb))) | |
| sr_rgb, _ = model.enhance(prepared, outscale=ESRGAN_SCALE) | |
| return to_uint8_rgb(sr_rgb) | |
| def maybe_enhance_for_embedding(rgb: np.ndarray, use_esrgan: bool) -> np.ndarray: | |
| if not use_esrgan: | |
| return to_uint8_rgb(rgb) | |
| return enhance_with_esrgan(rgb) | |
| def seconds_to_hhmmss(seconds: float) -> str: | |
| total_ms = int(max(0, seconds) * 1000) | |
| hours = total_ms // 3600000 | |
| minutes = (total_ms % 3600000) // 60000 | |
| secs = (total_ms % 60000) // 1000 | |
| ms = total_ms % 1000 | |
| return f"{hours:02d}:{minutes:02d}:{secs:02d}.{ms:03d}" | |
| def cosine_distance(vec1: List[float], vec2: List[float]) -> float: | |
| a = np.asarray(vec1, dtype=np.float32) | |
| b = np.asarray(vec2, dtype=np.float32) | |
| denom = float(np.linalg.norm(a) * np.linalg.norm(b)) | |
| if denom == 0: | |
| return 1.0 | |
| similarity = float(np.dot(a, b) / denom) | |
| similarity = max(-1.0, min(1.0, similarity)) | |
| return 1.0 - similarity | |
| def embeddings_template() -> Dict[str, Any]: | |
| return {"model_name": RECOGNITION_MODEL, "entries": []} | |
| def reset_face_database_on_startup() -> None: | |
| if DB_DIR.exists(): | |
| for child in DB_DIR.iterdir(): | |
| if child.is_dir(): | |
| shutil.rmtree(child, ignore_errors=True) | |
| elif child.is_file(): | |
| child.unlink(missing_ok=True) | |
| save_embeddings(embeddings_template()) | |
| def load_embeddings() -> Dict[str, Any]: | |
| if not EMBEDDINGS_FILE.exists(): | |
| return embeddings_template() | |
| try: | |
| with open(EMBEDDINGS_FILE, "r", encoding="utf-8") as f: | |
| data = json.load(f) | |
| if "entries" not in data: | |
| return embeddings_template() | |
| return data | |
| except Exception: | |
| return embeddings_template() | |
| def save_embeddings(data: Dict[str, Any]) -> None: | |
| with open(EMBEDDINGS_FILE, "w", encoding="utf-8") as f: | |
| json.dump(data, f, ensure_ascii=False, indent=2) | |
| reset_face_database_on_startup() | |
| def represent_image(image: Path | np.ndarray) -> List[float]: | |
| image_input: str | np.ndarray | |
| if isinstance(image, Path): | |
| image_input = str(image) | |
| else: | |
| image_input = to_uint8_rgb(image) | |
| objs = DeepFace.represent( | |
| img_path=image_input, | |
| model_name=RECOGNITION_MODEL, | |
| detector_backend="skip", | |
| align=False, | |
| enforce_detection=False, | |
| normalization="ArcFace", | |
| ) | |
| if isinstance(objs, list) and objs: | |
| first = objs[0] | |
| if isinstance(first, dict) and "embedding" in first: | |
| return first["embedding"] | |
| if isinstance(objs, dict) and "embedding" in objs: | |
| return objs["embedding"] | |
| raise ValueError("ไม่สามารถสร้าง embedding จากภาพใบหน้าได้") | |
| def represent_images_batch(images: List[np.ndarray]) -> List[List[float]]: | |
| if not images: | |
| return [] | |
| batch_input = [to_uint8_rgb(img) for img in images] | |
| objs = DeepFace.represent( | |
| img_path=batch_input, | |
| model_name=RECOGNITION_MODEL, | |
| detector_backend="skip", | |
| align=False, | |
| enforce_detection=False, | |
| normalization="ArcFace", | |
| ) | |
| if not isinstance(objs, list): | |
| raise ValueError("รูปแบบผลลัพธ์จาก batch recognition ไม่ถูกต้อง") | |
| embeddings: List[List[float]] = [] | |
| for item in objs: | |
| if isinstance(item, list) and item: | |
| first = item[0] | |
| if isinstance(first, dict) and "embedding" in first: | |
| embeddings.append(first["embedding"]) | |
| continue | |
| if isinstance(item, dict) and "embedding" in item: | |
| embeddings.append(item["embedding"]) | |
| continue | |
| raise ValueError("ไม่สามารถสร้าง embedding แบบ batch จากภาพใบหน้าได้") | |
| return embeddings | |
| def prepare_reference_embeddings(entries: List[Dict[str, Any]]) -> Tuple[List[Dict[str, Any]], np.ndarray, np.ndarray]: | |
| valid_entries: List[Dict[str, Any]] = [] | |
| vectors: List[np.ndarray] = [] | |
| for entry in entries: | |
| ref_embedding = entry.get("embedding") | |
| if not ref_embedding: | |
| continue | |
| vec = np.asarray(ref_embedding, dtype=np.float32) | |
| if vec.ndim != 1 or vec.size == 0: | |
| continue | |
| valid_entries.append(entry) | |
| vectors.append(vec) | |
| if not vectors: | |
| return valid_entries, np.empty((0, 0), dtype=np.float32), np.empty((0,), dtype=np.float32) | |
| matrix = np.stack(vectors, axis=0) | |
| norms = np.linalg.norm(matrix, axis=1) | |
| return valid_entries, matrix, norms | |
| def render_db_overview() -> str: | |
| data = load_embeddings() | |
| counts: Dict[str, int] = {} | |
| display_names: Dict[str, str] = {} | |
| for entry in data.get("entries", []): | |
| folder = entry.get("user_folder", "user") | |
| counts[folder] = counts.get(folder, 0) + 1 | |
| display_names[folder] = entry.get("user_display", folder) | |
| total_people = len(counts) | |
| total_faces = sum(counts.values()) | |
| if not counts: | |
| list_items = "<li>ยังไม่มีข้อมูลลงทะเบียน</li>" | |
| else: | |
| ranked = sorted(counts.items(), key=lambda item: item[1], reverse=True)[:12] | |
| list_items = "".join( | |
| f"<li><b>{display_names.get(folder, folder)}</b> — {count} ภาพ</li>" for folder, count in ranked | |
| ) | |
| return f""" | |
| <div class='soft-note'> | |
| <div style='display:flex;gap:18px;flex-wrap:wrap;margin-bottom:10px'> | |
| <div><b>{total_people}</b><br/>บุคคลที่ลงทะเบียน</div> | |
| <div><b>{total_faces}</b><br/>ภาพใบหน้าในระบบ</div> | |
| <div><b>{RECOGNITION_MODEL}</b><br/>โมเดล Recognition</div> | |
| </div> | |
| <div><b>ฐานข้อมูลปัจจุบัน</b></div> | |
| <ul style='margin:8px 0 0 18px;padding:0'>{list_items}</ul> | |
| </div> | |
| """ | |
| def load_user_gallery(user_folder: str) -> List[Tuple[str, str]]: | |
| user_dir = DB_DIR / user_folder | |
| if not user_dir.exists(): | |
| return [] | |
| images = sorted( | |
| [p for p in user_dir.iterdir() if p.suffix.lower() in {".jpg", ".jpeg", ".png"}], | |
| reverse=True, | |
| ) | |
| gallery = [] | |
| for img_path in images[:24]: | |
| gallery.append((str(img_path), img_path.stem)) | |
| return gallery | |
| def rebuild_embeddings() -> Tuple[str, str]: | |
| data = embeddings_template() | |
| image_paths = sorted( | |
| [ | |
| p | |
| for p in DB_DIR.rglob("*") | |
| if p.is_file() and p.suffix.lower() in {".jpg", ".jpeg", ".png"} | |
| ] | |
| ) | |
| success = 0 | |
| failed = 0 | |
| for img_path in image_paths: | |
| user_folder = img_path.parent.name | |
| try: | |
| embedding = represent_image(img_path) | |
| data["entries"].append( | |
| { | |
| "user_folder": user_folder, | |
| "user_display": user_folder, | |
| "image_path": str(img_path), | |
| "embedding": embedding, | |
| "created_at": datetime.now().isoformat(timespec="seconds"), | |
| } | |
| ) | |
| success += 1 | |
| except Exception: | |
| failed += 1 | |
| save_embeddings(data) | |
| message = f"✅ รีเฟรชฐานข้อมูลเรียบร้อย: {success} ภาพ | ข้าม {failed} ภาพ" | |
| return message, render_db_overview() | |
| def register_face(image: np.ndarray, username: str, use_esrgan: bool): | |
| if image is None: | |
| return "⚠️ กรุณาอัปโหลดรูปสำหรับลงทะเบียน", [], render_db_overview() | |
| display_name = username.strip() | |
| if not display_name: | |
| return "⚠️ กรุณากรอกชื่อ user ก่อนบันทึก", [], render_db_overview() | |
| user_folder = sanitize_username(display_name) | |
| rgb = to_uint8_rgb(image) | |
| try: | |
| face = detect_face_with_fallback(rgb) | |
| used_full_image_fallback = False | |
| if face is not None: | |
| aligned_face = align_face_with_retinaface(rgb, face["facial_area"], expand_pct=14) | |
| elif looks_like_face_crop(rgb): | |
| aligned_face = to_uint8_rgb(rgb) | |
| used_full_image_fallback = True | |
| else: | |
| return "⚠️ ไม่พบใบหน้าในภาพลงทะเบียน", [], render_db_overview() | |
| embedding_input = maybe_enhance_for_embedding(aligned_face, use_esrgan=use_esrgan) | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| save_path = DB_DIR / user_folder / f"{timestamp}_{uuid.uuid4().hex[:6]}.jpg" | |
| save_rgb_image(save_path, embedding_input) | |
| embedding = represent_image(embedding_input) | |
| data = load_embeddings() | |
| data.setdefault("entries", []) | |
| data["entries"].append( | |
| { | |
| "user_folder": user_folder, | |
| "user_display": display_name, | |
| "image_path": str(save_path), | |
| "embedding": embedding, | |
| "created_at": datetime.now().isoformat(timespec="seconds"), | |
| } | |
| ) | |
| save_embeddings(data) | |
| gallery = load_user_gallery(user_folder) | |
| message = ( | |
| f"✅ บันทึกใบหน้าสำเร็จสำหรับ user: **{display_name}** \n" | |
| f"- เก็บไฟล์ที่: `{save_path}` \n" | |
| f"- ใช้โมเดล recognition: `{RECOGNITION_MODEL}` \n" | |
| f"- ESRGAN ก่อน embedding: `{'on' if use_esrgan else 'off'}`" | |
| ) | |
| if used_full_image_fallback: | |
| message += "\n- หมายเหตุ: ใช้ทั้งภาพเป็น face crop สำหรับลงทะเบียน เพราะ detector ไม่พบใบหน้าจากภาพขนาดเล็ก" | |
| return message, gallery, render_db_overview() | |
| except Exception as exc: | |
| return f"❌ ลงทะเบียนไม่สำเร็จ: {exc}", [], render_db_overview() | |
| def draw_face_annotations(rgb: np.ndarray, faces: List[Dict[str, Any]]) -> np.ndarray: | |
| canvas = to_uint8_rgb(rgb).copy() | |
| for item in faces: | |
| x1, y1, x2, y2 = item["bbox"] | |
| identity = item["identity"] | |
| distance = item["distance"] | |
| score = item.get("score", 0.0) | |
| color = (37, 99, 235) if identity != "Unknown" else (14, 165, 233) | |
| cv2.rectangle(canvas, (x1, y1), (x2, y2), color, 2) | |
| if math.isfinite(distance): | |
| label = f"{identity} | d={distance:.3f}" | |
| else: | |
| label = f"{identity} | conf={score:.2f}" | |
| (tw, th), baseline = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.52, 2) | |
| label_y1 = max(0, y1 - th - baseline - 10) | |
| label_y2 = label_y1 + th + baseline + 8 | |
| label_x2 = min(canvas.shape[1], x1 + tw + 12) | |
| cv2.rectangle(canvas, (x1, label_y1), (label_x2, label_y2), color, -1) | |
| cv2.putText( | |
| canvas, | |
| label, | |
| (x1 + 6, label_y2 - 6), | |
| cv2.FONT_HERSHEY_SIMPLEX, | |
| 0.52, | |
| (255, 255, 255), | |
| 2, | |
| cv2.LINE_AA, | |
| ) | |
| return canvas | |
| def face_to_data_uri(rgb: np.ndarray) -> str: | |
| image = Image.fromarray(to_uint8_rgb(rgb)) | |
| buffer = BytesIO() | |
| image.save(buffer, format="JPEG", quality=88) | |
| encoded = base64.b64encode(buffer.getvalue()).decode("ascii") | |
| return f"data:image/jpeg;base64,{encoded}" | |
| def render_log_feed(log_items: List[Dict[str, Any]]) -> str: | |
| if not log_items: | |
| return """ | |
| <div class='log-empty'> | |
| ยังไม่มี face logs ตอนนี้ ระบบจะแสดงชื่อคนที่พบ, ค่า distance, threshold และเหตุผลที่เป็น Unknown ตรงนี้ | |
| </div> | |
| """ | |
| cards: List[str] = [] | |
| for item in log_items: | |
| processed_thumb = item.get("processed_thumbnail", "") | |
| identity = html.escape(str(item.get("identity", "Unknown"))) | |
| closest = html.escape(str(item.get("best_match_name", "Unknown"))) | |
| status = item.get("status", "Unknown") | |
| badge_class = "matched" if status == "Matched" else "unknown" | |
| event_time = html.escape(str(item.get("event_time", "-"))) | |
| video_time = html.escape(str(item.get("video_time", "-"))) | |
| reason = html.escape(str(item.get("reason", ""))) | |
| distance = item.get("distance") | |
| threshold = item.get("threshold") | |
| bbox = html.escape(str(item.get("bbox", "-"))) | |
| processed_label = html.escape(str(item.get("processed_label", "Processed preview"))) | |
| processed_width = int(item.get("processed_width", 0) or 0) | |
| processed_height = int(item.get("processed_height", 0) or 0) | |
| distance_text = f"{distance:.4f}" if isinstance(distance, (int, float)) and math.isfinite(distance) else "-" | |
| threshold_text = f"{threshold:.2f}" if isinstance(threshold, (int, float)) else "-" | |
| size_text = ( | |
| f"{processed_width}x{processed_height}" | |
| if processed_width > 0 and processed_height > 0 | |
| else "-" | |
| ) | |
| img_size_attrs = ( | |
| f" width='{processed_width}' height='{processed_height}'" | |
| if processed_width > 0 and processed_height > 0 | |
| else "" | |
| ) | |
| cards.append( | |
| f""" | |
| <div class='log-card'> | |
| <div class='log-thumb-wrap'> | |
| <div class='log-thumb-label'>{processed_label}</div> | |
| <img class='log-thumb' src='{processed_thumb}' alt='processed face thumbnail'{img_size_attrs} /> | |
| </div> | |
| <div class='log-main'> | |
| <div class='log-topline'> | |
| <div class='log-name'>{identity}</div> | |
| <div class='log-badge {badge_class}'>{status}</div> | |
| </div> | |
| <div class='log-meta'> | |
| <span>video_time: <b>{video_time}</b></span> | |
| <span>distance: <b>{distance_text}</b></span> | |
| <span>threshold: <b>{threshold_text}</b></span> | |
| <span>closest match: <b>{closest}</b></span> | |
| <span>preview: <b>{processed_label}</b></span> | |
| <span>size: <b>{size_text}</b></span> | |
| </div> | |
| <div class='log-reason'>{reason}</div> | |
| <div class='log-meta'> | |
| <span>bbox: <b>{bbox}</b></span> | |
| </div> | |
| </div> | |
| <div class='log-time'>{event_time}</div> | |
| </div> | |
| """ | |
| ) | |
| return f"<div class='log-feed'>{''.join(cards)}</div>" | |
| def find_best_match(embedding: List[float], entries: List[Dict[str, Any]], threshold: float) -> Dict[str, Any]: | |
| best_identity = "Unknown" | |
| best_distance = float("inf") | |
| for entry in entries: | |
| ref_embedding = entry.get("embedding") | |
| if not ref_embedding: | |
| continue | |
| dist = cosine_distance(embedding, ref_embedding) | |
| if dist < best_distance: | |
| best_distance = dist | |
| best_identity = entry.get("user_display") or entry.get("user_folder") or "Unknown" | |
| matched = best_distance <= threshold | |
| identity = best_identity if matched else "Unknown" | |
| if math.isfinite(best_distance): | |
| if matched: | |
| reason = ( | |
| f"Matched {best_identity} เพราะ cosine distance {best_distance:.4f} <= threshold {threshold:.2f}" | |
| ) | |
| else: | |
| reason = ( | |
| f"Unknown เพราะ cosine distance {best_distance:.4f} > threshold {threshold:.2f}; " | |
| f"closest match คือ {best_identity}" | |
| ) | |
| else: | |
| reason = "ไม่สามารถคำนวณ distance ที่ใช้งานได้จาก embedding ชุดนี้" | |
| return { | |
| "identity": identity, | |
| "best_match_name": best_identity, | |
| "distance": best_distance, | |
| "matched": matched, | |
| "threshold": threshold, | |
| "reason": reason, | |
| } | |
| def find_best_matches_batch( | |
| embeddings: List[List[float]], | |
| entries: List[Dict[str, Any]], | |
| threshold: float, | |
| ref_matrix: np.ndarray, | |
| ref_norms: np.ndarray, | |
| ) -> List[Dict[str, Any]]: | |
| if not embeddings: | |
| return [] | |
| if ref_matrix.size == 0 or not entries: | |
| return [ | |
| { | |
| "identity": "Unknown", | |
| "best_match_name": "Unknown", | |
| "distance": float("inf"), | |
| "matched": False, | |
| "threshold": threshold, | |
| "reason": "ไม่มีฐานข้อมูล embedding ที่พร้อมใช้งานสำหรับการเปรียบเทียบ", | |
| } | |
| for _ in embeddings | |
| ] | |
| emb_matrix = np.asarray(embeddings, dtype=np.float32) | |
| if emb_matrix.ndim == 1: | |
| emb_matrix = np.expand_dims(emb_matrix, axis=0) | |
| emb_norms = np.linalg.norm(emb_matrix, axis=1) | |
| denom = emb_norms[:, None] * ref_norms[None, :] | |
| safe_denom = np.where(denom == 0, 1e-12, denom) | |
| similarities = np.matmul(emb_matrix, ref_matrix.T) / safe_denom | |
| similarities = np.clip(similarities, -1.0, 1.0) | |
| distances = 1.0 - similarities | |
| results: List[Dict[str, Any]] = [] | |
| for row in distances: | |
| best_idx = int(np.argmin(row)) | |
| best_distance = float(row[best_idx]) | |
| best_identity = entries[best_idx].get("user_display") or entries[best_idx].get("user_folder") or "Unknown" | |
| matched = best_distance <= threshold | |
| identity = best_identity if matched else "Unknown" | |
| if matched: | |
| reason = f"Matched {best_identity} เพราะ cosine distance {best_distance:.4f} <= threshold {threshold:.2f}" | |
| else: | |
| reason = ( | |
| f"Unknown เพราะ cosine distance {best_distance:.4f} > threshold {threshold:.2f}; " | |
| f"closest match คือ {best_identity}" | |
| ) | |
| results.append( | |
| { | |
| "identity": identity, | |
| "best_match_name": best_identity, | |
| "distance": best_distance, | |
| "matched": matched, | |
| "threshold": threshold, | |
| "reason": reason, | |
| } | |
| ) | |
| return results | |
| def reset_runtime_panel(): | |
| return ( | |
| "🟦 พร้อมประมวลผลวิดีโอ — อัปโหลดวิดีโอที่แท็บซ้ายแล้วกดเริ่ม", | |
| None, | |
| render_log_feed([]), | |
| ) | |
| def resolve_video_path(video_value): | |
| if video_value is None: | |
| return None | |
| if isinstance(video_value, str): | |
| return video_value | |
| if isinstance(video_value, dict): | |
| return video_value.get("video") or video_value.get("path") or video_value.get("name") | |
| if isinstance(video_value, (list, tuple)) and len(video_value) > 0: | |
| return video_value[0] | |
| return str(video_value) | |
| def process_video_stream( | |
| video_value, | |
| process_fps: float, | |
| detector_score_threshold: float, | |
| recognition_threshold: float, | |
| expand_pct: int, | |
| log_cooldown_sec: float, | |
| use_align: bool, | |
| use_esrgan: bool, | |
| ): | |
| video_path = resolve_video_path(video_value) | |
| if not video_path: | |
| yield "⚠️ กรุณาอัปโหลดวิดีโอก่อนเริ่มประมวลผล", None, render_log_feed([]) | |
| return | |
| data = load_embeddings() | |
| entries = data.get("entries", []) | |
| if not entries: | |
| rebuild_message, _ = rebuild_embeddings() | |
| data = load_embeddings() | |
| entries = data.get("entries", []) | |
| if not entries: | |
| yield ( | |
| "⚠️ ยังไม่มีฐานข้อมูลใบหน้าที่พร้อมใช้งาน กรุณาลงทะเบียนรูปบุคคลก่อน\n\n" | |
| + rebuild_message, | |
| None, | |
| render_log_feed([]), | |
| ) | |
| return | |
| valid_entries, ref_matrix, ref_norms = prepare_reference_embeddings(entries) | |
| cap = cv2.VideoCapture(video_path) | |
| if not cap.isOpened(): | |
| yield "❌ เปิดวิดีโอไม่ได้", None, render_log_feed([]) | |
| return | |
| src_fps = cap.get(cv2.CAP_PROP_FPS) or 25.0 | |
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT) or 0) | |
| detect_every = max(1, int(round(src_fps / max(process_fps, 0.1)))) | |
| last_faces: List[Dict[str, Any]] = [] | |
| log_items: List[Dict[str, Any]] = [] | |
| last_logged_second: Dict[str, float] = {} | |
| frame_index = 0 | |
| preview_frame = None | |
| try: | |
| while True: | |
| ok, bgr_frame = cap.read() | |
| if not ok: | |
| break | |
| rgb_frame = cv2.cvtColor(bgr_frame, cv2.COLOR_BGR2RGB) | |
| if frame_index % detect_every == 0: | |
| current_faces: List[Dict[str, Any]] = [] | |
| try: | |
| detections = RetinaFace.detect_faces(rgb_frame, threshold=detector_score_threshold) | |
| except Exception: | |
| detections = {} | |
| if isinstance(detections, dict): | |
| faces_sorted = sorted( | |
| detections.values(), | |
| key=lambda face: max(0, int(face["facial_area"][2]) - int(face["facial_area"][0])) | |
| * max(0, int(face["facial_area"][3]) - int(face["facial_area"][1])), | |
| reverse=True, | |
| ) | |
| pending_faces: List[Dict[str, Any]] = [] | |
| for face in faces_sorted: | |
| raw_bbox = [int(v) for v in face["facial_area"]] | |
| bbox = expand_bbox(raw_bbox, rgb_frame.shape, expand_pct=float(expand_pct)) | |
| raw_face_rgb = crop_rgb(rgb_frame, bbox) | |
| aligned_face_rgb = align_face_with_retinaface( | |
| rgb_frame, raw_bbox, expand_pct=float(expand_pct) | |
| ) if use_align else None | |
| preview_face_rgb = aligned_face_rgb if aligned_face_rgb is not None else crop_rgb(rgb_frame, bbox) | |
| if preview_face_rgb.size == 0 or raw_face_rgb.size == 0: | |
| continue | |
| video_second = frame_index / src_fps if src_fps > 0 else 0.0 | |
| pending_faces.append( | |
| { | |
| "bbox": bbox, | |
| "raw_face_rgb": raw_face_rgb, | |
| "preview_face_rgb": preview_face_rgb, | |
| "score": float(face.get("score", 0.0)), | |
| "video_second": video_second, | |
| } | |
| ) | |
| if pending_faces: | |
| embedding_inputs = [ | |
| maybe_enhance_for_embedding(item["preview_face_rgb"], use_esrgan=use_esrgan) | |
| for item in pending_faces | |
| ] | |
| batch_embeddings = represent_images_batch(embedding_inputs) | |
| matches = find_best_matches_batch( | |
| embeddings=batch_embeddings, | |
| entries=valid_entries, | |
| threshold=recognition_threshold, | |
| ref_matrix=ref_matrix, | |
| ref_norms=ref_norms, | |
| ) | |
| for item, match, processed_face_rgb in zip(pending_faces, matches, embedding_inputs): | |
| bbox = item["bbox"] | |
| current_faces.append( | |
| { | |
| "bbox": bbox, | |
| "identity": match["identity"], | |
| "distance": match["distance"], | |
| "score": item["score"], | |
| } | |
| ) | |
| bucket_x = bbox[0] // 120 | |
| bucket_y = bbox[1] // 120 | |
| dedupe_key = ( | |
| match["best_match_name"] | |
| if match["matched"] | |
| else f"Unknown_{bucket_x}_{bucket_y}" | |
| ) | |
| if item["video_second"] - last_logged_second.get(dedupe_key, -999.0) >= log_cooldown_sec: | |
| last_logged_second[dedupe_key] = item["video_second"] | |
| event_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| log_items.insert( | |
| 0, | |
| { | |
| "event_time": event_time, | |
| "video_time": seconds_to_hhmmss(item["video_second"]), | |
| "identity": match["identity"], | |
| "best_match_name": match["best_match_name"], | |
| "distance": match["distance"], | |
| "bbox": bbox, | |
| "status": "Matched" if match["matched"] else "Unknown", | |
| "threshold": match["threshold"], | |
| "reason": match["reason"], | |
| "processed_thumbnail": face_to_data_uri(processed_face_rgb), | |
| "processed_label": ( | |
| ("Aligned face" if use_align else "Raw crop") | |
| + (" + ESRGAN" if use_esrgan else "") | |
| ), | |
| "processed_width": int(processed_face_rgb.shape[1]), | |
| "processed_height": int(processed_face_rgb.shape[0]), | |
| }, | |
| ) | |
| log_items = log_items[:MAX_LOG_ITEMS] | |
| last_faces = current_faces | |
| preview_frame = draw_face_annotations(rgb_frame, last_faces) | |
| progress = (frame_index + 1) / total_frames * 100.0 if total_frames > 0 else 0.0 | |
| if frame_index % max(1, detect_every) == 0: | |
| status = ( | |
| f"🔄 กำลังประมวลผลวิดีโอ... {progress:.1f}% \n" | |
| f"- FPS ต้นฉบับ: `{src_fps:.2f}` \n" | |
| f"- ประมวลผลทุก ๆ `{detect_every}` เฟรม \n" | |
| f"- Recognition mode: `micro-batch per detection step` \n" | |
| f"- Align ระหว่างวิดีโอ: `{'on' if use_align else 'off'}` \n" | |
| f"- ESRGAN ก่อน embedding: `{'on' if use_esrgan else 'off'}` \n" | |
| f"- Registered identities: `{len({e.get('user_folder', 'u') for e in entries})}` \n" | |
| f"- Threshold rule: `distance <= {recognition_threshold:.2f}` ถึงจะนับว่าเป็นคนเดิม \n" | |
| f"- Log events: `{len(log_items)}`" | |
| ) | |
| yield status, preview_frame, render_log_feed(log_items) | |
| frame_index += 1 | |
| except Exception as exc: | |
| yield f"❌ เกิดข้อผิดพลาดระหว่างประมวลผล: {exc}", preview_frame, render_log_feed(log_items) | |
| return | |
| finally: | |
| cap.release() | |
| final_status = ( | |
| f"✅ ประมวลผลเสร็จแล้ว \n" | |
| f"- จำนวน log event: `{len(log_items)}` \n" | |
| f"- กติกา match: `distance <= {recognition_threshold:.2f}` \n" | |
| f"- ESRGAN ก่อน embedding: `{'on' if use_esrgan else 'off'}` \n" | |
| f"- โมเดล recognition: `{RECOGNITION_MODEL}`" | |
| ) | |
| yield final_status, preview_frame, render_log_feed(log_items) | |
| with gr.Blocks( | |
| theme=gr.themes.Soft(primary_hue="sky", secondary_hue="blue", neutral_hue="slate"), | |
| css=CUSTOM_CSS, | |
| title="CCTV Face Recognition with RetinaFace + DeepFace", | |
| ) as demo: | |
| gr.HTML( | |
| """ | |
| <div class='app-shell'> | |
| <div class='hero-card'> | |
| <div style='font-size:30px;font-weight:800;color:#0f172a'>CCTV Face Recognition Dashboard</div> | |
| <div style='margin-top:8px;color:#334155;font-size:15px;line-height:1.65'> | |
| แอป Gradio สำหรับ <b>Face Detection + Alignment + Recognition</b> ด้วย | |
| <b>RetinaFace</b> และ <b>DeepFace (ArcFace)</b><br/> | |
| ฝั่งซ้ายใช้สำหรับลงทะเบียนใบหน้า, อัปโหลดวิดีโอ, live preview และการตั้งค่าความเร็ว/ความแม่นยำ | |
| ส่วนฝั่งขวาแสดง face logs แบบสรุปเหตุการณ์พร้อมเหตุผลของผลรู้จำ | |
| โดยระบบจะล้างฐานข้อมูลใบหน้าทุกครั้งที่เริ่มรันสคริปต์ใหม่ | |
| </div> | |
| </div> | |
| </div> | |
| """ | |
| ) | |
| with gr.Row(equal_height=False): | |
| with gr.Column(scale=4, elem_classes=["panel"]): | |
| gr.Markdown("### ฝั่งซ้าย • อัปโหลดและตั้งค่า") | |
| with gr.Tabs(): | |
| with gr.Tab("1) Upload รูปลงทะเบียน"): | |
| reg_image = gr.Image( | |
| type="numpy", | |
| label="อัปโหลดรูปบุคคล", | |
| sources=["upload", "webcam"], | |
| height=330, | |
| ) | |
| reg_user = gr.Textbox( | |
| label="User", | |
| placeholder="เช่น admin01", | |
| ) | |
| reg_use_esrgan = gr.Checkbox( | |
| label="Use ESRGAN before embedding", | |
| value=False, | |
| info="เหมาะกับรูปจากกล้องที่เบลอหรือรายละเอียดน้อย แต่จะช้ากว่าปกติและโหลดโมเดลครั้งแรกอาจใช้เวลา", | |
| ) | |
| with gr.Row(): | |
| reg_save_btn = gr.Button("บันทึกลงระบบ", elem_classes=["blue-btn"]) | |
| reg_refresh_btn = gr.Button("รีเฟรชฐานข้อมูล", elem_classes=["neutral-btn"]) | |
| reg_status = gr.Markdown(value="พร้อมลงทะเบียน") | |
| reg_gallery = gr.Gallery( | |
| label="รูปใบหน้าที่เก็บของ user นี้", | |
| columns=4, | |
| height=260, | |
| object_fit="cover", | |
| ) | |
| db_summary = gr.HTML(render_db_overview()) | |
| with gr.Tab("2) Upload video + Config"): | |
| video_input = gr.Video( | |
| sources=["upload"], | |
| label="อัปโหลดวิดีโอจากกล้องวงจรปิด", | |
| height=330, | |
| ) | |
| live_preview = gr.Image( | |
| label="Live Preview (วาด BBox ขณะประมวลผล)", | |
| type="numpy", | |
| interactive=False, | |
| height=330, | |
| ) | |
| process_fps = gr.Slider( | |
| minimum=1, | |
| maximum=12, | |
| value=3, | |
| step=1, | |
| label="Process FPS", | |
| info="จำนวนเฟรมต่อวินาทีที่ใช้ตรวจจับและจดจำใบหน้า", | |
| ) | |
| detector_score_threshold = gr.Slider( | |
| minimum=0.10, | |
| maximum=0.99, | |
| value=0.85, | |
| step=0.01, | |
| label="Detection Score Threshold", | |
| ) | |
| recognition_threshold = gr.Slider( | |
| minimum=0.15, | |
| maximum=1.00, | |
| value=0.85, | |
| step=0.01, | |
| label="Recognition Threshold (Cosine Distance)", | |
| info="กติกาคือ distance ต้องน้อยกว่าหรือเท่ากับค่านี้จึงจะ match; ค่าเริ่มต้น 0.85 จะผ่อนกว่าค่าเดิม", | |
| ) | |
| expand_pct = gr.Slider( | |
| minimum=0, | |
| maximum=30, | |
| value=10, | |
| step=1, | |
| label="Expand Face Area (%)", | |
| ) | |
| log_cooldown = gr.Slider( | |
| minimum=0.0, | |
| maximum=10.0, | |
| value=0, | |
| step=0.5, | |
| label="Log Cooldown (seconds)", | |
| info="ลด log ซ้ำของคนเดิมในช่วงเวลาใกล้กัน", | |
| ) | |
| use_align = gr.Checkbox( | |
| label="Use face alignment", | |
| value=True, | |
| info="ปิดไว้จะเร็วสุดสำหรับวิดีโอ; เปิดเมื่ออยากเพิ่มความนิ่งของใบหน้าก่อนสร้าง embedding", | |
| ) | |
| use_esrgan = gr.Checkbox( | |
| label="Use ESRGAN before embedding", | |
| value=False, | |
| info="ปรับรายละเอียด face crop จากกล้องก่อนทำ embedding; ช่วยกับภาพเล็กหรือแตก แต่จะช้าลงชัดเจน", | |
| ) | |
| with gr.Row(): | |
| start_btn = gr.Button("เริ่มประมวลผล", elem_classes=["blue-btn"]) | |
| clear_btn = gr.Button("ล้างผลลัพธ์", elem_classes=["neutral-btn"]) | |
| gr.HTML( | |
| """ | |
| <div class='soft-note'> | |
| ใช้ <b>RetinaFace.detect_faces</b> สำหรับตรวจจับ, | |
| เปิด <b>Use face alignment</b> เมื่อต้องการจัดแนวใบหน้าก่อน recognition, | |
| เปิด <b>Use ESRGAN before embedding</b> เมื่อต้องการเพิ่มรายละเอียด face crop จากภาพกล้องก่อนสร้าง embedding, | |
| และใช้ <b>DeepFace.represent</b> ด้วยโมเดล <b>ArcFace</b> สำหรับ recognition โดยคำนวณจากภาพใน memory เพื่อลดเวลา I/O | |
| </div> | |
| """ | |
| ) | |
| with gr.Column(scale=6, elem_classes=["panel"]): | |
| gr.Markdown("### ฝั่งขวา • Face Logs") | |
| runtime_status = gr.Markdown( | |
| value="🟦 พร้อมประมวลผลวิดีโอ — อัปโหลดวิดีโอที่แท็บซ้ายแล้วกดเริ่ม", | |
| elem_classes=["status-box"], | |
| ) | |
| face_log_feed = gr.HTML(value=render_log_feed([])) | |
| reg_save_btn.click( | |
| fn=register_face, | |
| inputs=[reg_image, reg_user, reg_use_esrgan], | |
| outputs=[reg_status, reg_gallery, db_summary], | |
| ) | |
| reg_refresh_btn.click( | |
| fn=rebuild_embeddings, | |
| inputs=[], | |
| outputs=[reg_status, db_summary], | |
| ) | |
| start_btn.click( | |
| fn=process_video_stream, | |
| inputs=[ | |
| video_input, | |
| process_fps, | |
| detector_score_threshold, | |
| recognition_threshold, | |
| expand_pct, | |
| log_cooldown, | |
| use_align, | |
| use_esrgan, | |
| ], | |
| outputs=[runtime_status, live_preview, face_log_feed], | |
| ) | |
| clear_btn.click( | |
| fn=reset_runtime_panel, | |
| inputs=[], | |
| outputs=[runtime_status, live_preview, face_log_feed], | |
| ) | |
| demo.queue(default_concurrency_limit=1) | |
| if __name__ == "__main__": | |
| modeling.build_model(task="facial_recognition", model_name=RECOGNITION_MODEL) | |
| demo.launch(server_name="0.0.0.0", server_port=int(os.getenv("PORT", "7860")),share=False) | |