|
|
|
|
|
import argparse |
|
|
import binascii |
|
|
import logging |
|
|
import os |
|
|
import os.path as osp |
|
|
import shutil |
|
|
import subprocess |
|
|
|
|
|
import imageio |
|
|
import torch |
|
|
import torchvision |
|
|
|
|
|
__all__ = ['save_video', 'save_image', 'str2bool'] |
|
|
|
|
|
|
|
|
def rand_name(length=8, suffix=''): |
|
|
name = binascii.b2a_hex(os.urandom(length)).decode('utf-8') |
|
|
if suffix: |
|
|
if not suffix.startswith('.'): |
|
|
suffix = '.' + suffix |
|
|
name += suffix |
|
|
return name |
|
|
|
|
|
|
|
|
def merge_video_audio(video_path: str, audio_path: str): |
|
|
""" |
|
|
Merge the video and audio into a new video, with the duration set to the shorter of the two, |
|
|
and overwrite the original video file. |
|
|
|
|
|
Parameters: |
|
|
video_path (str): Path to the original video file |
|
|
audio_path (str): Path to the audio file |
|
|
""" |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
|
|
|
|
|
|
|
if not os.path.exists(video_path): |
|
|
raise FileNotFoundError(f"video file {video_path} does not exist") |
|
|
if not os.path.exists(audio_path): |
|
|
raise FileNotFoundError(f"audio file {audio_path} does not exist") |
|
|
|
|
|
base, ext = os.path.splitext(video_path) |
|
|
temp_output = f"{base}_temp{ext}" |
|
|
|
|
|
try: |
|
|
|
|
|
command = [ |
|
|
'ffmpeg', |
|
|
'-y', |
|
|
'-i', |
|
|
video_path, |
|
|
'-i', |
|
|
audio_path, |
|
|
'-c:v', |
|
|
'copy', |
|
|
'-c:a', |
|
|
'aac', |
|
|
'-b:a', |
|
|
'192k', |
|
|
'-map', |
|
|
'0:v:0', |
|
|
'-map', |
|
|
'1:a:0', |
|
|
'-shortest', |
|
|
temp_output |
|
|
] |
|
|
|
|
|
|
|
|
logging.info("Start merging video and audio...") |
|
|
result = subprocess.run( |
|
|
command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) |
|
|
|
|
|
|
|
|
if result.returncode != 0: |
|
|
error_msg = f"FFmpeg execute failed: {result.stderr}" |
|
|
logging.error(error_msg) |
|
|
raise RuntimeError(error_msg) |
|
|
|
|
|
shutil.move(temp_output, video_path) |
|
|
logging.info(f"Merge completed, saved to {video_path}") |
|
|
|
|
|
except Exception as e: |
|
|
if os.path.exists(temp_output): |
|
|
os.remove(temp_output) |
|
|
logging.error(f"merge_video_audio failed with error: {e}") |
|
|
|
|
|
|
|
|
def save_video(tensor, |
|
|
save_file=None, |
|
|
fps=30, |
|
|
suffix='.mp4', |
|
|
nrow=8, |
|
|
normalize=True, |
|
|
value_range=(-1, 1)): |
|
|
|
|
|
cache_file = osp.join('/tmp', rand_name( |
|
|
suffix=suffix)) if save_file is None else save_file |
|
|
|
|
|
|
|
|
try: |
|
|
|
|
|
tensor = tensor.clamp(min(value_range), max(value_range)) |
|
|
tensor = torch.stack([ |
|
|
torchvision.utils.make_grid( |
|
|
u, nrow=nrow, normalize=normalize, value_range=value_range) |
|
|
for u in tensor.unbind(2) |
|
|
], |
|
|
dim=1).permute(1, 2, 3, 0) |
|
|
tensor = (tensor * 255).type(torch.uint8).cpu() |
|
|
|
|
|
|
|
|
writer = imageio.get_writer( |
|
|
cache_file, fps=fps, codec='libx264', quality=8) |
|
|
for frame in tensor.numpy(): |
|
|
writer.append_data(frame) |
|
|
writer.close() |
|
|
except Exception as e: |
|
|
logging.info(f'save_video failed, error: {e}') |
|
|
|
|
|
|
|
|
def save_image(tensor, save_file, nrow=8, normalize=True, value_range=(-1, 1)): |
|
|
|
|
|
suffix = osp.splitext(save_file)[1] |
|
|
if suffix.lower() not in [ |
|
|
'.jpg', '.jpeg', '.png', '.tiff', '.gif', '.webp' |
|
|
]: |
|
|
suffix = '.png' |
|
|
|
|
|
|
|
|
try: |
|
|
tensor = tensor.clamp(min(value_range), max(value_range)) |
|
|
torchvision.utils.save_image( |
|
|
tensor, |
|
|
save_file, |
|
|
nrow=nrow, |
|
|
normalize=normalize, |
|
|
value_range=value_range) |
|
|
return save_file |
|
|
except Exception as e: |
|
|
logging.info(f'save_image failed, error: {e}') |
|
|
|
|
|
|
|
|
def str2bool(v): |
|
|
""" |
|
|
Convert a string to a boolean. |
|
|
|
|
|
Supported true values: 'yes', 'true', 't', 'y', '1' |
|
|
Supported false values: 'no', 'false', 'f', 'n', '0' |
|
|
|
|
|
Args: |
|
|
v (str): String to convert. |
|
|
|
|
|
Returns: |
|
|
bool: Converted boolean value. |
|
|
|
|
|
Raises: |
|
|
argparse.ArgumentTypeError: If the value cannot be converted to boolean. |
|
|
""" |
|
|
if isinstance(v, bool): |
|
|
return v |
|
|
v_lower = v.lower() |
|
|
if v_lower in ('yes', 'true', 't', 'y', '1'): |
|
|
return True |
|
|
elif v_lower in ('no', 'false', 'f', 'n', '0'): |
|
|
return False |
|
|
else: |
|
|
raise argparse.ArgumentTypeError('Boolean value expected (True/False)') |
|
|
|
|
|
|
|
|
def masks_like(tensor, zero=False, generator=None, p=0.2): |
|
|
assert isinstance(tensor, list) |
|
|
out1 = [torch.ones(u.shape, dtype=u.dtype, device=u.device) for u in tensor] |
|
|
|
|
|
out2 = [torch.ones(u.shape, dtype=u.dtype, device=u.device) for u in tensor] |
|
|
|
|
|
if zero: |
|
|
if generator is not None: |
|
|
for u, v in zip(out1, out2): |
|
|
random_num = torch.rand( |
|
|
1, generator=generator, device=generator.device).item() |
|
|
if random_num < p: |
|
|
u[:, 0] = torch.normal( |
|
|
mean=-3.5, |
|
|
std=0.5, |
|
|
size=(1,), |
|
|
device=u.device, |
|
|
generator=generator).expand_as(u[:, 0]).exp() |
|
|
v[:, 0] = torch.zeros_like(v[:, 0]) |
|
|
else: |
|
|
u[:, 0] = u[:, 0] |
|
|
v[:, 0] = v[:, 0] |
|
|
else: |
|
|
for u, v in zip(out1, out2): |
|
|
u[:, 0] = torch.zeros_like(u[:, 0]) |
|
|
v[:, 0] = torch.zeros_like(v[:, 0]) |
|
|
|
|
|
return out1, out2 |
|
|
|
|
|
|
|
|
def best_output_size(w, h, dw, dh, expected_area): |
|
|
|
|
|
ratio = w / h |
|
|
ow = (expected_area * ratio)**0.5 |
|
|
oh = expected_area / ow |
|
|
|
|
|
|
|
|
ow1 = int(ow // dw * dw) |
|
|
oh1 = int(expected_area / ow1 // dh * dh) |
|
|
assert ow1 % dw == 0 and oh1 % dh == 0 and ow1 * oh1 <= expected_area |
|
|
ratio1 = ow1 / oh1 |
|
|
|
|
|
|
|
|
oh2 = int(oh // dh * dh) |
|
|
ow2 = int(expected_area / oh2 // dw * dw) |
|
|
assert oh2 % dh == 0 and ow2 % dw == 0 and ow2 * oh2 <= expected_area |
|
|
ratio2 = ow2 / oh2 |
|
|
|
|
|
|
|
|
if max(ratio / ratio1, ratio1 / ratio) < max(ratio / ratio2, |
|
|
ratio2 / ratio): |
|
|
return ow1, oh1 |
|
|
else: |
|
|
return ow2, oh2 |
|
|
|
|
|
|
|
|
def download_cosyvoice_repo(repo_path): |
|
|
try: |
|
|
import git |
|
|
except ImportError: |
|
|
raise ImportError('failed to import git, please run pip install GitPython') |
|
|
repo = git.Repo.clone_from('https://github.com/FunAudioLLM/CosyVoice.git', repo_path, multi_options=['--recursive'], branch='main') |
|
|
|
|
|
|
|
|
def download_cosyvoice_model(model_name, model_path): |
|
|
from modelscope import snapshot_download |
|
|
snapshot_download('iic/{}'.format(model_name), local_dir=model_path) |
|
|
|