import warnings warnings.filterwarnings("ignore") # 경고 무시 from moviepy import * #VideoFileClip #!pip install pyannote.audio #!pip install moviepy #!pip install gradio import librosa import numpy as np import os import torch import torch.nn as nn import torch.optim as optim from torch.utils.data import DataLoader, TensorDataset import torch.functional as F import torch.nn.functional as F from pyannote.audio import Pipeline from pyannote.audio import Audio import torchaudio from transformers import pipeline from huggingface_hub import hf_hub_download import gradio as gr import shutil import speech_recognition as sr # 오디오 변환 mp4 --> wav def extract_audio_from_video(video_file_path, audio_file_path): # mp4 파일 불러오기 video = VideoFileClip(video_file_path) # 오디오를 추출하여 wav 파일로 저장 video.audio.write_audiofile(audio_file_path, codec='pcm_s16le') # 전체 오디오 파일 불러오기 def seprate_speaker(audio_file, pipeline): audio = Audio() waveform, sample_rate = torchaudio.load(audio_file) diarization = pipeline(audio_file) # 화자별로 발화 구간을 저장할 딕셔너리 초기화 speaker_segments = {} # diarization 결과를 순회하며 각 화자의 발화를 딕셔너리에 추가 for segment, _, speaker in diarization.itertracks(yield_label=True): start_time = segment.start end_time = segment.end # 해당 화자가 처음 등장하면 리스트를 초기화 if speaker not in speaker_segments: speaker_segments[speaker] = [] # 발화 구간을 해당 화자의 리스트에 추가 segment_waveform = waveform[:, int(start_time * sample_rate):int(end_time * sample_rate)] speaker_segments[speaker].append(segment_waveform) # 각 화자별로 모든 발화 구간을 하나의 파일로 이어붙여 저장 for speaker, segments in speaker_segments.items(): # 화자의 모든 발화 구간을 이어붙임 combined_waveform = torch.cat(segments, dim=1) output_path = "/tmp/wav" # 경로 os.makedirs(output_path, exist_ok=True) # 경로가 없으면 생성 output_filename = os.path.join(output_path,f"{speaker}.wav") torchaudio.save(output_filename, combined_waveform, sample_rate) #오디오 파일 저장 # 간단한 DeepVoice 스타일 모델 정의 class DeepVoiceModel(nn.Module): def __init__(self, input_dim, hidden_dim, num_classes, dropout_rate=0.3, l2_reg=0.01): super(DeepVoiceModel, self).__init__() self.conv1 = nn.Conv1d(input_dim, hidden_dim, kernel_size=5, padding=2) self.bn1 = nn.BatchNorm1d(hidden_dim) self.conv2 = nn.Conv1d(hidden_dim, hidden_dim, kernel_size=5, padding=2) self.bn2 = nn.BatchNorm1d(hidden_dim) self.dropout = nn.Dropout(dropout_rate) self.fc = nn.Linear(hidden_dim, num_classes) def forward(self, x): x = self.bn1(torch.relu(self.conv1(x))) x = self.dropout(x) x = self.bn2(torch.relu(self.conv2(x))) x = self.dropout(x) x = torch.mean(x, dim=2) # Temporal pooling x = self.fc(x) return x def extract_mfcc_path(file_path, n_mfcc=13, max_len=100): # 음성 파일 audio, sample_rate = librosa.load(file_path, sr=None) # mfcc 특성 추출 mfcc = librosa.feature.mfcc(y=audio, sr=sample_rate, n_mfcc=n_mfcc) # 일정한 길이로 맞춤 if mfcc.shape[1] < max_len: pad_width = max_len - mfcc.shape[1] mfcc = np.pad(mfcc, ((0, 0), (0, pad_width)), mode='constant') else: mfcc = mfcc[:, :max_len] return torch.Tensor(mfcc) # 폴더에 있는 데이터 한번에 접근해서 한번에 체크 def real_fake_check(list_dir, path, model): THRESHOLD = 0.4 #딥페이크 기준을 0.4로 설정 r_cnt = 0 f_cnt = 0 prob = [] for i in list_dir: # real / fake 선택 input_data = extract_mfcc_path(os.path.join(path, i)) #input_data = torch.tensor(input_data).unsqueeze(0).to('cuda') # 배치 차원을 추가하여 (1, input_dim, sequence_length)로 맞춤 input_data = torch.tensor(input_data).unsqueeze(0).to('cpu') result = model(input_data.float()) probabilities = F.softmax(result, dim=1) #prob[i]='%.2f'%probabilities[0][1].item() #prob[i]=round(probabilities[0][1].item(),2) prob.append(probabilities[0][1].item()) predicted_class = 0 if probabilities[0][0] >= THRESHOLD else 1 # 확률값이 기준치보다 크다면 real, 아니면 fake if predicted_class == 0: r_cnt += 1 else: f_cnt += 1 return {'real':r_cnt, 'fake':f_cnt, 'prob': prob} # 음성에서 text 추출 def convert_wav_to_text(wav_file_path): recognizer = sr.Recognizer() # WAV 파일 열기 with sr.AudioFile(wav_file_path) as source: print("WAV 파일에서 오디오를 로드 중...") audio_data = recognizer.record(source) # 전체 오디오를 녹음 try: # Google Web Speech API로 텍스트 변환 text = recognizer.recognize_google(audio_data) except sr.UnknownValueError: text = 'error' except sr.RequestError as e: text = 'error' return text def main(file_name): if os.path.exists('/tmp/wav'): shutil.rmtree('/tmp/wav') hf_token = os.getenv("HUGGINGFACE_TOKEN") pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization-3.1",use_auth_token=hf_token) #device = torch.device('cuda:0') if torch.cuda.is_available() else "cpu"#torch.device('cpu') device = torch.device('cpu') #device = torch.device("cuda" if torch.cuda.is_available() else "cpu") video_file = file_name #deepfake #meganfox.mp4' #current_path = os.getcwd() audio_file = '/tmp/output_audio.wav' # 저장할 오디오 파일의 경로, 이름 지정 extract_audio_from_video(video_file, audio_file) seprate_speaker(audio_file,pipeline) # 발화자 분리해서 파일로 만들기 mel_dim = 13 # Mel-spectrogram 차원 num_classes = 2 # 분류할 클래스 수 input_dim = mel_dim hidden_dim = 128 dropout_rate = 0.2 l2_reg = 0.01 # 모델 model_name = hf_hub_download(repo_id="sssssungk/deepfake_voice", filename="deepvoice_model_girl.pth") model = DeepVoiceModel(input_dim, hidden_dim, num_classes, dropout_rate, l2_reg).to(device) model.load_state_dict(torch.load(model_name, map_location=torch.device('cpu'))) model.eval() # 평가 모드로 설정 path = '/tmp/wav' file_path = os.listdir(path) rf_check = real_fake_check(file_path, path,model) #fake dataset\ text_list =[] to_text = os.listdir('/tmp/wav') for i in to_text: text = convert_wav_to_text(os.path.join(path, i)) text_list.append(text) text_list.append(convert_wav_to_text(audio_file)) # '추가(" "형식으로 나오게) for i in range(len(text_list)): text_list[i] = text_list[i]+"'" return rf_check, text_list def deepvoice_check(video_file): results,text = main(video_file) return results,text # Gradio 인터페이스 생성 deepfake = gr.Interface( fn=deepvoice_check, inputs=gr.Video(label="Upload mp4 File"), outputs=gr.Textbox(label="DeepFaKeVoice Detection Result"), title="DeepFaKeVoice Check", description="Upload an mp4 file to check." ) if __name__ == "__main__": deepfake.launch(share=True, debug=True)