| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| """ |
| Simple batch processing script for Depth Anything 3. |
| |
| Usage: |
| python simple_batch_process.py input.zip output.zip |
| python simple_batch_process.py /path/to/images/ /path/to/output/ |
| """ |
|
|
| import argparse |
| import os |
| import shutil |
| import time |
| import zipfile |
| from pathlib import Path |
| import numpy as np |
| import torch |
| from PIL import Image |
| from tqdm import tqdm |
|
|
| from depth_anything_3.api import DepthAnything3 |
|
|
|
|
| def process_images_from_directory(input_dir: str, output_dir: str, model): |
| """ |
| Process all images in a directory. |
| |
| Args: |
| input_dir: Directory containing input images |
| output_dir: Directory to save depth maps |
| model: Loaded DepthAnything3 model |
| """ |
| |
| image_extensions = {".jpg", ".jpeg", ".png", ".bmp", ".tiff", ".tif"} |
| image_files = [] |
| for root, _, files in os.walk(input_dir): |
| |
| if "__MACOSX" in root: |
| continue |
| for file in files: |
| |
| if file.startswith("._") or file.startswith(".DS_Store"): |
| continue |
| if Path(file).suffix.lower() in image_extensions: |
| image_files.append(os.path.join(root, file)) |
|
|
| if not image_files: |
| print(f"No images found in {input_dir}") |
| return 0 |
|
|
| image_files = sorted(image_files) |
| print(f"Found {len(image_files)} images") |
|
|
| os.makedirs(output_dir, exist_ok=True) |
|
|
| |
| inference_times = [] |
| |
| |
| skipped = 0 |
| for img_path in tqdm(image_files, desc="Processing images"): |
| try: |
| |
| image = Image.open(img_path).convert("RGB") |
| image_np = np.array(image) |
|
|
| |
| inference_start = time.time() |
| with torch.no_grad(): |
| |
| prediction = model.inference([image_np]) |
| depth = prediction.depth[0] |
| inference_time = time.time() - inference_start |
| inference_times.append(inference_time) |
|
|
| |
| output_name = Path(img_path).stem + "_depth.npy" |
| output_path = os.path.join(output_dir, output_name) |
| np.save(output_path, depth) |
| |
| except Exception as e: |
| print(f"\n⚠️ Skipping {Path(img_path).name}: {str(e)}") |
| skipped += 1 |
| continue |
|
|
| |
| if inference_times: |
| total_inference = sum(inference_times) |
| avg_per_image = total_inference / len(inference_times) |
| throughput = 1 / avg_per_image |
| print(f"\n{'='*60}") |
| print("📊 Performance Metrics") |
| print(f"{'='*60}") |
| print(f"Total images found: {len(image_files)}") |
| print(f"Successfully processed: {len(inference_times)}") |
| if skipped > 0: |
| print(f"Skipped (errors): {skipped}") |
| print(f"Total inference time: {total_inference:.2f}s") |
| print(f"Average time per image: {avg_per_image:.3f}s") |
| print(f"Throughput: {throughput:.2f} images/second") |
| print(f"{'='*60}\n") |
|
|
| return len(inference_times) |
|
|
|
|
| def process_zip_to_zip(input_zip: str, output_zip: str, model): |
| """ |
| Process images from input ZIP and create output ZIP with depth maps. |
| |
| Args: |
| input_zip: Path to input ZIP file |
| output_zip: Path to output ZIP file |
| model: Loaded DepthAnything3 model |
| """ |
| |
| temp_dir = "temp_processing" |
| input_dir = os.path.join(temp_dir, "input") |
| output_dir = os.path.join(temp_dir, "output") |
| os.makedirs(input_dir, exist_ok=True) |
| os.makedirs(output_dir, exist_ok=True) |
|
|
| try: |
| |
| print(f"Extracting {input_zip}...") |
| with zipfile.ZipFile(input_zip, "r") as zip_ref: |
| zip_ref.extractall(input_dir) |
|
|
| |
| num_processed = process_images_from_directory(input_dir, output_dir, model) |
|
|
| if num_processed == 0: |
| print("No images were processed") |
| return |
|
|
| |
| print(f"Creating {output_zip}...") |
| with zipfile.ZipFile(output_zip, "w", zipfile.ZIP_DEFLATED) as zipf: |
| for root, _, files in os.walk(output_dir): |
| for file in files: |
| file_path = os.path.join(root, file) |
| arcname = os.path.relpath(file_path, output_dir) |
| zipf.write(file_path, arcname) |
|
|
| print(f"✅ Done! Processed {num_processed} images") |
| print(f"Output saved to: {output_zip}") |
|
|
| finally: |
| |
| if os.path.exists(temp_dir): |
| shutil.rmtree(temp_dir) |
|
|
|
|
| def main(): |
| parser = argparse.ArgumentParser( |
| description="Batch process images for depth estimation", |
| formatter_class=argparse.RawDescriptionHelpFormatter, |
| epilog=""" |
| Examples: |
| # Process ZIP files |
| python simple_batch_process.py input.zip output.zip |
| |
| # Process directories |
| python simple_batch_process.py /path/to/images/ /path/to/output/ |
| |
| # Specify model |
| python simple_batch_process.py input.zip output.zip --model depth-anything/DA3NESTED-GIANT-LARGE |
| """ |
| ) |
|
|
| parser.add_argument( |
| "input", |
| help="Input ZIP file or directory containing images" |
| ) |
| parser.add_argument( |
| "output", |
| help="Output ZIP file or directory for depth maps" |
| ) |
| parser.add_argument( |
| "--model", |
| default="depth-anything/DA3NESTED-GIANT-LARGE", |
| help="Model directory or HuggingFace model ID" |
| ) |
| parser.add_argument( |
| "--device", |
| default="cuda" if torch.cuda.is_available() else "cpu", |
| choices=["cuda", "cpu"], |
| help="Device to run inference on" |
| ) |
|
|
| args = parser.parse_args() |
|
|
| |
| print(f"Loading model from {args.model}...") |
| model = DepthAnything3.from_pretrained(args.model) |
| model = model.to(args.device) |
| model.eval() |
| print(f"Model loaded on {args.device}") |
|
|
| |
| input_is_zip = args.input.endswith(".zip") |
| output_is_zip = args.output.endswith(".zip") |
|
|
| if input_is_zip and output_is_zip: |
| |
| process_zip_to_zip(args.input, args.output, model) |
| elif input_is_zip and not output_is_zip: |
| |
| temp_dir = "temp_extraction" |
| os.makedirs(temp_dir, exist_ok=True) |
| try: |
| print(f"Extracting {args.input}...") |
| with zipfile.ZipFile(args.input, "r") as zip_ref: |
| zip_ref.extractall(temp_dir) |
| num_processed = process_images_from_directory(temp_dir, args.output, model) |
| print(f"✅ Done! Processed {num_processed} images") |
| finally: |
| if os.path.exists(temp_dir): |
| shutil.rmtree(temp_dir) |
| elif not input_is_zip and output_is_zip: |
| |
| temp_output = "temp_output" |
| os.makedirs(temp_output, exist_ok=True) |
| try: |
| num_processed = process_images_from_directory(args.input, temp_output, model) |
| print(f"Creating {args.output}...") |
| with zipfile.ZipFile(args.output, "w", zipfile.ZIP_DEFLATED) as zipf: |
| for root, _, files in os.walk(temp_output): |
| for file in files: |
| file_path = os.path.join(root, file) |
| arcname = os.path.relpath(file_path, temp_output) |
| zipf.write(file_path, arcname) |
| print(f"✅ Done! Processed {num_processed} images") |
| finally: |
| if os.path.exists(temp_output): |
| shutil.rmtree(temp_output) |
| else: |
| |
| num_processed = process_images_from_directory(args.input, args.output, model) |
| print(f"✅ Done! Processed {num_processed} images") |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|