#!/usr/bin/env python3 # Copyright (c) 2025 ByteDance Ltd. and/or its affiliates # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """ Simple batch processing script for Depth Anything 3. Usage: python simple_batch_process.py input.zip output.zip python simple_batch_process.py /path/to/images/ /path/to/output/ """ import argparse import os import shutil import time import zipfile from pathlib import Path import numpy as np import torch from PIL import Image from tqdm import tqdm from depth_anything_3.api import DepthAnything3 def process_images_from_directory(input_dir: str, output_dir: str, model): """ Process all images in a directory. Args: input_dir: Directory containing input images output_dir: Directory to save depth maps model: Loaded DepthAnything3 model """ # Find all image files (skip macOS metadata) image_extensions = {".jpg", ".jpeg", ".png", ".bmp", ".tiff", ".tif"} image_files = [] for root, _, files in os.walk(input_dir): # Skip __MACOSX directories if "__MACOSX" in root: continue for file in files: # Skip hidden files and macOS metadata if file.startswith("._") or file.startswith(".DS_Store"): continue if Path(file).suffix.lower() in image_extensions: image_files.append(os.path.join(root, file)) if not image_files: print(f"No images found in {input_dir}") return 0 image_files = sorted(image_files) print(f"Found {len(image_files)} images") os.makedirs(output_dir, exist_ok=True) # Track metrics inference_times = [] # Process each image skipped = 0 for img_path in tqdm(image_files, desc="Processing images"): try: # Load image image = Image.open(img_path).convert("RGB") image_np = np.array(image) # Predict depth (measure inference time only) inference_start = time.time() with torch.no_grad(): # API expects a list of images, returns Prediction object prediction = model.inference([image_np]) depth = prediction.depth[0] # Get first (and only) depth map inference_time = time.time() - inference_start inference_times.append(inference_time) # Save raw depth output_name = Path(img_path).stem + "_depth.npy" output_path = os.path.join(output_dir, output_name) np.save(output_path, depth) except Exception as e: print(f"\nāš ļø Skipping {Path(img_path).name}: {str(e)}") skipped += 1 continue # Print metrics if inference_times: total_inference = sum(inference_times) avg_per_image = total_inference / len(inference_times) throughput = 1 / avg_per_image print(f"\n{'='*60}") print("šŸ“Š Performance Metrics") print(f"{'='*60}") print(f"Total images found: {len(image_files)}") print(f"Successfully processed: {len(inference_times)}") if skipped > 0: print(f"Skipped (errors): {skipped}") print(f"Total inference time: {total_inference:.2f}s") print(f"Average time per image: {avg_per_image:.3f}s") print(f"Throughput: {throughput:.2f} images/second") print(f"{'='*60}\n") return len(inference_times) def process_zip_to_zip(input_zip: str, output_zip: str, model): """ Process images from input ZIP and create output ZIP with depth maps. Args: input_zip: Path to input ZIP file output_zip: Path to output ZIP file model: Loaded DepthAnything3 model """ # Create temporary directories temp_dir = "temp_processing" input_dir = os.path.join(temp_dir, "input") output_dir = os.path.join(temp_dir, "output") os.makedirs(input_dir, exist_ok=True) os.makedirs(output_dir, exist_ok=True) try: # Extract input ZIP print(f"Extracting {input_zip}...") with zipfile.ZipFile(input_zip, "r") as zip_ref: zip_ref.extractall(input_dir) # Process images num_processed = process_images_from_directory(input_dir, output_dir, model) if num_processed == 0: print("No images were processed") return # Create output ZIP print(f"Creating {output_zip}...") with zipfile.ZipFile(output_zip, "w", zipfile.ZIP_DEFLATED) as zipf: for root, _, files in os.walk(output_dir): for file in files: file_path = os.path.join(root, file) arcname = os.path.relpath(file_path, output_dir) zipf.write(file_path, arcname) print(f"āœ… Done! Processed {num_processed} images") print(f"Output saved to: {output_zip}") finally: # Cleanup if os.path.exists(temp_dir): shutil.rmtree(temp_dir) def main(): parser = argparse.ArgumentParser( description="Batch process images for depth estimation", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: # Process ZIP files python simple_batch_process.py input.zip output.zip # Process directories python simple_batch_process.py /path/to/images/ /path/to/output/ # Specify model python simple_batch_process.py input.zip output.zip --model depth-anything/DA3NESTED-GIANT-LARGE """ ) parser.add_argument( "input", help="Input ZIP file or directory containing images" ) parser.add_argument( "output", help="Output ZIP file or directory for depth maps" ) parser.add_argument( "--model", default="depth-anything/DA3NESTED-GIANT-LARGE", help="Model directory or HuggingFace model ID" ) parser.add_argument( "--device", default="cuda" if torch.cuda.is_available() else "cpu", choices=["cuda", "cpu"], help="Device to run inference on" ) args = parser.parse_args() # Load model print(f"Loading model from {args.model}...") model = DepthAnything3.from_pretrained(args.model) model = model.to(args.device) model.eval() print(f"Model loaded on {args.device}") # Determine input/output types input_is_zip = args.input.endswith(".zip") output_is_zip = args.output.endswith(".zip") if input_is_zip and output_is_zip: # ZIP to ZIP process_zip_to_zip(args.input, args.output, model) elif input_is_zip and not output_is_zip: # ZIP to directory temp_dir = "temp_extraction" os.makedirs(temp_dir, exist_ok=True) try: print(f"Extracting {args.input}...") with zipfile.ZipFile(args.input, "r") as zip_ref: zip_ref.extractall(temp_dir) num_processed = process_images_from_directory(temp_dir, args.output, model) print(f"āœ… Done! Processed {num_processed} images") finally: if os.path.exists(temp_dir): shutil.rmtree(temp_dir) elif not input_is_zip and output_is_zip: # Directory to ZIP temp_output = "temp_output" os.makedirs(temp_output, exist_ok=True) try: num_processed = process_images_from_directory(args.input, temp_output, model) print(f"Creating {args.output}...") with zipfile.ZipFile(args.output, "w", zipfile.ZIP_DEFLATED) as zipf: for root, _, files in os.walk(temp_output): for file in files: file_path = os.path.join(root, file) arcname = os.path.relpath(file_path, temp_output) zipf.write(file_path, arcname) print(f"āœ… Done! Processed {num_processed} images") finally: if os.path.exists(temp_output): shutil.rmtree(temp_output) else: # Directory to directory num_processed = process_images_from_directory(args.input, args.output, model) print(f"āœ… Done! Processed {num_processed} images") if __name__ == "__main__": main()