pavankumarvk commited on
Commit
73dc161
·
verified ·
1 Parent(s): 7dc7c6a

Update pipeline.py

Browse files
Files changed (1) hide show
  1. pipeline.py +169 -211
pipeline.py CHANGED
@@ -1,211 +1,169 @@
1
- import os
2
- import cv2
3
- import torch
4
- import zipfile
5
- import librosa
6
- import numpy as np
7
- import tensorflow as tf
8
- from facenet_pytorch import MTCNN
9
- from rawnet import RawNet
10
-
11
-
12
-
13
- #Set random seed for reproducibility.
14
- tf.random.set_seed(42)
15
-
16
- # Extract model if not already extracted
17
- if not os.path.exists("efficientnet-b0"):
18
- local_zip = "./efficientnet-b0.zip"
19
- if os.path.exists(local_zip):
20
- zip_ref = zipfile.ZipFile(local_zip, 'r')
21
- zip_ref.extractall()
22
- zip_ref.close()
23
- print("Model extracted successfully!")
24
-
25
- # Load models.
26
- # Load model without compiling to avoid optimizer dependency issues
27
- model = tf.keras.models.load_model("efficientnet-b0/", compile=False)
28
-
29
-
30
-
31
- class DetectionPipeline:
32
- """Pipeline class for detecting faces in the frames of a video file."""
33
-
34
- def __init__(self, n_frames=None, batch_size=60, resize=None, input_modality = 'video'):
35
- """Constructor for DetectionPipeline class.
36
-
37
- Keyword Arguments:
38
- n_frames {int} -- Total number of frames to load. These will be evenly spaced
39
- throughout the video. If not specified (i.e., None), all frames will be loaded.
40
- (default: {None})
41
- batch_size {int} -- Batch size to use with MTCNN face detector. (default: {32})
42
- resize {float} -- Fraction by which to resize frames from original prior to face
43
- detection. A value less than 1 results in downsampling and a value greater than
44
- 1 result in upsampling. (default: {None})
45
- """
46
- self.n_frames = n_frames
47
- self.batch_size = batch_size
48
- self.resize = resize
49
- self.input_modality = input_modality
50
-
51
- def __call__(self, filename):
52
- """Load frames from an MP4 video and detect faces.
53
-
54
- Arguments:
55
- filename {str} -- Path to video.
56
- """
57
- # Create video reader and find length
58
- if self.input_modality == 'video':
59
- print('Input modality is video.')
60
- v_cap = cv2.VideoCapture(filename)
61
- v_len = int(v_cap.get(cv2.CAP_PROP_FRAME_COUNT))
62
-
63
- # Pick 'n_frames' evenly spaced frames to sample
64
- if self.n_frames is None:
65
- sample = np.arange(0, v_len)
66
- else:
67
- sample = np.linspace(0, v_len - 1, self.n_frames).astype(int)
68
-
69
- # Loop through frames
70
- faces = []
71
- frames = []
72
- for j in range(v_len):
73
- success = v_cap.grab()
74
- if j in sample:
75
- # Load frame
76
- success, frame = v_cap.retrieve()
77
- if not success:
78
- continue
79
- frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
80
-
81
- # Resize frame to desired size
82
- if self.resize is not None:
83
- frame = frame.resize([int(d * self.resize) for d in frame.size])
84
- frames.append(frame)
85
-
86
- # When batch is full, detect faces and reset frame list
87
- if len(frames) % self.batch_size == 0 or j == sample[-1]:
88
- face2 = cv2.resize(frame, (224, 224))
89
- faces.append(face2)
90
-
91
- v_cap.release()
92
- return faces
93
-
94
- elif self.input_modality == 'image':
95
- print('Input modality is image.')
96
- #Perform inference for image modality.
97
- print('Reading image')
98
- # print(f"Image path is: {filename}")
99
- image = cv2.cvtColor(filename, cv2.COLOR_BGR2RGB)
100
- image = cv2.resize(image, (224, 224))
101
-
102
- # if not face.any():
103
- # print("No faces found...")
104
-
105
- return image
106
-
107
- elif self.input_modality == 'audio':
108
- print("INput modality is audio.")
109
-
110
- #Load audio.
111
- x, sr = librosa.load(filename)
112
- x_pt = torch.Tensor(x)
113
- x_pt = torch.unsqueeze(x_pt, dim = 0)
114
- return x_pt
115
-
116
- else:
117
- raise ValueError("Invalid input modality. Must be either 'video' or image")
118
-
119
- detection_video_pipeline = DetectionPipeline(n_frames=5, batch_size=1, input_modality='video')
120
- detection_image_pipeline = DetectionPipeline(batch_size = 1, input_modality = 'image')
121
-
122
- def deepfakes_video_predict(input_video):
123
-
124
- faces = detection_video_pipeline(input_video)
125
- total = 0
126
- real_res = []
127
- fake_res = []
128
-
129
- for face in faces:
130
-
131
- face2 = face/255
132
- pred = model.predict(np.expand_dims(face2, axis=0))[0]
133
- real, fake = pred[0], pred[1]
134
- real_res.append(real)
135
- fake_res.append(fake)
136
-
137
- total+=1
138
-
139
- pred2 = pred[1]
140
-
141
- if pred2 > 0.5:
142
- fake+=1
143
- else:
144
- real+=1
145
- real_mean = np.mean(real_res)
146
- fake_mean = np.mean(fake_res)
147
- print(f"Real Faces: {real_mean}")
148
- print(f"Fake Faces: {fake_mean}")
149
- text = ""
150
-
151
- if real_mean >= 0.5:
152
- text = "The video is REAL. \n Deepfakes Confidence: " + str(round(100 - (real_mean*100), 3)) + "%"
153
- else:
154
- text = "The video is FAKE. \n Deepfakes Confidence: " + str(round(fake_mean*100, 3)) + "%"
155
-
156
- return text
157
-
158
-
159
- def deepfakes_image_predict(input_image):
160
- faces = detection_image_pipeline(input_image)
161
- face2 = faces/255
162
- pred = model.predict(np.expand_dims(face2, axis = 0))[0]
163
- real, fake = pred[0], pred[1]
164
- if real > 0.5:
165
- text2 = "The image is REAL. \n Deepfakes Confidence: " + str(round(100 - (real*100), 3)) + "%"
166
- else:
167
- text2 = "The image is FAKE. \n Deepfakes Confidence: " + str(round(fake*100, 3)) + "%"
168
- return text2
169
-
170
- def load_audio_model():
171
- d_args = {
172
- "nb_samp": 64600,
173
- "first_conv": 1024,
174
- "in_channels": 1,
175
- "filts": [20, [20, 20], [20, 128], [128, 128]],
176
- "blocks": [2, 4],
177
- "nb_fc_node": 1024,
178
- "gru_node": 1024,
179
- "nb_gru_layer": 3,
180
- "nb_classes": 2}
181
-
182
- model = RawNet(d_args = d_args, device='cpu')
183
-
184
- #Load ckpt.
185
- model_dict = model.state_dict()
186
- ckpt = torch.load('RawNet2.pth', map_location=torch.device('cpu'))
187
- model.load_state_dict(ckpt, model_dict)
188
- return model
189
-
190
- audio_label_map = {
191
- 0: "Real audio",
192
- 1: "Fake audio"
193
- }
194
-
195
- def deepfakes_audio_predict(input_audio):
196
- #Perform inference on audio.
197
- x, sr = input_audio
198
- x_pt = torch.Tensor(x)
199
- x_pt = torch.unsqueeze(x_pt, dim = 0)
200
-
201
- #Load model.
202
- model = load_audio_model()
203
-
204
- #Perform inference.
205
- grads = model(x_pt)
206
-
207
- #Get the argmax.
208
- grads_np = grads.detach().numpy()
209
- result = np.argmax(grads_np)
210
-
211
- return audio_label_map[result]
 
