Spaces:
Runtime error
Runtime error
| import warnings | |
| warnings.filterwarnings("ignore") # ๊ฒฝ๊ณ ๋ฌด์ | |
| from moviepy import * #VideoFileClip | |
| #!pip install pyannote.audio | |
| #!pip install moviepy | |
| #!pip install gradio | |
| import librosa | |
| import numpy as np | |
| import os | |
| import torch | |
| import torch.nn as nn | |
| import torch.optim as optim | |
| from torch.utils.data import DataLoader, TensorDataset | |
| import torch.functional as F | |
| import torch.nn.functional as F | |
| from pyannote.audio import Pipeline | |
| from pyannote.audio import Audio | |
| import torchaudio | |
| from transformers import pipeline | |
| from huggingface_hub import hf_hub_download | |
| import gradio as gr | |
| import shutil | |
| import speech_recognition as sr | |
| # ์ค๋์ค ๋ณํ mp4 --> wav | |
| def extract_audio_from_video(video_file_path, audio_file_path): | |
| # mp4 ํ์ผ ๋ถ๋ฌ์ค๊ธฐ | |
| video = VideoFileClip(video_file_path) | |
| # ์ค๋์ค๋ฅผ ์ถ์ถํ์ฌ wav ํ์ผ๋ก ์ ์ฅ | |
| video.audio.write_audiofile(audio_file_path, codec='pcm_s16le') | |
| # ์ ์ฒด ์ค๋์ค ํ์ผ ๋ถ๋ฌ์ค๊ธฐ | |
| def seprate_speaker(audio_file, pipeline): | |
| audio = Audio() | |
| waveform, sample_rate = torchaudio.load(audio_file) | |
| diarization = pipeline(audio_file) | |
| # ํ์๋ณ๋ก ๋ฐํ ๊ตฌ๊ฐ์ ์ ์ฅํ ๋์ ๋๋ฆฌ ์ด๊ธฐํ | |
| speaker_segments = {} | |
| # diarization ๊ฒฐ๊ณผ๋ฅผ ์ํํ๋ฉฐ ๊ฐ ํ์์ ๋ฐํ๋ฅผ ๋์ ๋๋ฆฌ์ ์ถ๊ฐ | |
| for segment, _, speaker in diarization.itertracks(yield_label=True): | |
| start_time = segment.start | |
| end_time = segment.end | |
| # ํด๋น ํ์๊ฐ ์ฒ์ ๋ฑ์ฅํ๋ฉด ๋ฆฌ์คํธ๋ฅผ ์ด๊ธฐํ | |
| if speaker not in speaker_segments: | |
| speaker_segments[speaker] = [] | |
| # ๋ฐํ ๊ตฌ๊ฐ์ ํด๋น ํ์์ ๋ฆฌ์คํธ์ ์ถ๊ฐ | |
| segment_waveform = waveform[:, int(start_time * sample_rate):int(end_time * sample_rate)] | |
| speaker_segments[speaker].append(segment_waveform) | |
| # ๊ฐ ํ์๋ณ๋ก ๋ชจ๋ ๋ฐํ ๊ตฌ๊ฐ์ ํ๋์ ํ์ผ๋ก ์ด์ด๋ถ์ฌ ์ ์ฅ | |
| for speaker, segments in speaker_segments.items(): | |
| # ํ์์ ๋ชจ๋ ๋ฐํ ๊ตฌ๊ฐ์ ์ด์ด๋ถ์ | |
| combined_waveform = torch.cat(segments, dim=1) | |
| output_path = "/tmp/wav" # ๊ฒฝ๋ก | |
| os.makedirs(output_path, exist_ok=True) # ๊ฒฝ๋ก๊ฐ ์์ผ๋ฉด ์์ฑ | |
| output_filename = os.path.join(output_path,f"{speaker}.wav") | |
| torchaudio.save(output_filename, combined_waveform, sample_rate) #์ค๋์ค ํ์ผ ์ ์ฅ | |
| # ๊ฐ๋จํ DeepVoice ์คํ์ผ ๋ชจ๋ธ ์ ์ | |
| class DeepVoiceModel(nn.Module): | |
| def __init__(self, input_dim, hidden_dim, num_classes, dropout_rate=0.3, l2_reg=0.01): | |
| super(DeepVoiceModel, self).__init__() | |
| self.conv1 = nn.Conv1d(input_dim, hidden_dim, kernel_size=5, padding=2) | |
| self.bn1 = nn.BatchNorm1d(hidden_dim) | |
| self.conv2 = nn.Conv1d(hidden_dim, hidden_dim, kernel_size=5, padding=2) | |
| self.bn2 = nn.BatchNorm1d(hidden_dim) | |
| self.dropout = nn.Dropout(dropout_rate) | |
| self.fc = nn.Linear(hidden_dim, num_classes) | |
| def forward(self, x): | |
| x = self.bn1(torch.relu(self.conv1(x))) | |
| x = self.dropout(x) | |
| x = self.bn2(torch.relu(self.conv2(x))) | |
| x = self.dropout(x) | |
| x = torch.mean(x, dim=2) # Temporal pooling | |
| x = self.fc(x) | |
| return x | |
| def extract_mfcc_path(file_path, n_mfcc=13, max_len=100): | |
| # ์์ฑ ํ์ผ | |
| audio, sample_rate = librosa.load(file_path, sr=None) | |
| # mfcc ํน์ฑ ์ถ์ถ | |
| mfcc = librosa.feature.mfcc(y=audio, sr=sample_rate, n_mfcc=n_mfcc) | |
| # ์ผ์ ํ ๊ธธ์ด๋ก ๋ง์ถค | |
| if mfcc.shape[1] < max_len: | |
| pad_width = max_len - mfcc.shape[1] | |
| mfcc = np.pad(mfcc, ((0, 0), (0, pad_width)), mode='constant') | |
| else: | |
| mfcc = mfcc[:, :max_len] | |
| return torch.Tensor(mfcc) | |
| # ํด๋์ ์๋ ๋ฐ์ดํฐ ํ๋ฒ์ ์ ๊ทผํด์ ํ๋ฒ์ ์ฒดํฌ | |
| def real_fake_check(list_dir, path, model): | |
| THRESHOLD = 0.4 #๋ฅํ์ดํฌ ๊ธฐ์ค์ 0.4๋ก ์ค์ | |
| r_cnt = 0 | |
| f_cnt = 0 | |
| prob = [] | |
| for i in list_dir: # real / fake ์ ํ | |
| input_data = extract_mfcc_path(os.path.join(path, i)) | |
| #input_data = torch.tensor(input_data).unsqueeze(0).to('cuda') # ๋ฐฐ์น ์ฐจ์์ ์ถ๊ฐํ์ฌ (1, input_dim, sequence_length)๋ก ๋ง์ถค | |
| input_data = torch.tensor(input_data).unsqueeze(0).to('cpu') | |
| result = model(input_data.float()) | |
| probabilities = F.softmax(result, dim=1) | |
| #prob[i]='%.2f'%probabilities[0][1].item() | |
| #prob[i]=round(probabilities[0][1].item(),2) | |
| prob.append(probabilities[0][1].item()) | |
| predicted_class = 0 if probabilities[0][0] >= THRESHOLD else 1 # ํ๋ฅ ๊ฐ์ด ๊ธฐ์ค์น๋ณด๋ค ํฌ๋ค๋ฉด real, ์๋๋ฉด fake | |
| if predicted_class == 0: | |
| r_cnt += 1 | |
| else: | |
| f_cnt += 1 | |
| return {'real':r_cnt, 'fake':f_cnt, 'prob': prob} | |
| # ์์ฑ์์ text ์ถ์ถ | |
| def convert_wav_to_text(wav_file_path): | |
| recognizer = sr.Recognizer() | |
| # WAV ํ์ผ ์ด๊ธฐ | |
| with sr.AudioFile(wav_file_path) as source: | |
| print("WAV ํ์ผ์์ ์ค๋์ค๋ฅผ ๋ก๋ ์ค...") | |
| audio_data = recognizer.record(source) # ์ ์ฒด ์ค๋์ค๋ฅผ ๋ น์ | |
| try: | |
| # Google Web Speech API๋ก ํ ์คํธ ๋ณํ | |
| text = recognizer.recognize_google(audio_data) | |
| except sr.UnknownValueError: | |
| text = 'error' | |
| except sr.RequestError as e: | |
| text = 'error' | |
| return text | |
| def main(file_name): | |
| if os.path.exists('/tmp/wav'): | |
| shutil.rmtree('/tmp/wav') | |
| hf_token = os.getenv("HUGGINGFACE_TOKEN") | |
| pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization-3.1",use_auth_token=hf_token) | |
| #device = torch.device('cuda:0') if torch.cuda.is_available() else "cpu"#torch.device('cpu') | |
| device = torch.device('cpu') | |
| #device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| video_file = file_name #deepfake #meganfox.mp4' | |
| #current_path = os.getcwd() | |
| audio_file = '/tmp/output_audio.wav' # ์ ์ฅํ ์ค๋์ค ํ์ผ์ ๊ฒฝ๋ก, ์ด๋ฆ ์ง์ | |
| extract_audio_from_video(video_file, audio_file) | |
| seprate_speaker(audio_file,pipeline) # ๋ฐํ์ ๋ถ๋ฆฌํด์ ํ์ผ๋ก ๋ง๋ค๊ธฐ | |
| mel_dim = 13 # Mel-spectrogram ์ฐจ์ | |
| num_classes = 2 # ๋ถ๋ฅํ ํด๋์ค ์ | |
| input_dim = mel_dim | |
| hidden_dim = 128 | |
| dropout_rate = 0.2 | |
| l2_reg = 0.01 | |
| # ๋ชจ๋ธ | |
| model_name = hf_hub_download(repo_id="sssssungk/deepfake_voice", filename="deepvoice_model_girl.pth") | |
| model = DeepVoiceModel(input_dim, hidden_dim, num_classes, dropout_rate, l2_reg).to(device) | |
| model.load_state_dict(torch.load(model_name, map_location=torch.device('cpu'))) | |
| model.eval() # ํ๊ฐ ๋ชจ๋๋ก ์ค์ | |
| path = '/tmp/wav' | |
| file_path = os.listdir(path) | |
| rf_check = real_fake_check(file_path, path,model) #fake dataset\ | |
| text_list =[] | |
| to_text = os.listdir('/tmp/wav') | |
| for i in to_text: | |
| text = convert_wav_to_text(os.path.join(path, i)) | |
| text_list.append(text) | |
| text_list.append(convert_wav_to_text(audio_file)) | |
| # '์ถ๊ฐ(" "ํ์์ผ๋ก ๋์ค๊ฒ) | |
| for i in range(len(text_list)): | |
| text_list[i] = text_list[i]+"'" | |
| return rf_check, text_list | |
| def deepvoice_check(video_file): | |
| results,text = main(video_file) | |
| return results,text | |
| # Gradio ์ธํฐํ์ด์ค ์์ฑ | |
| deepfake = gr.Interface( | |
| fn=deepvoice_check, | |
| inputs=gr.Video(label="Upload mp4 File"), | |
| outputs=gr.Textbox(label="DeepFaKeVoice Detection Result"), | |
| title="DeepFaKeVoice Check", | |
| description="Upload an mp4 file to check." | |
| ) | |
| if __name__ == "__main__": | |
| deepfake.launch(share=True, debug=True) |