pavankumarvk commited on
Commit
d9a982f
·
verified ·
1 Parent(s): 2697855

Update pipeline.py

Browse files
Files changed (1) hide show
  1. pipeline.py +34 -174
pipeline.py CHANGED
@@ -1,17 +1,15 @@
1
- import os
2
- import cv2
3
- import torch
4
- import zipfile
5
  import librosa
6
- import numpy as np
7
  import tensorflow as tf
8
- from facenet_pytorch import MTCNN
9
  from rawnet import RawNet
10
 
11
- # Set random seed for reproducibility.
12
  tf.random.set_seed(42)
13
 
14
- # Extract model if not already extracted
15
  if not os.path.exists("efficientnet-b0"):
16
  local_zip = "./efficientnet-b0.zip"
17
  if os.path.exists(local_zip):
@@ -20,57 +18,44 @@ if not os.path.exists("efficientnet-b0"):
20
  zip_ref.close()
21
  print("Model extracted successfully!")
22
 
23
- # Load Video/Image models.
24
- # Load model without compiling to avoid optimizer dependency issues
25
  model = tf.keras.models.load_model("efficientnet-b0/", compile=False)
26
 
27
-
28
  class DetectionPipeline:
29
- """Pipeline class for detecting faces in the frames of a video file."""
30
-
31
  def __init__(self, n_frames=None, batch_size=60, resize=None, input_modality='video'):
32
- """Constructor for DetectionPipeline class."""
33
  self.n_frames = n_frames
34
  self.batch_size = batch_size
35
  self.resize = resize
36
  self.input_modality = input_modality
37
 
38
  def __call__(self, filename):
39
- """Load frames from an MP4 video and detect faces."""
40
  if self.input_modality == 'video':
41
- print('Input modality is video.')
42
  v_cap = cv2.VideoCapture(filename)
43
  v_len = int(v_cap.get(cv2.CAP_PROP_FRAME_COUNT))
44
 
45
- # Pick 'n_frames' evenly spaced frames to sample
46
- if self.n_frames is None:
47
- sample = np.arange(0, v_len)
48
- else:
49
- sample = np.linspace(0, v_len - 1, self.n_frames).astype(int)
50
 
51
- # Loop through frames
52
  faces = []
53
  frames = []
 
54
  for j in range(v_len):
55
  success = v_cap.grab()
 
56
  if j in sample:
57
- # Load frame
58
  success, frame = v_cap.retrieve()
59
  if not success:
60
  continue
 
61
  frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
62
 
63
- # Resize frame to desired size
64
  if self.resize is not None:
65
- frame = cv2.resize(frame, None, fx=self.resize, fy=self.resize)
66
-
 
 
67
  frames.append(frame)
68
 
69
- # When batch is full, detect faces and reset frame list
70
  if len(frames) % self.batch_size == 0 or j == sample[-1]:
71
- # Simple resizing for the EfficientNet model (assuming face is centered or whole frame is analyzed)
72
- # For a more robust solution, MTCNN should be used here to extract faces first.
73
- # Based on your provided logic, we resize the frame directly.
74
  face2 = cv2.resize(frame, (224, 224))
75
  faces.append(face2)
76
 
@@ -78,175 +63,50 @@ class DetectionPipeline:
78
  return faces
79
 
80
  elif self.input_modality == 'image':
81
- print('Input modality is image.')
82
- # Perform inference for image modality.
83
- # Note: 'filename' here is actually the numpy array from Gradio Image component
84
  image = cv2.cvtColor(filename, cv2.COLOR_BGR2RGB)
85
  image = cv2.resize(image, (224, 224))
86
  return image
87
-
88
  elif self.input_modality == 'audio':
89
- # Audio is handled by deepfakes_audio_predict directly,
90
- # but if you use this class, return placeholder or raw audio.
91
- return None
 
 
 
 
92
 
93
- # Instantiate pipelines
94
  detection_video_pipeline = DetectionPipeline(n_frames=5, batch_size=1, input_modality='video')
95
  detection_image_pipeline = DetectionPipeline(batch_size=1, input_modality='image')
96
 
97
- # ---------------------------------------------------------
98
- # Video & Image Prediction Functions
99
- # ---------------------------------------------------------
100
-
101
  def deepfakes_video_predict(input_video):
102
  faces = detection_video_pipeline(input_video)
103
- total = 0
104
- real_res = []
105
- fake_res = []
106
-
107
- # Initialize counters for the simple voting logic
108
- real_count = 0
109
- fake_count = 0
110
 
111
  for face in faces:
112
- face2 = face / 255.0
113
  pred = model.predict(np.expand_dims(face2, axis=0))[0]
114
  real, fake = pred[0], pred[1]
