DeepFakeVideo / app.py
sssssungk's picture
Update app.py
b978cf1 verified
import warnings
warnings.filterwarnings("ignore") # ๊ฒฝ๊ณ  ๋ฌด์‹œ
from moviepy import * #VideoFileClip
#!pip install pyannote.audio
#!pip install moviepy
#!pip install gradio
import librosa
import numpy as np
import os
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import torch.functional as F
import torch.nn.functional as F
from pyannote.audio import Pipeline
from pyannote.audio import Audio
import torchaudio
from transformers import pipeline
from huggingface_hub import hf_hub_download
import gradio as gr
import shutil
import speech_recognition as sr
# ์˜ค๋””์˜ค ๋ณ€ํ™˜ mp4 --> wav
def extract_audio_from_video(video_file_path, audio_file_path):
# mp4 ํŒŒ์ผ ๋ถˆ๋Ÿฌ์˜ค๊ธฐ
video = VideoFileClip(video_file_path)
# ์˜ค๋””์˜ค๋ฅผ ์ถ”์ถœํ•˜์—ฌ wav ํŒŒ์ผ๋กœ ์ €์žฅ
video.audio.write_audiofile(audio_file_path, codec='pcm_s16le')
# ์ „์ฒด ์˜ค๋””์˜ค ํŒŒ์ผ ๋ถˆ๋Ÿฌ์˜ค๊ธฐ
def seprate_speaker(audio_file, pipeline):
audio = Audio()
waveform, sample_rate = torchaudio.load(audio_file)
diarization = pipeline(audio_file)
# ํ™”์ž๋ณ„๋กœ ๋ฐœํ™” ๊ตฌ๊ฐ„์„ ์ €์žฅํ•  ๋”•์…”๋„ˆ๋ฆฌ ์ดˆ๊ธฐํ™”
speaker_segments = {}
# diarization ๊ฒฐ๊ณผ๋ฅผ ์ˆœํšŒํ•˜๋ฉฐ ๊ฐ ํ™”์ž์˜ ๋ฐœํ™”๋ฅผ ๋”•์…”๋„ˆ๋ฆฌ์— ์ถ”๊ฐ€
for segment, _, speaker in diarization.itertracks(yield_label=True):
start_time = segment.start
end_time = segment.end
# ํ•ด๋‹น ํ™”์ž๊ฐ€ ์ฒ˜์Œ ๋“ฑ์žฅํ•˜๋ฉด ๋ฆฌ์ŠคํŠธ๋ฅผ ์ดˆ๊ธฐํ™”
if speaker not in speaker_segments:
speaker_segments[speaker] = []
# ๋ฐœํ™” ๊ตฌ๊ฐ„์„ ํ•ด๋‹น ํ™”์ž์˜ ๋ฆฌ์ŠคํŠธ์— ์ถ”๊ฐ€
segment_waveform = waveform[:, int(start_time * sample_rate):int(end_time * sample_rate)]
speaker_segments[speaker].append(segment_waveform)
# ๊ฐ ํ™”์ž๋ณ„๋กœ ๋ชจ๋“  ๋ฐœํ™” ๊ตฌ๊ฐ„์„ ํ•˜๋‚˜์˜ ํŒŒ์ผ๋กœ ์ด์–ด๋ถ™์—ฌ ์ €์žฅ
for speaker, segments in speaker_segments.items():
# ํ™”์ž์˜ ๋ชจ๋“  ๋ฐœํ™” ๊ตฌ๊ฐ„์„ ์ด์–ด๋ถ™์ž„
combined_waveform = torch.cat(segments, dim=1)
output_path = "/tmp/wav" # ๊ฒฝ๋กœ
os.makedirs(output_path, exist_ok=True) # ๊ฒฝ๋กœ๊ฐ€ ์—†์œผ๋ฉด ์ƒ์„ฑ
output_filename = os.path.join(output_path,f"{speaker}.wav")
torchaudio.save(output_filename, combined_waveform, sample_rate) #์˜ค๋””์˜ค ํŒŒ์ผ ์ €์žฅ
# ๊ฐ„๋‹จํ•œ DeepVoice ์Šคํƒ€์ผ ๋ชจ๋ธ ์ •์˜
class DeepVoiceModel(nn.Module):
def __init__(self, input_dim, hidden_dim, num_classes, dropout_rate=0.3, l2_reg=0.01):
super(DeepVoiceModel, self).__init__()
self.conv1 = nn.Conv1d(input_dim, hidden_dim, kernel_size=5, padding=2)
self.bn1 = nn.BatchNorm1d(hidden_dim)
self.conv2 = nn.Conv1d(hidden_dim, hidden_dim, kernel_size=5, padding=2)
self.bn2 = nn.BatchNorm1d(hidden_dim)
self.dropout = nn.Dropout(dropout_rate)
self.fc = nn.Linear(hidden_dim, num_classes)
def forward(self, x):
x = self.bn1(torch.relu(self.conv1(x)))
x = self.dropout(x)
x = self.bn2(torch.relu(self.conv2(x)))
x = self.dropout(x)
x = torch.mean(x, dim=2) # Temporal pooling
x = self.fc(x)
return x
def extract_mfcc_path(file_path, n_mfcc=13, max_len=100):
# ์Œ์„ฑ ํŒŒ์ผ
audio, sample_rate = librosa.load(file_path, sr=None)
# mfcc ํŠน์„ฑ ์ถ”์ถœ
mfcc = librosa.feature.mfcc(y=audio, sr=sample_rate, n_mfcc=n_mfcc)
# ์ผ์ •ํ•œ ๊ธธ์ด๋กœ ๋งž์ถค
if mfcc.shape[1] < max_len:
pad_width = max_len - mfcc.shape[1]
mfcc = np.pad(mfcc, ((0, 0), (0, pad_width)), mode='constant')
else:
mfcc = mfcc[:, :max_len]
return torch.Tensor(mfcc)
# ํด๋”์— ์žˆ๋Š” ๋ฐ์ดํ„ฐ ํ•œ๋ฒˆ์— ์ ‘๊ทผํ•ด์„œ ํ•œ๋ฒˆ์— ์ฒดํฌ
def real_fake_check(list_dir, path, model):
THRESHOLD = 0.4 #๋”ฅํŽ˜์ดํฌ ๊ธฐ์ค€์„ 0.4๋กœ ์„ค์ •
r_cnt = 0
f_cnt = 0
prob = []
for i in list_dir: # real / fake ์„ ํƒ
input_data = extract_mfcc_path(os.path.join(path, i))
#input_data = torch.tensor(input_data).unsqueeze(0).to('cuda') # ๋ฐฐ์น˜ ์ฐจ์›์„ ์ถ”๊ฐ€ํ•˜์—ฌ (1, input_dim, sequence_length)๋กœ ๋งž์ถค
input_data = torch.tensor(input_data).unsqueeze(0).to('cpu')
result = model(input_data.float())
probabilities = F.softmax(result, dim=1)
#prob[i]='%.2f'%probabilities[0][1].item()
#prob[i]=round(probabilities[0][1].item(),2)
prob.append(probabilities[0][1].item())
predicted_class = 0 if probabilities[0][0] >= THRESHOLD else 1 # ํ™•๋ฅ ๊ฐ’์ด ๊ธฐ์ค€์น˜๋ณด๋‹ค ํฌ๋‹ค๋ฉด real, ์•„๋‹ˆ๋ฉด fake
if predicted_class == 0:
r_cnt += 1
else:
f_cnt += 1
return {'real':r_cnt, 'fake':f_cnt, 'prob': prob}
# ์Œ์„ฑ์—์„œ text ์ถ”์ถœ
def convert_wav_to_text(wav_file_path):
recognizer = sr.Recognizer()
# WAV ํŒŒ์ผ ์—ด๊ธฐ
with sr.AudioFile(wav_file_path) as source:
print("WAV ํŒŒ์ผ์—์„œ ์˜ค๋””์˜ค๋ฅผ ๋กœ๋“œ ์ค‘...")
audio_data = recognizer.record(source) # ์ „์ฒด ์˜ค๋””์˜ค๋ฅผ ๋…น์Œ
try:
# Google Web Speech API๋กœ ํ…์ŠคํŠธ ๋ณ€ํ™˜
text = recognizer.recognize_google(audio_data)
except sr.UnknownValueError:
text = 'error'
except sr.RequestError as e:
text = 'error'
return text
def main(file_name):
if os.path.exists('/tmp/wav'):
shutil.rmtree('/tmp/wav')
hf_token = os.getenv("HUGGINGFACE_TOKEN")
pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization-3.1",use_auth_token=hf_token)
#device = torch.device('cuda:0') if torch.cuda.is_available() else "cpu"#torch.device('cpu')
device = torch.device('cpu')
#device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
video_file = file_name #deepfake #meganfox.mp4'
#current_path = os.getcwd()
audio_file = '/tmp/output_audio.wav' # ์ €์žฅํ•  ์˜ค๋””์˜ค ํŒŒ์ผ์˜ ๊ฒฝ๋กœ, ์ด๋ฆ„ ์ง€์ •
extract_audio_from_video(video_file, audio_file)
seprate_speaker(audio_file,pipeline) # ๋ฐœํ™”์ž ๋ถ„๋ฆฌํ•ด์„œ ํŒŒ์ผ๋กœ ๋งŒ๋“ค๊ธฐ
mel_dim = 13 # Mel-spectrogram ์ฐจ์›
num_classes = 2 # ๋ถ„๋ฅ˜ํ•  ํด๋ž˜์Šค ์ˆ˜
input_dim = mel_dim
hidden_dim = 128
dropout_rate = 0.2
l2_reg = 0.01
# ๋ชจ๋ธ
model_name = hf_hub_download(repo_id="sssssungk/deepfake_voice", filename="deepvoice_model_girl.pth")
model = DeepVoiceModel(input_dim, hidden_dim, num_classes, dropout_rate, l2_reg).to(device)
model.load_state_dict(torch.load(model_name, map_location=torch.device('cpu')))
model.eval() # ํ‰๊ฐ€ ๋ชจ๋“œ๋กœ ์„ค์ •
path = '/tmp/wav'
file_path = os.listdir(path)
rf_check = real_fake_check(file_path, path,model) #fake dataset\
text_list =[]
to_text = os.listdir('/tmp/wav')
for i in to_text:
text = convert_wav_to_text(os.path.join(path, i))
text_list.append(text)
text_list.append(convert_wav_to_text(audio_file))
# '์ถ”๊ฐ€(" "ํ˜•์‹์œผ๋กœ ๋‚˜์˜ค๊ฒŒ)
for i in range(len(text_list)):
text_list[i] = text_list[i]+"'"
return rf_check, text_list
def deepvoice_check(video_file):
results,text = main(video_file)
return results,text
# Gradio ์ธํ„ฐํŽ˜์ด์Šค ์ƒ์„ฑ
deepfake = gr.Interface(
fn=deepvoice_check,
inputs=gr.Video(label="Upload mp4 File"),
outputs=gr.Textbox(label="DeepFaKeVoice Detection Result"),
title="DeepFaKeVoice Check",
description="Upload an mp4 file to check."
)
if __name__ == "__main__":
deepfake.launch(share=True, debug=True)