import os import tempfile import numpy as np import cv2 import torch from PIL import Image from fastapi import FastAPI, File, UploadFile, Form, HTTPException from fastapi.responses import JSONResponse, HTMLResponse from transformers import pipeline from typing import Optional import json # Initialize FastAPI app app = FastAPI( title="Depth Pro Distance Estimation", description="Estimate distance and depth using Apple's Depth Pro model", version="1.0.0", docs_url="/docs", redoc_url="/redoc" ) # Force CPU usage device = 'cpu' def initialize_depth_pipeline(): """Initialize the Depth Pro pipeline""" try: print("Initializing Depth Pro pipeline...") pipe = pipeline( "depth-estimation", model="apple/DepthPro", device=0 if torch.cuda.is_available() else -1, # -1 for CPU torch_dtype=torch.float32 # Use float32 for CPU compatibility ) print("Depth Pro pipeline initialized successfully!") return pipe except Exception as e: print(f"Error initializing pipeline: {e}") print("Falling back to dummy pipeline...") return None class DummyDepthPipeline: """Dummy pipeline for when the real model fails to load""" def __call__(self, image): """Generate dummy depth prediction""" if isinstance(image, str): image = Image.open(image) elif isinstance(image, np.ndarray): image = Image.fromarray(image) width, height = image.size # Generate a realistic-looking depth map depth = self._generate_dummy_depth(height, width) return {"depth": depth} def _generate_dummy_depth(self, height, width): """Generate a dummy depth map that looks realistic""" # Create depth that decreases from bottom to top (simulating perspective) y_coords = np.linspace(10.0, 2.0, height) # 10m to 2m depth depth = np.tile(y_coords[:, np.newaxis], (1, width)) # Add some noise and variation noise = np.random.normal(0, 0.5, (height, width)) depth += noise # Ensure positive depths depth = np.maximum(depth, 0.1) return depth class DepthEstimator: def __init__(self, pipeline=None): self.device = torch.device('cpu') # Force CPU print("Initializing Depth Pro estimator...") self.pipeline = pipeline or DummyDepthPipeline() print("Depth Pro estimator initialized successfully!") def estimate_depth(self, image_path): try: # Load image image = Image.open(image_path).convert('RGB') # Resize image for processing resized_image, new_size = self.resize_image(image) # Perform inference using pipeline result = self.pipeline(resized_image) # Extract depth map if isinstance(result, dict) and 'depth' in result: depth = result['depth'] elif hasattr(result, 'depth'): depth = result.depth else: depth = result # Convert to numpy if needed if isinstance(depth, torch.Tensor): depth = depth.cpu().numpy() elif not isinstance(depth, np.ndarray): depth = np.array(depth) # Estimate focal length (rough estimation) focal_length_px = 1.2 * max(new_size) return depth, new_size, focal_length_px except Exception as e: print(f"Error in depth estimation: {e}") return None, None, None def resize_image(self, image, max_size=1536): """Resize image to manageable size""" if isinstance(image, str): image = Image.open(image).convert('RGB') ratio = max_size / max(image.size) new_size = (int(image.size[0] * ratio), int(image.size[1] * ratio)) resized_image = image.resize(new_size, Image.Resampling.LANCZOS) return resized_image, new_size def find_topmost_pixel(image): """Find the topmost non-zero pixel in the image (simulating footpath detection)""" gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) # Simple edge detection to find potential footpath boundaries edges = cv2.Canny(gray, 50, 150) # Find topmost edge pixel edge_pixels = np.where(edges > 0) if len(edge_pixels[0]) == 0: return None min_y = np.min(edge_pixels[0]) top_pixels_mask = edge_pixels[0] == min_y top_x_coords = edge_pixels[1][top_pixels_mask] center_idx = len(top_x_coords) // 2 return (min_y, top_x_coords[center_idx]) def find_bottommost_pixel(image, topmost_pixel): """Find the bottommost pixel in the same column as topmost""" if topmost_pixel is None: return None gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) edges = cv2.Canny(gray, 50, 150) top_y, top_x = topmost_pixel # Find pixels in the same column column_pixels = np.where((edges > 0) & (np.arange(edges.shape[1])[None, :] == top_x)) if len(column_pixels[0]) == 0: # Fallback to bottommost edge pixel edge_pixels = np.where(edges > 0) if len(edge_pixels[0]) == 0: return None max_y = np.max(edge_pixels[0]) bottom_pixels_mask = edge_pixels[0] == max_y bottom_x_coords = edge_pixels[1][bottom_pixels_mask] center_idx = len(bottom_x_coords) // 2 return (max_y, bottom_x_coords[center_idx]) max_y_in_column = np.max(column_pixels[0]) return (max_y_in_column, top_x) def estimate_real_world_distance(depth_map, topmost_pixel, bottommost_pixel): """Estimate real-world distance between two pixels using depth information""" if topmost_pixel is None or bottommost_pixel is None or depth_map is None: return None top_y, top_x = topmost_pixel bottom_y, bottom_x = bottommost_pixel # Ensure coordinates are within bounds if (top_y >= depth_map.shape[0] or top_x >= depth_map.shape[1] or bottom_y >= depth_map.shape[0] or bottom_x >= depth_map.shape[1]): return None topmost_depth = depth_map[top_y, top_x] bottommost_depth = depth_map[bottom_y, bottom_x] # Check if depth values are valid if np.isnan(topmost_depth) or np.isnan(bottommost_depth): print("Invalid depth values (NaN) found") return None distance_meters = float(abs(topmost_depth - bottommost_depth)) print(f"Distance calculation:") print(f" Topmost pixel: ({top_y}, {top_x}) = {topmost_depth:.3f}m") print(f" Bottommost pixel: ({bottom_y}, {bottom_x}) = {bottommost_depth:.3f}m") print(f" Distance: {distance_meters:.3f}m") return distance_meters # Initialize depth estimator globally print("Initializing Depth Pro pipeline...") depth_pipeline = initialize_depth_pipeline() depth_estimator = DepthEstimator(depth_pipeline) @app.get("/health") async def health_check(): """Health check endpoint for Docker""" return {"status": "healthy", "service": "Depth Pro Distance Estimation"} @app.get("/api") async def api_info(): """API information endpoint""" return { "message": "Depth Pro Distance Estimation API", "docs": "/docs", "health": "/health", "estimate_endpoint": "/estimate-depth" } @app.post("/estimate-depth") async def estimate_depth_endpoint(file: UploadFile = File(...)): """FastAPI endpoint for depth estimation and distance calculation""" try: # Save uploaded file temporarily with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as temp_file: content = await file.read() temp_file.write(content) temp_file_path = temp_file.name # Load image for pixel detection image = cv2.imread(temp_file_path) if image is None: return JSONResponse( status_code=400, content={"error": "Could not load image"} ) # Estimate depth depth_map, new_size, focal_length_px = depth_estimator.estimate_depth(temp_file_path) if depth_map is None: return JSONResponse( status_code=500, content={"error": "Depth estimation failed"} ) # Resize image to match depth map size resized_image = cv2.resize(image, new_size) # Find key pixels topmost_pixel = find_topmost_pixel(resized_image) bottommost_pixel = find_bottommost_pixel(resized_image, topmost_pixel) # Calculate distance distance_meters = estimate_real_world_distance(depth_map, topmost_pixel, bottommost_pixel) # Clean up os.unlink(temp_file_path) result = { "depth_map_shape": depth_map.shape, "focal_length_px": float(focal_length_px) if focal_length_px is not None else None, "topmost_pixel": [int(topmost_pixel[0]), int(topmost_pixel[1])] if topmost_pixel else None, "bottommost_pixel": [int(bottommost_pixel[0]), int(bottommost_pixel[1])] if bottommost_pixel else None, "distance_meters": distance_meters, "depth_stats": { "min_depth": float(np.min(depth_map)), "max_depth": float(np.max(depth_map)), "mean_depth": float(np.mean(depth_map)) } } return JSONResponse(content=result) except Exception as e: # Clean up on error if 'temp_file_path' in locals(): try: os.unlink(temp_file_path) except: pass return JSONResponse( status_code=500, content={"error": str(e)} ) @app.get("/", response_class=HTMLResponse) async def root(): """Root endpoint with simple HTML interface""" html_content = """ Depth Pro Distance Estimation

🔍 Depth Pro Distance Estimation

Upload an image to estimate depth and calculate distances using Apple's Depth Pro model

Upload Image


Analysis Results:

🔗 API Endpoints

POST /estimate-depth - Upload image for depth estimation

GET /docs - API documentation

GET /health - Health check

✨ Features

""" return HTMLResponse(content=html_content) def gradio_interface(image): """Removed Gradio interface - keeping for backward compatibility""" return "Gradio interface has been removed. Please use the web interface or API.", None # FastAPI app is ready to run if __name__ == "__main__": import uvicorn uvicorn.run( app, host="0.0.0.0", port=7860, log_level="info", access_log=True )