Spaces:
Running on Zero
Running on Zero
File size: 14,115 Bytes
b8c861f | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 | from __future__ import annotations
import io
import random
import struct
import tempfile
from collections.abc import Iterator
from contextlib import contextmanager
from fractions import Fraction
from itertools import chain
from typing import TYPE_CHECKING
import numpy as np
import PIL.Image
import PIL.ImageOps
from .import_utils import BACKENDS_MAPPING, is_imageio_available, is_opencv_available
from .logging import get_logger
if TYPE_CHECKING:
import torch
global_rng = random.Random()
logger = get_logger(__name__)
@contextmanager
def buffered_writer(raw_f):
f = io.BufferedWriter(raw_f)
yield f
f.flush()
def export_to_gif(image: list[PIL.Image.Image], output_gif_path: str = None, fps: int = 10) -> str:
if output_gif_path is None:
output_gif_path = tempfile.NamedTemporaryFile(suffix=".gif").name
image[0].save(
output_gif_path,
save_all=True,
append_images=image[1:],
optimize=False,
duration=1000 // fps,
loop=0,
)
return output_gif_path
def export_to_ply(mesh, output_ply_path: str = None):
"""
Write a PLY file for a mesh.
"""
if output_ply_path is None:
output_ply_path = tempfile.NamedTemporaryFile(suffix=".ply").name
coords = mesh.verts.detach().cpu().numpy()
faces = mesh.faces.cpu().numpy()
rgb = np.stack([mesh.vertex_channels[x].detach().cpu().numpy() for x in "RGB"], axis=1)
with buffered_writer(open(output_ply_path, "wb")) as f:
f.write(b"ply\n")
f.write(b"format binary_little_endian 1.0\n")
f.write(bytes(f"element vertex {len(coords)}\n", "ascii"))
f.write(b"property float x\n")
f.write(b"property float y\n")
f.write(b"property float z\n")
if rgb is not None:
f.write(b"property uchar red\n")
f.write(b"property uchar green\n")
f.write(b"property uchar blue\n")
if faces is not None:
f.write(bytes(f"element face {len(faces)}\n", "ascii"))
f.write(b"property list uchar int vertex_index\n")
f.write(b"end_header\n")
if rgb is not None:
rgb = (rgb * 255.499).round().astype(int)
vertices = [
(*coord, *rgb)
for coord, rgb in zip(
coords.tolist(),
rgb.tolist(),
)
]
format = struct.Struct("<3f3B")
for item in vertices:
f.write(format.pack(*item))
else:
format = struct.Struct("<3f")
for vertex in coords.tolist():
f.write(format.pack(*vertex))
if faces is not None:
format = struct.Struct("<B3I")
for tri in faces.tolist():
f.write(format.pack(len(tri), *tri))
return output_ply_path
def export_to_obj(mesh, output_obj_path: str = None):
if output_obj_path is None:
output_obj_path = tempfile.NamedTemporaryFile(suffix=".obj").name
verts = mesh.verts.detach().cpu().numpy()
faces = mesh.faces.cpu().numpy()
vertex_colors = np.stack([mesh.vertex_channels[x].detach().cpu().numpy() for x in "RGB"], axis=1)
vertices = [
"{} {} {} {} {} {}".format(*coord, *color) for coord, color in zip(verts.tolist(), vertex_colors.tolist())
]
faces = ["f {} {} {}".format(str(tri[0] + 1), str(tri[1] + 1), str(tri[2] + 1)) for tri in faces.tolist()]
combined_data = ["v " + vertex for vertex in vertices] + faces
with open(output_obj_path, "w") as f:
f.writelines("\n".join(combined_data))
def _legacy_export_to_video(
video_frames: list[np.ndarray] | list[PIL.Image.Image], output_video_path: str = None, fps: int = 10
):
if is_opencv_available():
import cv2
else:
raise ImportError(BACKENDS_MAPPING["opencv"][1].format("export_to_video"))
if output_video_path is None:
output_video_path = tempfile.NamedTemporaryFile(suffix=".mp4").name
if isinstance(video_frames[0], np.ndarray):
video_frames = [(frame * 255).astype(np.uint8) for frame in video_frames]
elif isinstance(video_frames[0], PIL.Image.Image):
video_frames = [np.array(frame) for frame in video_frames]
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
h, w, c = video_frames[0].shape
video_writer = cv2.VideoWriter(output_video_path, fourcc, fps=fps, frameSize=(w, h))
for i in range(len(video_frames)):
img = cv2.cvtColor(video_frames[i], cv2.COLOR_RGB2BGR)
video_writer.write(img)
return output_video_path
def export_to_video(
video_frames: list[np.ndarray] | list[PIL.Image.Image],
output_video_path: str = None,
fps: int = 10,
quality: float = 5.0,
bitrate: int | None = None,
macro_block_size: int | None = 16,
) -> str:
"""
quality:
Video output quality. Default is 5. Uses variable bit rate. Highest quality is 10, lowest is 0. Set to None to
prevent variable bitrate flags to FFMPEG so you can manually specify them using output_params instead.
Specifying a fixed bitrate using `bitrate` disables this parameter.
bitrate:
Set a constant bitrate for the video encoding. Default is None causing `quality` parameter to be used instead.
Better quality videos with smaller file sizes will result from using the `quality` variable bitrate parameter
rather than specifying a fixed bitrate with this parameter.
macro_block_size:
Size constraint for video. Width and height, must be divisible by this number. If not divisible by this number
imageio will tell ffmpeg to scale the image up to the next closest size divisible by this number. Most codecs
are compatible with a macroblock size of 16 (default), some can go smaller (4, 8). To disable this automatic
feature set it to None or 1, however be warned many players can't decode videos that are odd in size and some
codecs will produce poor results or fail. See https://en.wikipedia.org/wiki/Macroblock.
"""
# TODO: Dhruv. Remove by Diffusers release 0.33.0
# Added to prevent breaking existing code
if not is_imageio_available():
logger.warning(
(
"It is recommended to use `export_to_video` with `imageio` and `imageio-ffmpeg` as a backend. \n"
"These libraries are not present in your environment. Attempting to use legacy OpenCV backend to export video. \n"
"Support for the OpenCV backend will be deprecated in a future Diffusers version"
)
)
return _legacy_export_to_video(video_frames, output_video_path, fps)
if is_imageio_available():
import imageio
else:
raise ImportError(BACKENDS_MAPPING["imageio"][1].format("export_to_video"))
try:
imageio.plugins.ffmpeg.get_exe()
except AttributeError:
raise AttributeError(
(
"Found an existing imageio backend in your environment. Attempting to export video with imageio. \n"
"Unable to find a compatible ffmpeg installation in your environment to use with imageio. Please install via `pip install imageio-ffmpeg"
)
)
if output_video_path is None:
output_video_path = tempfile.NamedTemporaryFile(suffix=".mp4").name
if isinstance(video_frames[0], np.ndarray):
video_frames = [(frame * 255).astype(np.uint8) for frame in video_frames]
elif isinstance(video_frames[0], PIL.Image.Image):
video_frames = [np.array(frame) for frame in video_frames]
with imageio.get_writer(
output_video_path, fps=fps, quality=quality, bitrate=bitrate, macro_block_size=macro_block_size
) as writer:
for frame in video_frames:
writer.append_data(frame)
return output_video_path
def _import_av():
try:
import av
except ImportError as error:
raise ImportError(
"PyAV is required to use `encode_video`. You can install it with `pip install av`."
) from error
return av
def _prepare_audio_stream(container, audio_sample_rate: int):
"""
Prepare the audio stream for writing.
"""
audio_stream = container.add_stream("aac", rate=audio_sample_rate)
audio_stream.codec_context.sample_rate = audio_sample_rate
audio_stream.codec_context.layout = "stereo"
audio_stream.codec_context.time_base = Fraction(1, audio_sample_rate)
return audio_stream
def _resample_audio(container, audio_stream, frame_in, av_module) -> None:
cc = audio_stream.codec_context
# Use the encoder's format/layout/rate as the target.
target_format = cc.format or "fltp" # AAC is usually fltp.
target_layout = cc.layout or "stereo"
target_rate = cc.sample_rate or frame_in.sample_rate
audio_resampler = av_module.audio.resampler.AudioResampler(
format=target_format,
layout=target_layout,
rate=target_rate,
)
audio_next_pts = 0
for rframe in audio_resampler.resample(frame_in):
if rframe.pts is None:
rframe.pts = audio_next_pts
audio_next_pts += rframe.samples
rframe.sample_rate = frame_in.sample_rate
container.mux(audio_stream.encode(rframe))
# Flush audio encoder.
for packet in audio_stream.encode():
container.mux(packet)
def _write_audio(
container,
audio_stream,
samples: "torch.Tensor",
audio_sample_rate: int,
av_module,
) -> None:
import torch
if samples.ndim == 1:
samples = samples[:, None]
if samples.shape[1] != 2 and samples.shape[0] == 2:
samples = samples.T
if samples.shape[1] != 2:
raise ValueError(f"Expected samples with 2 channels; got shape {samples.shape}.")
# Convert to int16 packed for ingestion; resampler converts to the encoder format.
if samples.dtype != torch.int16:
samples = torch.clip(samples, -1.0, 1.0)
samples = (samples * 32767.0).to(torch.int16)
frame_in = av_module.AudioFrame.from_ndarray(
samples.contiguous().reshape(1, -1).cpu().numpy(),
format="s16",
layout="stereo",
)
frame_in.sample_rate = audio_sample_rate
_resample_audio(container, audio_stream, frame_in, av_module)
def encode_video(
video: list[PIL.Image.Image] | np.ndarray | "torch.Tensor" | Iterator["torch.Tensor"],
fps: int,
output_path: str,
audio: "torch.Tensor" | None = None,
audio_sample_rate: int | None = None,
video_chunks_number: int = 1,
) -> None:
"""
Encodes a video with optional audio using the PyAV library. Based on code from the original LTX-2 repo:
https://github.com/Lightricks/LTX-2/blob/4f410820b198e05074a1e92de793e3b59e9ab5a0/packages/ltx-pipelines/src/ltx_pipelines/utils/media_io.py#L182
Args:
video (`List[PIL.Image.Image]` or `np.ndarray` or `torch.Tensor`):
A video tensor of shape [frames, height, width, channels] with integer pixel values in [0, 255]. If the
input is a `np.ndarray`, it is expected to be a float array with values in [0, 1] (which is what pipelines
usually return with `output_type="np"`).
fps (`int`)
The frames per second (FPS) of the encoded video.
output_path (`str`):
The path to save the encoded video to.
audio (`torch.Tensor`, *optional*):
An audio waveform of shape [audio_channels, samples].
audio_sample_rate: (`int`, *optional*):
The sampling rate of the audio waveform.
video_chunks_number (`int`, *optional*, defaults to `1`):
The number of chunks to split the video into for encoding. Each chunk will be encoded separately. The
number of chunks to use often depends on the tiling config for the video VAE.
"""
av = _import_av()
import torch
from tqdm import tqdm
if isinstance(video, list) and isinstance(video[0], PIL.Image.Image):
# Pipeline output_type="pil"; assumes each image is in "RGB" mode.
video_frames = [np.array(frame) for frame in video]
video = np.stack(video_frames, axis=0)
video = torch.from_numpy(video)
elif isinstance(video, np.ndarray):
# Pipeline output_type="np".
is_denormalized = np.logical_and(np.zeros_like(video) <= video, video <= np.ones_like(video))
if np.all(is_denormalized):
video = (video * 255).round().astype("uint8")
else:
logger.warning(
"Supplied `numpy.ndarray` does not have values in [0, 1]. The values will be assumed to be pixel "
"values in [0, ..., 255] and will be used as is."
)
video = torch.from_numpy(video)
if isinstance(video, torch.Tensor):
# Split into video_chunks_number along the frame dimension.
video = torch.tensor_split(video, video_chunks_number, dim=0)
video = iter(video)
first_chunk = next(video)
_, height, width, _ = first_chunk.shape
container = av.open(output_path, mode="w")
stream = container.add_stream("libx264", rate=int(fps))
stream.width = width
stream.height = height
stream.pix_fmt = "yuv420p"
if audio is not None:
if audio_sample_rate is None:
raise ValueError("audio_sample_rate is required when audio is provided")
audio_stream = _prepare_audio_stream(container, audio_sample_rate)
for video_chunk in tqdm(chain([first_chunk], video), total=video_chunks_number, desc="Encoding video chunks"):
video_chunk_cpu = video_chunk.to("cpu").numpy()
for frame_array in video_chunk_cpu:
frame = av.VideoFrame.from_ndarray(frame_array, format="rgb24")
for packet in stream.encode(frame):
container.mux(packet)
# Flush video encoder.
for packet in stream.encode():
container.mux(packet)
if audio is not None:
_write_audio(container, audio_stream, audio, audio_sample_rate, av)
container.close()
|