Spaces:
Build error
Build error
| from multiprocessing import Pool | |
| from PIL import Image | |
| import sys | |
| # sys.path.append('/mnt/data10t/dazuoye/GROUP2024-GEN6/FakeSV/code_test') | |
| # from imageocr import imageocr | |
| # from Extract_audio import extract_audio | |
| from FakeVD.code_test.VGGish_Feature_Extractor.my_vggish_fun import vggish_audio as extract_audio | |
| from FakeVD.code_test.VGG19_Feature_Extractor.vgg19_feature import load_model_vgg19 | |
| # from Imghash import imghash | |
| # from Extract_frame import V2frame | |
| from FakeVD.code_test.Text_Feature_Extractor.wav2text import wav2text | |
| import os | |
| import shutil | |
| # from Sentence_distance import sentence_distance | |
| import time | |
| import json | |
| import cv2 | |
| # audio_path = '/mnt/data10t/dazuoye/GROUP2024-GEN6/FakeSV/dataset/1.wav' | |
| # frame_path = '' | |
| # datasave_path = '/mnt/data10t/dazuoye/GROUP2024-GEN6/FakeSV/code_test/Text_Feature/data' | |
| # video_path = '/mnt/data10t/dazuoye/GROUP2024-GEN6/FakeSV/code_test/Text_Feature/videos' | |
| # def get_string_from_list(A): | |
| # if type(A) == list: | |
| # ret = '' | |
| # for i in A: | |
| # ret += get_string_from_list(i) | |
| # return ret | |
| # if type(A) == str: | |
| # return A | |
| # return "" | |
| # ocr_ret = [] | |
| # def ocr_frame(frame): | |
| # img_path = os.path.join(frame_path, str(frame)+'.jpg') | |
| # now_ocr_result = imageocr.work(img_path) | |
| # ocr_ret.append({'frame': frame, 'result': now_ocr_result}) | |
| # def get_frame(x): | |
| # return x['frame'] | |
| # def OCR(path): | |
| # imgH = imghash() | |
| # cropimgH = imghash() | |
| # # 返回结果 | |
| # ret = [] | |
| # # 当前帧 | |
| # now_frame = -15 | |
| # # 上一次执行ocr的帧 | |
| # last_frame = 0 | |
| # # 上一次的ocr结果拼接字符串 | |
| # last_ocr_result_string = "ocr at first" | |
| # # 限 | |
| # k = 8 | |
| # # 历史最高k点 | |
| # kmax = 20 | |
| # # 匹配标识 | |
| # marchflag = False | |
| # while True: | |
| # now_frame += 15 | |
| # img_path = os.path.join(frame_path, str(now_frame)+'.jpg') | |
| # if not os.path.exists(img_path): | |
| # print('[OCR all] done', now_frame) | |
| # break | |
| # # 相似度高,无需ocr | |
| # if not imgH.work(Image.open(img_path), k): | |
| # print("continue " + str(now_frame)) | |
| # continue | |
| # print('[OCR working] ocr at frame', now_frame) | |
| # # 进行ocr | |
| # now_ocr_result = imageocr.work(img_path, cropimgH) | |
| # # 将识别结果添加 | |
| # ret.append({'frame': now_frame, 'result': now_ocr_result}) | |
| # # print('[OCR done] ocr at frame', now_frame) | |
| # return ret | |
| # 处理视频 | |
| def video_work(w2t, model_vggish, video_file_path, output_path, audio_path='./FakeVD/code_test/preprocessed_feature/1.wav'): | |
| video_file_name = os.path.basename(video_file_path) | |
| video_id = os.path.splitext(video_file_name)[0] | |
| st = time.time() | |
| print('[video_work] working') | |
| # v2f = V2frame() | |
| ################## | |
| # w2t = wav2text() | |
| # 分离wav | |
| print('[extract audio]', video_file_path, audio_path) | |
| extract_audio(model_vggish, video_file_path, feature_path="./FakeVD/code_test/preprocessed_feature/1.pkl", audio_file_path=audio_path) | |
| # 分离帧 | |
| # print('[extract frame]', path, audio_path) | |
| # v2f.work(path, frame_path) | |
| # 进行wav处理 | |
| print('[wav2text] working') | |
| wav_result = w2t.work(audio_path) | |
| #print('[wav_result]', wav_result) | |
| # 进行ocr处理 | |
| # ocr_result = OCR(frame_path) | |
| # print('ocr_result', ocr_result) | |
| # cap = cv2.VideoCapture(path) | |
| # fps = cap.get(cv2.CAP_PROP_FPS) | |
| print("time: " + str(time.time() - st)) | |
| result = {"video_id": video_id, 'text': wav_result[0]['text']} # , 'ocr_result': ocr_result , "fps": fps | |
| with open(output_path, 'w') as f: | |
| f.write(json.dumps(result, ensure_ascii=False)) | |
| # if __name__ == '__main__': | |
| # file = sys.argv[1] | |
| # r = open(file, "r").readlines() | |
| # for name in r: | |
| # name = name.strip() | |
| # data_path = os.path.join(datasave_path, name[:-4]+'.json') | |
| # if os.path.exists(data_path): | |
| # # print("continue " + name) | |
| # continue | |
| # if name == '': | |
| # continue | |
| # path = os.path.join(video_path, name) | |
| # result = video_work(path, name[:-4]) | |
| # with open(data_path, 'w') as f: | |
| # f.write(json.dumps(result, ensure_ascii=False)) | |