| """Real-time face anti-spoofing demo (webcam or image).""" |
|
|
| import cv2 |
| import numpy as np |
| import sys |
| import time |
| import argparse |
| from pathlib import Path |
|
|
| from src.inference import ( |
| load_model, |
| infer, |
| process_with_logits, |
| crop, |
| get_cpu_info, |
| get_gpu_info, |
| get_execution_provider_name, |
| ) |
| from src.detection import load_detector, detect |
|
|
| MODELS_DIR = Path(__file__).parent / "models" |
| DETECTOR_MODEL = MODELS_DIR / "detector_quantized.onnx" |
| LIVENESS_MODEL = MODELS_DIR / "best_model_quantized.onnx" |
|
|
|
|
| def spoof_default_result(): |
| return { |
| "is_real": False, |
| "status": "spoof", |
| "logit_diff": 0.00, |
| "real_logit": 0.00, |
| "spoof_logit": 0.00, |
| "confidence": 0.00, |
| "p_real": 0.00, |
| } |
|
|
| def resize_for_detection(image_bgr, max_side=1280): |
| """Resize image for detection only; returns (resized_image, scale).""" |
| h, w = image_bgr.shape[:2] |
| if max(h, w) <= max_side: |
| return image_bgr, 1.0 |
|
|
| scale = max_side / float(max(h, w)) |
| new_w = int(w * scale) |
| new_h = int(h * scale) |
|
|
| |
| resized = cv2.resize(image_bgr, (new_w, new_h), interpolation=cv2.INTER_AREA) |
| return resized, scale |
|
|
|
|
| def scale_bbox_to_original(bbox, inv_scale): |
| """bbox dict in resized coords -> bbox dict in original coords.""" |
| return { |
| "x": bbox["x"] * inv_scale, |
| "y": bbox["y"] * inv_scale, |
| "width": bbox["width"] * inv_scale, |
| "height": bbox["height"] * inv_scale, |
| } |
|
|
|
|
| def draw_info_overlay(display_frame, fps_history, cpu_info, gpu_info, provider_name): |
| avg_fps = sum(fps_history) / len(fps_history) if fps_history else 0 |
|
|
| info_y = 25 |
| line_height = 20 |
| font = cv2.FONT_HERSHEY_SIMPLEX |
| font_scale = 0.5 |
| thickness = 1 |
| color_white = (255, 255, 255) |
| color_cyan = (255, 255, 0) |
|
|
| cv2.putText( |
| display_frame, |
| f"FPS: {avg_fps:.1f}", |
| (5, info_y), |
| font, |
| font_scale, |
| color_cyan, |
| thickness, |
| ) |
| info_y += line_height |
|
|
| cpu_lines = [] |
| max_chars_per_line = 55 |
| words = cpu_info.split() |
| current_line = "" |
| for word in words: |
| if len(current_line + " " + word) <= max_chars_per_line: |
| current_line += " " + word if current_line else word |
| else: |
| if current_line: |
| cpu_lines.append(current_line) |
| current_line = word |
| if current_line: |
| cpu_lines.append(current_line) |
|
|
| for i, cpu_line in enumerate(cpu_lines[:2]): |
| cv2.putText( |
| display_frame, |
| f"CPU: {cpu_line}" if i == 0 else cpu_line, |
| (5, info_y), |
| font, |
| font_scale, |
| color_white, |
| thickness, |
| ) |
| info_y += line_height |
|
|
| if gpu_info: |
| gpu_lines = [] |
| words = gpu_info.split() |
| current_line = "" |
| for word in words: |
| if len(current_line + " " + word) <= max_chars_per_line: |
| current_line += " " + word if current_line else word |
| else: |
| if current_line: |
| gpu_lines.append(current_line) |
| current_line = word |
| if current_line: |
| gpu_lines.append(current_line) |
|
|
| for i, gpu_line in enumerate(gpu_lines[:2]): |
| cv2.putText( |
| display_frame, |
| f"GPU: {gpu_line}" if i == 0 else gpu_line, |
| (5, info_y), |
| font, |
| font_scale, |
| color_white, |
| thickness, |
| ) |
| info_y += line_height |
| else: |
| cv2.putText( |
| display_frame, |
| "GPU: No GPU detected", |
| (5, info_y), |
| font, |
| font_scale, |
| color_white, |
| thickness, |
| ) |
| info_y += line_height |
|
|
| cv2.putText( |
| display_frame, |
| f"Provider: {provider_name}", |
| (5, info_y), |
| font, |
| font_scale, |
| color_white, |
| thickness, |
| ) |
| info_y += line_height |
| cv2.putText( |
| display_frame, |
| "Press 'i' to toggle", |
| (5, info_y), |
| font, |
| 0.4, |
| (200, 200, 200), |
| 1, |
| ) |
|
|
|
|
| def process_camera(args, face_detector, liveness_session, input_name, logit_threshold): |
| cap = cv2.VideoCapture(args.camera) |
| if not cap.isOpened(): |
| print(f"Error: Could not open camera {args.camera}") |
| exit(1) |
|
|
| cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640) |
| cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480) |
| cap.set(cv2.CAP_PROP_FPS, 30) |
|
|
| window_name = "Liveness Detection" |
| cv2.namedWindow(window_name, cv2.WINDOW_NORMAL) |
| cv2.resizeWindow(window_name, 640, 480) |
|
|
| show_info = True |
| fps_history = [] |
|
|
| cpu_info = get_cpu_info() |
| gpu_info = get_gpu_info() |
| provider_name = get_execution_provider_name(liveness_session) |
|
|
| print("Controls:") |
| print(" 'q' - Quit") |
| print(" 'i' - Toggle info display") |
|
|
| while True: |
| frame_start = time.time() |
| ret, frame = cap.read() |
| if not ret: |
| break |
|
|
| frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) |
| faces = detect(frame_rgb, face_detector, margin=args.margin) |
|
|
| if faces: |
| face_crops = [] |
| valid_faces = [] |
| for face in faces: |
| bbox = face["bbox"] |
| x, y, w, h = bbox["x"], bbox["y"], bbox["width"], bbox["height"] |
| det_conf = face.get("confidence", None) |
| if det_conf is not None: |
| det_conf = round(float(det_conf), 2) |
|
|
| try: |
| face_crop = crop(frame_rgb, (x, y, x + w, y + h), args.bbox_expansion_factor) |
| face_crops.append(face_crop) |
| valid_faces.append((int(x), int(y), int(w), int(h), det_conf)) |
| except Exception as e: |
| if args.verbose: |
| print(f"Warning: Failed to crop face at ({x},{y},{w},{h}): {e}", file=sys.stderr) |
| continue |
|
|
| if face_crops: |
| predictions = infer(face_crops, liveness_session, input_name, args.model_img_size) |
|
|
| for (x, y, w, h, det_conf), pred in zip(valid_faces, predictions): |
| try: |
| result = process_with_logits(pred, logit_threshold) |
| print(f"RESULT : {result}") |
| except Exception: |
| continue |
|
|
| color = (0, 255, 0) if result["is_real"] else (0, 0, 255) |
| cv2.rectangle(frame, (x, y), (x + w, y + h), color, 2) |
|
|
| if det_conf is not None: |
| label = f"{result['status'].upper()} p={result['realness_score']:.2f} det={det_conf:.2f}" |
| else: |
| label = f"{result['status'].upper()} p={result['realness_score']:.2f}" |
|
|
| cv2.putText(frame, label, (x, max(0, y - 10)), |
| cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2) |
|
|
| frame_time = time.time() - frame_start |
| current_fps = 1.0 / frame_time if frame_time > 0 else 0 |
| fps_history.append(current_fps) |
| if len(fps_history) > 30: |
| fps_history.pop(0) |
|
|
| display_frame = cv2.resize(frame, (640, 480), interpolation=cv2.INTER_AREA) |
| if show_info: |
| draw_info_overlay(display_frame, fps_history, cpu_info, gpu_info, provider_name) |
|
|
| cv2.imshow(window_name, display_frame) |
| key = cv2.waitKey(1) & 0xFF |
| if key == ord("q"): |
| break |
| elif key == ord("i"): |
| show_info = not show_info |
|
|
| cap.release() |
| cv2.destroyAllWindows() |
|
|
|
|
| def process_image(args, face_detector, liveness_session, input_name, logit_threshold): |
| image = cv2.imread(args.image) |
| if image is None: |
| print(f"Error: Could not load image from '{args.image}'", file=sys.stderr) |
| print("Please check that the file exists and is a valid image format.", file=sys.stderr) |
| exit(1) |
|
|
| |
| det_bgr, scale = resize_for_detection(image, max_side=1280) |
| det_rgb = cv2.cvtColor(det_bgr, cv2.COLOR_BGR2RGB) |
| faces = detect(det_rgb, face_detector, margin=args.margin) |
|
|
| if not faces: |
| print("No faces detected") |
| result = spoof_default_result() |
| print(result) |
| cv2.imshow("Result", image) |
| cv2.waitKey(0) |
| cv2.destroyAllWindows() |
| return |
|
|
| inv_scale = 1.0 / scale if scale != 0 else 1.0 |
| image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) |
|
|
| |
| face_crops = [] |
| valid_faces = [] |
| for face in faces: |
| bbox_orig = scale_bbox_to_original(face["bbox"], inv_scale) |
| x, y, w, h = bbox_orig["x"], bbox_orig["y"], bbox_orig["width"], bbox_orig["height"] |
| det_conf = face.get("confidence", None) |
| if det_conf is not None: |
| det_conf = round(float(det_conf), 2) |
|
|
| try: |
| face_crop = crop(image_rgb, (x, y, x + w, y + h), args.bbox_expansion_factor) |
| face_crops.append(face_crop) |
| valid_faces.append((int(x), int(y), int(w), int(h), det_conf)) |
| except Exception as e: |
| if args.verbose: |
| print(f"Warning: Failed to crop face at ({x},{y},{w},{h}): {e}", file=sys.stderr) |
| continue |
|
|
| if not face_crops: |
| print("Faces were detected, but all crops failed") |
| cv2.imshow("Result", image) |
| cv2.waitKey(0) |
| cv2.destroyAllWindows() |
| return |
|
|
| predictions = infer(face_crops, liveness_session, input_name, args.model_img_size) |
| if not predictions: |
| print("Inference returned no predictions") |
| cv2.imshow("Result", image) |
| cv2.waitKey(0) |
| cv2.destroyAllWindows() |
| return |
|
|
| |
| for (x, y, w, h, det_conf), pred in zip(valid_faces, predictions): |
| try: |
| result = process_with_logits(pred, logit_threshold) |
| print(f"RESULT : {result}") |
| except Exception: |
| continue |
|
|
| color = (0, 255, 0) if result["is_real"] else (0, 0, 255) |
| cv2.rectangle(image, (x, y), (x + w, y + h), color, 2) |
|
|
| if det_conf is not None: |
| label = f"{result['status'].upper()} p={result['realness_score']:.2f} det={det_conf:.2f}" |
| else: |
| label = f"{result['status'].upper()} p={result['realness_score']:.2f}" |
|
|
| cv2.putText(image, label, (x, max(0, y - 10)), |
| cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2) |
|
|
| cv2.imshow("Result", image) |
| cv2.waitKey(0) |
| cv2.destroyAllWindows() |
|
|
|
|
| if __name__ == "__main__": |
| parser = argparse.ArgumentParser() |
| parser.add_argument("--image", type=str, default=None, help="Path to image file (if not provided, uses camera)") |
| parser.add_argument("--camera", type=int, default=0, help="Camera index to use (default: 0)") |
| parser.add_argument("--model_img_size", type=int, default=128) |
| parser.add_argument("--bbox_expansion_factor", type=float, default=1.5) |
| parser.add_argument("--threshold", type=float, default=0.5) |
| parser.add_argument("--margin", type=int, default=5) |
| parser.add_argument("--detector_model", type=str, default=str(DETECTOR_MODEL)) |
| parser.add_argument("--liveness_model", type=str, default=str(LIVENESS_MODEL)) |
| parser.add_argument("--verbose", action="store_true", help="Enable verbose error logging") |
| args = parser.parse_args() |
|
|
| p = max(1e-6, min(1 - 1e-6, args.threshold)) |
| logit_threshold = np.log(p / (1 - p)) |
|
|
| face_detector = load_detector(args.detector_model, (320, 320)) |
| liveness_session, input_name = load_model(args.liveness_model) |
|
|
| if liveness_session is None or face_detector is None: |
| exit(1) |
|
|
| if args.image is None: |
| process_camera(args, face_detector, liveness_session, input_name, logit_threshold) |
| else: |
| process_image(args, face_detector, liveness_session, input_name, logit_threshold) |
|
|