depth-anything-3 / simple_batch_process.py
harshilawign's picture
Fix API method name: use inference() instead of infer_image()
16d14b6
#!/usr/bin/env python3
# Copyright (c) 2025 ByteDance Ltd. and/or its affiliates
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Simple batch processing script for Depth Anything 3.
Usage:
python simple_batch_process.py input.zip output.zip
python simple_batch_process.py /path/to/images/ /path/to/output/
"""
import argparse
import os
import shutil
import time
import zipfile
from pathlib import Path
import numpy as np
import torch
from PIL import Image
from tqdm import tqdm
from depth_anything_3.api import DepthAnything3
def process_images_from_directory(input_dir: str, output_dir: str, model):
"""
Process all images in a directory.
Args:
input_dir: Directory containing input images
output_dir: Directory to save depth maps
model: Loaded DepthAnything3 model
"""
# Find all image files (skip macOS metadata)
image_extensions = {".jpg", ".jpeg", ".png", ".bmp", ".tiff", ".tif"}
image_files = []
for root, _, files in os.walk(input_dir):
# Skip __MACOSX directories
if "__MACOSX" in root:
continue
for file in files:
# Skip hidden files and macOS metadata
if file.startswith("._") or file.startswith(".DS_Store"):
continue
if Path(file).suffix.lower() in image_extensions:
image_files.append(os.path.join(root, file))
if not image_files:
print(f"No images found in {input_dir}")
return 0
image_files = sorted(image_files)
print(f"Found {len(image_files)} images")
os.makedirs(output_dir, exist_ok=True)
# Track metrics
inference_times = []
# Process each image
skipped = 0
for img_path in tqdm(image_files, desc="Processing images"):
try:
# Load image
image = Image.open(img_path).convert("RGB")
image_np = np.array(image)
# Predict depth (measure inference time only)
inference_start = time.time()
with torch.no_grad():
# API expects a list of images, returns Prediction object
prediction = model.inference([image_np])
depth = prediction.depth[0] # Get first (and only) depth map
inference_time = time.time() - inference_start
inference_times.append(inference_time)
# Save raw depth
output_name = Path(img_path).stem + "_depth.npy"
output_path = os.path.join(output_dir, output_name)
np.save(output_path, depth)
except Exception as e:
print(f"\n⚠️ Skipping {Path(img_path).name}: {str(e)}")
skipped += 1
continue
# Print metrics
if inference_times:
total_inference = sum(inference_times)
avg_per_image = total_inference / len(inference_times)
throughput = 1 / avg_per_image
print(f"\n{'='*60}")
print("📊 Performance Metrics")
print(f"{'='*60}")
print(f"Total images found: {len(image_files)}")
print(f"Successfully processed: {len(inference_times)}")
if skipped > 0:
print(f"Skipped (errors): {skipped}")
print(f"Total inference time: {total_inference:.2f}s")
print(f"Average time per image: {avg_per_image:.3f}s")
print(f"Throughput: {throughput:.2f} images/second")
print(f"{'='*60}\n")
return len(inference_times)
def process_zip_to_zip(input_zip: str, output_zip: str, model):
"""
Process images from input ZIP and create output ZIP with depth maps.
Args:
input_zip: Path to input ZIP file
output_zip: Path to output ZIP file
model: Loaded DepthAnything3 model
"""
# Create temporary directories
temp_dir = "temp_processing"
input_dir = os.path.join(temp_dir, "input")
output_dir = os.path.join(temp_dir, "output")
os.makedirs(input_dir, exist_ok=True)
os.makedirs(output_dir, exist_ok=True)
try:
# Extract input ZIP
print(f"Extracting {input_zip}...")
with zipfile.ZipFile(input_zip, "r") as zip_ref:
zip_ref.extractall(input_dir)
# Process images
num_processed = process_images_from_directory(input_dir, output_dir, model)
if num_processed == 0:
print("No images were processed")
return
# Create output ZIP
print(f"Creating {output_zip}...")
with zipfile.ZipFile(output_zip, "w", zipfile.ZIP_DEFLATED) as zipf:
for root, _, files in os.walk(output_dir):
for file in files:
file_path = os.path.join(root, file)
arcname = os.path.relpath(file_path, output_dir)
zipf.write(file_path, arcname)
print(f"✅ Done! Processed {num_processed} images")
print(f"Output saved to: {output_zip}")
finally:
# Cleanup
if os.path.exists(temp_dir):
shutil.rmtree(temp_dir)
def main():
parser = argparse.ArgumentParser(
description="Batch process images for depth estimation",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Process ZIP files
python simple_batch_process.py input.zip output.zip
# Process directories
python simple_batch_process.py /path/to/images/ /path/to/output/
# Specify model
python simple_batch_process.py input.zip output.zip --model depth-anything/DA3NESTED-GIANT-LARGE
"""
)
parser.add_argument(
"input",
help="Input ZIP file or directory containing images"
)
parser.add_argument(
"output",
help="Output ZIP file or directory for depth maps"
)
parser.add_argument(
"--model",
default="depth-anything/DA3NESTED-GIANT-LARGE",
help="Model directory or HuggingFace model ID"
)
parser.add_argument(
"--device",
default="cuda" if torch.cuda.is_available() else "cpu",
choices=["cuda", "cpu"],
help="Device to run inference on"
)
args = parser.parse_args()
# Load model
print(f"Loading model from {args.model}...")
model = DepthAnything3.from_pretrained(args.model)
model = model.to(args.device)
model.eval()
print(f"Model loaded on {args.device}")
# Determine input/output types
input_is_zip = args.input.endswith(".zip")
output_is_zip = args.output.endswith(".zip")
if input_is_zip and output_is_zip:
# ZIP to ZIP
process_zip_to_zip(args.input, args.output, model)
elif input_is_zip and not output_is_zip:
# ZIP to directory
temp_dir = "temp_extraction"
os.makedirs(temp_dir, exist_ok=True)
try:
print(f"Extracting {args.input}...")
with zipfile.ZipFile(args.input, "r") as zip_ref:
zip_ref.extractall(temp_dir)
num_processed = process_images_from_directory(temp_dir, args.output, model)
print(f"✅ Done! Processed {num_processed} images")
finally:
if os.path.exists(temp_dir):
shutil.rmtree(temp_dir)
elif not input_is_zip and output_is_zip:
# Directory to ZIP
temp_output = "temp_output"
os.makedirs(temp_output, exist_ok=True)
try:
num_processed = process_images_from_directory(args.input, temp_output, model)
print(f"Creating {args.output}...")
with zipfile.ZipFile(args.output, "w", zipfile.ZIP_DEFLATED) as zipf:
for root, _, files in os.walk(temp_output):
for file in files:
file_path = os.path.join(root, file)
arcname = os.path.relpath(file_path, temp_output)
zipf.write(file_path, arcname)
print(f"✅ Done! Processed {num_processed} images")
finally:
if os.path.exists(temp_output):
shutil.rmtree(temp_output)
else:
# Directory to directory
num_processed = process_images_from_directory(args.input, args.output, model)
print(f"✅ Done! Processed {num_processed} images")
if __name__ == "__main__":
main()