from fastapi import FastAPI, File, UploadFile, HTTPException from fastapi.middleware.cors import CORSMiddleware import uvicorn import cv2 import numpy as np import os import tempfile import shutil import warnings import logging import os # Suppress Keras and TensorFlow warnings warnings.filterwarnings('ignore', category=UserWarning) os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3' logging.getLogger('absl').setLevel(logging.ERROR) from tensorflow.keras.preprocessing import image from tensorflow.keras.models import load_model from ultralytics import YOLO app = FastAPI(title="Deepfake Detection API") # Update CORS for frontend connectivity app.add_middleware( CORSMiddleware, allow_origins=["*"], # In production, replace with your frontend URL allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) @app.get("/") async def health_check(): return {"status": "online", "message": "Backend is running!"} print("Loading model and YOLO11...") model = load_model('model.h5', compile=False) # YOLOv11 pose model for extremely tight and precise facial feature cropping (matches MTCNN style) detector = YOLO('yolo11n-pose.pt') print("Model loaded successfully.") def detect_and_crop_face(img): """Detects face/person and crops it to 128x128.""" # Run YOLO11 pose detection results = detector.predict(img, verbose=False) if len(results) > 0 and results[0].keypoints is not None and len(results[0].keypoints.xy[0]) > 0: # Get the first person's keypoints kpts = results[0].keypoints.xy[0].cpu().numpy() # 0: nose, 1: left eye, 2: right eye, 3: left ear, 4: right ear face_kpts = kpts[0:5] # Filter out keypoints that weren't detected valid_kpts = [k for k in face_kpts if k[0] > 0 and k[1] > 0] if valid_kpts: valid_kpts = np.array(valid_kpts) x_min, y_min = np.min(valid_kpts, axis=0) x_max, y_max = np.max(valid_kpts, axis=0) # Expand this tight keypoint box to capture the full face (forehead, chin, cheeks) w = x_max - x_min h = y_max - y_min # Safety for edge cases if w > 0 and h > 0: pad_x = w * 0.3 pad_y_top = h * 0.5 # Expand more upward for the forehead pad_y_bot = h * 0.8 # Expand downward for the chin/mouth final_x1 = max(0, int(x_min - pad_x)) final_y1 = max(0, int(y_min - pad_y_top)) final_x2 = min(img.shape[1], int(x_max + pad_x)) final_y2 = min(img.shape[0], int(y_max + pad_y_bot)) face = img[final_y1:final_y2, final_x1:final_x2] if face.size > 0: return cv2.resize(face, (128, 128)) # Fallback to normal YOLO box heuristic if face keypoints fail but person is found if len(results) > 0 and len(results[0].boxes) > 0: box = results[0].boxes[0].xyxy[0].cpu().numpy() x1, y1, x2, y2 = map(int, box) x1, y1 = max(0, x1), max(0, y1) x2, y2 = min(img.shape[1], x2), min(img.shape[0], y2) h = y2 - y1 w = x2 - x1 if h > w * 1.5: y2 = y1 + int(h * 0.3) face = img[y1:y2, x1:x2] if face.size > 0: return cv2.resize(face, (128, 128)) # If no person is detected at all return cv2.resize(img, (128, 128)) def preprocess_face(face): """Formats the cropped face for the model.""" img_array = image.img_to_array(face) img_array = np.expand_dims(img_array, axis=0) img_array /= 255.0 # Normalize return img_array def process_image(img): """Processes a single BGR image array and returns the fake probability.""" face = detect_and_crop_face(img) processed_image = preprocess_face(face) prediction = model.predict(processed_image, verbose=0) return float(prediction[0][0]) @app.post("/predict") async def predict_media(file: UploadFile = File(...)): filename = file.filename.lower() is_video = filename.endswith(('.mp4', '.avi', '.mov', '.mkv')) is_image = filename.endswith(('.jpg', '.jpeg', '.png', '.bmp')) if not is_image and not is_video: raise HTTPException(status_code=400, detail="Unsupported file format.") try: if is_image: # Read image directly from bytes contents = await file.read() nparr = np.frombuffer(contents, np.uint8) img = cv2.imdecode(nparr, cv2.IMREAD_COLOR) if img is None: raise HTTPException(status_code=400, detail="Invalid image file.") score = process_image(img) result = "Real" if score < 0.5 else "Fake" return { "filename": filename, "type": "image", "prediction": result, "confidence_score": score } elif is_video: # Save video to a temporary file for cv2.VideoCapture with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as temp_video: shutil.copyfileobj(file.file, temp_video) temp_video_path = temp_video.name cap = cv2.VideoCapture(temp_video_path) if not cap.isOpened(): os.unlink(temp_video_path) raise HTTPException(status_code=400, detail="Could not open video file.") frame_scores = [] frame_count = 0 # Process 1 frame every 5 frames (~6 fps for 30fps video) for better accuracy while True: ret, frame = cap.read() if not ret: break if frame_count % 5 == 0: score = process_image(frame) frame_scores.append(score) frame_count += 1 cap.release() os.unlink(temp_video_path) if not frame_scores: raise HTTPException(status_code=400, detail="Could not extract frames from video.") # Deepfakes often only manipulate specific frames, so average score can mask the spoof. # We use max_score to find the most manipulated frame. max_score = max(frame_scores) avg_score = sum(frame_scores) / len(frame_scores) fake_frames_count = sum(1 for s in frame_scores if s >= 0.5) final_result = "Real" if max_score < 0.5 else "Fake" return { "filename": filename, "type": "video", "prediction": final_result, "confidence_score": max_score, "frames_analyzed": len(frame_scores), "fake_frames_count": fake_frames_count, "max_fake_score": max_score, "avg_score": avg_score } except Exception as e: import traceback traceback.print_exc() raise HTTPException(status_code=500, detail=str(e)) if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=7860)