| | |
| | import json |
| | import os |
| | import time |
| |
|
| | import click |
| | import numpy as np |
| | import torch |
| |
|
| | from genmo.mochi_preview.pipelines import ( |
| | DecoderModelFactory, |
| | DitModelFactory, |
| | MochiMultiGPUPipeline, |
| | MochiSingleGPUPipeline, |
| | T5ModelFactory, |
| | linear_quadratic_schedule, |
| | ) |
| | from genmo.lib.progress import progress_bar |
| | from genmo.lib.utils import save_video |
| |
|
| | pipeline = None |
| | model_dir_path = None |
| | num_gpus = torch.cuda.device_count() |
| | cpu_offload = False |
| |
|
| |
|
| | def configure_model(model_dir_path_, cpu_offload_): |
| | global model_dir_path, cpu_offload |
| | model_dir_path = model_dir_path_ |
| | cpu_offload = cpu_offload_ |
| |
|
| |
|
| | def load_model(): |
| | global num_gpus, pipeline, model_dir_path |
| | if pipeline is None: |
| | MOCHI_DIR = model_dir_path |
| | print(f"Launching with {num_gpus} GPUs. If you want to force single GPU mode use CUDA_VISIBLE_DEVICES=0.") |
| | klass = MochiSingleGPUPipeline if num_gpus == 1 else MochiMultiGPUPipeline |
| | kwargs = dict( |
| | text_encoder_factory=T5ModelFactory(), |
| | dit_factory=DitModelFactory(model_path=f"{MOCHI_DIR}/dit.safetensors", model_dtype="bf16"), |
| | decoder_factory=DecoderModelFactory( |
| | model_path=f"{MOCHI_DIR}/vae.safetensors", |
| | model_stats_path=f"{MOCHI_DIR}/vae_stats.json", |
| | ), |
| | ) |
| | if num_gpus > 1: |
| | assert not cpu_offload, "CPU offload not supported in multi-GPU mode" |
| | kwargs["world_size"] = num_gpus |
| | else: |
| | kwargs["cpu_offload"] = cpu_offload |
| | kwargs["tiled_decode"] = True |
| | pipeline = klass(**kwargs) |
| |
|
| |
|
| | def generate_video( |
| | prompt, |
| | negative_prompt, |
| | width, |
| | height, |
| | num_frames, |
| | seed, |
| | cfg_scale, |
| | num_inference_steps, |
| | ): |
| | load_model() |
| |
|
| | |
| | |
| | sigma_schedule = linear_quadratic_schedule(num_inference_steps, 0.025) |
| |
|
| | |
| | |
| | |
| | |
| | cfg_schedule = [cfg_scale] * num_inference_steps |
| |
|
| | args = { |
| | "height": height, |
| | "width": width, |
| | "num_frames": num_frames, |
| | "sigma_schedule": sigma_schedule, |
| | "cfg_schedule": cfg_schedule, |
| | "num_inference_steps": num_inference_steps, |
| | |
| | |
| | "batch_cfg": False, |
| | "prompt": prompt, |
| | "negative_prompt": negative_prompt, |
| | "seed": seed, |
| | } |
| |
|
| | with progress_bar(type="tqdm"): |
| | final_frames = pipeline(**args) |
| |
|
| | final_frames = final_frames[0] |
| |
|
| | assert isinstance(final_frames, np.ndarray) |
| | assert final_frames.dtype == np.float32 |
| |
|
| | os.makedirs("outputs", exist_ok=True) |
| | output_path = os.path.join("outputs", f"output_{int(time.time())}.mp4") |
| |
|
| |
|
| | save_video(final_frames, output_path) |
| | json_path = os.path.splitext(output_path)[0] + ".json" |
| | json.dump(args, open(json_path, "w"), indent=4) |
| |
|
| | return output_path |
| |
|
| | from textwrap import dedent |
| |
|
| | DEFAULT_PROMPT = dedent(""" |
| | A hand with delicate fingers picks up a bright yellow lemon from a wooden bowl |
| | filled with lemons and sprigs of mint against a peach-colored background. |
| | The hand gently tosses the lemon up and catches it, showcasing its smooth texture. |
| | A beige string bag sits beside the bowl, adding a rustic touch to the scene. |
| | Additional lemons, one halved, are scattered around the base of the bowl. |
| | The even lighting enhances the vibrant colors and creates a fresh, |
| | inviting atmosphere. |
| | """) |
| |
|
| | @click.command() |
| | @click.option("--prompt", default=DEFAULT_PROMPT, help="Prompt for video generation.") |
| | @click.option("--negative_prompt", default="", help="Negative prompt for video generation.") |
| | @click.option("--width", default=848, type=int, help="Width of the video.") |
| | @click.option("--height", default=480, type=int, help="Height of the video.") |
| | @click.option("--num_frames", default=163, type=int, help="Number of frames.") |
| | @click.option("--seed", default=12345, type=int, help="Random seed.") |
| | @click.option("--cfg_scale", default=4.5, type=float, help="CFG Scale.") |
| | @click.option("--num_steps", default=64, type=int, help="Number of inference steps.") |
| | @click.option("--model_dir", required=True, help="Path to the model directory.") |
| | @click.option("--cpu_offload", is_flag=True, help="Whether to offload model to CPU") |
| | def generate_cli( |
| | prompt, negative_prompt, width, height, num_frames, seed, cfg_scale, num_steps, model_dir, cpu_offload |
| | ): |
| | configure_model(model_dir, cpu_offload) |
| | output = generate_video( |
| | prompt, |
| | negative_prompt, |
| | width, |
| | height, |
| | num_frames, |
| | seed, |
| | cfg_scale, |
| | num_steps, |
| | ) |
| | click.echo(f"Video generated at: {output}") |
| |
|
| |
|
| | if __name__ == "__main__": |
| | generate_cli() |
| |
|