Fggfg / app.py
Ksjsjjdj's picture
Update app.py
54d4ded verified
import os
import sys
import subprocess
import traceback
import gc
import tempfile
import random
import time
from pathlib import Path
os.system("pip install spaces-0.1.0-py3-none-any.whl moviepy==1.0.3 imageio[ffmpeg] librosa soundfile accelerate")
os.system("pip install git+https://github.com/tolgacangoz/diffusers.git")
import spaces
import torch
import numpy as np
import librosa
import soundfile as sf
from PIL import Image
from moviepy.editor import VideoFileClip, concatenate_videoclips
from huggingface_hub import snapshot_download
import gradio as gr
try:
import diffusers
from diffusers import AutoencoderKLWan, WanPipeline, WanImageToVideoPipeline, UniPCMultistepScheduler, WanSpeechToVideoPipeline
from diffusers.utils import export_to_video
except ImportError:
pass
MODEL_ID_TI2V = "FastVideo/FastWan2.2-TI2V-5B-FullAttn-Diffusers"
MODEL_ID_S2V = "tolgacangoz/Wan2.2-S2V-14B-Diffusers"
MODELS = {
"ti2v_text": None,
"ti2v_image": None,
"s2v": None
}
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
def load_models_at_startup():
global MODELS
try:
vae = AutoencoderKLWan.from_pretrained(MODEL_ID_TI2V, subfolder="vae", torch_dtype=torch.float32)
text_pipe = WanPipeline.from_pretrained(MODEL_ID_TI2V, vae=vae, torch_dtype=torch.bfloat16)
text_pipe.scheduler = UniPCMultistepScheduler.from_config(text_pipe.scheduler.config, flow_shift=8.0)
try:
if DEVICE == "cuda":
text_pipe.enable_model_cpu_offload()
else:
text_pipe.to(DEVICE)
except RuntimeError:
text_pipe.to("cpu")
MODELS["ti2v_text"] = text_pipe
image_pipe = WanImageToVideoPipeline.from_pretrained(MODEL_ID_TI2V, vae=vae, torch_dtype=torch.bfloat16)
image_pipe.scheduler = UniPCMultistepScheduler.from_config(image_pipe.scheduler.config, flow_shift=8.0)
try:
if DEVICE == "cuda":
image_pipe.enable_model_cpu_offload()
else:
image_pipe.to(DEVICE)
except RuntimeError:
image_pipe.to("cpu")
MODELS["ti2v_image"] = image_pipe
except Exception as e:
pass
try:
s2v_pipe = WanSpeechToVideoPipeline.from_pretrained(
MODEL_ID_S2V,
torch_dtype=torch.bfloat16
)
try:
if DEVICE == "cuda":
s2v_pipe.enable_model_cpu_offload()
else:
s2v_pipe.to(DEVICE)
except RuntimeError:
s2v_pipe.to("cpu")
MODELS["s2v"] = s2v_pipe
except Exception as e:
pass
load_models_at_startup()
def auto_duration_estimator(mode, input_data, duration_val):
base_overhead = 45
if mode == "s2v":
audio_path = input_data
if audio_path:
try:
dur = librosa.get_duration(filename=audio_path)
return int(base_overhead + (dur * 15))
except:
return 120
return 120
else:
num_images = len(input_data) if input_data else 0
if num_images > 0:
total_seconds = max(duration_val, num_images * 2)
else:
total_seconds = duration_val
return int(base_overhead + (total_seconds * 12))
def fast_stitch_videos(video_paths):
if not video_paths: return None
if len(video_paths) == 1: return video_paths[0]
try:
with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as f:
for path in video_paths:
f.write(f"file '{path}'\n")
list_path = f.name
with tempfile.NamedTemporaryFile(suffix="_stitched_stream.mp4", delete=False) as tmp:
out_path = tmp.name
cmd = [
"ffmpeg", "-y", "-f", "concat", "-safe", "0",
"-i", list_path, "-c", "copy", out_path
]
subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
os.remove(list_path)
return out_path
except:
return video_paths[-1]
@spaces(duration=lambda *args: auto_duration_estimator("ti2v", args[0], args[5]))
def generate_ti2v_gpu_stream(input_files, prompt, height, width, negative_prompt, duration_seconds, guidance_scale, steps, seed, randomize_seed, progress=gr.Progress(track_tqdm=True)):
global MODELS
text_to_video_pipe = MODELS.get("ti2v_text")
image_to_video_pipe = MODELS.get("ti2v_image")
if not text_to_video_pipe or not image_to_video_pipe:
raise gr.Error("Models failed to load at startup.")
MOD_VALUE = 32
target_h = max(MOD_VALUE, (int(height) // MOD_VALUE) * MOD_VALUE)
target_w = max(MOD_VALUE, (int(width) // MOD_VALUE) * MOD_VALUE)
master_seed = random.randint(0, 2**32 - 1) if randomize_seed else int(seed)
video_clips_paths = []
pil_images = []
if input_files:
files_list = input_files if isinstance(input_files, list) else [input_files]
for f in files_list:
try:
path = f.name if hasattr(f, "name") else f
img = Image.open(path).convert("RGB")
pil_images.append(img)
except:
continue
SAFE_CHUNK_DURATION = 4.0
FIXED_FPS = 24
last_preview_frame = None
if len(pil_images) > 0:
seconds_per_image = max(2.0, duration_seconds / len(pil_images))
for i, img in enumerate(pil_images):
current_chunk_duration = min(seconds_per_image, SAFE_CHUNK_DURATION)
num_frames = int(current_chunk_duration * FIXED_FPS)
local_seed = master_seed + i
generator = torch.Generator(device=DEVICE).manual_seed(local_seed)
resized_image = img.resize((target_w, target_h))
try:
with torch.inference_mode():
output_frames = image_to_video_pipe(
image=resized_image,
prompt=prompt,
negative_prompt=negative_prompt,
height=target_h,
width=target_w,
num_frames=num_frames,
guidance_scale=float(guidance_scale),
num_inference_steps=int(steps),
generator=generator
).frames[0]
with tempfile.NamedTemporaryFile(suffix=f"_img_{i}.mp4", delete=False) as tmp:
export_to_video(output_frames, tmp.name, fps=FIXED_FPS)
video_clips_paths.append(tmp.name)
if len(output_frames) > 0:
last_preview_frame = output_frames[-1]
current_stitched = fast_stitch_videos(video_clips_paths)
yield current_stitched, last_preview_frame, master_seed
except Exception:
continue
else:
num_chunks = int(np.ceil(duration_seconds / SAFE_CHUNK_DURATION))
frames_per_chunk = int(SAFE_CHUNK_DURATION * FIXED_FPS)
for i in range(num_chunks):
chunk_seed = master_seed + (i * 100)
generator = torch.Generator(device=DEVICE).manual_seed(chunk_seed)
with torch.inference_mode():
output_frames = text_to_video_pipe(
prompt=prompt,
negative_prompt=negative_prompt,
height=target_h,
width=target_w,
num_frames=frames_per_chunk,
guidance_scale=float(guidance_scale),
num_inference_steps=int(steps),
generator=generator
).frames[0]
with tempfile.NamedTemporaryFile(suffix=f"_chunk_{i}.mp4", delete=False) as tmp:
export_to_video(output_frames, tmp.name, fps=FIXED_FPS)
video_clips_paths.append(tmp.name)
if len(output_frames) > 0:
last_preview_frame = output_frames[-1]
current_stitched = fast_stitch_videos(video_clips_paths)
yield current_stitched, last_preview_frame, master_seed
def merge_audio_video(video_path, audio_path, output_path):
cmd = [
"ffmpeg", "-y",
"-i", video_path,
"-i", audio_path,
"-c:v", "copy",
"-c:a", "aac",
"-map", "0:v:0", "-map", "1:a:0",
"-shortest",
output_path
]
subprocess.run(cmd, check=True)
return output_path
def load_audio_for_model(audio_filepath):
try:
wav, sr = librosa.load(audio_filepath, sr=16000)
return wav, sr
except:
return None, None
@spaces(duration=lambda *args: auto_duration_estimator("s2v", args[1], 0))
def generate_s2v_gpu(image_input, audio_filepath, prompt, seed, randomize_seed):
global MODELS
pipe = MODELS.get("s2v")
if not pipe:
raise gr.Error("S2V Model not initialized.")
if image_input is None or audio_filepath is None:
raise gr.Error("Inputs Missing")
audio_values, sample_rate = load_audio_for_model(audio_filepath)
if audio_values is None:
raise gr.Error("Invalid Audio")
init_image = image_input.convert("RGB")
w, h = init_image.size
w = (w // 16) * 16
h = (h // 16) * 16
init_image = init_image.resize((w, h), Image.LANCZOS)
current_seed = random.randint(0, 2**32 - 1) if randomize_seed else int(seed)
generator = torch.Generator(device=DEVICE).manual_seed(current_seed)
with torch.inference_mode():
out = pipe(
image=init_image,
audio=audio_values,
num_inference_steps=25,
guidance_scale=4.0,
sampling_rate=sample_rate,
prompt=prompt,
generator=generator
)
frames = out.frames[0]
with tempfile.NamedTemporaryFile(suffix="_temp_mute.mp4", delete=False) as tmp_vid:
temp_mute_path = tmp_vid.name
with tempfile.NamedTemporaryFile(suffix="_output_s2v.mp4", delete=False) as tmp_final:
final_video_path = tmp_final.name
export_to_video(frames, temp_mute_path, fps=30)
final_output = merge_audio_video(temp_mute_path, audio_filepath, final_video_path)
return final_output, current_seed
with gr.Blocks(theme=gr.themes.Soft()) as demo:
gr.Markdown("# Wan 2.2 Unified Streaming Video Platform")
with gr.Tabs():
with gr.TabItem("Text & Image to Video (Streaming & Long Duration)"):
with gr.Row():
with gr.Column(scale=1):
ti2v_files = gr.File(label="Input Images", file_count="multiple", type="filepath", file_types=["image"])
ti2v_prompt = gr.Textbox(label="Prompt", value="Cinematic view, realistic lighting, 4k", lines=2)
ti2v_duration = gr.Slider(minimum=2, maximum=300, step=1, value=5, label="Total Duration (s)")
with gr.Accordion("Advanced", open=False):
ti2v_neg = gr.Textbox(label="Negative Prompt", value="low quality, distortion, text, watermark", lines=2)
ti2v_seed = gr.Slider(label="Seed", minimum=0, maximum=2**32-1, step=1, value=42)
ti2v_rand = gr.Checkbox(label="Random Seed", value=True)
with gr.Row():
ti2v_h = gr.Slider(256, 1024, 32, 832, label="Height")
ti2v_w = gr.Slider(256, 1024, 32, 832, label="Width")
ti2v_steps = gr.Slider(2, 10, 1, 4, label="Steps")
ti2v_scale = gr.Slider(1.0, 8.0, 0.1, 5.0, label="CFG")
btn_ti2v = gr.Button("Start Streaming Generation", variant="primary")
with gr.Column(scale=2):
with gr.Row():
out_ti2v = gr.Video(label="Live Video Stream", autoplay=True)
out_preview_ti2v = gr.Image(label="Last Frame Preview", interactive=False)
out_seed_ti2v = gr.Number(label="Seed Used")
btn_ti2v.click(
fn=generate_ti2v_gpu_stream,
inputs=[ti2v_files, ti2v_prompt, ti2v_h, ti2v_w, ti2v_neg, ti2v_duration, ti2v_scale, ti2v_steps, ti2v_seed, ti2v_rand],
outputs=[out_ti2v, out_preview_ti2v, out_seed_ti2v]
)
with gr.TabItem("Speech to Video (S2V)"):
with gr.Row():
with gr.Column(scale=1):
s2v_img = gr.Image(label="Reference Image", type="pil")
s2v_audio = gr.Audio(label="Audio Input", type="filepath")
s2v_prompt = gr.Textbox(label="Prompt", value="Realistic movement, talking face")
s2v_seed = gr.Slider(label="Seed", minimum=0, maximum=2**32-1, step=1, value=42)
s2v_rand = gr.Checkbox(label="Random Seed", value=True)
btn_s2v = gr.Button("Generate S2V", variant="primary")
with gr.Column(scale=2):
out_s2v = gr.Video(label="Result")
out_seed_s2v = gr.Number(label="Seed Used")
btn_s2v.click(generate_s2v_gpu, [s2v_img, s2v_audio, s2v_prompt, s2v_seed, s2v_rand], [out_s2v, out_seed_s2v])
if __name__ == "__main__":
demo.queue().launch()