115
  real_res.append(real)
116
  fake_res.append(fake)
117
 
118
- total += 1
119
-
120
- pred2 = pred[1] # Probability of Fake
121
-
122
- if pred2 > 0.5:
123
- fake_count += 1
124
- else:
125
- real_count += 1
126
-
127
  real_mean = np.mean(real_res)
128
  fake_mean = np.mean(fake_res)
129
- print(f"Real Faces: {real_mean}")
130
- print(f"Fake Faces: {fake_mean}")
131
- text = ""
132
 
133
  if real_mean >= 0.5:
134
- text = "The video is REAL. \n Deepfakes Confidence: " + str(round(100 - (real_mean * 100), 3)) + "%"
135
  else:
136
- text = "The video is FAKE. \n Deepfakes Confidence: " + str(round(fake_mean * 100, 3)) + "%"
137
-
138
- return text
139
-
140
 
141
  def deepfakes_image_predict(input_image):
142
- faces = detection_image_pipeline(input_image)
143
- face2 = faces / 255.0
 
144
  pred = model.predict(np.expand_dims(face2, axis=0))[0]
145
  real, fake = pred[0], pred[1]
146
-
147
  if real > 0.5:
148
- text2 = "The image is REAL. \n Deepfakes Confidence: " + str(round(100 - (real * 100), 3)) + "%"
149
- else:
150
- # Fixed the parenthesis placement here
151
- text2 = "The image is FAKE. \n Deepfakes Confidence: " + str(round(fake * 100, 3)) + "%"
152
-
153
- return text2
154
-
155
- # ---------------------------------------------------------
156
- # Audio Prediction Functions
157
- # ---------------------------------------------------------
158
-
159
- def load_audio_model():
160
- d_args = {
161
- "nb_samp": 64600,
162
- "first_conv": 1024,
163
- "in_channels": 1,
164
- "filts": [20, [20, 20], [20, 128], [128, 128]],
165
- "blocks": [2, 4],
166
- "nb_fc_node": 1024,
167
- "gru_node": 1024,
168
- "nb_gru_layer": 3,
169
- "nb_classes": 2
170
- }
171
-
172
- device = torch.device('cpu')
173
- model = RawNet(d_args=d_args, device=device)
174
- model.eval()
175
-
176
- # Load weights
177
- # Ensure 'RawNet2.pth' is in your repository root
178
- if os.path.exists('RawNet2.pth'):
179
- try:
180
- checkpoint = torch.load('RawNet2.pth', map_location=device)
181
- # Handle different checkpoint formats (strict or not)
182
- if isinstance(checkpoint, dict):
183
- if 'model' in checkpoint:
184
- model.load_state_dict(checkpoint['model'])
185
- elif 'state_dict' in checkpoint:
186
- model.load_state_dict(checkpoint['state_dict'])
187
- else:
188
- model.load_state_dict(checkpoint, strict=False)
189
- else:
190
- model.load_state_dict(checkpoint, strict=False)
191
- print("Audio model loaded successfully.")
192
- except Exception as e:
193
- print(f"Error loading audio model weights: {e}")
194
  else:
195
- print("Warning: 'RawNet2.pth' not found. Audio detection will not work.")
196
-
197
- return model
198
-
199
- # Load the audio model globally to avoid reloading it on every request
200
- audio_model = load_audio_model()
201
-
202
- audio_label_map = {
203
- 0: "Real",
204
- 1: "Fake"
205
- }
206
-
207
- def deepfakes_audio_predict(input_audio):
208
- """
209
- input_audio: tuple (sample_rate, audio_data) provided by Gradio
210
- """
211
- if audio_model is None:
212
- return "Error: Audio model not loaded."
213
-
214
- try:
215
- sr, x = input_audio
216
- except ValueError:
217
- # Fallback if input format is different (e.g. just file path)
218
- return "Error: Invalid audio input format."
219
-
220
- # Target sampling rate and length for RawNet
221
- target_sr = 16000
222
- target_len = 64600
223
-
224
- # Resample if necessary
225
- if sr != target_sr:
226
- x = librosa.resample(x, orig_sr=sr, target_sr=target_sr)
227
-
228
- # Pad or crop to target length
229
- len_x = x.shape[0]
230
- if len_x < target_len:
231
- # Pad with zeros
232
- x = np.pad(x, (0, target_len - len_x), mode='constant')
233
- elif len_x > target_len:
234
- # Center crop
235
- start = (len_x - target_len) // 2
236
- x = x[start:start + target_len]
237
-
238
- # Convert to Tensor and add dimensions (Batch, Channel, Length)
239
- x_pt = torch.from_numpy(x).float().unsqueeze(0).unsqueeze(0)
240
-
241
- # Perform inference
242
- with torch.no_grad():
243
- output = audio_model(x_pt)
244
-
245
- # Output is LogSoftmax, convert to probabilities
246
- probs = torch.exp(output)
247
- confidence, prediction = torch.max(probs, 1)
248
-
249
- label = audio_label_map[prediction.item()]
250
- confidence_score = confidence.item() * 100
251
-
252
- return f"The audio is {label}.\nConfidence: {confidence_score:.2f}%"
 
