Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI, File, UploadFile, HTTPException | |
| from fastapi.responses import JSONResponse | |
| from fastapi import FastAPI, File, UploadFile, HTTPException | |
| from fastapi.middleware.cors import CORSMiddleware | |
| import uvicorn | |
| import torch | |
| import torch.nn as nn | |
| from torchvision import models, transforms | |
| from facenet_pytorch import MTCNN | |
| import cv2 | |
| import numpy as np | |
| from PIL import Image | |
| import shutil | |
| import os | |
| import uuid | |
| # --- CONFIG --- | |
| MODEL_PATH = "deepfake_detector.pth" # Path ที่เก็บโมเดล | |
| DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu' | |
| IMG_SIZE = 224 | |
| FRAME_INTERVAL = 10 # ตรวจทุกๆ 10 เฟรม (เพื่อความเร็ว) | |
| app = FastAPI(title="DeepGuard API", description="Deepfake Detection API") | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], # ใน Production ควรเปลี่ยนเป็น domain จริง แต่ตอนนี้ใส่ * (รับหมด) ไปก่อน | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # --- 1. Model Loader --- | |
| def build_model(): | |
| # ต้องสร้างโครงสร้างโมเดลให้เหมือนตอนเทรนเป๊ะๆ | |
| model = models.efficientnet_b0(weights=None) # ไม่ต้องโหลด weight เน็ต เพราะเราจะโหลดเอง | |
| num_ftrs = model.classifier[1].in_features | |
| # โครงสร้างต้องตรงกับที่เราแก้ล่าสุด (Dropout + Linear) | |
| model.classifier[1] = nn.Sequential( | |
| nn.Dropout(p=0.6), | |
| nn.Linear(num_ftrs, 2) | |
| ) | |
| return model | |
| print(f"Loading model from {MODEL_PATH}...") | |
| try: | |
| model = build_model() | |
| # map_location สำคัญมากถ้าเทรนบน GPU แต่รันบน CPU | |
| model.load_state_dict(torch.load(MODEL_PATH, map_location=DEVICE)) | |
| model.to(DEVICE) | |
| model.eval() # สั่งให้เป็นโหมดทำนาย (ปิด Dropout) | |
| print("✅ Model loaded successfully!") | |
| except Exception as e: | |
| print(f"❌ Error loading model: {e}") | |
| # ถ้าโหลดไม่ผ่าน ให้รันต่อไม่ได้ | |
| raise e | |
| # --- 2. Helper Components --- | |
| # Face Detector | |
| mtcnn = MTCNN(keep_all=False, select_largest=True, device=DEVICE, margin=20) | |
| # Preprocessing (เหมือนตอน Validation) | |
| transform = transforms.Compose([ | |
| transforms.Resize((IMG_SIZE, IMG_SIZE)), | |
| transforms.ToTensor(), | |
| transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]), | |
| ]) | |
| # --- 3. Processing Logic --- | |
| def process_video(video_path): | |
| cap = cv2.VideoCapture(video_path) | |
| frames_processed = 0 | |
| fake_probs = [] | |
| frame_count = 0 | |
| while cap.isOpened(): | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| # ข้ามเฟรมเพื่อความเร็ว | |
| if frame_count % FRAME_INTERVAL != 0: | |
| frame_count += 1 | |
| continue | |
| # แปลงสี BGR -> RGB | |
| frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
| pil_img = Image.fromarray(frame_rgb) | |
| # Detect Face & Crop | |
| # MTCNN จะคืนค่ามาเป็น PIL Image ที่ Crop แล้ว (ถ้า detected) | |
| face = mtcnn(pil_img) | |
| if face is not None: | |
| # face ที่ได้มาเป็น Tensor อยู่แล้ว (เพราะ MTCNN ของ library นี้ทำให้) | |
| # แต่เราต้อง Normalize ให้เหมือนตอนเทรน (EfficientNet ต้องการ) | |
| # แปลงกลับเป็น PIL เพื่อเข้า Transform มาตรฐาน (หรือจะทำ manual ก็ได้) | |
| # เพื่อความชัวร์ ใช้ transform ที่เราประกาศไว้ดีกว่า | |
| # แต่ mtcnn คืนค่ามาเป็น tensor ที่ normalize มาแบบนึงแล้ว (0-1) | |
| # ซึ่งมักจะไม่ตรงกับ ImageNet mean/std | |
| # **แก้ไข Logic:** เพื่อความแม่นยำสูงสุด เราจะใช้ box จาก mtcnn แล้ว crop เอง | |
| # เพื่อส่งเข้า transform pipeline เดียวกับตอนเทรน | |
| boxes, _ = mtcnn.detect(pil_img) | |
| if boxes is not None: | |
| box = boxes[0] | |
| face_img = pil_img.crop(box) | |
| # Preprocess | |
| input_tensor = transform(face_img).unsqueeze(0).to(DEVICE) | |
| # Inference | |
| with torch.no_grad(): | |
| outputs = model(input_tensor) | |
| # output คือ [logit_fake, logit_real] หรือ [logit_real, logit_fake] ขึ้นอยู่กับ class index | |
| # ปกติ ImageFolder จะเรียงตามตัวอักษร: 0=fake, 1=real | |
| # เราจะใช้ Softmax เพื่อแปลงเป็น % | |
| probs = torch.softmax(outputs, dim=1) | |
| fake_prob = probs[0][0].item() # สมมติว่า class 0 คือ fake | |
| fake_probs.append(fake_prob) | |
| frames_processed += 1 | |
| frame_count += 1 | |
| cap.release() | |
| return fake_probs | |
| # --- 4. API Endpoints --- | |
| def home(): | |
| return {"message": "DeepGuard API is running!"} | |
| async def predict_video(file: UploadFile = File(...)): | |
| # 1. เช็คชนิดไฟล์ | |
| if not file.filename.endswith(('.mp4', '.avi', '.mov')): | |
| raise HTTPException(status_code=400, detail="Invalid file format. Please upload video.") | |
| # 2. Save ไฟล์วิดีโอลงเครื่องชั่วคราว (เพราะ OpenCV อ่านจาก RAM ไม่ได้ง่ายๆ) | |
| temp_filename = f"temp_{uuid.uuid4()}.mp4" | |
| try: | |
| with open(temp_filename, "wb") as buffer: | |
| shutil.copyfileobj(file.file, buffer) | |
| # 3. ส่งเข้า Process | |
| fake_probs = process_video(temp_filename) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Processing Error: {str(e)}") | |
| finally: | |
| # 4. ลบไฟล์ทิ้งเมื่อเสร็จ (Clean up) | |
| if os.path.exists(temp_filename): | |
| os.remove(temp_filename) | |
| # 5. สรุปผล | |
| if not fake_probs: | |
| return {"status": "failed", "message": "No faces detected in the video."} | |
| # หาค่าเฉลี่ยความน่าจะเป็น | |
| avg_fake_prob = np.mean(fake_probs) | |
| # Thresholding (ปรับได้) | |
| # ถ้า Fake Probability > 0.5 ให้ตอบว่า FAKE | |
| # แต่เนื่องจากเรามี Label Smoothing ค่ามันอาจจะไม่ใช่ 0 หรือ 1 เป๊ะๆ | |
| detection_result = "FAKE" if avg_fake_prob > 0.5 else "REAL" | |
| confidence = avg_fake_prob if detection_result == "FAKE" else (1 - avg_fake_prob) | |
| return { | |
| "filename": file.filename, | |
| "result": detection_result, | |
| "confidence": f"{confidence*100:.2f}%", | |
| "fake_probability": float(avg_fake_prob), | |
| "frames_analyzed": len(fake_probs) | |
| } | |
| if __name__ == "__main__": | |
| uvicorn.run(app, host="0.0.0.0", port=8000) |