DSBackend / app.py
MrDevCoder01's picture
Initial upload of DeepSecure backend
c9caccb
from fastapi import FastAPI, File, UploadFile, HTTPException
from fastapi.middleware.cors import CORSMiddleware
import uvicorn
import cv2
import numpy as np
import os
import tempfile
import shutil
import warnings
import logging
import os
# Suppress Keras and TensorFlow warnings
warnings.filterwarnings('ignore', category=UserWarning)
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
logging.getLogger('absl').setLevel(logging.ERROR)
from tensorflow.keras.preprocessing import image
from tensorflow.keras.models import load_model
from ultralytics import YOLO
app = FastAPI(title="Deepfake Detection API")
# Update CORS for frontend connectivity
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # In production, replace with your frontend URL
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/")
async def health_check():
return {"status": "online", "message": "Backend is running!"}
print("Loading model and YOLO11...")
model = load_model('model.h5', compile=False)
# YOLOv11 pose model for extremely tight and precise facial feature cropping (matches MTCNN style)
detector = YOLO('yolo11n-pose.pt')
print("Model loaded successfully.")
def detect_and_crop_face(img):
"""Detects face/person and crops it to 128x128."""
# Run YOLO11 pose detection
results = detector.predict(img, verbose=False)
if len(results) > 0 and results[0].keypoints is not None and len(results[0].keypoints.xy[0]) > 0:
# Get the first person's keypoints
kpts = results[0].keypoints.xy[0].cpu().numpy()
# 0: nose, 1: left eye, 2: right eye, 3: left ear, 4: right ear
face_kpts = kpts[0:5]
# Filter out keypoints that weren't detected
valid_kpts = [k for k in face_kpts if k[0] > 0 and k[1] > 0]
if valid_kpts:
valid_kpts = np.array(valid_kpts)
x_min, y_min = np.min(valid_kpts, axis=0)
x_max, y_max = np.max(valid_kpts, axis=0)
# Expand this tight keypoint box to capture the full face (forehead, chin, cheeks)
w = x_max - x_min
h = y_max - y_min
# Safety for edge cases
if w > 0 and h > 0:
pad_x = w * 0.3
pad_y_top = h * 0.5 # Expand more upward for the forehead
pad_y_bot = h * 0.8 # Expand downward for the chin/mouth
final_x1 = max(0, int(x_min - pad_x))
final_y1 = max(0, int(y_min - pad_y_top))
final_x2 = min(img.shape[1], int(x_max + pad_x))
final_y2 = min(img.shape[0], int(y_max + pad_y_bot))
face = img[final_y1:final_y2, final_x1:final_x2]
if face.size > 0:
return cv2.resize(face, (128, 128))
# Fallback to normal YOLO box heuristic if face keypoints fail but person is found
if len(results) > 0 and len(results[0].boxes) > 0:
box = results[0].boxes[0].xyxy[0].cpu().numpy()
x1, y1, x2, y2 = map(int, box)
x1, y1 = max(0, x1), max(0, y1)
x2, y2 = min(img.shape[1], x2), min(img.shape[0], y2)
h = y2 - y1
w = x2 - x1
if h > w * 1.5:
y2 = y1 + int(h * 0.3)
face = img[y1:y2, x1:x2]
if face.size > 0:
return cv2.resize(face, (128, 128))
# If no person is detected at all
return cv2.resize(img, (128, 128))
def preprocess_face(face):
"""Formats the cropped face for the model."""
img_array = image.img_to_array(face)
img_array = np.expand_dims(img_array, axis=0)
img_array /= 255.0 # Normalize
return img_array
def process_image(img):
"""Processes a single BGR image array and returns the fake probability."""
face = detect_and_crop_face(img)
processed_image = preprocess_face(face)
prediction = model.predict(processed_image, verbose=0)
return float(prediction[0][0])
@app.post("/predict")
async def predict_media(file: UploadFile = File(...)):
filename = file.filename.lower()
is_video = filename.endswith(('.mp4', '.avi', '.mov', '.mkv'))
is_image = filename.endswith(('.jpg', '.jpeg', '.png', '.bmp'))
if not is_image and not is_video:
raise HTTPException(status_code=400, detail="Unsupported file format.")
try:
if is_image:
# Read image directly from bytes
contents = await file.read()
nparr = np.frombuffer(contents, np.uint8)
img = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
if img is None:
raise HTTPException(status_code=400, detail="Invalid image file.")
score = process_image(img)
result = "Real" if score < 0.5 else "Fake"
return {
"filename": filename,
"type": "image",
"prediction": result,
"confidence_score": score
}
elif is_video:
# Save video to a temporary file for cv2.VideoCapture
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as temp_video:
shutil.copyfileobj(file.file, temp_video)
temp_video_path = temp_video.name
cap = cv2.VideoCapture(temp_video_path)
if not cap.isOpened():
os.unlink(temp_video_path)
raise HTTPException(status_code=400, detail="Could not open video file.")
frame_scores = []
frame_count = 0
# Process 1 frame every 5 frames (~6 fps for 30fps video) for better accuracy
while True:
ret, frame = cap.read()
if not ret:
break
if frame_count % 5 == 0:
score = process_image(frame)
frame_scores.append(score)
frame_count += 1
cap.release()
os.unlink(temp_video_path)
if not frame_scores:
raise HTTPException(status_code=400, detail="Could not extract frames from video.")
# Deepfakes often only manipulate specific frames, so average score can mask the spoof.
# We use max_score to find the most manipulated frame.
max_score = max(frame_scores)
avg_score = sum(frame_scores) / len(frame_scores)
fake_frames_count = sum(1 for s in frame_scores if s >= 0.5)
final_result = "Real" if max_score < 0.5 else "Fake"
return {
"filename": filename,
"type": "video",
"prediction": final_result,
"confidence_score": max_score,
"frames_analyzed": len(frame_scores),
"fake_frames_count": fake_frames_count,
"max_fake_score": max_score,
"avg_score": avg_score
}
except Exception as e:
import traceback
traceback.print_exc()
raise HTTPException(status_code=500, detail=str(e))
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=7860)