SanskarModi commited on
Commit
9e0f3c3
·
verified ·
1 Parent(s): 73b4ecb

Upload 6 files

Browse files
Files changed (6) hide show
  1. Dockerfile +22 -0
  2. app.py +79 -0
  3. common.py +96 -0
  4. params.yaml +21 -0
  5. prediction.py +354 -0
  6. requirements.txt +34 -0
Dockerfile ADDED
@@ -0,0 +1,22 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ FROM python:3.10-slim
2
+
3
+ WORKDIR /app
4
+
5
+ # Install system dependencies required for OpenCV and mediapipe
6
+ RUN apt-get update && apt-get install -y \
7
+ libgl1-mesa-glx \
8
+ libglib2.0-0 \
9
+ && apt-get clean \
10
+ && rm -rf /var/lib/apt/lists/*
11
+
12
+ # Install Python dependencies
13
+ COPY requirements.txt .
14
+ RUN pip install --upgrade pip && pip install --no-cache-dir -r requirements.txt
15
+
16
+ # Copy application code
17
+ COPY . .
18
+
19
+ EXPOSE 8000
20
+
21
+ # Start FastAPI app
22
+ CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8000"]
app.py ADDED
@@ -0,0 +1,79 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import tempfile
2
+ from io import BytesIO
3
+ from typing import Optional
4
+
5
+ import cv2
6
+ import numpy as np
7
+ import uvicorn
8
+ from fastapi import FastAPI, File, Form, Query, UploadFile
9
+ from fastapi.responses import JSONResponse, StreamingResponse
10
+ from starlette.middleware.cors import CORSMiddleware
11
+
12
+ from prediction import Prediction
13
+
14
+ app = FastAPI(
15
+ title="Deepfake Detection API",
16
+ description="Upload a video to check if it's real or a manipulated deepfake (Face2Face, FaceShifter, FaceSwap, or NeuralTextures).",
17
+ )
18
+
19
+ # CORS (optional if using frontend)
20
+ app.add_middleware(
21
+ CORSMiddleware,
22
+ allow_origins=["*"],
23
+ allow_credentials=True,
24
+ allow_methods=["*"],
25
+ allow_headers=["*"],
26
+ )
27
+
28
+ # Initialize model
29
+ predictor = Prediction()
30
+
31
+
32
+ @app.post("/predict/")
33
+ async def predict_deepfake(
34
+ video: UploadFile = File(...),
35
+ sequence_length: Optional[int] = Query(
36
+ None, description="Number of frames to use for prediction"
37
+ ),
38
+ ):
39
+ try:
40
+ # Save video to a temporary file
41
+ with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as temp_video:
42
+ temp_video.write(await video.read())
43
+ temp_video_path = temp_video.name
44
+
45
+ # Get prediction and explanation image
46
+ prediction_str, explanation_image, details = predictor.predict(
47
+ temp_video_path, sequence_length
48
+ )
49
+
50
+ response = {"prediction": prediction_str, "details": details}
51
+
52
+ # Convert explanation image (np array) to JPEG bytes if available
53
+ if explanation_image is not None:
54
+ _, img_encoded = cv2.imencode(".jpg", explanation_image)
55
+ img_bytes = BytesIO(img_encoded.tobytes())
56
+ return StreamingResponse(
57
+ content=img_bytes,
58
+ media_type="image/jpeg",
59
+ headers={"X-Prediction-Result": prediction_str},
60
+ )
61
+ else:
62
+ return JSONResponse(content=response)
63
+
64
+ except Exception as e:
65
+ import traceback
66
+
67
+ error_detail = traceback.format_exc()
68
+ return JSONResponse(
69
+ status_code=500, content={"error": str(e), "detail": error_detail}
70
+ )
71
+
72
+
73
+ @app.get("/")
74
+ def root():
75
+ return {
76
+ "message": "Deepfake Detection API is running!",
77
+ "usage": "POST to /predict/ with a video file and optional sequence_length parameter",
78
+ }
79
+
common.py ADDED
@@ -0,0 +1,96 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import json
2
+ import os
3
+ from pathlib import Path
4
+ from typing import Any
5
+
6
+ import numpy as np
7
+ import yaml
8
+ from box import ConfigBox
9
+ from box.exceptions import BoxValueError
10
+ from ensure import ensure_annotations
11
+
12
+
13
+ @ensure_annotations
14
+ def read_yaml(path_to_yaml: Path) -> ConfigBox:
15
+ """reads yaml file and returns
16
+
17
+ Args:
18
+ path_to_yaml (str): path like input
19
+
20
+ Raises:
21
+ ValueError: if yaml file is empty
22
+ e: empty file
23
+
24
+ Returns:
25
+ ConfigBox: ConfigBox type
26
+ """
27
+ try:
28
+ with open(path_to_yaml) as yaml_file:
29
+ content = yaml.safe_load(yaml_file)
30
+ print(f"yaml file: {path_to_yaml} loaded successfully")
31
+ return ConfigBox(content)
32
+ except BoxValueError:
33
+ raise ValueError("yaml file is empty")
34
+
35
+
36
+ @ensure_annotations
37
+ def create_directories(path_to_directories: list, verbose=True):
38
+ """Create a list of directories if they don't already exist or are not empty.
39
+ Args:
40
+ path_to_directories (list): List of path of directories
41
+ """
42
+ for path in path_to_directories:
43
+ # Check if directory exists and has files
44
+ if os.path.exists(path):
45
+ if verbose:
46
+ print(
47
+ f"Directory at {path} already exists and contains files. Skipping creation."
48
+ )
49
+ continue # Skip creating the directory if it's not empty
50
+ os.makedirs(path, exist_ok=True)
51
+ if verbose:
52
+ print(f"Created directory at: {path}")
53
+
54
+
55
+ @ensure_annotations
56
+ def save_json(path: Path, data: dict):
57
+ """save json data
58
+ Args:
59
+ path (Path): path to json file
60
+ data (dict): data to be saved in json file
61
+ """
62
+ with open(path, "w") as f:
63
+ json.dump(data, f, indent=4)
64
+
65
+ print(f"json file saved at: {path}")
66
+
67
+
68
+ @ensure_annotations
69
+ def load_json(path: Path) -> ConfigBox:
70
+ """load json files data
71
+
72
+ Args:
73
+ path (Path): path to json file
74
+
75
+ Returns:
76
+ ConfigBox: data as class attributes instead of dict
77
+ """
78
+ with open(path) as f:
79
+ content = json.load(f)
80
+
81
+ print(f"json file loaded successfully from: {path}")
82
+ return ConfigBox(content)
83
+
84
+
85
+ @ensure_annotations
86
+ def get_size_in_kbs(path: Path) -> int:
87
+ """get size in KB
88
+
89
+ Args:
90
+ path (Path): path of the file
91
+
92
+ Returns:
93
+ int: size in KB
94
+ """
95
+ size_in_kb = round(os.path.getsize(path) / 1024)
96
+ return size_in_kb
params.yaml ADDED
@@ -0,0 +1,21 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # data ingestion
2
+ num_videos: 1000
3
+
4
+ # preprocessing
5
+ fps: 30
6
+ max_frames: 200
7
+ resolution: [224, 224]
8
+ expansion_factor: 0.2
9
+
10
+ # training and evaluation
11
+ input_shape: [224, 224, 3]
12
+ batch_size: 12
13
+ sequence_length: 10
14
+ num_workers: 8
15
+ dropout_rate: 0.5
16
+ units: 2048
17
+ learning_rate: 0.0001
18
+ epochs: 500
19
+ lstm_layers: 1
20
+ bidirectional: True
21
+ weight_decay: 0.00001
prediction.py ADDED
@@ -0,0 +1,354 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import cv2
2
+ import mediapipe as mp
3
+ import numpy as np
4
+ import torch
5
+ import torch.nn.functional as F
6
+ from torchvision import transforms
7
+ from pathlib import Path
8
+ from common import read_yaml
9
+
10
+ PARAMS_FILE_PATH = Path("params.yaml")
11
+
12
+ class Prediction:
13
+ def __init__(self):
14
+ """
15
+ Initialize the Prediction class with a pre-trained model and necessary parameters.
16
+ """
17
+ self.device = torch.device("cpu")
18
+ self.model = torch.jit.load("model.pt")
19
+
20
+ self.model.eval()
21
+ params = read_yaml(PARAMS_FILE_PATH)
22
+ self.expansion_factor = params.expansion_factor
23
+ self.resolution = params.resolution
24
+ self.default_frame_count = params.sequence_length
25
+
26
+ # Initialize MediaPipe face detector
27
+ self.face_detection = mp.solutions.face_detection.FaceDetection(
28
+ model_selection=0, min_detection_confidence=0.6
29
+ )
30
+
31
+ # Define the classes for prediction
32
+ self.classes = [
33
+ "original",
34
+ "Deepfake (Face2Face)",
35
+ "Deepfake (FaceShifter)",
36
+ "Deepfake (FaceSwap)",
37
+ "Deepfake (NeuralTextures)",
38
+ ]
39
+
40
+ def get_frames(self, video):
41
+ """
42
+ Yields frames from the given video file.
43
+ """
44
+ vidobj = cv2.VideoCapture(video)
45
+ success, image = vidobj.read()
46
+ while success:
47
+ yield image
48
+ success, image = vidobj.read()
49
+
50
+ def get_face(self, frame):
51
+ """
52
+ Detect faces in a frame using MediaPipe.
53
+
54
+ Args:
55
+ frame (np.ndarray): Input frame
56
+
57
+ Returns:
58
+ tuple: (top, right, bottom, left) coordinates of the face or None if no face detected
59
+ """
60
+ try:
61
+ # Convert frame from BGR (OpenCV) to RGB
62
+ rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
63
+
64
+ # Detect faces
65
+ results = self.face_detection.process(rgb_frame)
66
+
67
+ if results.detections:
68
+ detection = results.detections[0] # Use the first detected face
69
+ h, w, _ = frame.shape
70
+ bboxC = detection.location_data.relative_bounding_box
71
+
72
+ # Calculate absolute coordinates
73
+ xmin = int(bboxC.xmin * w)
74
+ ymin = int(bboxC.ymin * h)
75
+ box_width = int(bboxC.width * w)
76
+ box_height = int(bboxC.height * h)
77
+
78
+ # Return in top, right, bottom, left format
79
+ top = max(ymin, 0)
80
+ right = min(xmin + box_width, w)
81
+ bottom = min(ymin + box_height, h)
82
+ left = max(xmin, 0)
83
+
84
+ return (top, right, bottom, left)
85
+
86
+ return None # No face detected
87
+
88
+ except Exception as e:
89
+ print(f"Error in get_face: {e}")
90
+ print(f"Frame shape: {frame.shape}, dtype: {frame.dtype}")
91
+ raise
92
+
93
+ def color_jitter(self, image):
94
+ """
95
+ Applies color jitter to the given image for data augmentation.
96
+
97
+ Args:
98
+ image (np.ndarray): The input image
99
+
100
+ Returns:
101
+ np.ndarray: The color jittered image
102
+ """
103
+ rng = np.random.default_rng(seed=42)
104
+
105
+ # Convert to HSV for easier manipulation
106
+ hsv = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)
107
+ h, s, v = cv2.split(hsv)
108
+
109
+ # Adjust brightness
110
+ value = rng.uniform(0.8, 1.2)
111
+ v = cv2.multiply(v, value)
112
+
113
+ # Adjust contrast
114
+ mean = np.mean(v)
115
+ value = rng.uniform(0.8, 1.2)
116
+ v = cv2.addWeighted(v, value, mean, 1 - value, 0)
117
+
118
+ # Adjust saturation
119
+ value = rng.uniform(0.8, 1.2)
120
+ s = cv2.multiply(s, value)
121
+
122
+ final_hsv = cv2.merge((h, s, v))
123
+ image = cv2.cvtColor(final_hsv, cv2.COLOR_HSV2BGR)
124
+ return image
125
+
126
+ def preprocess(self, video, seq_length=None):
127
+ """
128
+ Preprocess the video by extracting frames, detecting faces, and resizing.
129
+ Applies same preprocessing as training pipeline.
130
+
131
+ Args:
132
+ video (str): Path to the video file
133
+ seq_length (int, optional): Number of frames to extract
134
+
135
+ Returns:
136
+ list: List of preprocessed frames
137
+ """
138
+ frames = []
139
+ raw_frames = [] # Store original cropped frames for visualization
140
+
141
+ # Use provided sequence length or default from params
142
+ target_seq_length = (
143
+ seq_length if seq_length is not None else self.default_frame_count
144
+ )
145
+
146
+ transform = transforms.Compose(
147
+ [
148
+ transforms.ToPILImage(),
149
+ transforms.Resize(
150
+ tuple(self.resolution),
151
+ interpolation=transforms.InterpolationMode.BILINEAR,
152
+ ),
153
+ transforms.ToTensor(),
154
+ transforms.Normalize(
155
+ mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]
156
+ ),
157
+ ]
158
+ )
159
+
160
+ buffer = [] # For processing in batches of 4 like training pipeline
161
+
162
+ for idx, frame in enumerate(self.get_frames(video)):
163
+ if len(frames) < target_seq_length:
164
+ buffer.append(frame)
165
+
166
+ if len(buffer) == 4: # Process in batches of 4
167
+ faces = [self.get_face(f) for f in buffer]
168
+
169
+ for i, face in enumerate(faces):
170
+ if face is not None:
171
+ top, right, bottom, left = face
172
+ face_height = bottom - top
173
+ face_width = right - left
174
+
175
+ # Expand face region using expansion factor
176
+ expanded_top = max(
177
+ 0, top - int(self.expansion_factor / 2 * face_height)
178
+ )
179
+ expanded_bottom = min(
180
+ buffer[i].shape[0],
181
+ bottom + int(self.expansion_factor / 2 * face_height),
182
+ )
183
+ expanded_left = max(
184
+ 0, left - int(self.expansion_factor / 2 * face_width)
185
+ )
186
+ expanded_right = min(
187
+ buffer[i].shape[1],
188
+ right + int(self.expansion_factor / 2 * face_width),
189
+ )
190
+
191
+ # Crop and resize
192
+ cropped_face = cv2.resize(
193
+ buffer[i][
194
+ expanded_top:expanded_bottom,
195
+ expanded_left:expanded_right,
196
+ :,
197
+ ],
198
+ tuple(self.resolution),
199
+ )
200
+
201
+ # Store original cropped face for visualization
202
+ raw_frames.append(cropped_face.copy())
203
+
204
+ # Apply color jitter like in training
205
+ cropped_face = self.color_jitter(cropped_face)
206
+
207
+ # Transform for model input
208
+ transformed = transform(cropped_face)
209
+ frames.append(transformed)
210
+
211
+ buffer = [] # Reset buffer
212
+ else:
213
+ break
214
+
215
+ # Handle padding if we have fewer frames than required
216
+ if len(frames) < target_seq_length:
217
+ # If we have some frames, duplicate the last one
218
+ if frames:
219
+ while len(frames) < target_seq_length:
220
+ frames.append(frames[-1])
221
+ raw_frames.append(raw_frames[-1])
222
+ else:
223
+ return [], [] # No faces detected
224
+
225
+ return frames[:target_seq_length], raw_frames[:target_seq_length]
226
+
227
+ def save_gradients(self, grad):
228
+ """
229
+ Hook function to capture gradients.
230
+ """
231
+ self.gradients = grad
232
+
233
+ def grad_cam(self, fmap, grads):
234
+ """
235
+ Compute Grad-CAM using feature maps and gradients.
236
+ """
237
+ pooled_grads = torch.mean(grads, dim=[0])
238
+ for i in range(fmap.shape[1]):
239
+ fmap[:, i, :, :] *= pooled_grads[i]
240
+
241
+ cam = torch.mean(fmap, dim=1).squeeze().cpu().detach().numpy()
242
+
243
+ # Apply ReLU to retain only positive activations
244
+ cam = np.maximum(cam, 0)
245
+
246
+ # Normalize Grad-CAM
247
+ cam = cam - np.min(cam)
248
+ cam = cam / np.max(cam) if np.max(cam) > 0 else cam # Prevent division by zero
249
+
250
+ # Resize the cam to match the resolution of the original image
251
+ cam = cv2.resize(cam, tuple(self.resolution))
252
+ # Convert to single-channel by summing or taking one of the channels
253
+ cam = np.sum(cam, axis=-1) if cam.shape[-1] > 1 else cam
254
+ return cam
255
+
256
+ def generate_gradcam(self, fmap, video_frame, grads):
257
+ """
258
+ Generate the Grad-CAM heatmap and overlay it on the frame.
259
+ """
260
+ cam = self.grad_cam(fmap, grads)
261
+ # Ensure cam is a single-channel 8-bit image
262
+ cam = np.uint8(255 * cam) # Scale to 0-255
263
+ heatmap = cv2.applyColorMap(cam, cv2.COLORMAP_JET) # Apply colormap
264
+
265
+ # Ensure video_frame is in the right format
266
+ video_frame = np.float32(cv2.cvtColor(video_frame, cv2.COLOR_RGB2BGR))
267
+
268
+ # Convert the normalized video_frame back to uint8 (0-255)
269
+ video_frame = np.uint8(255 * video_frame)
270
+
271
+ # Blend heatmap and original image with a weight to ensure the face is visible
272
+ alpha = 0.01 # Lower weight for the heatmap to make face more visible
273
+ beta = 1 - alpha # Weight for the original frame
274
+ overlayed_img = cv2.addWeighted(heatmap, alpha, video_frame, beta, 0)
275
+
276
+ return overlayed_img
277
+
278
+ def predict(self, video, seq_length=None):
279
+ """
280
+ Predict whether a video is real or fake.
281
+
282
+ Args:
283
+ video (str): Path to the video file
284
+ seq_length (int, optional): Number of frames to use
285
+
286
+ Returns:
287
+ tuple: (prediction_result, gradcam_image, classification_details)
288
+ """
289
+ frames, raw_frames = self.preprocess(video, seq_length)
290
+
291
+ if not frames:
292
+ return "No faces detected in the video", None, None
293
+
294
+ # Prepare input tensor for the model
295
+ target_seq_length = (
296
+ seq_length if seq_length is not None else self.default_frame_count
297
+ )
298
+ input_tensor = torch.stack(frames).unsqueeze(0)
299
+ input_tensor = input_tensor.view(1, target_seq_length, 3, *self.resolution)
300
+ input_tensor = input_tensor.to(self.device)
301
+ input_tensor.requires_grad_()
302
+
303
+ # Forward pass to get feature maps and final output
304
+ fmap, attn_wts, output = self.model(input_tensor)
305
+ fmap.register_hook(self.save_gradients)
306
+
307
+ # Get predictions for all classes
308
+ class_probs = F.softmax(output, dim=1).detach().cpu().numpy()[0]
309
+
310
+ # Get the predicted class
311
+ predicted_class_idx = np.argmax(class_probs)
312
+ predicted_class = (
313
+ self.classes[predicted_class_idx]
314
+ if predicted_class_idx < len(self.classes)
315
+ else "Unknown"
316
+ )
317
+ prediction = "Deepfake" if predicted_class_idx > 0 else "Real"
318
+
319
+ # Format confidence values to 2 decimal places
320
+ confidence_class = round(class_probs[predicted_class_idx] * 100, 2)
321
+ confidence_deepfake_real = (
322
+ round(class_probs[1:].max() * 100, 2)
323
+ if prediction == "Deepfake"
324
+ else round(class_probs[0] * 100, 2)
325
+ )
326
+ prediction_string = f"{prediction} {confidence_deepfake_real:.2f}% Confidence"
327
+
328
+ # Create detailed classification results
329
+ classification_details = (
330
+ {
331
+ "Deepfake type": predicted_class,
332
+ "confidence(%)": f"{confidence_class:.2f}",
333
+ }
334
+ if prediction == "Deepfake"
335
+ else {
336
+ "Deepfake type": "None (Real video)",
337
+ "confidence(%)": f"{confidence_class:.2f}",
338
+ }
339
+ )
340
+
341
+ # Backpropagate for Grad-CAM
342
+ self.model.zero_grad()
343
+ output[0, predicted_class_idx].backward()
344
+ grads = self.gradients
345
+
346
+ # Generate Grad-CAM visualization for the best frame
347
+ if raw_frames:
348
+ # Choose middle frame for visualization
349
+ middle_idx = len(raw_frames) // 2
350
+ gradcam_image = self.generate_gradcam(fmap, raw_frames[middle_idx], grads)
351
+ else:
352
+ gradcam_image = None
353
+
354
+ return prediction_string, gradcam_image, classification_details
requirements.txt ADDED
@@ -0,0 +1,34 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # Core Libraries for Machine Learning and image/video processing
2
+ torch
3
+ torchvision
4
+ numpy
5
+ scikit-learn
6
+ pandas
7
+ opencv-python-headless
8
+ pillow
9
+ mediapipe
10
+
11
+ # Machine Learning Workflow, pipelines, Model Management, env, and Configuration
12
+ mlflow
13
+ pyYAML
14
+ dvc
15
+ python-dotenv
16
+
17
+ # User Interface and inference
18
+ fastapi
19
+ uvicorn
20
+
21
+ # Code Formatting
22
+ black
23
+ isort
24
+
25
+ # Visualization and Jupyter Tools
26
+ plotly
27
+ ipywidgets
28
+ jupyter
29
+ notebook
30
+
31
+ # utils and helper libraries
32
+ python-box
33
+ tqdm
34
+ ensure