|
|
import os |
|
|
import tempfile |
|
|
import numpy as np |
|
|
import cv2 |
|
|
import torch |
|
|
from PIL import Image |
|
|
from fastapi import FastAPI, File, UploadFile, Form, HTTPException |
|
|
from fastapi.responses import JSONResponse, HTMLResponse |
|
|
from transformers import pipeline |
|
|
from typing import Optional |
|
|
import json |
|
|
|
|
|
|
|
|
app = FastAPI( |
|
|
title="Depth Pro Distance Estimation", |
|
|
description="Estimate distance and depth using Apple's Depth Pro model", |
|
|
version="1.0.0", |
|
|
docs_url="/docs", |
|
|
redoc_url="/redoc" |
|
|
) |
|
|
|
|
|
|
|
|
device = 'cpu' |
|
|
|
|
|
def initialize_depth_pipeline(): |
|
|
"""Initialize the Depth Pro pipeline""" |
|
|
try: |
|
|
print("Initializing Depth Pro pipeline...") |
|
|
pipe = pipeline( |
|
|
"depth-estimation", |
|
|
model="apple/DepthPro", |
|
|
device=0 if torch.cuda.is_available() else -1, |
|
|
torch_dtype=torch.float32 |
|
|
) |
|
|
print("Depth Pro pipeline initialized successfully!") |
|
|
return pipe |
|
|
except Exception as e: |
|
|
print(f"Error initializing pipeline: {e}") |
|
|
print("Falling back to dummy pipeline...") |
|
|
return None |
|
|
|
|
|
class DummyDepthPipeline: |
|
|
"""Dummy pipeline for when the real model fails to load""" |
|
|
|
|
|
def __call__(self, image): |
|
|
"""Generate dummy depth prediction""" |
|
|
if isinstance(image, str): |
|
|
image = Image.open(image) |
|
|
elif isinstance(image, np.ndarray): |
|
|
image = Image.fromarray(image) |
|
|
|
|
|
width, height = image.size |
|
|
|
|
|
|
|
|
depth = self._generate_dummy_depth(height, width) |
|
|
|
|
|
return {"depth": depth} |
|
|
|
|
|
def _generate_dummy_depth(self, height, width): |
|
|
"""Generate a dummy depth map that looks realistic""" |
|
|
|
|
|
y_coords = np.linspace(10.0, 2.0, height) |
|
|
depth = np.tile(y_coords[:, np.newaxis], (1, width)) |
|
|
|
|
|
|
|
|
noise = np.random.normal(0, 0.5, (height, width)) |
|
|
depth += noise |
|
|
|
|
|
|
|
|
depth = np.maximum(depth, 0.1) |
|
|
|
|
|
return depth |
|
|
|
|
|
class DepthEstimator: |
|
|
def __init__(self, pipeline=None): |
|
|
self.device = torch.device('cpu') |
|
|
print("Initializing Depth Pro estimator...") |
|
|
self.pipeline = pipeline or DummyDepthPipeline() |
|
|
print("Depth Pro estimator initialized successfully!") |
|
|
|
|
|
def estimate_depth(self, image_path): |
|
|
try: |
|
|
|
|
|
image = Image.open(image_path).convert('RGB') |
|
|
|
|
|
|
|
|
resized_image, new_size = self.resize_image(image) |
|
|
|
|
|
|
|
|
result = self.pipeline(resized_image) |
|
|
|
|
|
|
|
|
if isinstance(result, dict) and 'depth' in result: |
|
|
depth = result['depth'] |
|
|
elif hasattr(result, 'depth'): |
|
|
depth = result.depth |
|
|
else: |
|
|
depth = result |
|
|
|
|
|
|
|
|
if isinstance(depth, torch.Tensor): |
|
|
depth = depth.cpu().numpy() |
|
|
elif not isinstance(depth, np.ndarray): |
|
|
depth = np.array(depth) |
|
|
|
|
|
|
|
|
focal_length_px = 1.2 * max(new_size) |
|
|
|
|
|
return depth, new_size, focal_length_px |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error in depth estimation: {e}") |
|
|
return None, None, None |
|
|
|
|
|
def resize_image(self, image, max_size=1536): |
|
|
"""Resize image to manageable size""" |
|
|
if isinstance(image, str): |
|
|
image = Image.open(image).convert('RGB') |
|
|
|
|
|
ratio = max_size / max(image.size) |
|
|
new_size = (int(image.size[0] * ratio), int(image.size[1] * ratio)) |
|
|
resized_image = image.resize(new_size, Image.Resampling.LANCZOS) |
|
|
|
|
|
return resized_image, new_size |
|
|
|
|
|
def find_topmost_pixel(image): |
|
|
"""Find the topmost non-zero pixel in the image (simulating footpath detection)""" |
|
|
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) |
|
|
|
|
|
edges = cv2.Canny(gray, 50, 150) |
|
|
|
|
|
|
|
|
edge_pixels = np.where(edges > 0) |
|
|
if len(edge_pixels[0]) == 0: |
|
|
return None |
|
|
|
|
|
min_y = np.min(edge_pixels[0]) |
|
|
top_pixels_mask = edge_pixels[0] == min_y |
|
|
top_x_coords = edge_pixels[1][top_pixels_mask] |
|
|
center_idx = len(top_x_coords) // 2 |
|
|
return (min_y, top_x_coords[center_idx]) |
|
|
|
|
|
def find_bottommost_pixel(image, topmost_pixel): |
|
|
"""Find the bottommost pixel in the same column as topmost""" |
|
|
if topmost_pixel is None: |
|
|
return None |
|
|
|
|
|
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) |
|
|
edges = cv2.Canny(gray, 50, 150) |
|
|
|
|
|
top_y, top_x = topmost_pixel |
|
|
|
|
|
|
|
|
column_pixels = np.where((edges > 0) & (np.arange(edges.shape[1])[None, :] == top_x)) |
|
|
|
|
|
if len(column_pixels[0]) == 0: |
|
|
|
|
|
edge_pixels = np.where(edges > 0) |
|
|
if len(edge_pixels[0]) == 0: |
|
|
return None |
|
|
max_y = np.max(edge_pixels[0]) |
|
|
bottom_pixels_mask = edge_pixels[0] == max_y |
|
|
bottom_x_coords = edge_pixels[1][bottom_pixels_mask] |
|
|
center_idx = len(bottom_x_coords) // 2 |
|
|
return (max_y, bottom_x_coords[center_idx]) |
|
|
|
|
|
max_y_in_column = np.max(column_pixels[0]) |
|
|
return (max_y_in_column, top_x) |
|
|
|
|
|
def estimate_real_world_distance(depth_map, topmost_pixel, bottommost_pixel): |
|
|
"""Estimate real-world distance between two pixels using depth information""" |
|
|
if topmost_pixel is None or bottommost_pixel is None or depth_map is None: |
|
|
return None |
|
|
|
|
|
top_y, top_x = topmost_pixel |
|
|
bottom_y, bottom_x = bottommost_pixel |
|
|
|
|
|
|
|
|
if (top_y >= depth_map.shape[0] or top_x >= depth_map.shape[1] or |
|
|
bottom_y >= depth_map.shape[0] or bottom_x >= depth_map.shape[1]): |
|
|
return None |
|
|
|
|
|
topmost_depth = depth_map[top_y, top_x] |
|
|
bottommost_depth = depth_map[bottom_y, bottom_x] |
|
|
|
|
|
|
|
|
if np.isnan(topmost_depth) or np.isnan(bottommost_depth): |
|
|
print("Invalid depth values (NaN) found") |
|
|
return None |
|
|
|
|
|
distance_meters = float(abs(topmost_depth - bottommost_depth)) |
|
|
|
|
|
print(f"Distance calculation:") |
|
|
print(f" Topmost pixel: ({top_y}, {top_x}) = {topmost_depth:.3f}m") |
|
|
print(f" Bottommost pixel: ({bottom_y}, {bottom_x}) = {bottommost_depth:.3f}m") |
|
|
print(f" Distance: {distance_meters:.3f}m") |
|
|
|
|
|
return distance_meters |
|
|
|
|
|
|
|
|
print("Initializing Depth Pro pipeline...") |
|
|
depth_pipeline = initialize_depth_pipeline() |
|
|
depth_estimator = DepthEstimator(depth_pipeline) |
|
|
|
|
|
@app.get("/health") |
|
|
async def health_check(): |
|
|
"""Health check endpoint for Docker""" |
|
|
return {"status": "healthy", "service": "Depth Pro Distance Estimation"} |
|
|
|
|
|
@app.get("/api") |
|
|
async def api_info(): |
|
|
"""API information endpoint""" |
|
|
return { |
|
|
"message": "Depth Pro Distance Estimation API", |
|
|
"docs": "/docs", |
|
|
"health": "/health", |
|
|
"estimate_endpoint": "/estimate-depth" |
|
|
} |
|
|
|
|
|
@app.post("/estimate-depth") |
|
|
async def estimate_depth_endpoint(file: UploadFile = File(...)): |
|
|
"""FastAPI endpoint for depth estimation and distance calculation""" |
|
|
try: |
|
|
|
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as temp_file: |
|
|
content = await file.read() |
|
|
temp_file.write(content) |
|
|
temp_file_path = temp_file.name |
|
|
|
|
|
|
|
|
image = cv2.imread(temp_file_path) |
|
|
if image is None: |
|
|
return JSONResponse( |
|
|
status_code=400, |
|
|
content={"error": "Could not load image"} |
|
|
) |
|
|
|
|
|
|
|
|
depth_map, new_size, focal_length_px = depth_estimator.estimate_depth(temp_file_path) |
|
|
|
|
|
if depth_map is None: |
|
|
return JSONResponse( |
|
|
status_code=500, |
|
|
content={"error": "Depth estimation failed"} |
|
|
) |
|
|
|
|
|
|
|
|
resized_image = cv2.resize(image, new_size) |
|
|
|
|
|
|
|
|
topmost_pixel = find_topmost_pixel(resized_image) |
|
|
bottommost_pixel = find_bottommost_pixel(resized_image, topmost_pixel) |
|
|
|
|
|
|
|
|
distance_meters = estimate_real_world_distance(depth_map, topmost_pixel, bottommost_pixel) |
|
|
|
|
|
|
|
|
os.unlink(temp_file_path) |
|
|
|
|
|
result = { |
|
|
"depth_map_shape": depth_map.shape, |
|
|
"focal_length_px": float(focal_length_px) if focal_length_px is not None else None, |
|
|
"topmost_pixel": [int(topmost_pixel[0]), int(topmost_pixel[1])] if topmost_pixel else None, |
|
|
"bottommost_pixel": [int(bottommost_pixel[0]), int(bottommost_pixel[1])] if bottommost_pixel else None, |
|
|
"distance_meters": distance_meters, |
|
|
"depth_stats": { |
|
|
"min_depth": float(np.min(depth_map)), |
|
|
"max_depth": float(np.max(depth_map)), |
|
|
"mean_depth": float(np.mean(depth_map)) |
|
|
} |
|
|
} |
|
|
|
|
|
return JSONResponse(content=result) |
|
|
|
|
|
except Exception as e: |
|
|
|
|
|
if 'temp_file_path' in locals(): |
|
|
try: |
|
|
os.unlink(temp_file_path) |
|
|
except: |
|
|
pass |
|
|
return JSONResponse( |
|
|
status_code=500, |
|
|
content={"error": str(e)} |
|
|
) |
|
|
|
|
|
@app.get("/", response_class=HTMLResponse) |
|
|
async def root(): |
|
|
"""Root endpoint with simple HTML interface""" |
|
|
html_content = """ |
|
|
<!DOCTYPE html> |
|
|
<html> |
|
|
<head> |
|
|
<title>Depth Pro Distance Estimation</title> |
|
|
<style> |
|
|
body { |
|
|
font-family: Arial, sans-serif; |
|
|
max-width: 800px; |
|
|
margin: 0 auto; |
|
|
padding: 20px; |
|
|
background-color: #f5f5f5; |
|
|
} |
|
|
.container { |
|
|
background-color: white; |
|
|
padding: 30px; |
|
|
border-radius: 10px; |
|
|
box-shadow: 0 2px 10px rgba(0,0,0,0.1); |
|
|
} |
|
|
h1 { |
|
|
color: #2c3e50; |
|
|
text-align: center; |
|
|
margin-bottom: 10px; |
|
|
} |
|
|
.subtitle { |
|
|
text-align: center; |
|
|
color: #7f8c8d; |
|
|
margin-bottom: 30px; |
|
|
} |
|
|
.upload-section { |
|
|
border: 2px dashed #3498db; |
|
|
border-radius: 10px; |
|
|
padding: 30px; |
|
|
text-align: center; |
|
|
margin: 20px 0; |
|
|
background-color: #ecf0f1; |
|
|
} |
|
|
input[type="file"] { |
|
|
margin: 20px 0; |
|
|
padding: 10px; |
|
|
border: 1px solid #bdc3c7; |
|
|
border-radius: 5px; |
|
|
} |
|
|
button { |
|
|
background-color: #3498db; |
|
|
color: white; |
|
|
padding: 12px 25px; |
|
|
border: none; |
|
|
border-radius: 5px; |
|
|
cursor: pointer; |
|
|
font-size: 16px; |
|
|
} |
|
|
button:hover { |
|
|
background-color: #2980b9; |
|
|
} |
|
|
.results { |
|
|
margin-top: 20px; |
|
|
padding: 20px; |
|
|
border-radius: 5px; |
|
|
background-color: #e8f5e8; |
|
|
display: none; |
|
|
} |
|
|
.error { |
|
|
background-color: #ffeaa7; |
|
|
border-left: 4px solid #fdcb6e; |
|
|
padding: 10px; |
|
|
margin: 10px 0; |
|
|
} |
|
|
.endpoint-info { |
|
|
background-color: #74b9ff; |
|
|
color: white; |
|
|
padding: 15px; |
|
|
border-radius: 5px; |
|
|
margin: 20px 0; |
|
|
} |
|
|
.feature { |
|
|
margin: 10px 0; |
|
|
padding: 10px; |
|
|
border-left: 3px solid #3498db; |
|
|
background-color: #f8f9fa; |
|
|
} |
|
|
</style> |
|
|
</head> |
|
|
<body> |
|
|
<div class="container"> |
|
|
<h1>π Depth Pro Distance Estimation</h1> |
|
|
<p class="subtitle">Upload an image to estimate depth and calculate distances using Apple's Depth Pro model</p> |
|
|
|
|
|
<div class="upload-section"> |
|
|
<h3>Upload Image</h3> |
|
|
<form id="uploadForm" enctype="multipart/form-data"> |
|
|
<input type="file" id="imageFile" name="file" accept="image/*" required> |
|
|
<br> |
|
|
<button type="submit">Analyze Image</button> |
|
|
</form> |
|
|
|
|
|
<div id="results" class="results"> |
|
|
<h3>Analysis Results:</h3> |
|
|
<div id="resultsContent"></div> |
|
|
</div> |
|
|
</div> |
|
|
|
|
|
<div class="endpoint-info"> |
|
|
<h3>π API Endpoints</h3> |
|
|
<p><strong>POST /estimate-depth</strong> - Upload image for depth estimation</p> |
|
|
<p><strong>GET /docs</strong> - API documentation</p> |
|
|
<p><strong>GET /health</strong> - Health check</p> |
|
|
</div> |
|
|
|
|
|
<div class="feature"> |
|
|
<h3>β¨ Features</h3> |
|
|
<ul> |
|
|
<li>π― Monocular depth estimation using Depth Pro</li> |
|
|
<li>π Real-world distance calculation</li> |
|
|
<li>π₯οΈ CPU-optimized processing</li> |
|
|
<li>π Fast inference suitable for real-time use</li> |
|
|
</ul> |
|
|
</div> |
|
|
</div> |
|
|
|
|
|
<script> |
|
|
document.getElementById('uploadForm').addEventListener('submit', async function(e) { |
|
|
e.preventDefault(); |
|
|
|
|
|
const fileInput = document.getElementById('imageFile'); |
|
|
const resultsDiv = document.getElementById('results'); |
|
|
const resultsContent = document.getElementById('resultsContent'); |
|
|
|
|
|
if (!fileInput.files[0]) { |
|
|
alert('Please select an image file'); |
|
|
return; |
|
|
} |
|
|
|
|
|
const formData = new FormData(); |
|
|
formData.append('file', fileInput.files[0]); |
|
|
|
|
|
try { |
|
|
resultsContent.innerHTML = '<p>π Processing image...</p>'; |
|
|
resultsDiv.style.display = 'block'; |
|
|
|
|
|
const response = await fetch('/estimate-depth', { |
|
|
method: 'POST', |
|
|
body: formData |
|
|
}); |
|
|
|
|
|
if (response.ok) { |
|
|
const result = await response.json(); |
|
|
|
|
|
let html = '<h4>π Results:</h4>'; |
|
|
html += `<p><strong>π Distance:</strong> ${result.distance_meters ? result.distance_meters.toFixed(3) + ' meters' : 'N/A'}</p>`; |
|
|
html += `<p><strong>π― Focal Length:</strong> ${result.focal_length_px ? result.focal_length_px.toFixed(2) + ' pixels' : 'N/A'}</p>`; |
|
|
html += `<p><strong>π Depth Map Shape:</strong> ${result.depth_map_shape ? result.depth_map_shape.join(' x ') : 'N/A'}</p>`; |
|
|
html += `<p><strong>π Top Pixel:</strong> ${result.topmost_pixel ? `(${result.topmost_pixel[0]}, ${result.topmost_pixel[1]})` : 'N/A'}</p>`; |
|
|
html += `<p><strong>π½ Bottom Pixel:</strong> ${result.bottommost_pixel ? `(${result.bottommost_pixel[0]}, ${result.bottommost_pixel[1]})` : 'N/A'}</p>`; |
|
|
|
|
|
if (result.depth_stats) { |
|
|
html += '<h4>οΏ½ Depth Statistics:</h4>'; |
|
|
html += `<p><strong>Min Depth:</strong> ${result.depth_stats.min_depth.toFixed(3)}m</p>`; |
|
|
html += `<p><strong>Max Depth:</strong> ${result.depth_stats.max_depth.toFixed(3)}m</p>`; |
|
|
html += `<p><strong>Mean Depth:</strong> ${result.depth_stats.mean_depth.toFixed(3)}m</p>`; |
|
|
} |
|
|
|
|
|
resultsContent.innerHTML = html; |
|
|
} else { |
|
|
const error = await response.json(); |
|
|
resultsContent.innerHTML = `<div class="error">β Error: ${error.error || 'Processing failed'}</div>`; |
|
|
} |
|
|
} catch (error) { |
|
|
resultsContent.innerHTML = `<div class="error">β Network error: ${error.message}</div>`; |
|
|
} |
|
|
}); |
|
|
</script> |
|
|
</body> |
|
|
</html> |
|
|
""" |
|
|
return HTMLResponse(content=html_content) |
|
|
|
|
|
def gradio_interface(image): |
|
|
"""Removed Gradio interface - keeping for backward compatibility""" |
|
|
return "Gradio interface has been removed. Please use the web interface or API.", None |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
import uvicorn |
|
|
uvicorn.run( |
|
|
app, |
|
|
host="0.0.0.0", |
|
|
port=7860, |
|
|
log_level="info", |
|
|
access_log=True |
|
|
) |
|
|
|