pf-depth / app.py
jay208's picture
1.0.0
eae62a9
raw
history blame
17.8 kB
import os
import tempfile
import numpy as np
import cv2
import torch
from PIL import Image
from fastapi import FastAPI, File, UploadFile, Form, HTTPException
from fastapi.responses import JSONResponse, HTMLResponse
from transformers import pipeline
from typing import Optional
import json
# Initialize FastAPI app
app = FastAPI(
title="Depth Pro Distance Estimation",
description="Estimate distance and depth using Apple's Depth Pro model",
version="1.0.0",
docs_url="/docs",
redoc_url="/redoc"
)
# Force CPU usage
device = 'cpu'
def initialize_depth_pipeline():
"""Initialize the Depth Pro pipeline"""
try:
print("Initializing Depth Pro pipeline...")
pipe = pipeline(
"depth-estimation",
model="apple/DepthPro",
device=0 if torch.cuda.is_available() else -1, # -1 for CPU
torch_dtype=torch.float32 # Use float32 for CPU compatibility
)
print("Depth Pro pipeline initialized successfully!")
return pipe
except Exception as e:
print(f"Error initializing pipeline: {e}")
print("Falling back to dummy pipeline...")
return None
class DummyDepthPipeline:
"""Dummy pipeline for when the real model fails to load"""
def __call__(self, image):
"""Generate dummy depth prediction"""
if isinstance(image, str):
image = Image.open(image)
elif isinstance(image, np.ndarray):
image = Image.fromarray(image)
width, height = image.size
# Generate a realistic-looking depth map
depth = self._generate_dummy_depth(height, width)
return {"depth": depth}
def _generate_dummy_depth(self, height, width):
"""Generate a dummy depth map that looks realistic"""
# Create depth that decreases from bottom to top (simulating perspective)
y_coords = np.linspace(10.0, 2.0, height) # 10m to 2m depth
depth = np.tile(y_coords[:, np.newaxis], (1, width))
# Add some noise and variation
noise = np.random.normal(0, 0.5, (height, width))
depth += noise
# Ensure positive depths
depth = np.maximum(depth, 0.1)
return depth
class DepthEstimator:
def __init__(self, pipeline=None):
self.device = torch.device('cpu') # Force CPU
print("Initializing Depth Pro estimator...")
self.pipeline = pipeline or DummyDepthPipeline()
print("Depth Pro estimator initialized successfully!")
def estimate_depth(self, image_path):
try:
# Load image
image = Image.open(image_path).convert('RGB')
# Resize image for processing
resized_image, new_size = self.resize_image(image)
# Perform inference using pipeline
result = self.pipeline(resized_image)
# Extract depth map
if isinstance(result, dict) and 'depth' in result:
depth = result['depth']
elif hasattr(result, 'depth'):
depth = result.depth
else:
depth = result
# Convert to numpy if needed
if isinstance(depth, torch.Tensor):
depth = depth.cpu().numpy()
elif not isinstance(depth, np.ndarray):
depth = np.array(depth)
# Estimate focal length (rough estimation)
focal_length_px = 1.2 * max(new_size)
return depth, new_size, focal_length_px
except Exception as e:
print(f"Error in depth estimation: {e}")
return None, None, None
def resize_image(self, image, max_size=1536):
"""Resize image to manageable size"""
if isinstance(image, str):
image = Image.open(image).convert('RGB')
ratio = max_size / max(image.size)
new_size = (int(image.size[0] * ratio), int(image.size[1] * ratio))
resized_image = image.resize(new_size, Image.Resampling.LANCZOS)
return resized_image, new_size
def find_topmost_pixel(image):
"""Find the topmost non-zero pixel in the image (simulating footpath detection)"""
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
# Simple edge detection to find potential footpath boundaries
edges = cv2.Canny(gray, 50, 150)
# Find topmost edge pixel
edge_pixels = np.where(edges > 0)
if len(edge_pixels[0]) == 0:
return None
min_y = np.min(edge_pixels[0])
top_pixels_mask = edge_pixels[0] == min_y
top_x_coords = edge_pixels[1][top_pixels_mask]
center_idx = len(top_x_coords) // 2
return (min_y, top_x_coords[center_idx])
def find_bottommost_pixel(image, topmost_pixel):
"""Find the bottommost pixel in the same column as topmost"""
if topmost_pixel is None:
return None
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
edges = cv2.Canny(gray, 50, 150)
top_y, top_x = topmost_pixel
# Find pixels in the same column
column_pixels = np.where((edges > 0) & (np.arange(edges.shape[1])[None, :] == top_x))
if len(column_pixels[0]) == 0:
# Fallback to bottommost edge pixel
edge_pixels = np.where(edges > 0)
if len(edge_pixels[0]) == 0:
return None
max_y = np.max(edge_pixels[0])
bottom_pixels_mask = edge_pixels[0] == max_y
bottom_x_coords = edge_pixels[1][bottom_pixels_mask]
center_idx = len(bottom_x_coords) // 2
return (max_y, bottom_x_coords[center_idx])
max_y_in_column = np.max(column_pixels[0])
return (max_y_in_column, top_x)
def estimate_real_world_distance(depth_map, topmost_pixel, bottommost_pixel):
"""Estimate real-world distance between two pixels using depth information"""
if topmost_pixel is None or bottommost_pixel is None or depth_map is None:
return None
top_y, top_x = topmost_pixel
bottom_y, bottom_x = bottommost_pixel
# Ensure coordinates are within bounds
if (top_y >= depth_map.shape[0] or top_x >= depth_map.shape[1] or
bottom_y >= depth_map.shape[0] or bottom_x >= depth_map.shape[1]):
return None
topmost_depth = depth_map[top_y, top_x]
bottommost_depth = depth_map[bottom_y, bottom_x]
# Check if depth values are valid
if np.isnan(topmost_depth) or np.isnan(bottommost_depth):
print("Invalid depth values (NaN) found")
return None
distance_meters = float(abs(topmost_depth - bottommost_depth))
print(f"Distance calculation:")
print(f" Topmost pixel: ({top_y}, {top_x}) = {topmost_depth:.3f}m")
print(f" Bottommost pixel: ({bottom_y}, {bottom_x}) = {bottommost_depth:.3f}m")
print(f" Distance: {distance_meters:.3f}m")
return distance_meters
# Initialize depth estimator globally
print("Initializing Depth Pro pipeline...")
depth_pipeline = initialize_depth_pipeline()
depth_estimator = DepthEstimator(depth_pipeline)
@app.get("/health")
async def health_check():
"""Health check endpoint for Docker"""
return {"status": "healthy", "service": "Depth Pro Distance Estimation"}
@app.get("/api")
async def api_info():
"""API information endpoint"""
return {
"message": "Depth Pro Distance Estimation API",
"docs": "/docs",
"health": "/health",
"estimate_endpoint": "/estimate-depth"
}
@app.post("/estimate-depth")
async def estimate_depth_endpoint(file: UploadFile = File(...)):
"""FastAPI endpoint for depth estimation and distance calculation"""
try:
# Save uploaded file temporarily
with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as temp_file:
content = await file.read()
temp_file.write(content)
temp_file_path = temp_file.name
# Load image for pixel detection
image = cv2.imread(temp_file_path)
if image is None:
return JSONResponse(
status_code=400,
content={"error": "Could not load image"}
)
# Estimate depth
depth_map, new_size, focal_length_px = depth_estimator.estimate_depth(temp_file_path)
if depth_map is None:
return JSONResponse(
status_code=500,
content={"error": "Depth estimation failed"}
)
# Resize image to match depth map size
resized_image = cv2.resize(image, new_size)
# Find key pixels
topmost_pixel = find_topmost_pixel(resized_image)
bottommost_pixel = find_bottommost_pixel(resized_image, topmost_pixel)
# Calculate distance
distance_meters = estimate_real_world_distance(depth_map, topmost_pixel, bottommost_pixel)
# Clean up
os.unlink(temp_file_path)
result = {
"depth_map_shape": depth_map.shape,
"focal_length_px": float(focal_length_px) if focal_length_px is not None else None,
"topmost_pixel": [int(topmost_pixel[0]), int(topmost_pixel[1])] if topmost_pixel else None,
"bottommost_pixel": [int(bottommost_pixel[0]), int(bottommost_pixel[1])] if bottommost_pixel else None,
"distance_meters": distance_meters,
"depth_stats": {
"min_depth": float(np.min(depth_map)),
"max_depth": float(np.max(depth_map)),
"mean_depth": float(np.mean(depth_map))
}
}
return JSONResponse(content=result)
except Exception as e:
# Clean up on error
if 'temp_file_path' in locals():
try:
os.unlink(temp_file_path)
except:
pass
return JSONResponse(
status_code=500,
content={"error": str(e)}
)
@app.get("/", response_class=HTMLResponse)
async def root():
"""Root endpoint with simple HTML interface"""
html_content = """
<!DOCTYPE html>
<html>
<head>
<title>Depth Pro Distance Estimation</title>
<style>
body {
font-family: Arial, sans-serif;
max-width: 800px;
margin: 0 auto;
padding: 20px;
background-color: #f5f5f5;
}
.container {
background-color: white;
padding: 30px;
border-radius: 10px;
box-shadow: 0 2px 10px rgba(0,0,0,0.1);
}
h1 {
color: #2c3e50;
text-align: center;
margin-bottom: 10px;
}
.subtitle {
text-align: center;
color: #7f8c8d;
margin-bottom: 30px;
}
.upload-section {
border: 2px dashed #3498db;
border-radius: 10px;
padding: 30px;
text-align: center;
margin: 20px 0;
background-color: #ecf0f1;
}
input[type="file"] {
margin: 20px 0;
padding: 10px;
border: 1px solid #bdc3c7;
border-radius: 5px;
}
button {
background-color: #3498db;
color: white;
padding: 12px 25px;
border: none;
border-radius: 5px;
cursor: pointer;
font-size: 16px;
}
button:hover {
background-color: #2980b9;
}
.results {
margin-top: 20px;
padding: 20px;
border-radius: 5px;
background-color: #e8f5e8;
display: none;
}
.error {
background-color: #ffeaa7;
border-left: 4px solid #fdcb6e;
padding: 10px;
margin: 10px 0;
}
.endpoint-info {
background-color: #74b9ff;
color: white;
padding: 15px;
border-radius: 5px;
margin: 20px 0;
}
.feature {
margin: 10px 0;
padding: 10px;
border-left: 3px solid #3498db;
background-color: #f8f9fa;
}
</style>
</head>
<body>
<div class="container">
<h1>πŸ” Depth Pro Distance Estimation</h1>
<p class="subtitle">Upload an image to estimate depth and calculate distances using Apple's Depth Pro model</p>
<div class="upload-section">
<h3>Upload Image</h3>
<form id="uploadForm" enctype="multipart/form-data">
<input type="file" id="imageFile" name="file" accept="image/*" required>
<br>
<button type="submit">Analyze Image</button>
</form>
<div id="results" class="results">
<h3>Analysis Results:</h3>
<div id="resultsContent"></div>
</div>
</div>
<div class="endpoint-info">
<h3>πŸ”— API Endpoints</h3>
<p><strong>POST /estimate-depth</strong> - Upload image for depth estimation</p>
<p><strong>GET /docs</strong> - API documentation</p>
<p><strong>GET /health</strong> - Health check</p>
</div>
<div class="feature">
<h3>✨ Features</h3>
<ul>
<li>🎯 Monocular depth estimation using Depth Pro</li>
<li>πŸ“ Real-world distance calculation</li>
<li>πŸ–₯️ CPU-optimized processing</li>
<li>πŸš€ Fast inference suitable for real-time use</li>
</ul>
</div>
</div>
<script>
document.getElementById('uploadForm').addEventListener('submit', async function(e) {
e.preventDefault();
const fileInput = document.getElementById('imageFile');
const resultsDiv = document.getElementById('results');
const resultsContent = document.getElementById('resultsContent');
if (!fileInput.files[0]) {
alert('Please select an image file');
return;
}
const formData = new FormData();
formData.append('file', fileInput.files[0]);
try {
resultsContent.innerHTML = '<p>πŸ”„ Processing image...</p>';
resultsDiv.style.display = 'block';
const response = await fetch('/estimate-depth', {
method: 'POST',
body: formData
});
if (response.ok) {
const result = await response.json();
let html = '<h4>πŸ“Š Results:</h4>';
html += `<p><strong>πŸ“ Distance:</strong> ${result.distance_meters ? result.distance_meters.toFixed(3) + ' meters' : 'N/A'}</p>`;
html += `<p><strong>🎯 Focal Length:</strong> ${result.focal_length_px ? result.focal_length_px.toFixed(2) + ' pixels' : 'N/A'}</p>`;
html += `<p><strong>πŸ“Š Depth Map Shape:</strong> ${result.depth_map_shape ? result.depth_map_shape.join(' x ') : 'N/A'}</p>`;
html += `<p><strong>πŸ” Top Pixel:</strong> ${result.topmost_pixel ? `(${result.topmost_pixel[0]}, ${result.topmost_pixel[1]})` : 'N/A'}</p>`;
html += `<p><strong>πŸ”½ Bottom Pixel:</strong> ${result.bottommost_pixel ? `(${result.bottommost_pixel[0]}, ${result.bottommost_pixel[1]})` : 'N/A'}</p>`;
if (result.depth_stats) {
html += '<h4>οΏ½ Depth Statistics:</h4>';
html += `<p><strong>Min Depth:</strong> ${result.depth_stats.min_depth.toFixed(3)}m</p>`;
html += `<p><strong>Max Depth:</strong> ${result.depth_stats.max_depth.toFixed(3)}m</p>`;
html += `<p><strong>Mean Depth:</strong> ${result.depth_stats.mean_depth.toFixed(3)}m</p>`;
}
resultsContent.innerHTML = html;
} else {
const error = await response.json();
resultsContent.innerHTML = `<div class="error">❌ Error: ${error.error || 'Processing failed'}</div>`;
}
} catch (error) {
resultsContent.innerHTML = `<div class="error">❌ Network error: ${error.message}</div>`;
}
});
</script>
</body>
</html>
"""
return HTMLResponse(content=html_content)
def gradio_interface(image):
"""Removed Gradio interface - keeping for backward compatibility"""
return "Gradio interface has been removed. Please use the web interface or API.", None
# FastAPI app is ready to run
if __name__ == "__main__":
import uvicorn
uvicorn.run(
app,
host="0.0.0.0",
port=7860,
log_level="info",
access_log=True
)