from __future__ import annotations import base64 import html import json import math import os import re import shutil import sys import urllib.request import uuid from datetime import datetime from io import BytesIO from pathlib import Path from typing import Any, Dict, List, Tuple import cv2 import gradio as gr import numpy as np from PIL import Image from deepface import DeepFace from deepface.modules import modeling from retinaface import RetinaFace try: try: import torchvision.transforms._functional_tensor as tv_functional_tensor sys.modules.setdefault("torchvision.transforms.functional_tensor", tv_functional_tensor) except Exception: pass import torch from basicsr.archs.rrdbnet_arch import RRDBNet from realesrgan import RealESRGANer except Exception: torch = None RRDBNet = None RealESRGANer = None BASE_DIR = Path(__file__).resolve().parent DB_DIR = BASE_DIR / "face_db" OUTPUT_DIR = BASE_DIR / "outputs" EMBEDDINGS_FILE = BASE_DIR / "face_db_embeddings.json" RECOGNITION_MODEL = "ArcFace" MAX_LOG_ITEMS = 120 ESRGAN_MODEL_URL = "https://github.com/xinntao/Real-ESRGAN/releases/download/v0.1.0/RealESRGAN_x4plus.pth" ESRGAN_MODEL_FILENAME = "RealESRGAN_x4plus.pth" ESRGAN_SCALE = 4 ESRGAN_MAX_DIMENSION = 1024 _ESRGAN_MODEL = None for folder in (DB_DIR, OUTPUT_DIR): folder.mkdir(parents=True, exist_ok=True) CUSTOM_CSS = """ :root { --sky-1: #f6fbff; --sky-2: #eef7ff; --sky-3: #dbeeff; --sky-4: #b9ddff; --sky-5: #2563eb; --sky-6: #0f172a; } .gradio-container { background: linear-gradient(180deg, #f8fbff 0%, #f1f8ff 100%); } .app-shell { max-width: 1480px; margin: 0 auto; } .hero-card { background: linear-gradient(135deg, rgba(37, 99, 235, 0.10), rgba(125, 211, 252, 0.16)); border: 1px solid rgba(37, 99, 235, 0.16); border-radius: 24px; padding: 20px 24px; margin-bottom: 14px; box-shadow: 0 12px 34px rgba(37, 99, 235, 0.08); } .panel { background: rgba(255, 255, 255, 0.90); border: 1px solid rgba(148, 163, 184, 0.18); border-radius: 24px; padding: 14px; box-shadow: 0 14px 36px rgba(15, 23, 42, 0.06); } .soft-note { background: linear-gradient(180deg, rgba(239, 246, 255, 0.95), rgba(255, 255, 255, 0.96)); border: 1px dashed rgba(37, 99, 235, 0.25); border-radius: 18px; padding: 12px 14px; color: #1e3a8a; } .blue-btn button, .blue-btn button:hover { background: linear-gradient(90deg, #2563eb 0%, #38bdf8 100%) !important; color: white !important; border: none !important; } .neutral-btn button, .neutral-btn button:hover { background: white !important; color: #0f172a !important; border: 1px solid rgba(148, 163, 184, 0.35) !important; } .status-box { background: white; border: 1px solid rgba(37, 99, 235, 0.12); border-radius: 18px; padding: 12px 14px; } .log-feed { display: flex; flex-direction: column; gap: 12px; } .log-card { display: grid; grid-template-columns: auto 1fr auto; gap: 14px; align-items: start; background: linear-gradient(180deg, rgba(255,255,255,0.98), rgba(248,250,252,0.96)); border: 1px solid rgba(148, 163, 184, 0.22); border-radius: 20px; padding: 12px; box-shadow: 0 10px 28px rgba(15, 23, 42, 0.06); } .log-thumb-wrap { display: flex; flex-direction: column; gap: 6px; } .log-thumb { display: block; max-width: none; height: auto; border-radius: 14px; background: #e2e8f0; } .log-thumb-label { font-size: 12px; font-weight: 700; color: #475569; text-transform: uppercase; letter-spacing: 0.04em; } .log-main { min-width: 0; } .log-topline { display: flex; flex-wrap: wrap; gap: 10px; align-items: center; margin-bottom: 6px; } .log-name { font-size: 28px; font-weight: 800; color: #1e293b; line-height: 1.05; } .log-badge { display: inline-flex; align-items: center; border-radius: 999px; padding: 6px 14px; font-size: 18px; font-weight: 800; color: white; } .log-badge.matched { background: linear-gradient(90deg, #16a34a 0%, #22c55e 100%); } .log-badge.unknown { background: linear-gradient(90deg, #dc2626 0%, #ef4444 100%); } .log-meta { display: flex; flex-wrap: wrap; gap: 14px; color: #334155; font-size: 15px; margin-bottom: 8px; } .log-reason { color: #1e40af; font-size: 15px; line-height: 1.5; } .log-time { text-align: right; color: #64748b; font-size: 16px; font-weight: 700; white-space: nowrap; } .log-empty { padding: 22px; border-radius: 18px; border: 1px dashed rgba(148, 163, 184, 0.35); color: #475569; background: rgba(255,255,255,0.78); } """ def sanitize_username(username: str) -> str: cleaned = re.sub(r"[^a-zA-Z0-9ก-๙_-]+", "_", username.strip()) cleaned = re.sub(r"_+", "_", cleaned).strip("_") return cleaned or "user" def to_uint8_rgb(image: Any) -> np.ndarray: arr = np.array(image) if arr.ndim == 2: arr = cv2.cvtColor(arr, cv2.COLOR_GRAY2RGB) if arr.ndim == 3 and arr.shape[2] == 4: arr = cv2.cvtColor(arr, cv2.COLOR_RGBA2RGB) if arr.dtype != np.uint8: if np.max(arr) <= 1.0: arr = (arr * 255.0).clip(0, 255).astype(np.uint8) else: arr = arr.clip(0, 255).astype(np.uint8) return arr def save_rgb_image(path: Path, rgb: np.ndarray) -> None: path.parent.mkdir(parents=True, exist_ok=True) bgr = cv2.cvtColor(to_uint8_rgb(rgb), cv2.COLOR_RGB2BGR) cv2.imwrite(str(path), bgr) def expand_bbox(bbox: List[float], image_shape: Tuple[int, int, int], expand_pct: float = 12.0) -> List[int]: h, w = image_shape[:2] x1, y1, x2, y2 = [int(v) for v in bbox] bw, bh = max(1, x2 - x1), max(1, y2 - y1) pad_x = int(bw * (expand_pct / 100.0)) pad_y = int(bh * (expand_pct / 100.0)) x1 = max(0, x1 - pad_x) y1 = max(0, y1 - pad_y) x2 = min(w, x2 + pad_x) y2 = min(h, y2 + pad_y) return [x1, y1, x2, y2] def crop_rgb(rgb: np.ndarray, bbox: List[int]) -> np.ndarray: x1, y1, x2, y2 = bbox return rgb[y1:y2, x1:x2].copy() def looks_like_face_crop(rgb: np.ndarray) -> bool: h, w = rgb.shape[:2] if min(h, w) < 48: return False ratio = w / max(h, 1) return 0.55 <= ratio <= 1.8 def get_largest_face(detections: Any) -> Dict[str, Any] | None: if not isinstance(detections, dict): return None candidates = [] for _, face in detections.items(): facial_area = face.get("facial_area") if not facial_area or len(facial_area) != 4: continue x1, y1, x2, y2 = [int(v) for v in facial_area] area = max(0, x2 - x1) * max(0, y2 - y1) candidates.append((area, face)) if not candidates: return None return sorted(candidates, key=lambda x: x[0], reverse=True)[0][1] def detect_face_with_fallback(rgb: np.ndarray) -> Dict[str, Any] | None: try: detections = RetinaFace.detect_faces(rgb) except Exception: detections = {} face = get_largest_face(detections) if face is not None: return face h, w = rgb.shape[:2] min_side = min(h, w) if min_side < 160: scale = max(2, int(math.ceil(160 / max(1, min_side)))) enlarged = cv2.resize(rgb, (w * scale, h * scale), interpolation=cv2.INTER_CUBIC) try: detections = RetinaFace.detect_faces(enlarged) except Exception: detections = {} face = get_largest_face(detections) if face is not None: x1, y1, x2, y2 = [int(v) for v in face["facial_area"]] face["facial_area"] = [ max(0, x1 // scale), max(0, y1 // scale), min(w, x2 // scale), min(h, y2 // scale), ] return face return None def align_face_with_retinaface(rgb: np.ndarray, bbox: List[int], expand_pct: float = 12.0) -> np.ndarray: expanded = expand_bbox(bbox, rgb.shape, expand_pct=expand_pct) face_crop = crop_rgb(rgb, expanded) if face_crop.size == 0: raise ValueError("ไม่สามารถ crop ใบหน้าจากภาพได้") try: aligned_faces = RetinaFace.extract_faces(img_path=face_crop, align=True, expand_face_area=0) if aligned_faces: return to_uint8_rgb(aligned_faces[0]) except Exception: pass return to_uint8_rgb(face_crop) def load_esrgan_model(): global _ESRGAN_MODEL if _ESRGAN_MODEL is not None: return _ESRGAN_MODEL if torch is None or RRDBNet is None or RealESRGANer is None: raise RuntimeError("ยังไม่ได้ติดตั้ง torch / realesrgan / basicsr สำหรับ Real-ESRGAN") weights_dir = BASE_DIR / "weights" weights_dir.mkdir(parents=True, exist_ok=True) model_path = weights_dir / ESRGAN_MODEL_FILENAME if not model_path.exists(): urllib.request.urlretrieve(ESRGAN_MODEL_URL, model_path) model = RRDBNet(num_in_ch=3, num_out_ch=3, num_feat=64, num_block=23, num_grow_ch=32, scale=ESRGAN_SCALE) if torch.cuda.is_available(): gpu_id = 0 else: gpu_id = None _ESRGAN_MODEL = RealESRGANer( scale=ESRGAN_SCALE, model_path=str(model_path), model=model, tile=0, tile_pad=10, pre_pad=0, half=bool(torch.cuda.is_available()), gpu_id=gpu_id, ) return _ESRGAN_MODEL def trim_image_for_esrgan(rgb: np.ndarray) -> np.ndarray: h, w = rgb.shape[:2] trimmed_h = h - (h % 4) trimmed_w = w - (w % 4) if trimmed_h <= 0 or trimmed_w <= 0: raise ValueError("ภาพเล็กเกินไปสำหรับ ESRGAN") if trimmed_h == h and trimmed_w == w: return rgb return rgb[:trimmed_h, :trimmed_w] def maybe_downscale_for_esrgan(rgb: np.ndarray, max_dimension: int = ESRGAN_MAX_DIMENSION) -> np.ndarray: h, w = rgb.shape[:2] longest_side = max(h, w) if longest_side <= max_dimension: return rgb scale = max_dimension / float(longest_side) new_w = max(4, int(round(w * scale))) new_h = max(4, int(round(h * scale))) resized = cv2.resize(rgb, (new_w, new_h), interpolation=cv2.INTER_AREA) return trim_image_for_esrgan(resized) def enhance_with_esrgan(rgb: np.ndarray) -> np.ndarray: model = load_esrgan_model() prepared = maybe_downscale_for_esrgan(trim_image_for_esrgan(to_uint8_rgb(rgb))) sr_rgb, _ = model.enhance(prepared, outscale=ESRGAN_SCALE) return to_uint8_rgb(sr_rgb) def maybe_enhance_for_embedding(rgb: np.ndarray, use_esrgan: bool) -> np.ndarray: if not use_esrgan: return to_uint8_rgb(rgb) return enhance_with_esrgan(rgb) def seconds_to_hhmmss(seconds: float) -> str: total_ms = int(max(0, seconds) * 1000) hours = total_ms // 3600000 minutes = (total_ms % 3600000) // 60000 secs = (total_ms % 60000) // 1000 ms = total_ms % 1000 return f"{hours:02d}:{minutes:02d}:{secs:02d}.{ms:03d}" def cosine_distance(vec1: List[float], vec2: List[float]) -> float: a = np.asarray(vec1, dtype=np.float32) b = np.asarray(vec2, dtype=np.float32) denom = float(np.linalg.norm(a) * np.linalg.norm(b)) if denom == 0: return 1.0 similarity = float(np.dot(a, b) / denom) similarity = max(-1.0, min(1.0, similarity)) return 1.0 - similarity def embeddings_template() -> Dict[str, Any]: return {"model_name": RECOGNITION_MODEL, "entries": []} def reset_face_database_on_startup() -> None: if DB_DIR.exists(): for child in DB_DIR.iterdir(): if child.is_dir(): shutil.rmtree(child, ignore_errors=True) elif child.is_file(): child.unlink(missing_ok=True) save_embeddings(embeddings_template()) def load_embeddings() -> Dict[str, Any]: if not EMBEDDINGS_FILE.exists(): return embeddings_template() try: with open(EMBEDDINGS_FILE, "r", encoding="utf-8") as f: data = json.load(f) if "entries" not in data: return embeddings_template() return data except Exception: return embeddings_template() def save_embeddings(data: Dict[str, Any]) -> None: with open(EMBEDDINGS_FILE, "w", encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False, indent=2) reset_face_database_on_startup() def represent_image(image: Path | np.ndarray) -> List[float]: image_input: str | np.ndarray if isinstance(image, Path): image_input = str(image) else: image_input = to_uint8_rgb(image) objs = DeepFace.represent( img_path=image_input, model_name=RECOGNITION_MODEL, detector_backend="skip", align=False, enforce_detection=False, normalization="ArcFace", ) if isinstance(objs, list) and objs: first = objs[0] if isinstance(first, dict) and "embedding" in first: return first["embedding"] if isinstance(objs, dict) and "embedding" in objs: return objs["embedding"] raise ValueError("ไม่สามารถสร้าง embedding จากภาพใบหน้าได้") def represent_images_batch(images: List[np.ndarray]) -> List[List[float]]: if not images: return [] batch_input = [to_uint8_rgb(img) for img in images] objs = DeepFace.represent( img_path=batch_input, model_name=RECOGNITION_MODEL, detector_backend="skip", align=False, enforce_detection=False, normalization="ArcFace", ) if not isinstance(objs, list): raise ValueError("รูปแบบผลลัพธ์จาก batch recognition ไม่ถูกต้อง") embeddings: List[List[float]] = [] for item in objs: if isinstance(item, list) and item: first = item[0] if isinstance(first, dict) and "embedding" in first: embeddings.append(first["embedding"]) continue if isinstance(item, dict) and "embedding" in item: embeddings.append(item["embedding"]) continue raise ValueError("ไม่สามารถสร้าง embedding แบบ batch จากภาพใบหน้าได้") return embeddings def prepare_reference_embeddings(entries: List[Dict[str, Any]]) -> Tuple[List[Dict[str, Any]], np.ndarray, np.ndarray]: valid_entries: List[Dict[str, Any]] = [] vectors: List[np.ndarray] = [] for entry in entries: ref_embedding = entry.get("embedding") if not ref_embedding: continue vec = np.asarray(ref_embedding, dtype=np.float32) if vec.ndim != 1 or vec.size == 0: continue valid_entries.append(entry) vectors.append(vec) if not vectors: return valid_entries, np.empty((0, 0), dtype=np.float32), np.empty((0,), dtype=np.float32) matrix = np.stack(vectors, axis=0) norms = np.linalg.norm(matrix, axis=1) return valid_entries, matrix, norms def render_db_overview() -> str: data = load_embeddings() counts: Dict[str, int] = {} display_names: Dict[str, str] = {} for entry in data.get("entries", []): folder = entry.get("user_folder", "user") counts[folder] = counts.get(folder, 0) + 1 display_names[folder] = entry.get("user_display", folder) total_people = len(counts) total_faces = sum(counts.values()) if not counts: list_items = "
  • ยังไม่มีข้อมูลลงทะเบียน
  • " else: ranked = sorted(counts.items(), key=lambda item: item[1], reverse=True)[:12] list_items = "".join( f"
  • {display_names.get(folder, folder)} — {count} ภาพ
  • " for folder, count in ranked ) return f"""
    {total_people}
    บุคคลที่ลงทะเบียน
    {total_faces}
    ภาพใบหน้าในระบบ
    {RECOGNITION_MODEL}
    โมเดล Recognition
    ฐานข้อมูลปัจจุบัน
    """ def load_user_gallery(user_folder: str) -> List[Tuple[str, str]]: user_dir = DB_DIR / user_folder if not user_dir.exists(): return [] images = sorted( [p for p in user_dir.iterdir() if p.suffix.lower() in {".jpg", ".jpeg", ".png"}], reverse=True, ) gallery = [] for img_path in images[:24]: gallery.append((str(img_path), img_path.stem)) return gallery def rebuild_embeddings() -> Tuple[str, str]: data = embeddings_template() image_paths = sorted( [ p for p in DB_DIR.rglob("*") if p.is_file() and p.suffix.lower() in {".jpg", ".jpeg", ".png"} ] ) success = 0 failed = 0 for img_path in image_paths: user_folder = img_path.parent.name try: embedding = represent_image(img_path) data["entries"].append( { "user_folder": user_folder, "user_display": user_folder, "image_path": str(img_path), "embedding": embedding, "created_at": datetime.now().isoformat(timespec="seconds"), } ) success += 1 except Exception: failed += 1 save_embeddings(data) message = f"✅ รีเฟรชฐานข้อมูลเรียบร้อย: {success} ภาพ | ข้าม {failed} ภาพ" return message, render_db_overview() def register_face(image: np.ndarray, username: str, use_esrgan: bool): if image is None: return "⚠️ กรุณาอัปโหลดรูปสำหรับลงทะเบียน", [], render_db_overview() display_name = username.strip() if not display_name: return "⚠️ กรุณากรอกชื่อ user ก่อนบันทึก", [], render_db_overview() user_folder = sanitize_username(display_name) rgb = to_uint8_rgb(image) try: face = detect_face_with_fallback(rgb) used_full_image_fallback = False if face is not None: aligned_face = align_face_with_retinaface(rgb, face["facial_area"], expand_pct=14) elif looks_like_face_crop(rgb): aligned_face = to_uint8_rgb(rgb) used_full_image_fallback = True else: return "⚠️ ไม่พบใบหน้าในภาพลงทะเบียน", [], render_db_overview() embedding_input = maybe_enhance_for_embedding(aligned_face, use_esrgan=use_esrgan) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") save_path = DB_DIR / user_folder / f"{timestamp}_{uuid.uuid4().hex[:6]}.jpg" save_rgb_image(save_path, embedding_input) embedding = represent_image(embedding_input) data = load_embeddings() data.setdefault("entries", []) data["entries"].append( { "user_folder": user_folder, "user_display": display_name, "image_path": str(save_path), "embedding": embedding, "created_at": datetime.now().isoformat(timespec="seconds"), } ) save_embeddings(data) gallery = load_user_gallery(user_folder) message = ( f"✅ บันทึกใบหน้าสำเร็จสำหรับ user: **{display_name}** \n" f"- เก็บไฟล์ที่: `{save_path}` \n" f"- ใช้โมเดล recognition: `{RECOGNITION_MODEL}` \n" f"- ESRGAN ก่อน embedding: `{'on' if use_esrgan else 'off'}`" ) if used_full_image_fallback: message += "\n- หมายเหตุ: ใช้ทั้งภาพเป็น face crop สำหรับลงทะเบียน เพราะ detector ไม่พบใบหน้าจากภาพขนาดเล็ก" return message, gallery, render_db_overview() except Exception as exc: return f"❌ ลงทะเบียนไม่สำเร็จ: {exc}", [], render_db_overview() def draw_face_annotations(rgb: np.ndarray, faces: List[Dict[str, Any]]) -> np.ndarray: canvas = to_uint8_rgb(rgb).copy() for item in faces: x1, y1, x2, y2 = item["bbox"] identity = item["identity"] distance = item["distance"] score = item.get("score", 0.0) color = (37, 99, 235) if identity != "Unknown" else (14, 165, 233) cv2.rectangle(canvas, (x1, y1), (x2, y2), color, 2) if math.isfinite(distance): label = f"{identity} | d={distance:.3f}" else: label = f"{identity} | conf={score:.2f}" (tw, th), baseline = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.52, 2) label_y1 = max(0, y1 - th - baseline - 10) label_y2 = label_y1 + th + baseline + 8 label_x2 = min(canvas.shape[1], x1 + tw + 12) cv2.rectangle(canvas, (x1, label_y1), (label_x2, label_y2), color, -1) cv2.putText( canvas, label, (x1 + 6, label_y2 - 6), cv2.FONT_HERSHEY_SIMPLEX, 0.52, (255, 255, 255), 2, cv2.LINE_AA, ) return canvas def face_to_data_uri(rgb: np.ndarray) -> str: image = Image.fromarray(to_uint8_rgb(rgb)) buffer = BytesIO() image.save(buffer, format="JPEG", quality=88) encoded = base64.b64encode(buffer.getvalue()).decode("ascii") return f"data:image/jpeg;base64,{encoded}" def render_log_feed(log_items: List[Dict[str, Any]]) -> str: if not log_items: return """
    ยังไม่มี face logs ตอนนี้ ระบบจะแสดงชื่อคนที่พบ, ค่า distance, threshold และเหตุผลที่เป็น Unknown ตรงนี้
    """ cards: List[str] = [] for item in log_items: processed_thumb = item.get("processed_thumbnail", "") identity = html.escape(str(item.get("identity", "Unknown"))) closest = html.escape(str(item.get("best_match_name", "Unknown"))) status = item.get("status", "Unknown") badge_class = "matched" if status == "Matched" else "unknown" event_time = html.escape(str(item.get("event_time", "-"))) video_time = html.escape(str(item.get("video_time", "-"))) reason = html.escape(str(item.get("reason", ""))) distance = item.get("distance") threshold = item.get("threshold") bbox = html.escape(str(item.get("bbox", "-"))) processed_label = html.escape(str(item.get("processed_label", "Processed preview"))) processed_width = int(item.get("processed_width", 0) or 0) processed_height = int(item.get("processed_height", 0) or 0) distance_text = f"{distance:.4f}" if isinstance(distance, (int, float)) and math.isfinite(distance) else "-" threshold_text = f"{threshold:.2f}" if isinstance(threshold, (int, float)) else "-" size_text = ( f"{processed_width}x{processed_height}" if processed_width > 0 and processed_height > 0 else "-" ) img_size_attrs = ( f" width='{processed_width}' height='{processed_height}'" if processed_width > 0 and processed_height > 0 else "" ) cards.append( f"""
    {processed_label}
    processed face thumbnail
    {identity}
    {status}
    video_time: {video_time} distance: {distance_text} threshold: {threshold_text} closest match: {closest} preview: {processed_label} size: {size_text}
    {reason}
    bbox: {bbox}
    {event_time}
    """ ) return f"
    {''.join(cards)}
    " def find_best_match(embedding: List[float], entries: List[Dict[str, Any]], threshold: float) -> Dict[str, Any]: best_identity = "Unknown" best_distance = float("inf") for entry in entries: ref_embedding = entry.get("embedding") if not ref_embedding: continue dist = cosine_distance(embedding, ref_embedding) if dist < best_distance: best_distance = dist best_identity = entry.get("user_display") or entry.get("user_folder") or "Unknown" matched = best_distance <= threshold identity = best_identity if matched else "Unknown" if math.isfinite(best_distance): if matched: reason = ( f"Matched {best_identity} เพราะ cosine distance {best_distance:.4f} <= threshold {threshold:.2f}" ) else: reason = ( f"Unknown เพราะ cosine distance {best_distance:.4f} > threshold {threshold:.2f}; " f"closest match คือ {best_identity}" ) else: reason = "ไม่สามารถคำนวณ distance ที่ใช้งานได้จาก embedding ชุดนี้" return { "identity": identity, "best_match_name": best_identity, "distance": best_distance, "matched": matched, "threshold": threshold, "reason": reason, } def find_best_matches_batch( embeddings: List[List[float]], entries: List[Dict[str, Any]], threshold: float, ref_matrix: np.ndarray, ref_norms: np.ndarray, ) -> List[Dict[str, Any]]: if not embeddings: return [] if ref_matrix.size == 0 or not entries: return [ { "identity": "Unknown", "best_match_name": "Unknown", "distance": float("inf"), "matched": False, "threshold": threshold, "reason": "ไม่มีฐานข้อมูล embedding ที่พร้อมใช้งานสำหรับการเปรียบเทียบ", } for _ in embeddings ] emb_matrix = np.asarray(embeddings, dtype=np.float32) if emb_matrix.ndim == 1: emb_matrix = np.expand_dims(emb_matrix, axis=0) emb_norms = np.linalg.norm(emb_matrix, axis=1) denom = emb_norms[:, None] * ref_norms[None, :] safe_denom = np.where(denom == 0, 1e-12, denom) similarities = np.matmul(emb_matrix, ref_matrix.T) / safe_denom similarities = np.clip(similarities, -1.0, 1.0) distances = 1.0 - similarities results: List[Dict[str, Any]] = [] for row in distances: best_idx = int(np.argmin(row)) best_distance = float(row[best_idx]) best_identity = entries[best_idx].get("user_display") or entries[best_idx].get("user_folder") or "Unknown" matched = best_distance <= threshold identity = best_identity if matched else "Unknown" if matched: reason = f"Matched {best_identity} เพราะ cosine distance {best_distance:.4f} <= threshold {threshold:.2f}" else: reason = ( f"Unknown เพราะ cosine distance {best_distance:.4f} > threshold {threshold:.2f}; " f"closest match คือ {best_identity}" ) results.append( { "identity": identity, "best_match_name": best_identity, "distance": best_distance, "matched": matched, "threshold": threshold, "reason": reason, } ) return results def reset_runtime_panel(): return ( "🟦 พร้อมประมวลผลวิดีโอ — อัปโหลดวิดีโอที่แท็บซ้ายแล้วกดเริ่ม", None, render_log_feed([]), ) def resolve_video_path(video_value): if video_value is None: return None if isinstance(video_value, str): return video_value if isinstance(video_value, dict): return video_value.get("video") or video_value.get("path") or video_value.get("name") if isinstance(video_value, (list, tuple)) and len(video_value) > 0: return video_value[0] return str(video_value) def process_video_stream( video_value, process_fps: float, detector_score_threshold: float, recognition_threshold: float, expand_pct: int, log_cooldown_sec: float, use_align: bool, use_esrgan: bool, ): video_path = resolve_video_path(video_value) if not video_path: yield "⚠️ กรุณาอัปโหลดวิดีโอก่อนเริ่มประมวลผล", None, render_log_feed([]) return data = load_embeddings() entries = data.get("entries", []) if not entries: rebuild_message, _ = rebuild_embeddings() data = load_embeddings() entries = data.get("entries", []) if not entries: yield ( "⚠️ ยังไม่มีฐานข้อมูลใบหน้าที่พร้อมใช้งาน กรุณาลงทะเบียนรูปบุคคลก่อน\n\n" + rebuild_message, None, render_log_feed([]), ) return valid_entries, ref_matrix, ref_norms = prepare_reference_embeddings(entries) cap = cv2.VideoCapture(video_path) if not cap.isOpened(): yield "❌ เปิดวิดีโอไม่ได้", None, render_log_feed([]) return src_fps = cap.get(cv2.CAP_PROP_FPS) or 25.0 total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT) or 0) detect_every = max(1, int(round(src_fps / max(process_fps, 0.1)))) last_faces: List[Dict[str, Any]] = [] log_items: List[Dict[str, Any]] = [] last_logged_second: Dict[str, float] = {} frame_index = 0 preview_frame = None try: while True: ok, bgr_frame = cap.read() if not ok: break rgb_frame = cv2.cvtColor(bgr_frame, cv2.COLOR_BGR2RGB) if frame_index % detect_every == 0: current_faces: List[Dict[str, Any]] = [] try: detections = RetinaFace.detect_faces(rgb_frame, threshold=detector_score_threshold) except Exception: detections = {} if isinstance(detections, dict): faces_sorted = sorted( detections.values(), key=lambda face: max(0, int(face["facial_area"][2]) - int(face["facial_area"][0])) * max(0, int(face["facial_area"][3]) - int(face["facial_area"][1])), reverse=True, ) pending_faces: List[Dict[str, Any]] = [] for face in faces_sorted: raw_bbox = [int(v) for v in face["facial_area"]] bbox = expand_bbox(raw_bbox, rgb_frame.shape, expand_pct=float(expand_pct)) raw_face_rgb = crop_rgb(rgb_frame, bbox) aligned_face_rgb = align_face_with_retinaface( rgb_frame, raw_bbox, expand_pct=float(expand_pct) ) if use_align else None preview_face_rgb = aligned_face_rgb if aligned_face_rgb is not None else crop_rgb(rgb_frame, bbox) if preview_face_rgb.size == 0 or raw_face_rgb.size == 0: continue video_second = frame_index / src_fps if src_fps > 0 else 0.0 pending_faces.append( { "bbox": bbox, "raw_face_rgb": raw_face_rgb, "preview_face_rgb": preview_face_rgb, "score": float(face.get("score", 0.0)), "video_second": video_second, } ) if pending_faces: embedding_inputs = [ maybe_enhance_for_embedding(item["preview_face_rgb"], use_esrgan=use_esrgan) for item in pending_faces ] batch_embeddings = represent_images_batch(embedding_inputs) matches = find_best_matches_batch( embeddings=batch_embeddings, entries=valid_entries, threshold=recognition_threshold, ref_matrix=ref_matrix, ref_norms=ref_norms, ) for item, match, processed_face_rgb in zip(pending_faces, matches, embedding_inputs): bbox = item["bbox"] current_faces.append( { "bbox": bbox, "identity": match["identity"], "distance": match["distance"], "score": item["score"], } ) bucket_x = bbox[0] // 120 bucket_y = bbox[1] // 120 dedupe_key = ( match["best_match_name"] if match["matched"] else f"Unknown_{bucket_x}_{bucket_y}" ) if item["video_second"] - last_logged_second.get(dedupe_key, -999.0) >= log_cooldown_sec: last_logged_second[dedupe_key] = item["video_second"] event_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") log_items.insert( 0, { "event_time": event_time, "video_time": seconds_to_hhmmss(item["video_second"]), "identity": match["identity"], "best_match_name": match["best_match_name"], "distance": match["distance"], "bbox": bbox, "status": "Matched" if match["matched"] else "Unknown", "threshold": match["threshold"], "reason": match["reason"], "processed_thumbnail": face_to_data_uri(processed_face_rgb), "processed_label": ( ("Aligned face" if use_align else "Raw crop") + (" + ESRGAN" if use_esrgan else "") ), "processed_width": int(processed_face_rgb.shape[1]), "processed_height": int(processed_face_rgb.shape[0]), }, ) log_items = log_items[:MAX_LOG_ITEMS] last_faces = current_faces preview_frame = draw_face_annotations(rgb_frame, last_faces) progress = (frame_index + 1) / total_frames * 100.0 if total_frames > 0 else 0.0 if frame_index % max(1, detect_every) == 0: status = ( f"🔄 กำลังประมวลผลวิดีโอ... {progress:.1f}% \n" f"- FPS ต้นฉบับ: `{src_fps:.2f}` \n" f"- ประมวลผลทุก ๆ `{detect_every}` เฟรม \n" f"- Recognition mode: `micro-batch per detection step` \n" f"- Align ระหว่างวิดีโอ: `{'on' if use_align else 'off'}` \n" f"- ESRGAN ก่อน embedding: `{'on' if use_esrgan else 'off'}` \n" f"- Registered identities: `{len({e.get('user_folder', 'u') for e in entries})}` \n" f"- Threshold rule: `distance <= {recognition_threshold:.2f}` ถึงจะนับว่าเป็นคนเดิม \n" f"- Log events: `{len(log_items)}`" ) yield status, preview_frame, render_log_feed(log_items) frame_index += 1 except Exception as exc: yield f"❌ เกิดข้อผิดพลาดระหว่างประมวลผล: {exc}", preview_frame, render_log_feed(log_items) return finally: cap.release() final_status = ( f"✅ ประมวลผลเสร็จแล้ว \n" f"- จำนวน log event: `{len(log_items)}` \n" f"- กติกา match: `distance <= {recognition_threshold:.2f}` \n" f"- ESRGAN ก่อน embedding: `{'on' if use_esrgan else 'off'}` \n" f"- โมเดล recognition: `{RECOGNITION_MODEL}`" ) yield final_status, preview_frame, render_log_feed(log_items) with gr.Blocks( theme=gr.themes.Soft(primary_hue="sky", secondary_hue="blue", neutral_hue="slate"), css=CUSTOM_CSS, title="CCTV Face Recognition with RetinaFace + DeepFace", ) as demo: gr.HTML( """
    CCTV Face Recognition Dashboard
    แอป Gradio สำหรับ Face Detection + Alignment + Recognition ด้วย RetinaFace และ DeepFace (ArcFace)
    ฝั่งซ้ายใช้สำหรับลงทะเบียนใบหน้า, อัปโหลดวิดีโอ, live preview และการตั้งค่าความเร็ว/ความแม่นยำ ส่วนฝั่งขวาแสดง face logs แบบสรุปเหตุการณ์พร้อมเหตุผลของผลรู้จำ โดยระบบจะล้างฐานข้อมูลใบหน้าทุกครั้งที่เริ่มรันสคริปต์ใหม่
    """ ) with gr.Row(equal_height=False): with gr.Column(scale=4, elem_classes=["panel"]): gr.Markdown("### ฝั่งซ้าย • อัปโหลดและตั้งค่า") with gr.Tabs(): with gr.Tab("1) Upload รูปลงทะเบียน"): reg_image = gr.Image( type="numpy", label="อัปโหลดรูปบุคคล", sources=["upload", "webcam"], height=330, ) reg_user = gr.Textbox( label="User", placeholder="เช่น admin01", ) reg_use_esrgan = gr.Checkbox( label="Use ESRGAN before embedding", value=False, info="เหมาะกับรูปจากกล้องที่เบลอหรือรายละเอียดน้อย แต่จะช้ากว่าปกติและโหลดโมเดลครั้งแรกอาจใช้เวลา", ) with gr.Row(): reg_save_btn = gr.Button("บันทึกลงระบบ", elem_classes=["blue-btn"]) reg_refresh_btn = gr.Button("รีเฟรชฐานข้อมูล", elem_classes=["neutral-btn"]) reg_status = gr.Markdown(value="พร้อมลงทะเบียน") reg_gallery = gr.Gallery( label="รูปใบหน้าที่เก็บของ user นี้", columns=4, height=260, object_fit="cover", ) db_summary = gr.HTML(render_db_overview()) with gr.Tab("2) Upload video + Config"): video_input = gr.Video( sources=["upload"], label="อัปโหลดวิดีโอจากกล้องวงจรปิด", height=330, ) live_preview = gr.Image( label="Live Preview (วาด BBox ขณะประมวลผล)", type="numpy", interactive=False, height=330, ) process_fps = gr.Slider( minimum=1, maximum=12, value=3, step=1, label="Process FPS", info="จำนวนเฟรมต่อวินาทีที่ใช้ตรวจจับและจดจำใบหน้า", ) detector_score_threshold = gr.Slider( minimum=0.10, maximum=0.99, value=0.85, step=0.01, label="Detection Score Threshold", ) recognition_threshold = gr.Slider( minimum=0.15, maximum=1.00, value=0.85, step=0.01, label="Recognition Threshold (Cosine Distance)", info="กติกาคือ distance ต้องน้อยกว่าหรือเท่ากับค่านี้จึงจะ match; ค่าเริ่มต้น 0.85 จะผ่อนกว่าค่าเดิม", ) expand_pct = gr.Slider( minimum=0, maximum=30, value=10, step=1, label="Expand Face Area (%)", ) log_cooldown = gr.Slider( minimum=0.0, maximum=10.0, value=0, step=0.5, label="Log Cooldown (seconds)", info="ลด log ซ้ำของคนเดิมในช่วงเวลาใกล้กัน", ) use_align = gr.Checkbox( label="Use face alignment", value=True, info="ปิดไว้จะเร็วสุดสำหรับวิดีโอ; เปิดเมื่ออยากเพิ่มความนิ่งของใบหน้าก่อนสร้าง embedding", ) use_esrgan = gr.Checkbox( label="Use ESRGAN before embedding", value=False, info="ปรับรายละเอียด face crop จากกล้องก่อนทำ embedding; ช่วยกับภาพเล็กหรือแตก แต่จะช้าลงชัดเจน", ) with gr.Row(): start_btn = gr.Button("เริ่มประมวลผล", elem_classes=["blue-btn"]) clear_btn = gr.Button("ล้างผลลัพธ์", elem_classes=["neutral-btn"]) gr.HTML( """
    ใช้ RetinaFace.detect_faces สำหรับตรวจจับ, เปิด Use face alignment เมื่อต้องการจัดแนวใบหน้าก่อน recognition, เปิด Use ESRGAN before embedding เมื่อต้องการเพิ่มรายละเอียด face crop จากภาพกล้องก่อนสร้าง embedding, และใช้ DeepFace.represent ด้วยโมเดล ArcFace สำหรับ recognition โดยคำนวณจากภาพใน memory เพื่อลดเวลา I/O
    """ ) with gr.Column(scale=6, elem_classes=["panel"]): gr.Markdown("### ฝั่งขวา • Face Logs") runtime_status = gr.Markdown( value="🟦 พร้อมประมวลผลวิดีโอ — อัปโหลดวิดีโอที่แท็บซ้ายแล้วกดเริ่ม", elem_classes=["status-box"], ) face_log_feed = gr.HTML(value=render_log_feed([])) reg_save_btn.click( fn=register_face, inputs=[reg_image, reg_user, reg_use_esrgan], outputs=[reg_status, reg_gallery, db_summary], ) reg_refresh_btn.click( fn=rebuild_embeddings, inputs=[], outputs=[reg_status, db_summary], ) start_btn.click( fn=process_video_stream, inputs=[ video_input, process_fps, detector_score_threshold, recognition_threshold, expand_pct, log_cooldown, use_align, use_esrgan, ], outputs=[runtime_status, live_preview, face_log_feed], ) clear_btn.click( fn=reset_runtime_panel, inputs=[], outputs=[runtime_status, live_preview, face_log_feed], ) demo.queue(default_concurrency_limit=1) if __name__ == "__main__": modeling.build_model(task="facial_recognition", model_name=RECOGNITION_MODEL) demo.launch(server_name="0.0.0.0", server_port=int(os.getenv("PORT", "7860")),share=False)