Spaces:
Runtime error
Runtime error
| import cv2 | |
| import os | |
| import torch | |
| import numpy as np | |
| from fastapi import FastAPI, UploadFile, File | |
| from typing import List | |
| from PIL import Image | |
| import torchvision.transforms as transforms | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from contextlib import asynccontextmanager | |
| from Antispoofing.inference import DeepFakeModel | |
| # --- GLOBAL BRAIN LOGIC --- | |
| class LivenessSDK_Brain: | |
| def __init__(self): | |
| self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| self.model = DeepFakeModel(backbone_ckpt=None).to(self.device) | |
| self.load_weights() | |
| self.transform = transforms.Compose([ | |
| transforms.Resize((224, 224)), | |
| transforms.ToTensor(), | |
| transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5]) | |
| ]) | |
| def load_weights(self): | |
| parent_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) | |
| model_path = os.path.join(parent_dir, "Antispoofing", "antispoofing_full.pth") | |
| checkpoint = torch.load(model_path, map_location=self.device) | |
| state_dict = checkpoint['model_state_dict'] if 'model_state_dict' in checkpoint else checkpoint | |
| self.model.load_state_dict(state_dict) | |
| self.model.eval() | |
| def check_physical_liveness(self, frame_rgb): | |
| hsv = cv2.cvtColor(frame_rgb, cv2.COLOR_RGB2HSV) | |
| v_channel = hsv[:, :, 2] | |
| h_channel = hsv[:, :, 0] | |
| s_channel = hsv[:, :, 1] | |
| # 1. Structural Entropy | |
| hist = cv2.calcHist([v_channel], [0], None, [256], [0, 256]) | |
| hist = hist / (hist.sum() + 1e-7) | |
| entropy = -np.sum(hist * np.log2(hist + 1e-7)) | |
| # 2. Specular Peakiness | |
| max_peak = np.max(hist) | |
| # 3. Intensity Area (NEW: Screens have 'broad' bright areas, faces have 'tiny' glints) | |
| # Count pixels that are near-maximum brightness | |
| _, bright_mask = cv2.threshold(v_channel, 240, 255, cv2.THRESH_BINARY) | |
| bright_area_ratio = (cv2.countNonZero(bright_mask) / v_channel.size) * 100 | |
| # 4. Color Diversity | |
| hue_std = np.std(h_channel) | |
| sat_std = np.std(s_channel) | |
| # --- THE ULTIMATE SCORING ENGINE --- | |
| spoof_points = 0 | |
| # Entropy Check (Flatness) | |
| if entropy < 6.2: | |
| spoof_points += 1 | |
| # Peakiness Check (Screens are too uniform) | |
| if max_peak < 0.025 or max_peak > 0.15: | |
| spoof_points += 1 | |
| # Digital Palette (Narrow color bands) | |
| if hue_std < 3.2 or sat_std < 6.0: | |
| spoof_points += 1 | |
| # Broad Brightness (Screens glow, faces reflect) | |
| # If more than 4% of the image is 'max bright', it's likely a screen | |
| if bright_area_ratio > 4.0: | |
| spoof_points += 1 | |
| # --- THE OVERRIDE --- | |
| # If the peak is very high (0.06+) AND the bright area is small (< 3%), | |
| # it is almost certainly a real 3D reflection on skin. | |
| if 0.045 < max_peak < 0.15 and bright_area_ratio < 3.0: | |
| spoof_points = 0 | |
| print(f"DEBUG -> Entropy: {entropy:.2f}, Peak: {max_peak:.4f}, BrightArea: {bright_area_ratio:.1f}% | Points: {spoof_points}") | |
| if spoof_points > 1: | |
| return False, f"SCREEN_SPOOF_DETECTED (Score: {spoof_points})" | |
| if spoof_points == 1: | |
| # if max_peak < 0.045 or max_peak > 0.15: | |
| # return False, f"SCREEN_SPOOF_DETECTED (Score: {spoof_points})" | |
| return False, f"SPOOF_BLUR_OR_LOW_TEXTURE_DETECTED (Score: {spoof_points})" | |
| return True, "OK" | |
| def check_glare_and_texture(self, frame_rgb): | |
| hsv = cv2.cvtColor(frame_rgb, cv2.COLOR_RGB2HSV) | |
| v_channel = hsv[:, :, 2] | |
| avg_brightness = np.mean(v_channel) | |
| laplacian_var = cv2.Laplacian(v_channel, cv2.CV_64F).var() | |
| # Adaptive threshold ramp for skin tone fairness | |
| if avg_brightness <= 70: | |
| min_tex = 18 | |
| elif avg_brightness >= 120: | |
| min_tex = 30 | |
| else: | |
| min_tex = 18 + (40 - 18) * ((avg_brightness - 70) / (120 - 70)) | |
| print(f"DEBUG -> Texture: {laplacian_var:.2f}, MinThresh: {min_tex:.1f} brightness: {avg_brightness:.1f}") | |
| if laplacian_var < min_tex: | |
| return False, f"SPOOF_BLUR_OR_LOW_TEXTURE_DETECTED ({laplacian_var:.1f})" | |
| return True, "OK" | |
| def check_motion_coherence(self, frames_rgb): | |
| # Guard: If only 1 frame, we cannot calculate motion | |
| if len(frames_rgb) < 2: | |
| return False, "INSUFFICIENT_FRAMES_FOR_MOTION_CHECK" | |
| diffs = [] | |
| for i in range(len(frames_rgb) - 1): | |
| d = cv2.absdiff(frames_rgb[i], frames_rgb[i+1]) | |
| diffs.append(np.mean(d)) | |
| avg_motion = np.mean(diffs) | |
| print(f"DEBUG -> Avg Motion: {avg_motion:.2f}") | |
| if np.isnan(avg_motion) or avg_motion < 0.8: | |
| return False, "STATIC_IMAGE_OR_SCREEN_DETECTION" | |
| return True, "OK" | |
| # --- FASTAPI SETUP --- | |
| brain = None | |
| async def lifespan(app: FastAPI): | |
| global brain | |
| brain = LivenessSDK_Brain() | |
| yield | |
| app = FastAPI(lifespan=lifespan) | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| async def verify(files: List[UploadFile] = File(...)): | |
| if brain is None: | |
| return {"is_live": False, "reason": "SERVER_STARTING"} | |
| cv_frames = [] | |
| for file in files: | |
| content = await file.read() | |
| nparr = np.frombuffer(content, np.uint8) | |
| img = cv2.imdecode(nparr, cv2.IMREAD_COLOR) | |
| if img is not None: | |
| img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) | |
| cv_frames.append(img_rgb) | |
| if not cv_frames: | |
| return {"is_live": False, "reason": "NO_IMAGE_DATA"} | |
| last_frame = cv_frames[-1] | |
| # --- DEFENSE PIPELINE --- | |
| # 1. Physical Environment Check (Entropy/Peak/Color) | |
| phys_ok, phys_msg = brain.check_physical_liveness(last_frame) | |
| if not phys_ok: return {"is_live": False, "reason": phys_msg} | |
| # 2. Adaptive Texture Check | |
| tex_ok, tex_msg = brain.check_glare_and_texture(last_frame) | |
| if not tex_ok: return {"is_live": False, "reason": tex_msg} | |
| # 3. Motion Consistency Check | |
| motion_ok, motion_msg = brain.check_motion_coherence(cv_frames) | |
| if not motion_ok: return {"is_live": False, "reason": motion_msg} | |
| # 4. Neural Network Inference (DeepFake Model) | |
| tensors = [brain.transform(Image.fromarray(f)) for f in cv_frames] | |
| batch = torch.stack(tensors).unsqueeze(0).to(brain.device) | |
| with torch.no_grad(): | |
| logits = brain.model(batch) | |
| scores = torch.sigmoid(logits) | |
| final_score = torch.mean(scores).item() | |
| print(f"--- FINAL DECISION --- Score: {final_score:.4f}") | |
| is_live = final_score > 0.75 | |
| return { | |
| "is_live": is_live, | |
| "skin_confidence": round(final_score, 4), | |
| "status": "success", | |
| "reason": "OK" if is_live else "AI_SPOOF_DETECTION" | |
| } | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=8000) |