Konthee's picture
Add Real-ESRGAN face enhancement and log preview updates
15f8874
from __future__ import annotations
import base64
import html
import json
import math
import os
import re
import shutil
import sys
import urllib.request
import uuid
from datetime import datetime
from io import BytesIO
from pathlib import Path
from typing import Any, Dict, List, Tuple
import cv2
import gradio as gr
import numpy as np
from PIL import Image
from deepface import DeepFace
from deepface.modules import modeling
from retinaface import RetinaFace
try:
try:
import torchvision.transforms._functional_tensor as tv_functional_tensor
sys.modules.setdefault("torchvision.transforms.functional_tensor", tv_functional_tensor)
except Exception:
pass
import torch
from basicsr.archs.rrdbnet_arch import RRDBNet
from realesrgan import RealESRGANer
except Exception:
torch = None
RRDBNet = None
RealESRGANer = None
BASE_DIR = Path(__file__).resolve().parent
DB_DIR = BASE_DIR / "face_db"
OUTPUT_DIR = BASE_DIR / "outputs"
EMBEDDINGS_FILE = BASE_DIR / "face_db_embeddings.json"
RECOGNITION_MODEL = "ArcFace"
MAX_LOG_ITEMS = 120
ESRGAN_MODEL_URL = "https://github.com/xinntao/Real-ESRGAN/releases/download/v0.1.0/RealESRGAN_x4plus.pth"
ESRGAN_MODEL_FILENAME = "RealESRGAN_x4plus.pth"
ESRGAN_SCALE = 4
ESRGAN_MAX_DIMENSION = 1024
_ESRGAN_MODEL = None
for folder in (DB_DIR, OUTPUT_DIR):
folder.mkdir(parents=True, exist_ok=True)
CUSTOM_CSS = """
:root {
--sky-1: #f6fbff;
--sky-2: #eef7ff;
--sky-3: #dbeeff;
--sky-4: #b9ddff;
--sky-5: #2563eb;
--sky-6: #0f172a;
}
.gradio-container {
background: linear-gradient(180deg, #f8fbff 0%, #f1f8ff 100%);
}
.app-shell {
max-width: 1480px;
margin: 0 auto;
}
.hero-card {
background: linear-gradient(135deg, rgba(37, 99, 235, 0.10), rgba(125, 211, 252, 0.16));
border: 1px solid rgba(37, 99, 235, 0.16);
border-radius: 24px;
padding: 20px 24px;
margin-bottom: 14px;
box-shadow: 0 12px 34px rgba(37, 99, 235, 0.08);
}
.panel {
background: rgba(255, 255, 255, 0.90);
border: 1px solid rgba(148, 163, 184, 0.18);
border-radius: 24px;
padding: 14px;
box-shadow: 0 14px 36px rgba(15, 23, 42, 0.06);
}
.soft-note {
background: linear-gradient(180deg, rgba(239, 246, 255, 0.95), rgba(255, 255, 255, 0.96));
border: 1px dashed rgba(37, 99, 235, 0.25);
border-radius: 18px;
padding: 12px 14px;
color: #1e3a8a;
}
.blue-btn button,
.blue-btn button:hover {
background: linear-gradient(90deg, #2563eb 0%, #38bdf8 100%) !important;
color: white !important;
border: none !important;
}
.neutral-btn button,
.neutral-btn button:hover {
background: white !important;
color: #0f172a !important;
border: 1px solid rgba(148, 163, 184, 0.35) !important;
}
.status-box {
background: white;
border: 1px solid rgba(37, 99, 235, 0.12);
border-radius: 18px;
padding: 12px 14px;
}
.log-feed {
display: flex;
flex-direction: column;
gap: 12px;
}
.log-card {
display: grid;
grid-template-columns: auto 1fr auto;
gap: 14px;
align-items: start;
background: linear-gradient(180deg, rgba(255,255,255,0.98), rgba(248,250,252,0.96));
border: 1px solid rgba(148, 163, 184, 0.22);
border-radius: 20px;
padding: 12px;
box-shadow: 0 10px 28px rgba(15, 23, 42, 0.06);
}
.log-thumb-wrap {
display: flex;
flex-direction: column;
gap: 6px;
}
.log-thumb {
display: block;
max-width: none;
height: auto;
border-radius: 14px;
background: #e2e8f0;
}
.log-thumb-label {
font-size: 12px;
font-weight: 700;
color: #475569;
text-transform: uppercase;
letter-spacing: 0.04em;
}
.log-main {
min-width: 0;
}
.log-topline {
display: flex;
flex-wrap: wrap;
gap: 10px;
align-items: center;
margin-bottom: 6px;
}
.log-name {
font-size: 28px;
font-weight: 800;
color: #1e293b;
line-height: 1.05;
}
.log-badge {
display: inline-flex;
align-items: center;
border-radius: 999px;
padding: 6px 14px;
font-size: 18px;
font-weight: 800;
color: white;
}
.log-badge.matched {
background: linear-gradient(90deg, #16a34a 0%, #22c55e 100%);
}
.log-badge.unknown {
background: linear-gradient(90deg, #dc2626 0%, #ef4444 100%);
}
.log-meta {
display: flex;
flex-wrap: wrap;
gap: 14px;
color: #334155;
font-size: 15px;
margin-bottom: 8px;
}
.log-reason {
color: #1e40af;
font-size: 15px;
line-height: 1.5;
}
.log-time {
text-align: right;
color: #64748b;
font-size: 16px;
font-weight: 700;
white-space: nowrap;
}
.log-empty {
padding: 22px;
border-radius: 18px;
border: 1px dashed rgba(148, 163, 184, 0.35);
color: #475569;
background: rgba(255,255,255,0.78);
}
"""
def sanitize_username(username: str) -> str:
cleaned = re.sub(r"[^a-zA-Z0-9ก-๙_-]+", "_", username.strip())
cleaned = re.sub(r"_+", "_", cleaned).strip("_")
return cleaned or "user"
def to_uint8_rgb(image: Any) -> np.ndarray:
arr = np.array(image)
if arr.ndim == 2:
arr = cv2.cvtColor(arr, cv2.COLOR_GRAY2RGB)
if arr.ndim == 3 and arr.shape[2] == 4:
arr = cv2.cvtColor(arr, cv2.COLOR_RGBA2RGB)
if arr.dtype != np.uint8:
if np.max(arr) <= 1.0:
arr = (arr * 255.0).clip(0, 255).astype(np.uint8)
else:
arr = arr.clip(0, 255).astype(np.uint8)
return arr
def save_rgb_image(path: Path, rgb: np.ndarray) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
bgr = cv2.cvtColor(to_uint8_rgb(rgb), cv2.COLOR_RGB2BGR)
cv2.imwrite(str(path), bgr)
def expand_bbox(bbox: List[float], image_shape: Tuple[int, int, int], expand_pct: float = 12.0) -> List[int]:
h, w = image_shape[:2]
x1, y1, x2, y2 = [int(v) for v in bbox]
bw, bh = max(1, x2 - x1), max(1, y2 - y1)
pad_x = int(bw * (expand_pct / 100.0))
pad_y = int(bh * (expand_pct / 100.0))
x1 = max(0, x1 - pad_x)
y1 = max(0, y1 - pad_y)
x2 = min(w, x2 + pad_x)
y2 = min(h, y2 + pad_y)
return [x1, y1, x2, y2]
def crop_rgb(rgb: np.ndarray, bbox: List[int]) -> np.ndarray:
x1, y1, x2, y2 = bbox
return rgb[y1:y2, x1:x2].copy()
def looks_like_face_crop(rgb: np.ndarray) -> bool:
h, w = rgb.shape[:2]
if min(h, w) < 48:
return False
ratio = w / max(h, 1)
return 0.55 <= ratio <= 1.8
def get_largest_face(detections: Any) -> Dict[str, Any] | None:
if not isinstance(detections, dict):
return None
candidates = []
for _, face in detections.items():
facial_area = face.get("facial_area")
if not facial_area or len(facial_area) != 4:
continue
x1, y1, x2, y2 = [int(v) for v in facial_area]
area = max(0, x2 - x1) * max(0, y2 - y1)
candidates.append((area, face))
if not candidates:
return None
return sorted(candidates, key=lambda x: x[0], reverse=True)[0][1]
def detect_face_with_fallback(rgb: np.ndarray) -> Dict[str, Any] | None:
try:
detections = RetinaFace.detect_faces(rgb)
except Exception:
detections = {}
face = get_largest_face(detections)
if face is not None:
return face
h, w = rgb.shape[:2]
min_side = min(h, w)
if min_side < 160:
scale = max(2, int(math.ceil(160 / max(1, min_side))))
enlarged = cv2.resize(rgb, (w * scale, h * scale), interpolation=cv2.INTER_CUBIC)
try:
detections = RetinaFace.detect_faces(enlarged)
except Exception:
detections = {}
face = get_largest_face(detections)
if face is not None:
x1, y1, x2, y2 = [int(v) for v in face["facial_area"]]
face["facial_area"] = [
max(0, x1 // scale),
max(0, y1 // scale),
min(w, x2 // scale),
min(h, y2 // scale),
]
return face
return None
def align_face_with_retinaface(rgb: np.ndarray, bbox: List[int], expand_pct: float = 12.0) -> np.ndarray:
expanded = expand_bbox(bbox, rgb.shape, expand_pct=expand_pct)
face_crop = crop_rgb(rgb, expanded)
if face_crop.size == 0:
raise ValueError("ไม่สามารถ crop ใบหน้าจากภาพได้")
try:
aligned_faces = RetinaFace.extract_faces(img_path=face_crop, align=True, expand_face_area=0)
if aligned_faces:
return to_uint8_rgb(aligned_faces[0])
except Exception:
pass
return to_uint8_rgb(face_crop)
def load_esrgan_model():
global _ESRGAN_MODEL
if _ESRGAN_MODEL is not None:
return _ESRGAN_MODEL
if torch is None or RRDBNet is None or RealESRGANer is None:
raise RuntimeError("ยังไม่ได้ติดตั้ง torch / realesrgan / basicsr สำหรับ Real-ESRGAN")
weights_dir = BASE_DIR / "weights"
weights_dir.mkdir(parents=True, exist_ok=True)
model_path = weights_dir / ESRGAN_MODEL_FILENAME
if not model_path.exists():
urllib.request.urlretrieve(ESRGAN_MODEL_URL, model_path)
model = RRDBNet(num_in_ch=3, num_out_ch=3, num_feat=64, num_block=23, num_grow_ch=32, scale=ESRGAN_SCALE)
if torch.cuda.is_available():
gpu_id = 0
else:
gpu_id = None
_ESRGAN_MODEL = RealESRGANer(
scale=ESRGAN_SCALE,
model_path=str(model_path),
model=model,
tile=0,
tile_pad=10,
pre_pad=0,
half=bool(torch.cuda.is_available()),
gpu_id=gpu_id,
)
return _ESRGAN_MODEL
def trim_image_for_esrgan(rgb: np.ndarray) -> np.ndarray:
h, w = rgb.shape[:2]
trimmed_h = h - (h % 4)
trimmed_w = w - (w % 4)
if trimmed_h <= 0 or trimmed_w <= 0:
raise ValueError("ภาพเล็กเกินไปสำหรับ ESRGAN")
if trimmed_h == h and trimmed_w == w:
return rgb
return rgb[:trimmed_h, :trimmed_w]
def maybe_downscale_for_esrgan(rgb: np.ndarray, max_dimension: int = ESRGAN_MAX_DIMENSION) -> np.ndarray:
h, w = rgb.shape[:2]
longest_side = max(h, w)
if longest_side <= max_dimension:
return rgb
scale = max_dimension / float(longest_side)
new_w = max(4, int(round(w * scale)))
new_h = max(4, int(round(h * scale)))
resized = cv2.resize(rgb, (new_w, new_h), interpolation=cv2.INTER_AREA)
return trim_image_for_esrgan(resized)
def enhance_with_esrgan(rgb: np.ndarray) -> np.ndarray:
model = load_esrgan_model()
prepared = maybe_downscale_for_esrgan(trim_image_for_esrgan(to_uint8_rgb(rgb)))
sr_rgb, _ = model.enhance(prepared, outscale=ESRGAN_SCALE)
return to_uint8_rgb(sr_rgb)
def maybe_enhance_for_embedding(rgb: np.ndarray, use_esrgan: bool) -> np.ndarray:
if not use_esrgan:
return to_uint8_rgb(rgb)
return enhance_with_esrgan(rgb)
def seconds_to_hhmmss(seconds: float) -> str:
total_ms = int(max(0, seconds) * 1000)
hours = total_ms // 3600000
minutes = (total_ms % 3600000) // 60000
secs = (total_ms % 60000) // 1000
ms = total_ms % 1000
return f"{hours:02d}:{minutes:02d}:{secs:02d}.{ms:03d}"
def cosine_distance(vec1: List[float], vec2: List[float]) -> float:
a = np.asarray(vec1, dtype=np.float32)
b = np.asarray(vec2, dtype=np.float32)
denom = float(np.linalg.norm(a) * np.linalg.norm(b))
if denom == 0:
return 1.0
similarity = float(np.dot(a, b) / denom)
similarity = max(-1.0, min(1.0, similarity))
return 1.0 - similarity
def embeddings_template() -> Dict[str, Any]:
return {"model_name": RECOGNITION_MODEL, "entries": []}
def reset_face_database_on_startup() -> None:
if DB_DIR.exists():
for child in DB_DIR.iterdir():
if child.is_dir():
shutil.rmtree(child, ignore_errors=True)
elif child.is_file():
child.unlink(missing_ok=True)
save_embeddings(embeddings_template())
def load_embeddings() -> Dict[str, Any]:
if not EMBEDDINGS_FILE.exists():
return embeddings_template()
try:
with open(EMBEDDINGS_FILE, "r", encoding="utf-8") as f:
data = json.load(f)
if "entries" not in data:
return embeddings_template()
return data
except Exception:
return embeddings_template()
def save_embeddings(data: Dict[str, Any]) -> None:
with open(EMBEDDINGS_FILE, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
reset_face_database_on_startup()
def represent_image(image: Path | np.ndarray) -> List[float]:
image_input: str | np.ndarray
if isinstance(image, Path):
image_input = str(image)
else:
image_input = to_uint8_rgb(image)
objs = DeepFace.represent(
img_path=image_input,
model_name=RECOGNITION_MODEL,
detector_backend="skip",
align=False,
enforce_detection=False,
normalization="ArcFace",
)
if isinstance(objs, list) and objs:
first = objs[0]
if isinstance(first, dict) and "embedding" in first:
return first["embedding"]
if isinstance(objs, dict) and "embedding" in objs:
return objs["embedding"]
raise ValueError("ไม่สามารถสร้าง embedding จากภาพใบหน้าได้")
def represent_images_batch(images: List[np.ndarray]) -> List[List[float]]:
if not images:
return []
batch_input = [to_uint8_rgb(img) for img in images]
objs = DeepFace.represent(
img_path=batch_input,
model_name=RECOGNITION_MODEL,
detector_backend="skip",
align=False,
enforce_detection=False,
normalization="ArcFace",
)
if not isinstance(objs, list):
raise ValueError("รูปแบบผลลัพธ์จาก batch recognition ไม่ถูกต้อง")
embeddings: List[List[float]] = []
for item in objs:
if isinstance(item, list) and item:
first = item[0]
if isinstance(first, dict) and "embedding" in first:
embeddings.append(first["embedding"])
continue
if isinstance(item, dict) and "embedding" in item:
embeddings.append(item["embedding"])
continue
raise ValueError("ไม่สามารถสร้าง embedding แบบ batch จากภาพใบหน้าได้")
return embeddings
def prepare_reference_embeddings(entries: List[Dict[str, Any]]) -> Tuple[List[Dict[str, Any]], np.ndarray, np.ndarray]:
valid_entries: List[Dict[str, Any]] = []
vectors: List[np.ndarray] = []
for entry in entries:
ref_embedding = entry.get("embedding")
if not ref_embedding:
continue
vec = np.asarray(ref_embedding, dtype=np.float32)
if vec.ndim != 1 or vec.size == 0:
continue
valid_entries.append(entry)
vectors.append(vec)
if not vectors:
return valid_entries, np.empty((0, 0), dtype=np.float32), np.empty((0,), dtype=np.float32)
matrix = np.stack(vectors, axis=0)
norms = np.linalg.norm(matrix, axis=1)
return valid_entries, matrix, norms
def render_db_overview() -> str:
data = load_embeddings()
counts: Dict[str, int] = {}
display_names: Dict[str, str] = {}
for entry in data.get("entries", []):
folder = entry.get("user_folder", "user")
counts[folder] = counts.get(folder, 0) + 1
display_names[folder] = entry.get("user_display", folder)
total_people = len(counts)
total_faces = sum(counts.values())
if not counts:
list_items = "<li>ยังไม่มีข้อมูลลงทะเบียน</li>"
else:
ranked = sorted(counts.items(), key=lambda item: item[1], reverse=True)[:12]
list_items = "".join(
f"<li><b>{display_names.get(folder, folder)}</b> — {count} ภาพ</li>" for folder, count in ranked
)
return f"""
<div class='soft-note'>
<div style='display:flex;gap:18px;flex-wrap:wrap;margin-bottom:10px'>
<div><b>{total_people}</b><br/>บุคคลที่ลงทะเบียน</div>
<div><b>{total_faces}</b><br/>ภาพใบหน้าในระบบ</div>
<div><b>{RECOGNITION_MODEL}</b><br/>โมเดล Recognition</div>
</div>
<div><b>ฐานข้อมูลปัจจุบัน</b></div>
<ul style='margin:8px 0 0 18px;padding:0'>{list_items}</ul>
</div>
"""
def load_user_gallery(user_folder: str) -> List[Tuple[str, str]]:
user_dir = DB_DIR / user_folder
if not user_dir.exists():
return []
images = sorted(
[p for p in user_dir.iterdir() if p.suffix.lower() in {".jpg", ".jpeg", ".png"}],
reverse=True,
)
gallery = []
for img_path in images[:24]:
gallery.append((str(img_path), img_path.stem))
return gallery
def rebuild_embeddings() -> Tuple[str, str]:
data = embeddings_template()
image_paths = sorted(
[
p
for p in DB_DIR.rglob("*")
if p.is_file() and p.suffix.lower() in {".jpg", ".jpeg", ".png"}
]
)
success = 0
failed = 0
for img_path in image_paths:
user_folder = img_path.parent.name
try:
embedding = represent_image(img_path)
data["entries"].append(
{
"user_folder": user_folder,
"user_display": user_folder,
"image_path": str(img_path),
"embedding": embedding,
"created_at": datetime.now().isoformat(timespec="seconds"),
}
)
success += 1
except Exception:
failed += 1
save_embeddings(data)
message = f"✅ รีเฟรชฐานข้อมูลเรียบร้อย: {success} ภาพ | ข้าม {failed} ภาพ"
return message, render_db_overview()
def register_face(image: np.ndarray, username: str, use_esrgan: bool):
if image is None:
return "⚠️ กรุณาอัปโหลดรูปสำหรับลงทะเบียน", [], render_db_overview()
display_name = username.strip()
if not display_name:
return "⚠️ กรุณากรอกชื่อ user ก่อนบันทึก", [], render_db_overview()
user_folder = sanitize_username(display_name)
rgb = to_uint8_rgb(image)
try:
face = detect_face_with_fallback(rgb)
used_full_image_fallback = False
if face is not None:
aligned_face = align_face_with_retinaface(rgb, face["facial_area"], expand_pct=14)
elif looks_like_face_crop(rgb):
aligned_face = to_uint8_rgb(rgb)
used_full_image_fallback = True
else:
return "⚠️ ไม่พบใบหน้าในภาพลงทะเบียน", [], render_db_overview()
embedding_input = maybe_enhance_for_embedding(aligned_face, use_esrgan=use_esrgan)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
save_path = DB_DIR / user_folder / f"{timestamp}_{uuid.uuid4().hex[:6]}.jpg"
save_rgb_image(save_path, embedding_input)
embedding = represent_image(embedding_input)
data = load_embeddings()
data.setdefault("entries", [])
data["entries"].append(
{
"user_folder": user_folder,
"user_display": display_name,
"image_path": str(save_path),
"embedding": embedding,
"created_at": datetime.now().isoformat(timespec="seconds"),
}
)
save_embeddings(data)
gallery = load_user_gallery(user_folder)
message = (
f"✅ บันทึกใบหน้าสำเร็จสำหรับ user: **{display_name}** \n"
f"- เก็บไฟล์ที่: `{save_path}` \n"
f"- ใช้โมเดล recognition: `{RECOGNITION_MODEL}` \n"
f"- ESRGAN ก่อน embedding: `{'on' if use_esrgan else 'off'}`"
)
if used_full_image_fallback:
message += "\n- หมายเหตุ: ใช้ทั้งภาพเป็น face crop สำหรับลงทะเบียน เพราะ detector ไม่พบใบหน้าจากภาพขนาดเล็ก"
return message, gallery, render_db_overview()
except Exception as exc:
return f"❌ ลงทะเบียนไม่สำเร็จ: {exc}", [], render_db_overview()
def draw_face_annotations(rgb: np.ndarray, faces: List[Dict[str, Any]]) -> np.ndarray:
canvas = to_uint8_rgb(rgb).copy()
for item in faces:
x1, y1, x2, y2 = item["bbox"]
identity = item["identity"]
distance = item["distance"]
score = item.get("score", 0.0)
color = (37, 99, 235) if identity != "Unknown" else (14, 165, 233)
cv2.rectangle(canvas, (x1, y1), (x2, y2), color, 2)
if math.isfinite(distance):
label = f"{identity} | d={distance:.3f}"
else:
label = f"{identity} | conf={score:.2f}"
(tw, th), baseline = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.52, 2)
label_y1 = max(0, y1 - th - baseline - 10)
label_y2 = label_y1 + th + baseline + 8
label_x2 = min(canvas.shape[1], x1 + tw + 12)
cv2.rectangle(canvas, (x1, label_y1), (label_x2, label_y2), color, -1)
cv2.putText(
canvas,
label,
(x1 + 6, label_y2 - 6),
cv2.FONT_HERSHEY_SIMPLEX,
0.52,
(255, 255, 255),
2,
cv2.LINE_AA,
)
return canvas
def face_to_data_uri(rgb: np.ndarray) -> str:
image = Image.fromarray(to_uint8_rgb(rgb))
buffer = BytesIO()
image.save(buffer, format="JPEG", quality=88)
encoded = base64.b64encode(buffer.getvalue()).decode("ascii")
return f"data:image/jpeg;base64,{encoded}"
def render_log_feed(log_items: List[Dict[str, Any]]) -> str:
if not log_items:
return """
<div class='log-empty'>
ยังไม่มี face logs ตอนนี้ ระบบจะแสดงชื่อคนที่พบ, ค่า distance, threshold และเหตุผลที่เป็น Unknown ตรงนี้
</div>
"""
cards: List[str] = []
for item in log_items:
processed_thumb = item.get("processed_thumbnail", "")
identity = html.escape(str(item.get("identity", "Unknown")))
closest = html.escape(str(item.get("best_match_name", "Unknown")))
status = item.get("status", "Unknown")
badge_class = "matched" if status == "Matched" else "unknown"
event_time = html.escape(str(item.get("event_time", "-")))
video_time = html.escape(str(item.get("video_time", "-")))
reason = html.escape(str(item.get("reason", "")))
distance = item.get("distance")
threshold = item.get("threshold")
bbox = html.escape(str(item.get("bbox", "-")))
processed_label = html.escape(str(item.get("processed_label", "Processed preview")))
processed_width = int(item.get("processed_width", 0) or 0)
processed_height = int(item.get("processed_height", 0) or 0)
distance_text = f"{distance:.4f}" if isinstance(distance, (int, float)) and math.isfinite(distance) else "-"
threshold_text = f"{threshold:.2f}" if isinstance(threshold, (int, float)) else "-"
size_text = (
f"{processed_width}x{processed_height}"
if processed_width > 0 and processed_height > 0
else "-"
)
img_size_attrs = (
f" width='{processed_width}' height='{processed_height}'"
if processed_width > 0 and processed_height > 0
else ""
)
cards.append(
f"""
<div class='log-card'>
<div class='log-thumb-wrap'>
<div class='log-thumb-label'>{processed_label}</div>
<img class='log-thumb' src='{processed_thumb}' alt='processed face thumbnail'{img_size_attrs} />
</div>
<div class='log-main'>
<div class='log-topline'>
<div class='log-name'>{identity}</div>
<div class='log-badge {badge_class}'>{status}</div>
</div>
<div class='log-meta'>
<span>video_time: <b>{video_time}</b></span>
<span>distance: <b>{distance_text}</b></span>
<span>threshold: <b>{threshold_text}</b></span>
<span>closest match: <b>{closest}</b></span>
<span>preview: <b>{processed_label}</b></span>
<span>size: <b>{size_text}</b></span>
</div>
<div class='log-reason'>{reason}</div>
<div class='log-meta'>
<span>bbox: <b>{bbox}</b></span>
</div>
</div>
<div class='log-time'>{event_time}</div>
</div>
"""
)
return f"<div class='log-feed'>{''.join(cards)}</div>"
def find_best_match(embedding: List[float], entries: List[Dict[str, Any]], threshold: float) -> Dict[str, Any]:
best_identity = "Unknown"
best_distance = float("inf")
for entry in entries:
ref_embedding = entry.get("embedding")
if not ref_embedding:
continue
dist = cosine_distance(embedding, ref_embedding)
if dist < best_distance:
best_distance = dist
best_identity = entry.get("user_display") or entry.get("user_folder") or "Unknown"
matched = best_distance <= threshold
identity = best_identity if matched else "Unknown"
if math.isfinite(best_distance):
if matched:
reason = (
f"Matched {best_identity} เพราะ cosine distance {best_distance:.4f} <= threshold {threshold:.2f}"
)
else:
reason = (
f"Unknown เพราะ cosine distance {best_distance:.4f} > threshold {threshold:.2f}; "
f"closest match คือ {best_identity}"
)
else:
reason = "ไม่สามารถคำนวณ distance ที่ใช้งานได้จาก embedding ชุดนี้"
return {
"identity": identity,
"best_match_name": best_identity,
"distance": best_distance,
"matched": matched,
"threshold": threshold,
"reason": reason,
}
def find_best_matches_batch(
embeddings: List[List[float]],
entries: List[Dict[str, Any]],
threshold: float,
ref_matrix: np.ndarray,
ref_norms: np.ndarray,
) -> List[Dict[str, Any]]:
if not embeddings:
return []
if ref_matrix.size == 0 or not entries:
return [
{
"identity": "Unknown",
"best_match_name": "Unknown",
"distance": float("inf"),
"matched": False,
"threshold": threshold,
"reason": "ไม่มีฐานข้อมูล embedding ที่พร้อมใช้งานสำหรับการเปรียบเทียบ",
}
for _ in embeddings
]
emb_matrix = np.asarray(embeddings, dtype=np.float32)
if emb_matrix.ndim == 1:
emb_matrix = np.expand_dims(emb_matrix, axis=0)
emb_norms = np.linalg.norm(emb_matrix, axis=1)
denom = emb_norms[:, None] * ref_norms[None, :]
safe_denom = np.where(denom == 0, 1e-12, denom)
similarities = np.matmul(emb_matrix, ref_matrix.T) / safe_denom
similarities = np.clip(similarities, -1.0, 1.0)
distances = 1.0 - similarities
results: List[Dict[str, Any]] = []
for row in distances:
best_idx = int(np.argmin(row))
best_distance = float(row[best_idx])
best_identity = entries[best_idx].get("user_display") or entries[best_idx].get("user_folder") or "Unknown"
matched = best_distance <= threshold
identity = best_identity if matched else "Unknown"
if matched:
reason = f"Matched {best_identity} เพราะ cosine distance {best_distance:.4f} <= threshold {threshold:.2f}"
else:
reason = (
f"Unknown เพราะ cosine distance {best_distance:.4f} > threshold {threshold:.2f}; "
f"closest match คือ {best_identity}"
)
results.append(
{
"identity": identity,
"best_match_name": best_identity,
"distance": best_distance,
"matched": matched,
"threshold": threshold,
"reason": reason,
}
)
return results
def reset_runtime_panel():
return (
"🟦 พร้อมประมวลผลวิดีโอ — อัปโหลดวิดีโอที่แท็บซ้ายแล้วกดเริ่ม",
None,
render_log_feed([]),
)
def resolve_video_path(video_value):
if video_value is None:
return None
if isinstance(video_value, str):
return video_value
if isinstance(video_value, dict):
return video_value.get("video") or video_value.get("path") or video_value.get("name")
if isinstance(video_value, (list, tuple)) and len(video_value) > 0:
return video_value[0]
return str(video_value)
def process_video_stream(
video_value,
process_fps: float,
detector_score_threshold: float,
recognition_threshold: float,
expand_pct: int,
log_cooldown_sec: float,
use_align: bool,
use_esrgan: bool,
):
video_path = resolve_video_path(video_value)
if not video_path:
yield "⚠️ กรุณาอัปโหลดวิดีโอก่อนเริ่มประมวลผล", None, render_log_feed([])
return
data = load_embeddings()
entries = data.get("entries", [])
if not entries:
rebuild_message, _ = rebuild_embeddings()
data = load_embeddings()
entries = data.get("entries", [])
if not entries:
yield (
"⚠️ ยังไม่มีฐานข้อมูลใบหน้าที่พร้อมใช้งาน กรุณาลงทะเบียนรูปบุคคลก่อน\n\n"
+ rebuild_message,
None,
render_log_feed([]),
)
return
valid_entries, ref_matrix, ref_norms = prepare_reference_embeddings(entries)
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
yield "❌ เปิดวิดีโอไม่ได้", None, render_log_feed([])
return
src_fps = cap.get(cv2.CAP_PROP_FPS) or 25.0
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT) or 0)
detect_every = max(1, int(round(src_fps / max(process_fps, 0.1))))
last_faces: List[Dict[str, Any]] = []
log_items: List[Dict[str, Any]] = []
last_logged_second: Dict[str, float] = {}
frame_index = 0
preview_frame = None
try:
while True:
ok, bgr_frame = cap.read()
if not ok:
break
rgb_frame = cv2.cvtColor(bgr_frame, cv2.COLOR_BGR2RGB)
if frame_index % detect_every == 0:
current_faces: List[Dict[str, Any]] = []
try:
detections = RetinaFace.detect_faces(rgb_frame, threshold=detector_score_threshold)
except Exception:
detections = {}
if isinstance(detections, dict):
faces_sorted = sorted(
detections.values(),
key=lambda face: max(0, int(face["facial_area"][2]) - int(face["facial_area"][0]))
* max(0, int(face["facial_area"][3]) - int(face["facial_area"][1])),
reverse=True,
)
pending_faces: List[Dict[str, Any]] = []
for face in faces_sorted:
raw_bbox = [int(v) for v in face["facial_area"]]
bbox = expand_bbox(raw_bbox, rgb_frame.shape, expand_pct=float(expand_pct))
raw_face_rgb = crop_rgb(rgb_frame, bbox)
aligned_face_rgb = align_face_with_retinaface(
rgb_frame, raw_bbox, expand_pct=float(expand_pct)
) if use_align else None
preview_face_rgb = aligned_face_rgb if aligned_face_rgb is not None else crop_rgb(rgb_frame, bbox)
if preview_face_rgb.size == 0 or raw_face_rgb.size == 0:
continue
video_second = frame_index / src_fps if src_fps > 0 else 0.0
pending_faces.append(
{
"bbox": bbox,
"raw_face_rgb": raw_face_rgb,
"preview_face_rgb": preview_face_rgb,
"score": float(face.get("score", 0.0)),
"video_second": video_second,
}
)
if pending_faces:
embedding_inputs = [
maybe_enhance_for_embedding(item["preview_face_rgb"], use_esrgan=use_esrgan)
for item in pending_faces
]
batch_embeddings = represent_images_batch(embedding_inputs)
matches = find_best_matches_batch(
embeddings=batch_embeddings,
entries=valid_entries,
threshold=recognition_threshold,
ref_matrix=ref_matrix,
ref_norms=ref_norms,
)
for item, match, processed_face_rgb in zip(pending_faces, matches, embedding_inputs):
bbox = item["bbox"]
current_faces.append(
{
"bbox": bbox,
"identity": match["identity"],
"distance": match["distance"],
"score": item["score"],
}
)
bucket_x = bbox[0] // 120
bucket_y = bbox[1] // 120
dedupe_key = (
match["best_match_name"]
if match["matched"]
else f"Unknown_{bucket_x}_{bucket_y}"
)
if item["video_second"] - last_logged_second.get(dedupe_key, -999.0) >= log_cooldown_sec:
last_logged_second[dedupe_key] = item["video_second"]
event_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
log_items.insert(
0,
{
"event_time": event_time,
"video_time": seconds_to_hhmmss(item["video_second"]),
"identity": match["identity"],
"best_match_name": match["best_match_name"],
"distance": match["distance"],
"bbox": bbox,
"status": "Matched" if match["matched"] else "Unknown",
"threshold": match["threshold"],
"reason": match["reason"],
"processed_thumbnail": face_to_data_uri(processed_face_rgb),
"processed_label": (
("Aligned face" if use_align else "Raw crop")
+ (" + ESRGAN" if use_esrgan else "")
),
"processed_width": int(processed_face_rgb.shape[1]),
"processed_height": int(processed_face_rgb.shape[0]),
},
)
log_items = log_items[:MAX_LOG_ITEMS]
last_faces = current_faces
preview_frame = draw_face_annotations(rgb_frame, last_faces)
progress = (frame_index + 1) / total_frames * 100.0 if total_frames > 0 else 0.0
if frame_index % max(1, detect_every) == 0:
status = (
f"🔄 กำลังประมวลผลวิดีโอ... {progress:.1f}% \n"
f"- FPS ต้นฉบับ: `{src_fps:.2f}` \n"
f"- ประมวลผลทุก ๆ `{detect_every}` เฟรม \n"
f"- Recognition mode: `micro-batch per detection step` \n"
f"- Align ระหว่างวิดีโอ: `{'on' if use_align else 'off'}` \n"
f"- ESRGAN ก่อน embedding: `{'on' if use_esrgan else 'off'}` \n"
f"- Registered identities: `{len({e.get('user_folder', 'u') for e in entries})}` \n"
f"- Threshold rule: `distance <= {recognition_threshold:.2f}` ถึงจะนับว่าเป็นคนเดิม \n"
f"- Log events: `{len(log_items)}`"
)
yield status, preview_frame, render_log_feed(log_items)
frame_index += 1
except Exception as exc:
yield f"❌ เกิดข้อผิดพลาดระหว่างประมวลผล: {exc}", preview_frame, render_log_feed(log_items)
return
finally:
cap.release()
final_status = (
f"✅ ประมวลผลเสร็จแล้ว \n"
f"- จำนวน log event: `{len(log_items)}` \n"
f"- กติกา match: `distance <= {recognition_threshold:.2f}` \n"
f"- ESRGAN ก่อน embedding: `{'on' if use_esrgan else 'off'}` \n"
f"- โมเดล recognition: `{RECOGNITION_MODEL}`"
)
yield final_status, preview_frame, render_log_feed(log_items)
with gr.Blocks(
theme=gr.themes.Soft(primary_hue="sky", secondary_hue="blue", neutral_hue="slate"),
css=CUSTOM_CSS,
title="CCTV Face Recognition with RetinaFace + DeepFace",
) as demo:
gr.HTML(
"""
<div class='app-shell'>
<div class='hero-card'>
<div style='font-size:30px;font-weight:800;color:#0f172a'>CCTV Face Recognition Dashboard</div>
<div style='margin-top:8px;color:#334155;font-size:15px;line-height:1.65'>
แอป Gradio สำหรับ <b>Face Detection + Alignment + Recognition</b> ด้วย
<b>RetinaFace</b> และ <b>DeepFace (ArcFace)</b><br/>
ฝั่งซ้ายใช้สำหรับลงทะเบียนใบหน้า, อัปโหลดวิดีโอ, live preview และการตั้งค่าความเร็ว/ความแม่นยำ
ส่วนฝั่งขวาแสดง face logs แบบสรุปเหตุการณ์พร้อมเหตุผลของผลรู้จำ
โดยระบบจะล้างฐานข้อมูลใบหน้าทุกครั้งที่เริ่มรันสคริปต์ใหม่
</div>
</div>
</div>
"""
)
with gr.Row(equal_height=False):
with gr.Column(scale=4, elem_classes=["panel"]):
gr.Markdown("### ฝั่งซ้าย • อัปโหลดและตั้งค่า")
with gr.Tabs():
with gr.Tab("1) Upload รูปลงทะเบียน"):
reg_image = gr.Image(
type="numpy",
label="อัปโหลดรูปบุคคล",
sources=["upload", "webcam"],
height=330,
)
reg_user = gr.Textbox(
label="User",
placeholder="เช่น admin01",
)
reg_use_esrgan = gr.Checkbox(
label="Use ESRGAN before embedding",
value=False,
info="เหมาะกับรูปจากกล้องที่เบลอหรือรายละเอียดน้อย แต่จะช้ากว่าปกติและโหลดโมเดลครั้งแรกอาจใช้เวลา",
)
with gr.Row():
reg_save_btn = gr.Button("บันทึกลงระบบ", elem_classes=["blue-btn"])
reg_refresh_btn = gr.Button("รีเฟรชฐานข้อมูล", elem_classes=["neutral-btn"])
reg_status = gr.Markdown(value="พร้อมลงทะเบียน")
reg_gallery = gr.Gallery(
label="รูปใบหน้าที่เก็บของ user นี้",
columns=4,
height=260,
object_fit="cover",
)
db_summary = gr.HTML(render_db_overview())
with gr.Tab("2) Upload video + Config"):
video_input = gr.Video(
sources=["upload"],
label="อัปโหลดวิดีโอจากกล้องวงจรปิด",
height=330,
)
live_preview = gr.Image(
label="Live Preview (วาด BBox ขณะประมวลผล)",
type="numpy",
interactive=False,
height=330,
)
process_fps = gr.Slider(
minimum=1,
maximum=12,
value=3,
step=1,
label="Process FPS",
info="จำนวนเฟรมต่อวินาทีที่ใช้ตรวจจับและจดจำใบหน้า",
)
detector_score_threshold = gr.Slider(
minimum=0.10,
maximum=0.99,
value=0.85,
step=0.01,
label="Detection Score Threshold",
)
recognition_threshold = gr.Slider(
minimum=0.15,
maximum=1.00,
value=0.85,
step=0.01,
label="Recognition Threshold (Cosine Distance)",
info="กติกาคือ distance ต้องน้อยกว่าหรือเท่ากับค่านี้จึงจะ match; ค่าเริ่มต้น 0.85 จะผ่อนกว่าค่าเดิม",
)
expand_pct = gr.Slider(
minimum=0,
maximum=30,
value=10,
step=1,
label="Expand Face Area (%)",
)
log_cooldown = gr.Slider(
minimum=0.0,
maximum=10.0,
value=0,
step=0.5,
label="Log Cooldown (seconds)",
info="ลด log ซ้ำของคนเดิมในช่วงเวลาใกล้กัน",
)
use_align = gr.Checkbox(
label="Use face alignment",
value=True,
info="ปิดไว้จะเร็วสุดสำหรับวิดีโอ; เปิดเมื่ออยากเพิ่มความนิ่งของใบหน้าก่อนสร้าง embedding",
)
use_esrgan = gr.Checkbox(
label="Use ESRGAN before embedding",
value=False,
info="ปรับรายละเอียด face crop จากกล้องก่อนทำ embedding; ช่วยกับภาพเล็กหรือแตก แต่จะช้าลงชัดเจน",
)
with gr.Row():
start_btn = gr.Button("เริ่มประมวลผล", elem_classes=["blue-btn"])
clear_btn = gr.Button("ล้างผลลัพธ์", elem_classes=["neutral-btn"])
gr.HTML(
"""
<div class='soft-note'>
ใช้ <b>RetinaFace.detect_faces</b> สำหรับตรวจจับ,
เปิด <b>Use face alignment</b> เมื่อต้องการจัดแนวใบหน้าก่อน recognition,
เปิด <b>Use ESRGAN before embedding</b> เมื่อต้องการเพิ่มรายละเอียด face crop จากภาพกล้องก่อนสร้าง embedding,
และใช้ <b>DeepFace.represent</b> ด้วยโมเดล <b>ArcFace</b> สำหรับ recognition โดยคำนวณจากภาพใน memory เพื่อลดเวลา I/O
</div>
"""
)
with gr.Column(scale=6, elem_classes=["panel"]):
gr.Markdown("### ฝั่งขวา • Face Logs")
runtime_status = gr.Markdown(
value="🟦 พร้อมประมวลผลวิดีโอ — อัปโหลดวิดีโอที่แท็บซ้ายแล้วกดเริ่ม",
elem_classes=["status-box"],
)
face_log_feed = gr.HTML(value=render_log_feed([]))
reg_save_btn.click(
fn=register_face,
inputs=[reg_image, reg_user, reg_use_esrgan],
outputs=[reg_status, reg_gallery, db_summary],
)
reg_refresh_btn.click(
fn=rebuild_embeddings,
inputs=[],
outputs=[reg_status, db_summary],
)
start_btn.click(
fn=process_video_stream,
inputs=[
video_input,
process_fps,
detector_score_threshold,
recognition_threshold,
expand_pct,
log_cooldown,
use_align,
use_esrgan,
],
outputs=[runtime_status, live_preview, face_log_feed],
)
clear_btn.click(
fn=reset_runtime_panel,
inputs=[],
outputs=[runtime_status, live_preview, face_log_feed],
)
demo.queue(default_concurrency_limit=1)
if __name__ == "__main__":
modeling.build_model(task="facial_recognition", model_name=RECOGNITION_MODEL)
demo.launch(server_name="0.0.0.0", server_port=int(os.getenv("PORT", "7860")),share=False)