Spaces:
Sleeping
Sleeping
| ''' | |
| 1. (LLM) slide generation | |
| 2. (VLM) subtitle and cursor prompt generation | |
| 3. TTS->audio; GUI&WhisperX Grounding->cursor; | |
| 4. Talking Gen: local-[hallo2, fantasy, ...], api-[HeyGen] | |
| 5. Merage | |
| ''' | |
| import cv2 | |
| import pdb | |
| import json | |
| import time | |
| import shutil | |
| import asyncio | |
| import os, sys | |
| import argparse | |
| import subprocess | |
| from os import path | |
| from pdf2image import convert_from_path | |
| print("Initializing...") | |
| from speech_gen import tts_per_slide | |
| from subtitle_render import add_subtitles | |
| from talking_gen import talking_gen_per_slide | |
| from cursor_gen import cursor_gen_per_sentence | |
| from slide_code_gen import latex_code_gen | |
| # from slide_code_gen_select_improvement import latex_code_gen_upgrade | |
| from cursor_render import render_video_with_cursor_from_json | |
| from subtitle_cursor_prompt_gen import subtitle_cursor_gen | |
| from wei_utils import get_agent_config | |
| # os.environ["GEMINI_API_KEY"] = "" | |
| # os.environ["OPENAI_API_KEY"] = "" | |
| def copy_folder(src_dir, dst_dir): | |
| if not os.path.exists(src_dir): raise FileNotFoundError(f"no such dir: {src_dir}") | |
| os.makedirs(os.path.dirname(dst_dir), exist_ok=True) | |
| shutil.copytree(src_dir, dst_dir) | |
| def str2list(s): return [int(x) for x in s.split(',')] | |
| if __name__ == '__main__': | |
| parser = argparse.ArgumentParser(description='Paper2Video Generation Pipeline') | |
| parser.add_argument('--result_dir', type=str, default='./result/zeyu') | |
| parser.add_argument('--model_name_t', type=str, default='gpt-4.1') | |
| parser.add_argument('--model_name_v', type=str, default='gpt-4.1') | |
| parser.add_argument('--model_name_talking', type=str, default='hallo2') | |
| parser.add_argument('--paper_latex_root', type=str, default='./assets/demo/latex_proj') | |
| parser.add_argument('--ref_img', type=str, default='./assets/demo/zeyu.png') | |
| parser.add_argument('--ref_audio', type=str, default='./assets/demo/zeyu.wav') | |
| parser.add_argument('--ref_text', type=str, default=None) | |
| parser.add_argument('--gpu_list', type=str2list, default="") | |
| parser.add_argument('--if_tree_search', type=bool, default=True) | |
| parser.add_argument('--beamer_templete_prompt', type=str, default=None) | |
| parser.add_argument('--stage', type=str, default="[\"0\"]") | |
| parser.add_argument('--talking_head_env', type=str, default="") | |
| # slide+subtitle: 1; | |
| # tts+cusor: 2; | |
| # talking-head: 3: | |
| # all: 0 | |
| args = parser.parse_args() | |
| stage = json.loads(args.stage) | |
| print("start", "stage:", stage, args.gpu_list) | |
| cursor_img_path = "./cursor_image/red.png" | |
| os.makedirs(args.result_dir, exist_ok=True) # result dir | |
| agent_config_t = get_agent_config(args.model_name_t) # LLM | |
| agent_config_v = get_agent_config(args.model_name_v) # VLM | |
| copy_latex_proj_path = path.join(args.result_dir, path.basename(args.paper_latex_root)) | |
| if path.exists(copy_latex_proj_path) is False: | |
| copy_folder(args.paper_latex_root, copy_latex_proj_path) | |
| args.paper_latex_root = copy_latex_proj_path | |
| if path.exists(path.join(args.result_dir, "sat.json")) is True: | |
| with open(path.join(args.result_dir, "sat.json"), 'r') as f: | |
| time_second = json.load(f) | |
| else: time_second = {} | |
| if path.exists(path.join(args.result_dir, "token.json")) is True: | |
| with open(path.join(args.result_dir, "token.json"), 'r') as f: | |
| token_usage = json.load(f) | |
| else: token_usage = {} | |
| ## Step 1: Slide Generation | |
| slide_latex_path = path.join(args.paper_latex_root, "slides.tex") | |
| slide_image_dir = path.join(args.result_dir, 'slide_imgs') | |
| os.makedirs(slide_image_dir, exist_ok=True) | |
| start_time = time.time() # start time | |
| if "1" in stage or "0" in stage: | |
| prompt_path = "./prompts/slide_beamer_prompt.txt" | |
| if args.if_tree_search is True: | |
| usage_slide, beamer_path = latex_code_gen(prompt_path=prompt_path, tex_dir=args.paper_latex_root, beamer_save_path=slide_latex_path, | |
| model_config_ll=agent_config_t, model_config_vl=agent_config_v, beamer_temp_name=args.beamer_templete_prompt) | |
| else: | |
| paper_latex_path = path.join(args.paper_latex_root, "main.tex") | |
| usage_slide = latex_code_gen(prompt_path=prompt_path, tex_dir=args.paper_latex_root, tex_path=paper_latex_path, beamer_save_path=slide_latex_path, model_config=agent_config_t) | |
| slide_imgs = convert_from_path(beamer_path, dpi=400) | |
| for i, img in enumerate(slide_imgs): img.save(path.join(slide_image_dir, f"{i+1}.png")) # save slides as images | |
| if args.model_name_t not in token_usage.keys(): | |
| token_usage[args.model_name_t] = [usage_slide] | |
| else: token_usage[args.model_name_t].append(usage_slide) | |
| step1_time = time.time() | |
| time_second["slide_gen"] = [step1_time-start_time] | |
| print("Slide Generation", step1_time-start_time) | |
| ## Step 2: Subtitle and Cursor Prompt Generation | |
| start_time = time.time() # start time | |
| subtitle_cursor_save_path = path.join(args.result_dir, 'subtitle_w_cursor.txt') | |
| cursor_save_path = path.join(args.result_dir, 'cursor.json') | |
| speech_save_dir = path.join(args.result_dir, 'audio') | |
| if "2" in stage or "0" in stage: | |
| prompt_path = "./prompts/slide_subtitle_cursor_prompt.txt" | |
| subtitle, usage_subtitle = subtitle_cursor_gen(slide_image_dir, prompt_path, agent_config_v) | |
| with open(subtitle_cursor_save_path, 'w') as f: f.write(subtitle) | |
| if args.model_name_v not in token_usage.keys(): | |
| token_usage[args.model_name_v] = [usage_subtitle] | |
| else: token_usage[args.model_name_v].append(usage_subtitle) | |
| step2_time = time.time() | |
| time_second["subtitle_cursor_prompt_gen"] = [step2_time-start_time] | |
| print("Subtitle and Cursor Prompt Generation", step2_time-start_time) | |
| ## Step 3-1: Speech Generation | |
| tts_per_slide(model_type='f5', script_path=subtitle_cursor_save_path, | |
| speech_save_dir=speech_save_dir, ref_audio=args.ref_audio, ref_text=args.ref_text) | |
| step3_1_time = time.time() | |
| time_second["tts"] = [step3_1_time-step2_time] | |
| print("Speech Generation", step3_1_time-step2_time) | |
| ## Step 3-2: Cursor Generation | |
| os.environ["PYTHONHASHSEED"] = "random" | |
| cursor_token = cursor_gen_per_sentence(script_path=subtitle_cursor_save_path, slide_img_dir=slide_image_dir, | |
| slide_audio_dir=speech_save_dir, cursor_save_path=cursor_save_path, gpu_list=args.gpu_list) | |
| token_usage["cursor"] = cursor_token | |
| step3_2_time = time.time() | |
| time_second["cursor_gen"] = [step3_2_time-step3_1_time] | |
| print("Cursor Generation", step3_2_time-step3_1_time) | |
| ## Step 4: Talking Video Generation | |
| start_time = time.time() # start time | |
| if "3" in stage or "0" in stage: | |
| talking_save_dir = path.join(args.result_dir, 'talking_{}'.format(args.model_name_talking)) | |
| talking_inference_input = [] | |
| audio_path_list = [path.join(speech_save_dir, name) for name in os.listdir(speech_save_dir)] | |
| for audio_path in audio_path_list: talking_inference_input.append([args.ref_img, audio_path]) | |
| talking_gen_per_slide(args.model_name_talking, talking_inference_input, talking_save_dir, args.gpu_list, env_path=args.talking_head_env) | |
| step4_time = time.time() | |
| time_second["talking_gen"] = [step4_time-start_time] | |
| print("Cursor Generation", step4_time-start_time) | |
| ## Step5: Merage | |
| # merage talking and slides | |
| tmp_merage_dir = path.join(args.result_dir, "merage") | |
| tmp_merage_1 = path.join(args.result_dir, "1_merage.mp4") | |
| image_size = cv2.imread(path.join(slide_image_dir, '1.png')).shape | |
| if args.model_name_talking == 'hallo2': | |
| size = max(image_size[0]//6, image_size[1]//6) | |
| width, height = size, size | |
| num_slide = len(os.listdir(slide_image_dir)) | |
| print(args.ref_img.split("/")[-1].split(".")[0]) | |
| merage_cmd = ["./1_merage.bash", slide_image_dir, talking_save_dir, tmp_merage_dir, | |
| str(width), str(height), str(num_slide), tmp_merage_1, args.ref_img.split("/")[-1].replace(".png", "")] | |
| out = subprocess.run(merage_cmd, text=True) | |
| # render cursor | |
| cursor_size = size//6 | |
| tmp_merage_2 = path.join(args.result_dir, "2_merage.mp4") | |
| render_video_with_cursor_from_json(video_path=tmp_merage_1, out_video_path=tmp_merage_2, | |
| json_path=cursor_save_path, cursor_img_path=cursor_img_path, | |
| transition_duration=0.1, cursor_size=cursor_size) | |
| # render subtitle | |
| front_size = size//10 | |
| tmp_merage_3 = path.join(args.result_dir, "3_merage.mp4") | |
| add_subtitles(tmp_merage_2, tmp_merage_3, size//10) | |
| step5_time = time.time() | |
| time_second["merage"] = [step5_time-step4_time] | |
| print("Merage", step5_time-step4_time) | |
| # sat. save | |
| time_second = {"slide_gen": [step1_time-start_time, usage_slide], | |
| "subtitle_cursor_prompt_gen": [step2_time-step1_time, usage_subtitle], | |
| "tts": step3_1_time-step2_time, "cursor_gen": step3_2_time-step3_1_time, | |
| "talking_gen": step4_time-step3_2_time, "merage": step5_time-step4_time} | |
| with open(path.join(args.result_dir, "sat.json"), 'w') as f: json.dump(time_second, f, indent=4) | |
| with open(path.join(args.result_dir, "token.json"), 'w') as f: json.dump(token_usage, f, indent=4) | |