| | import os |
| | import pandas as pd |
| | import torch |
| | import numpy as np |
| | import random |
| |
|
| | from diffusers import StableDiffusionPipeline |
| | from diffusers.utils import export_to_video |
| |
|
| | |
| | os.environ["CUDA_VISIBLE_DEVICES"] = "0" |
| |
|
| | def set_seed(seed: int = 42): |
| | """ |
| | Set random seed for reproducibility |
| | """ |
| | random.seed(seed) |
| | np.random.seed(seed) |
| | torch.manual_seed(seed) |
| | torch.cuda.manual_seed(seed) |
| | torch.cuda.manual_seed_all(seed) |
| | torch.backends.cudnn.deterministic = True |
| | torch.backends.cudnn.benchmark = False |
| |
|
| | |
| | set_seed(42) |
| |
|
| | def generate_image(pipeline, prompt: str, output_path: str): |
| | """ |
| | Generate an image using the Stable Diffusion model and save it |
| | """ |
| | with torch.autocast("cuda"): |
| | image = pipeline(prompt).images[0] |
| | image.save(output_path) |
| |
|
| | import torch |
| | from diffusers.utils import export_to_video |
| |
|
| | def generate_video(pipeline, pipeline_type: str, prompt: str, output_path: str, **kwargs): |
| | """ |
| | Generate a video using different video generation pipelines and save as mp4 or gif |
| | |
| | Parameters: |
| | pipeline: Loaded video generation pipeline |
| | pipeline_type: Type of video model, options are "cogvideo", "ltx", "hunyuan", "animatediff" |
| | prompt: Text description |
| | output_path: Output video path (animatediff defaults to gif, others to mp4) |
| | kwargs: Hyperparameter settings, e.g., width, height, num_frames, num_inference_steps, fps, guidance_scale, etc. |
| | """ |
| | if pipeline_type == "cogvideo": |
| | |
| | video = pipeline( |
| | prompt=prompt, |
| | num_videos_per_prompt=kwargs.get("num_videos_per_prompt", 1), |
| | num_inference_steps=kwargs.get("num_inference_steps", 50), |
| | num_frames=kwargs.get("num_frames", 49), |
| | guidance_scale=kwargs.get("guidance_scale", 6), |
| | generator=kwargs.get("generator", torch.Generator(device="cuda").manual_seed(42)) |
| | ).frames[0] |
| | export_to_video(video, output_path, fps=kwargs.get("fps", 8)) |
| | elif pipeline_type == "ltx": |
| | |
| | video = pipeline( |
| | prompt=prompt, |
| | negative_prompt=kwargs.get("negative_prompt", "worst quality, inconsistent motion, blurry, jittery, distorted"), |
| | width=kwargs.get("width", 704), |
| | height=kwargs.get("height", 480), |
| | num_frames=kwargs.get("num_frames", 161), |
| | num_inference_steps=kwargs.get("num_inference_steps", 50), |
| | ).frames[0] |
| | export_to_video(video, output_path, fps=kwargs.get("fps", 15)) |
| | elif pipeline_type == "hunyuan": |
| | |
| | video = pipeline( |
| | prompt=prompt, |
| | width=kwargs.get("width", 512), |
| | height=kwargs.get("height", 320), |
| | num_frames=kwargs.get("num_frames", 61), |
| | num_inference_steps=kwargs.get("num_inference_steps", 30), |
| | ).frames[0] |
| | export_to_video(video, output_path, fps=kwargs.get("fps", 15)) |
| | elif pipeline_type == "animatediff": |
| | |
| | video = pipeline( |
| | prompt=prompt, |
| | guidance_scale=kwargs.get("guidance_scale", 1.0), |
| | num_inference_steps=kwargs.get("num_inference_steps", 4) |
| | ).frames[0] |
| | export_to_video(video, output_path) |
| | else: |
| | raise ValueError(f"Unknown pipeline type: {pipeline_type}") |
| |
|
| | def load_video_pipeline(pipeline_type: str): |
| | """ |
| | Load the corresponding video generation model based on pipeline_type |
| | |
| | Parameters: |
| | pipeline_type: Options are "cogvideo", "ltx", "hunyuan", "animatediff" |
| | Returns: |
| | Loaded and initialized video generation pipeline |
| | """ |
| | if pipeline_type == "cogvideo": |
| | from diffusers import CogVideoXPipeline |
| | print("Loading video generation model (CogVideoX-5b)...") |
| | pipe = CogVideoXPipeline.from_pretrained( |
| | "THUDM/CogVideoX-5b", |
| | torch_dtype=torch.bfloat16 |
| | ) |
| | pipe.vae.enable_slicing() |
| | pipe.vae.enable_tiling() |
| | pipe.to("cuda") |
| | return pipe |
| | elif pipeline_type == "ltx": |
| | from diffusers import LTXPipeline |
| | print("Loading video generation model (LTX-Video)...") |
| | pipe = LTXPipeline.from_pretrained( |
| | "Lightricks/LTX-Video", |
| | torch_dtype=torch.bfloat16 |
| | ) |
| | pipe.to("cuda") |
| | return pipe |
| | elif pipeline_type == "hunyuan": |
| | from diffusers import BitsAndBytesConfig, HunyuanVideoTransformer3DModel, HunyuanVideoPipeline |
| | from diffusers.hooks import apply_layerwise_casting |
| | from transformers import LlamaModel |
| | print("Loading video generation model (HunyuanVideo)...") |
| | model_id = "hunyuanvideo-community/HunyuanVideo" |
| | quantization_config = BitsAndBytesConfig( |
| | load_in_4bit=True, bnb_4bit_compute_dtype=torch.bfloat16 |
| | ) |
| | text_encoder = LlamaModel.from_pretrained(model_id, subfolder="text_encoder", torch_dtype=torch.float16) |
| | apply_layerwise_casting(text_encoder, storage_dtype=torch.float8_e4m3fn, compute_dtype=torch.float16) |
| | transformer = HunyuanVideoTransformer3DModel.from_pretrained( |
| | model_id, |
| | subfolder="transformer", |
| | quantization_config=quantization_config, |
| | torch_dtype=torch.bfloat16, |
| | ) |
| | pipe = HunyuanVideoPipeline.from_pretrained( |
| | model_id, transformer=transformer, text_encoder=text_encoder, torch_dtype=torch.float16 |
| | ) |
| | pipe.vae.enable_tiling() |
| | pipe.enable_model_cpu_offload() |
| | return pipe |
| | elif pipeline_type == "animatediff": |
| | from diffusers import AnimateDiffPipeline, MotionAdapter, EulerDiscreteScheduler |
| | from huggingface_hub import hf_hub_download |
| | from safetensors.torch import load_file |
| | print("Loading video generation model (AnimateDiff-Lightning)...") |
| | device = "cuda" |
| | dtype = torch.float16 |
| | step = 4 |
| | repo = "ByteDance/AnimateDiff-Lightning" |
| | ckpt = f"animatediff_lightning_{step}step_diffusers.safetensors" |
| | base = "emilianJR/epiCRealism" |
| | adapter = MotionAdapter().to(device, dtype) |
| | |
| | adapter.load_state_dict(load_file(hf_hub_download(repo, ckpt), device=device)) |
| | pipe = AnimateDiffPipeline.from_pretrained(base, motion_adapter=adapter, torch_dtype=dtype).to(device) |
| | pipe.scheduler = EulerDiscreteScheduler.from_config( |
| | pipe.scheduler.config, timestep_spacing="trailing", beta_schedule="linear" |
| | ) |
| | return pipe |
| | else: |
| | raise ValueError(f"Unknown pipeline type: {pipeline_type}") |
| |
|
| | def main(): |
| | |
| | |
| | print("Loading image generation model (Stable Diffusion)...") |
| | pipe_image = StableDiffusionPipeline.from_pretrained( |
| | "runwayml/stable-diffusion-v1-5", |
| | torch_dtype=torch.float16 |
| | ) |
| | pipe_image.to("cuda") |
| | |
| | |
| |
|
| | |
| | video_pipeline_type = "ltx" |
| |
|
| | |
| | tasks1 = [ |
| | { |
| | "csv_file": "output_prompt_rag_more/prompt_ai_concrete_rag_10_testset.csv", |
| | "image_dir": "output_ai_covers_concrete_rag_10_testset", |
| | "video_dir": "output_ai_videos_concrete_rag_10_testset_ltx" |
| | }, |
| | { |
| | "csv_file": "output_prompt_rag_more/prompt_ai_abstract_rag_10_testset.csv", |
| | "image_dir": "output_ai_covers_abstract_rag_10_testset", |
| | "video_dir": "output_ai_videos_abstract_rag_10_testset_ltx" |
| | } |
| |
|
| | ] |
| |
|
| | |
| | |
| | |
| | |
| | |
| | tasks = tasks1 |
| | pipe_video = load_video_pipeline(video_pipeline_type) |
| |
|
| | |
| | for task in tasks: |
| | csv_file = task["csv_file"] |
| | image_dir = task["image_dir"] |
| | video_dir = task["video_dir"] |
| | os.makedirs(image_dir, exist_ok=True) |
| | print(f"Ensuring directory exists: {image_dir}") |
| | os.makedirs(video_dir, exist_ok=True) |
| | print(f"Ensuring directory exists: {video_dir}") |
| |
|
| | if not os.path.exists(csv_file): |
| | print(f"Error: CSV file {csv_file} not found, please check the path.") |
| | continue |
| |
|
| | df = pd.read_csv(csv_file) |
| | for idx, row in df.iterrows(): |
| | user_prompt = str(row["user prompt"]) |
| | title = str(row["title"]) |
| | cover_prompt = str(row["cover prompt"]) |
| | video_prompt = str(row["video prompt"]) |
| |
|
| | |
| | image_filename = os.path.join(image_dir, f"{user_prompt}.png") |
| | video_filename = os.path.join(video_dir, f"{user_prompt}.mp4") |
| |
|
| | print("-" * 50) |
| | print(f"[CSV: {csv_file}] - [{idx}] Starting generation: {user_prompt}") |
| | print(f"Title: {title}") |
| | print(f"Cover Prompt: {cover_prompt}") |
| | print(f"Video Prompt: {video_prompt}") |
| |
|
| | if os.path.exists(image_filename) and os.path.exists(video_filename): |
| | print(f"File already exists, skipping generation: {video_filename}") |
| | continue |
| |
|
| | |
| | try: |
| | generate_image(pipe_image, cover_prompt, image_filename) |
| | print(f"Image saved to {image_filename}") |
| | except Exception as e: |
| | print(f"Image generation failed: {e}") |
| |
|
| | |
| | try: |
| | generate_video( |
| | pipe_video, |
| | pipeline_type=video_pipeline_type, |
| | prompt=video_prompt, |
| | output_path=video_filename |
| | |
| | |
| | ) |
| | print(f"Video saved to {video_filename}") |
| | except Exception as e: |
| | print(f"Video generation failed: {e}") |
| |
|
| | print("All generation tasks completed!") |
| |
|
| | if __name__ == "__main__": |
| | main() |
| |
|