1
+ import os
2
+ import cv2
3
+ import torch
4
+ import zipfile
5
  import librosa
6
+ import numpy as np
7
  import tensorflow as tf
8
+ from facenet_pytorch import MTCNN
9
  from rawnet import RawNet
10
 
 
11
  tf.random.set_seed(42)
12
 
 
13
  if not os.path.exists("efficientnet-b0"):
14
  local_zip = "./efficientnet-b0.zip"
15
  if os.path.exists(local_zip):
 
18
  zip_ref.close()
19
  print("Model extracted successfully!")
20
 
 
 
21
  model = tf.keras.models.load_model("efficientnet-b0/", compile=False)
22
 
 
23
  class DetectionPipeline:
 
 
24
  def __init__(self, n_frames=None, batch_size=60, resize=None, input_modality='video'):
 
25
  self.n_frames = n_frames
26
  self.batch_size = batch_size
27
  self.resize = resize
28
  self.input_modality = input_modality
29
 
30
  def __call__(self, filename):
 
31
  if self.input_modality == 'video':
 
32
  v_cap = cv2.VideoCapture(filename)
33
  v_len = int(v_cap.get(cv2.CAP_PROP_FRAME_COUNT))
34
 
35
+ sample = np.arange(0, v_len) if self.n_frames is None \
36
+ else np.linspace(0, v_len-1, self.n_frames).astype(int)
 
 
 
37
 
 
38
  faces = []
39
  frames = []
40
+
41
  for j in range(v_len):
42
  success = v_cap.grab()
43
+
44
  if j in sample:
 
45
  success, frame = v_cap.retrieve()
46
  if not success:
47
  continue
48
+
49
  frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
50
 
 
51
  if self.resize is not None:
52
+ frame = frame.resize(
53
+ [int(d * self.resize) for d in frame.size]
54
+ )
55
+
56
  frames.append(frame)
57
 
 
58
  if len(frames) % self.batch_size == 0 or j == sample[-1]:
 
 
 
59
  face2 = cv2.resize(frame, (224, 224))
60
  faces.append(face2)
61
 
 
63
  return faces
64
 
65
  elif self.input_modality == 'image':
 
 
 
66
  image = cv2.cvtColor(filename, cv2.COLOR_BGR2RGB)
67
  image = cv2.resize(image, (224, 224))
68
  return image
69
+
70
  elif self.input_modality == 'audio':
71
+ x, sr = librosa.load(filename)
72
+ x_pt = torch.Tensor(x)
73
+ x_pt = torch.unsqueeze(x_pt, dim=0)
74
+ return x_pt
75
+
76
+ else:
77
+ raise ValueError("Invalid modality")
78
 
 
79
  detection_video_pipeline = DetectionPipeline(n_frames=5, batch_size=1, input_modality='video')
80
  detection_image_pipeline = DetectionPipeline(batch_size=1, input_modality='image')
81
 
 
 
 
 
82
  def deepfakes_video_predict(input_video):
83
  faces = detection_video_pipeline(input_video)
84
+
85
+ real_res, fake_res = [], []
 
 
 
 
 
86
 
87
  for face in faces:
88
+ face2 = face / 255
89
  pred = model.predict(np.expand_dims(face2, axis=0))[0]
90
  real, fake = pred[0], pred[1]
91
  real_res.append(real)
92
  fake_res.append(fake)
93
 
 
 
 
 
 
 
 
 
 
94
  real_mean = np.mean(real_res)
95
  fake_mean = np.mean(fake_res)
 
 
 
96
 
97
  if real_mean >= 0.5:
98
+ return "The video is REAL. Confidence: " + str(round(100 - real_mean*100, 3)) + "%"
99
  else:
100
+ return "The video is FAKE. Confidence: " + str(round(fake_mean*100, 3)) + "%"
 
 
 
101
 
102
  def deepfakes_image_predict(input_image):
103
+ face = detection_image_pipeline(input_image)
104
+ face2 = face / 255
105
+
106
  pred = model.predict(np.expand_dims(face2, axis=0))[0]
107
  real, fake = pred[0], pred[1]
108
+
109
  if real > 0.5:
110
+ return "The image is REAL."
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
111
  else:
112
+ return "The image is FAKE."