OzzyGT's picture
OzzyGT HF Staff
intiial commit
b36de4d
from typing import List, Union
import av
import numpy as np
import torch
from diffusers.modular_pipelines import (
ComponentSpec,
InputParam,
ModularPipelineBlocks,
OutputParam,
PipelineState,
)
from matplotlib import colormaps
from PIL import Image
from transformers import DepthAnythingForDepthEstimation, DPTImageProcessor
TURBO_CMAP = colormaps["turbo"]
def save_video(frames: List[Image.Image], fps: float, output_path: str) -> None:
"""Save a list of PIL Image frames as an MP4 video."""
container = av.open(output_path, mode="w")
stream = container.add_stream("libx264", rate=int(fps))
stream.pix_fmt = "yuv420p"
stream.width = frames[0].width
stream.height = frames[0].height
for frame in frames:
video_frame = av.VideoFrame.from_image(frame)
for packet in stream.encode(video_frame):
container.mux(packet)
for packet in stream.encode():
container.mux(packet)
container.close()
class DepthAnythingV2EstimatorBlock(ModularPipelineBlocks):
_requirements = {
"transformers": ">=5.1.0",
"torch": ">=2.9.0",
"torchvision": ">=0.16.0",
"av": ">=12.0.0",
"matplotlib": ">=3.7.0",
}
@property
def expected_components(self) -> List[ComponentSpec]:
return [
ComponentSpec(
name="depth_estimator",
type_hint=DepthAnythingForDepthEstimation,
pretrained_model_name_or_path="depth-anything/Depth-Anything-V2-Large-hf",
),
ComponentSpec(
name="depth_estimator_processor",
type_hint=DPTImageProcessor,
pretrained_model_name_or_path="depth-anything/Depth-Anything-V2-Large-hf",
),
]
@property
def inputs(self) -> List[InputParam]:
return [
InputParam(
"image",
type_hint=Union[Image.Image, List[Image.Image]],
required=False,
description="Image(s) to estimate depth for",
),
InputParam(
"video_path",
type_hint=str,
required=False,
description="Path to input video file. When provided, image is ignored.",
),
InputParam(
"colormap",
type_hint=str,
default="grayscale",
description="Depth visualization format: 'grayscale' or 'turbo' (colormapped)",
),
]
@property
def intermediate_outputs(self) -> List[OutputParam]:
return [
OutputParam(
"depth_image",
type_hint=Image.Image,
description="Normalized depth map as a PIL image (single image mode)",
),
OutputParam(
"predicted_depth",
type_hint=torch.Tensor,
description="Raw relative depth tensor (H x W) (single image mode)",
),
OutputParam(
"depth_frames",
type_hint=list,
description="List of per-frame depth PIL images (video mode)",
),
OutputParam(
"fps",
type_hint=float,
description="Source video frame rate (video mode)",
),
]
def _estimate_depth(self, image: Image.Image, processor, model) -> dict:
inputs = processor(images=[image], return_tensors="pt").to(model.device)
outputs = model(**inputs)
post_processed = processor.post_process_depth_estimation(
outputs, target_sizes=[(image.height, image.width)]
)
return post_processed[0]
def _normalize_depth(self, depth: np.ndarray) -> np.ndarray:
d_min = depth.min()
d_max = depth.max()
normalized = (depth - d_min) / (d_max - d_min + 1e-8)
# Invert so bright = close, dark = far (consistent with depth_pro)
return 1.0 - normalized
def _apply_colormap(self, normalized: np.ndarray, mode: str) -> np.ndarray:
if mode == "turbo":
colored = (TURBO_CMAP(normalized)[..., :3] * 255).astype(np.uint8)
return colored
return (normalized * 255.0).astype(np.uint8)
def _process_video(self, video_path, processor, model, colormap):
input_container = av.open(video_path)
video_stream = input_container.streams.video[0]
fps = video_stream.average_rate
depth_frames = []
for frame in input_container.decode(video=0):
pil_image = frame.to_image().convert("RGB")
result = self._estimate_depth(pil_image, processor, model)
depth_np = result["predicted_depth"].float().cpu().numpy()
normalized = self._normalize_depth(depth_np)
colored = self._apply_colormap(normalized, colormap)
if colormap == "turbo":
depth_frame = Image.fromarray(colored, mode="RGB")
else:
depth_frame = Image.fromarray(colored, mode="L")
depth_frames.append(depth_frame)
input_container.close()
return depth_frames, fps
@torch.no_grad()
def __call__(self, components, state: PipelineState) -> PipelineState:
block_state = self.get_block_state(state)
processor = components.depth_estimator_processor
model = components.depth_estimator
video_path = getattr(block_state, "video_path", None)
if video_path:
depth_frames, fps = self._process_video(
video_path, processor, model, block_state.colormap
)
block_state.depth_frames = depth_frames
block_state.fps = float(fps)
block_state.depth_image = None
block_state.predicted_depth = None
else:
image = block_state.image
if not isinstance(image, list):
image = [image]
result = self._estimate_depth(image[0], processor, model)
predicted_depth = result["predicted_depth"]
block_state.predicted_depth = predicted_depth
depth_np = predicted_depth.float().cpu().numpy()
normalized = self._normalize_depth(depth_np)
colored = self._apply_colormap(normalized, block_state.colormap)
if block_state.colormap == "turbo":
block_state.depth_image = Image.fromarray(colored, mode="RGB")
else:
block_state.depth_image = Image.fromarray(colored, mode="L")
block_state.depth_frames = None
block_state.fps = None
self.set_block_state(state, block_state)
return components, state