1
+ import os
2
+ import cv2
3
+ import torch
4
+ import zipfile
5
+ import librosa
6
+ import numpy as np
7
+ import tensorflow as tf
8
+ from facenet_pytorch import MTCNN
9
+
10
+
11
+
12
+ #Set random seed for reproducibility.
13
+ tf.random.set_seed(42)
14
+
15
+ # Extract model if not already extracted
16
+ if not os.path.exists("efficientnet-b0"):
17
+ local_zip = "./efficientnet-b0.zip"
18
+ if os.path.exists(local_zip):
19
+ zip_ref = zipfile.ZipFile(local_zip, 'r')
20
+ zip_ref.extractall()
21
+ zip_ref.close()
22
+ print("Model extracted successfully!")
23
+
24
+ # Load models.
25
+ # Load model without compiling to avoid optimizer dependency issues
26
+ model = tf.keras.models.load_model("efficientnet-b0/", compile=False)
27
+
28
+
29
+
30
+ class DetectionPipeline:
31
+ """Pipeline class for detecting faces in the frames of a video file."""
32
+
33
+ def __init__(self, n_frames=None, batch_size=60, resize=None, input_modality = 'video'):
34
+ """Constructor for DetectionPipeline class.
35
+
36
+ Keyword Arguments:
37
+ n_frames {int} -- Total number of frames to load. These will be evenly spaced
38
+ throughout the video. If not specified (i.e., None), all frames will be loaded.
39
+ (default: {None})
40
+ batch_size {int} -- Batch size to use with MTCNN face detector. (default: {32})
41
+ resize {float} -- Fraction by which to resize frames from original prior to face
42
+ detection. A value less than 1 results in downsampling and a value greater than
43
+ 1 result in upsampling. (default: {None})
44
+ """
45
+ self.n_frames = n_frames
46
+ self.batch_size = batch_size
47
+ self.resize = resize
48
+ self.input_modality = input_modality
49
+
50
+ def __call__(self, filename):
51
+ """Load frames from an MP4 video and detect faces.
52
+
53
+ Arguments:
54
+ filename {str} -- Path to video.
55
+ """
56
+ # Create video reader and find length
57
+ if self.input_modality == 'video':
58
+ print('Input modality is video.')
59
+ v_cap = cv2.VideoCapture(filename)
60
+ v_len = int(v_cap.get(cv2.CAP_PROP_FRAME_COUNT))
61
+
62
+ # Pick 'n_frames' evenly spaced frames to sample
63
+ if self.n_frames is None:
64
+ sample = np.arange(0, v_len)
65
+ else:
66
+ sample = np.linspace(0, v_len - 1, self.n_frames).astype(int)
67
+
68
+ # Loop through frames
69
+ faces = []
70
+ frames = []
71
+ for j in range(v_len):
72
+ success = v_cap.grab()
73
+ if j in sample:
74
+ # Load frame
75
+ success, frame = v_cap.retrieve()
76
+ if not success:
77
+ continue
78
+ frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
79
+
80
+ # Resize frame to desired size
81
+ if self.resize is not None:
82
+ frame = frame.resize([int(d * self.resize) for d in frame.size])
83
+ frames.append(frame)
84
+
85
+ # When batch is full, detect faces and reset frame list
86
+ if len(frames) % self.batch_size == 0 or j == sample[-1]:
87
+ face2 = cv2.resize(frame, (224, 224))
88
+ faces.append(face2)
89
+
90
+ v_cap.release()
91
+ return faces
92
+
93
+ elif self.input_modality == 'image':
94
+ print('Input modality is image.')
95
+ #Perform inference for image modality.
96
+ print('Reading image')
97
+ # print(f"Image path is: {filename}")
98
+ image = cv2.cvtColor(filename, cv2.COLOR_BGR2RGB)
99
+ image = cv2.resize(image, (224, 224))
100
+
101
+ # if not face.any():
102
+ # print("No faces found...")
103
+
104
+ return image
105
+
106
+ elif self.input_modality == 'audio':
107
+ print("INput modality is audio.")
108
+
109
+ #Load audio.
110
+ x, sr = librosa.load(filename)
111
+ x_pt = torch.Tensor(x)
112
+ x_pt = torch.unsqueeze(x_pt, dim = 0)
113
+ return x_pt
114
+
115
+ else:
116
+ raise ValueError("Invalid input modality. Must be either 'video' or image")
117
+
118
+ detection_video_pipeline = DetectionPipeline(n_frames=5, batch_size=1, input_modality='video')
119
+ detection_image_pipeline = DetectionPipeline(batch_size = 1, input_modality = 'image')
120
+
121
+ def deepfakes_video_predict(input_video):
122
+
123
+ faces = detection_video_pipeline(input_video)
124
+ total = 0
125
+ real_res = []
126
+ fake_res = []
127
+
128
+ for face in faces:
129
+
130
+ face2 = face/255
131
+ pred = model.predict(np.expand_dims(face2, axis=0))[0]
132
+ real, fake = pred[0], pred[1]
133
+ real_res.append(real)
134
+ fake_res.append(fake)
135
+
136
+ total+=1
137
+
138
+ pred2 = pred[1]
139
+
140
+ if pred2 > 0.5:
141
+ fake+=1
142
+ else:
143
+ real+=1
144
+ real_mean = np.mean(real_res)
145
+ fake_mean = np.mean(fake_res)
146
+ print(f"Real Faces: {real_mean}")
147
+ print(f"Fake Faces: {fake_mean}")
148
+ text = ""
149
+
150
+ if real_mean >= 0.5:
151
+ text = "The video is REAL. \n Deepfakes Confidence: " + str(round(100 - (real_mean*100), 3)) + "%"
152
+ else:
153
+ text = "The video is FAKE. \n Deepfakes Confidence: " + str(round(fake_mean*100, 3)) + "%"
154
+
155
+ return text
156
+
157
+
158
+ def deepfakes_image_predict(input_image):
159
+ faces = detection_image_pipeline(input_image)
160
+ face2 = faces/255
161
+ pred = model.predict(np.expand_dims(face2, axis = 0))[0]
162
+ real, fake = pred[0], pred[1]
163
+ if real > 0.5:
164
+ text2 = "The image is REAL. \n Deepfakes Confidence: " + str(round(100 - (real*100), 3)) + "%"
165
+ else:
166
+ text2 = "The image is FAKE. \n Deepfakes Confidence: " + str(round(fake*100, 3)) + "%"
167
+ return text2
168
+
169
+