Spaces:
Sleeping
Sleeping
| import warnings | |
| warnings.filterwarnings("ignore") # 경고 무시 | |
| #!pip install pyannote.audio | |
| #!pip install moviepy | |
| import librosa | |
| import numpy as np | |
| import os | |
| import torch | |
| import torch.nn as nn | |
| import torch.optim as optim | |
| from torch.utils.data import DataLoader, TensorDataset | |
| import torch.functional as F | |
| from pyannote.audio import Pipeline | |
| from pyannote.audio import Audio | |
| import torchaudio | |
| import torch.nn.functional as F | |
| import os | |
| from moviepy.editor import VideoFileClip | |
| from transformers import pipeline | |
| from huggingface_hub import hf_hub_download | |
| #!pip install gradio | |
| import gradio as gr | |
| from moviepy.editor import VideoFileClip | |
| # 오디오 변환 mp4 --> wav | |
| def extract_audio_from_video(video_file_path, audio_file_path): | |
| # mp4 파일 불러오기 | |
| video = VideoFileClip(video_file_path) | |
| # 오디오를 추출하여 wav 파일로 저장 | |
| video.audio.write_audiofile(audio_file_path, codec='pcm_s16le') | |
| # 전체 오디오 파일 불러오기 | |
| def seprate_speaker(audio_file, pipeline): | |
| audio = Audio() | |
| waveform, sample_rate = torchaudio.load(audio_file) | |
| diarization = pipeline(audio_file) | |
| # 화자별로 발화 구간을 저장할 딕셔너리 초기화 | |
| speaker_segments = {} | |
| # diarization 결과를 순회하며 각 화자의 발화를 딕셔너리에 추가 | |
| for segment, _, speaker in diarization.itertracks(yield_label=True): | |
| start_time = segment.start | |
| end_time = segment.end | |
| # 해당 화자가 처음 등장하면 리스트를 초기화 | |
| if speaker not in speaker_segments: | |
| speaker_segments[speaker] = [] | |
| # 발화 구간을 해당 화자의 리스트에 추가 | |
| segment_waveform = waveform[:, int(start_time * sample_rate):int(end_time * sample_rate)] | |
| speaker_segments[speaker].append(segment_waveform) | |
| # 각 화자별로 모든 발화 구간을 하나의 파일로 이어붙여 저장 | |
| for speaker, segments in speaker_segments.items(): | |
| # 화자의 모든 발화 구간을 이어붙임 | |
| combined_waveform = torch.cat(segments, dim=1) | |
| # output_path = "/content/wav" # 경로 | |
| output_path = './output' | |
| os.makedirs(output_path, exist_ok=True) # 경로가 없으면 생성 | |
| output_filename = os.path.join(output_path,f"{speaker}.wav") | |
| torchaudio.save(output_filename, combined_waveform, sample_rate) #오디오 파일 저장 | |
| #print(f"Saved {output_filename} for speaker {speaker}") | |
| # 간단한 DeepVoice 스타일 모델 정의 | |
| class DeepVoiceModel(nn.Module): | |
| def __init__(self, input_dim, hidden_dim, num_classes, dropout_rate=0.3, l2_reg=0.01): | |
| super(DeepVoiceModel, self).__init__() | |
| self.conv1 = nn.Conv1d(input_dim, hidden_dim, kernel_size=5, padding=2) | |
| self.bn1 = nn.BatchNorm1d(hidden_dim) | |
| self.conv2 = nn.Conv1d(hidden_dim, hidden_dim, kernel_size=5, padding=2) | |
| self.bn2 = nn.BatchNorm1d(hidden_dim) | |
| self.dropout = nn.Dropout(dropout_rate) | |
| self.fc = nn.Linear(hidden_dim, num_classes) | |
| def forward(self, x): | |
| x = self.bn1(torch.relu(self.conv1(x))) | |
| x = self.dropout(x) | |
| x = self.bn2(torch.relu(self.conv2(x))) | |
| x = self.dropout(x) | |
| x = torch.mean(x, dim=2) # Temporal pooling | |
| x = self.fc(x) | |
| return x | |
| def extract_mfcc_path(file_path, n_mfcc=13, max_len=100): | |
| # 음성 파일 | |
| audio, sample_rate = librosa.load(file_path, sr=None) | |
| # mfcc 특성 추출 | |
| mfcc = librosa.feature.mfcc(y=audio, sr=sample_rate, n_mfcc=n_mfcc) | |
| # 일정한 길이로 맞춤 | |
| if mfcc.shape[1] < max_len: | |
| pad_width = max_len - mfcc.shape[1] | |
| mfcc = np.pad(mfcc, ((0, 0), (0, pad_width)), mode='constant') | |
| else: | |
| mfcc = mfcc[:, :max_len] | |
| return torch.Tensor(mfcc) | |
| # 폴더에 있는 데이터 한번에 접근해서 한번에 체크 | |
| def real_fake_check(list_dir, path, model): | |
| THRESHOLD = 0.4 #딥페이크 기준을 0.4로 설정 | |
| r_cnt = 0 | |
| f_cnt = 0 | |
| prob = {} | |
| for i in list_dir: # real / fake 선택 | |
| #print('------',i) | |
| input_data = extract_mfcc_path(os.path.join(path, i)) | |
| input_data = torch.tensor(input_data).unsqueeze(0).to(device) # 배치 차원을 추가하여 (1, input_dim, sequence_length)로 맞춤 | |
| result = model(input_data.float()) | |
| # predicted_class = torch.argmax(result, dim=1).item() | |
| probabilities = F.softmax(result, dim=1) | |
| prob[i]='%.2f'%probabilities[0][1].item() | |
| predicted_class = 0 if probabilities[0][0] >= THRESHOLD else 1 # 확률값이 기준치보다 크다면 real, 아니면 fake | |
| # print('-- %.2f'%probabilities[0][0].item()) #확률 값 출력 | |
| if predicted_class == 0: | |
| # print("REAL") | |
| r_cnt += 1 | |
| else: | |
| # print("FAKE") | |
| f_cnt += 1 | |
| #print() | |
| #print('real: ',r_cnt,'/',len(list_dir)) | |
| #print('fake: ',f_cnt,'/',len(list_dir)) | |
| return {'real: ':f'{r_cnt}/{len(list_dir)}', 'fake: ':f'{f_cnt}/{len(list_dir)}', 'prob: ': prob} | |
| def main(file_name): | |
| my_key = os.getenv("my_key") | |
| pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization-3.1", | |
| use_auth_token=my_key) | |
| # pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization-3.1") | |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| video_file = file_name #deepfake #meganfox.mp4' | |
| audio_file = './output_audio.wav' # 저장할 오디오 파일의 경로, 이름 지정 | |
| extract_audio_from_video(video_file, audio_file) | |
| seprate_speaker(audio_file,pipeline) # 발화자 분리해서 파일로 만들기 | |
| mel_dim = 13 # Mel-spectrogram 차원 | |
| num_classes = 2 # 분류할 클래스 수 | |
| input_dim = mel_dim | |
| hidden_dim = 128 | |
| dropout_rate = 0.2 | |
| l2_reg = 0.01 | |
| # 모델 | |
| model_name = './deepvoice_model_girl.pth' | |
| model = DeepVoiceModel(input_dim, hidden_dim, num_classes, dropout_rate, l2_reg).to(device) | |
| model.load_state_dict(torch.load(model_name, map_location=torch.device(device)))#("/content/drive/MyDrive/캡스톤 1조/model/deepvoice_model_girl.pth")) | |
| model.eval() # 평가 모드로 설정 | |
| #real,fake 폴더 | |
| #real_path = '/content/drive/MyDrive/캡스톤 1조/data/deepvoice/real' | |
| #real_path = '/content/drive/MyDrive/Celeb-DF-v2/Celeb-real' | |
| #real = os.listdir(real_path) | |
| fake_path = './output'#'/content/drive/MyDrive/캡스톤 1조/data/deepvoice/fake' | |
| fake = os.listdir(fake_path) | |
| #print("\n-------real data---------") | |
| #real_fake_check(real, real_path, model) #real dataset | |
| #print("\n-------fake data---------") | |
| rf_check = real_fake_check(fake, fake_path,model) #fake dataset\ | |
| return rf_check | |
| #Gradio 메인 함수 | |
| def deepvoice_check(video_file): | |
| results = main(video_file) | |
| return results | |
| # Gradio 인터페이스 생성 | |
| iface = gr.Interface( | |
| fn=main, | |
| inputs=gr.Video(label="Upload mp4 File"), | |
| outputs=gr.Textbox(label="Deepfake Detection Result"), | |
| title="DeepVoice Check", | |
| description="Upload an mp4 file to check for DeepVoice indicators." | |
| ) | |
| # Gradio 인터페이스 실행 | |
| iface.launch(share=True, debug=True) |