# Copyright 2024-2025 The Alibaba Wan Team Authors. All rights reserved. import argparse import binascii import os import os.path as osp import json from omegaconf import OmegaConf import imageio import torch import torchvision from moviepy.editor import AudioFileClip, VideoClip __all__ = ['tensor_to_video', 'prepare_json_dataset'] from moviepy.editor import AudioFileClip, VideoClip def tensor_to_video(tensor, output_video_path, input_audio_path, fps=25): """ tensor: shape [f, h, w, c] (as your code expects) """ def make_frame(t): frame_index = min(int(t * fps), tensor.shape[0] - 1) return tensor[frame_index] video_duration = tensor.shape[0] / fps audio_clip = None audio_subclip = None video_clip = None try: # Load audio audio_clip = AudioFileClip(input_audio_path) audio_duration = audio_clip.duration final_duration = min(video_duration, audio_duration) # Trim audio audio_subclip = audio_clip.subclip(0, final_duration) # Build video video_clip = VideoClip(make_frame, duration=final_duration) video_clip = video_clip.set_audio(audio_subclip) # Write file (this can spawn ffmpeg) video_clip.write_videofile( output_video_path, fps=fps, audio_codec="aac" ) finally: # Make absolutely sure everything is closed if video_clip is not None: video_clip.close() if audio_subclip is not None: audio_subclip.close() if audio_clip is not None: audio_clip.close() def prepare_json_dataset(json_path): samples = [] with open(json_path, "rb") as f: data = json.load(f) for itemname, row in data.items(): text = row['prompt'].strip().replace("_", " ").strip('"') audio_path = row['audio_path'] ref_img_path = [x for x in row['img_paths']] samples.append({ "text": text, "ref_img": ref_img_path, "audio": audio_path, "itemname": itemname }) samples = OmegaConf.create(samples) return samples