import os import random import time import copy import gradio as gr import dashscope from dashscope import VideoSynthesis from examples import t2v_examples, i2v_examples # Configure API key DASHSCOPE_API_KEY = os.getenv('AIzaSyBbLpmcLFbYG_B6qf0VsOkzZt1JEHCI4qQ') dashscope.api_key = AIzaSyBbLpmcLFbYG_B6qf0VsOkzZt1JEHCI4qQ # Task management constants KEEP_SUCCESS_TASK = 3600 * 10 KEEP_RUNING_TASK = 3600 * 1 LIMIT_RUNING_TASK = 10 LIMIT_HISTORY_RUNING_TASK = 20 # Global variables task_status = {} total_task_info = { "total_process_cost": 0, "total_complete_task": 0, "total_submit": 0, "latest_1h_submit_status": {} } def get_submit_code(): submit_code = random.randint(0, 2147483647) for sub_c, sub_info in copy.deepcopy(total_task_info["latest_1h_submit_status"]).items(): if time.time() - sub_info > 3600: total_task_info["latest_1h_submit_status"].pop(sub_c) return submit_code def t2v_generation_async(prompt, size, watermark_wan, seed=-1): seed = seed if seed >= 0 else random.randint(0, 2147483647) total_task_info["latest_1h_submit_status"][get_submit_code()] = time.time() total_task_info["total_submit"] += 1 if not allow_task_num(): gr.Info(f"Warning: The number of running tasks is too large, the estimate waiting time is {get_waiting_time('-1')} s.") return None, False, gr.Button(visible=True), gr.Button(visible=False), gr.Slider(), gr.Slider() try: rsp = VideoSynthesis.async_call( model="wanx2.1-t2v-plus", prompt=prompt, size=size, seed=seed, watermark_wanx=watermark_wan ) task_id = rsp.output.task_id status = False return task_id, status, gr.Button(visible=False), gr.Button(visible=True), get_cost_time(task_id), get_waiting_time(task_id) except Exception as e: gr.Warning(f"Warning: {e}") return None, True, gr.Button(), gr.Button(), gr.Slider(), gr.Slider() def i2v_generation_async(prompt, image, watermark_wan, seed=-1): seed = seed if seed >= 0 else random.randint(0, 2147483647) total_task_info["latest_1h_submit_status"][get_submit_code()] = time.time() total_task_info["total_submit"] += 1 if not allow_task_num(): gr.Info(f"Warning: The number of running tasks is too large, the estimate waiting time is {get_waiting_time('-1')} s.") return "", None, gr.Button(visible=True), gr.Button(visible=False), gr.Slider(), gr.Slider() try: rsp = VideoSynthesis.async_call( model="wanx2.1-i2v-plus", prompt=prompt, seed=seed, img_url=image, watermark_wanx=watermark_wan ) task_id = rsp.output.task_id status = False return task_id, status, gr.Button(visible=False), gr.Button(visible=True), get_cost_time(task_id), get_waiting_time(task_id) except Exception as e: gr.Warning(f"Warning: {e}") return "", None, gr.Button(), gr.Button(), gr.Slider(), gr.Slider() def get_result_with_task_id(task_id): if task_id == "": return True, None try: rsp = VideoSynthesis.fetch(task=task_id) if rsp.output.task_status == "FAILED": gr.Info(f"Warning: task running {rsp.output.task_status}") status = True video_url = None else: video_url = rsp.output.video_url video_url = video_url if video_url != "" else None status = video_url is not None if status: total_task_info["total_complete_task"] += 1 total_task_info["total_process_cost"] += time.time() - task_status[task_id]["time"] except: video_url = None status = False return status, None if video_url == "" else video_url def allow_task_num(): num = 0 total_num = 0 for task_id in task_status: if not task_status[task_id]["status"] and task_status[task_id]["time"] + 1800 > time.time(): num += 1 if not task_status[task_id]["status"]: total_num += 1 return num < LIMIT_RUNING_TASK or total_num < LIMIT_HISTORY_RUNING_TASK def get_waiting_time(task_id): num = 0 for task_id in task_status: if not task_status[task_id]["status"]: num += 1 latest_submit_tasks = len(total_task_info["latest_1h_submit_status"]) if task_id in task_status: return int(640 - (time.time() - task_status[task_id]["time"])) else: return int(latest_submit_tasks * (total_task_info["total_process_cost"]/(total_task_info["total_complete_task"]+1))) def get_cost_time(task_id): if task_id in task_status and not task_status[task_id]["status"]: et = int(time.time() - task_status[task_id]["time"]) return f"{et:.2f}" else: return gr.Textbox() def clean_task_status(): for task_id in copy.deepcopy(task_status): if task_id == "": continue if task_status[task_id]["status"]: if task_status[task_id]["time"] + KEEP_SUCCESS_TASK < time.time(): task_status.pop(task_id) else: if task_status[task_id]["time"] + KEEP_RUNING_TASK < time.time(): task_status.pop(task_id) def process_change(task_id, task): status = task_status.get(task_id, {"status":False})["status"] if status: video_url = task_status[task_id]["url"] ret_t2v_btn = gr.Button(visible=True) if task == 't2v' else gr.Button() ret_t2v_status_btn = gr.Button(visible=False) if task == 't2v' else gr.Button() ret_i2v_btn = gr.Button(visible=True) if task == 'i2v' else gr.Button() ret_i2v_status_btn = gr.Button(visible=False) if task == 'i2v' else gr.Button() return gr.Video(value=video_url), ret_t2v_btn, ret_i2v_btn, ret_t2v_status_btn, ret_i2v_status_btn return gr.Video(value=None), gr.Button(), gr.Button(), gr.Button(), gr.Button() def status_refresh(task_id, task, status): if task_id in task_status and not task_status[task_id]["status"]: cost_time = int(time.time() - task_status[task_id]["time"]) else: cost_time = 0 status, video_url = get_result_with_task_id(task_id) if task_id not in task_status: task_status[task_id] = {"status": status, "url": video_url, "time": time.time(), "value": 100 if status else 0} else: task_status[task_id]["status"] = status task_status[task_id]["url"] = video_url waiting_time = get_waiting_time(task_id) value = task_status.get(task_id, {"value": 100})["value"] value = max(value, int(cost_time*100/waiting_time)) task_status[task_id]["value"] = value if value < 100 else 100 if not video_url == "" and status: value = 100 process_bar = gr.Slider( label=f"({value}%)Generating" if value % 2 == 1 else f"({value}%)Generating.....", value=value ) process_change_ret = process_change(task_id, task) return *process_change_ret, cost_time, waiting_time, process_bar # Create the Gradio interface with gr.Blocks() as demo: gr.HTML("""
Wan2.1: Open and Advanced Large-Scale Video Generative Models
Code | Huggingface | Modelscope
We are excited to announce that Wan's international experience page is officially live, supporting image and video generation, and it's completely free. We welcome you to try it out! Wan Web
""") t2v_task_id = gr.State(value="") i2v_task_id = gr.State(value="") status = gr.State(value=False) task = gr.State(value="t2v") with gr.Row(): with gr.Column(): with gr.Row(): with gr.Tabs(): # Text to Video Tab with gr.TabItem("Text to Video") as t2v_tab: with gr.Row(): txt2vid_prompt = gr.Textbox( label="Prompt", placeholder="Describe the video you want to generate", lines=19, ) with gr.Row(): resolution = gr.Dropdown( label="Resolution", choices=["1280*720", "960*960", "720*1280", "1088*832", "832*1088"], value="1280*720", ) with gr.Row(): run_t2v_button = gr.Button("Generate Video") t2v_refresh_status = gr.Button("Refresh Generating Status", visible=False) # Image to Video Tab with gr.TabItem("Image to Video") as i2v_tab: with gr.Row(): with gr.Column(): img2vid_image = gr.Image( type="filepath", label="Upload Input Image", elem_id="image_upload", ) img2vid_prompt = gr.Textbox( label="Prompt", placeholder="Describe the video you want to generate", value="", lines=5, ) with gr.Row(): run_i2v_button = gr.Button("Generate Video") i2v_refresh_status = gr.Button("Refresh Generating Status", visible=False) with gr.Column(): with gr.Row(): result_gallery = gr.Video( label='Generated Video', interactive=False, height=500 ) with gr.Row(): watermark_wan = gr.Checkbox(label="Watermark", value=True, visible=True, container=False) seed = gr.Number(label="Seed", value=-1, container=True) cost_time = gr.Number(label="Cost Time(secs)", value=0, interactive=False, container=True) waiting_time = gr.Number(label="Estimated Waiting Time(secs)", value=0, interactive=False, container=True) process_bar = gr.Slider( show_label=True, label="", value=100, maximum=100, interactive=True, container=True ) with gr.Row(): gr.Markdown('Due to automatic refresh of task status causing significant network congestion, please manually click the "Refresh Generating Status" button to check the task status.') fake_video = gr.Video(label='Examples', visible=False, interactive=False) with gr.Row(visible=True) as t2v_eg: gr.Examples( t2v_examples, inputs=[txt2vid_prompt, result_gallery], outputs=[result_gallery] ) with gr.Row(visible=False) as i2v_eg: gr.Examples( i2v_examples, inputs=[img2vid_prompt, img2vid_image, result_gallery], outputs=[result_gallery] ) def switch_i2v_tab(): return gr.Row(visible=False), gr.Row(visible=True), "i2v" def switch_t2v_tab(): return gr.Row(visible=True), gr.Row(visible=False), "t2v" i2v_tab.select(switch_i2v_tab, outputs=[t2v_eg, i2v_eg, task]) t2v_tab.select(switch_t2v_tab, outputs=[t2v_eg, i2v_eg, task]) run_t2v_button.click( fn=t2v_generation_async, inputs=[txt2vid_prompt, resolution, watermark_wan, seed], outputs=[t2v_task_id, status, run_t2v_button, t2v_refresh_status, cost_time, waiting_time], ) run_i2v_button.click( fn=i2v_generation_async, inputs=[img2vid_prompt, img2vid_image, watermark_wan, seed], outputs=[i2v_task_id, status, run_i2v_button, i2v_refresh_status, cost_time, waiting_time], ) t2v_refresh_status.click( fn=status_refresh, inputs=[t2v_task_id, task, status], outputs=[result_gallery, run_t2v_button, run_i2v_button, t2v_refresh_status, i2v_refresh_status, cost_time, waiting_time, process_bar] ) i2v_refresh_status.click( fn=status_refresh, inputs=[i2v_task_id, task, status], outputs=[result_gallery, run_t2v_button, run_i2v_button, t2v_refresh_status, i2v_refresh_status, cost_time, waiting_time, process_bar] ) demo.launch()