|
|
|
|
|
import gradio as gr |
|
|
import cv2 |
|
|
import numpy as np |
|
|
from PIL import Image |
|
|
import io |
|
|
import json |
|
|
from datetime import datetime |
|
|
from typing import List, Tuple, Optional |
|
|
import tempfile |
|
|
import zipfile |
|
|
from pathlib import Path |
|
|
|
|
|
class WarehouseStitcher: |
|
|
"""Production-ready warehouse image stitching pipeline""" |
|
|
|
|
|
def __init__(self): |
|
|
self.version = "1.0.0" |
|
|
self.config = { |
|
|
'feature_extractor': 'SIFT', |
|
|
'matcher': 'BF', |
|
|
'use_clahe': True, |
|
|
'detect_rack_labels': True, |
|
|
'ransac_threshold': 5.0, |
|
|
'min_match_count': 10, |
|
|
} |
|
|
|
|
|
def preprocess_image(self, img: np.ndarray) -> np.ndarray: |
|
|
"""Apply CLAHE and preprocessing""" |
|
|
if len(img.shape) == 3: |
|
|
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) |
|
|
else: |
|
|
gray = img |
|
|
|
|
|
if self.config['use_clahe']: |
|
|
clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8, 8)) |
|
|
gray = clahe.apply(gray) |
|
|
|
|
|
return gray |
|
|
|
|
|
def detect_rack_labels(self, img: np.ndarray) -> List[dict]: |
|
|
"""Detect warehouse rack labels""" |
|
|
labels = [] |
|
|
gray = self.preprocess_image(img) |
|
|
|
|
|
edges = cv2.Canny(gray, 50, 150) |
|
|
contours, _ = cv2.findContours(edges, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) |
|
|
|
|
|
for contour in contours: |
|
|
area = cv2.contourArea(contour) |
|
|
if 500 < area < 50000: |
|
|
x, y, w, h = cv2.boundingRect(contour) |
|
|
aspect_ratio = w / float(h) |
|
|
if 2.0 < aspect_ratio < 8.0: |
|
|
labels.append({ |
|
|
'bbox': (x, y, w, h), |
|
|
'area': area, |
|
|
'center': (x + w//2, y + h//2) |
|
|
}) |
|
|
|
|
|
return labels |
|
|
|
|
|
def extract_features(self, img: np.ndarray) -> Tuple: |
|
|
"""Extract features with selected method""" |
|
|
gray = self.preprocess_image(img) |
|
|
|
|
|
if self.config['feature_extractor'] == 'SIFT': |
|
|
detector = cv2.SIFT_create(nfeatures=2000, contrastThreshold=0.03, edgeThreshold=10) |
|
|
elif self.config['feature_extractor'] == 'ORB': |
|
|
detector = cv2.ORB_create(nfeatures=2000) |
|
|
else: |
|
|
detector = cv2.AKAZE_create() |
|
|
|
|
|
|
|
|
keypoints, descriptors = detector.detectAndCompute(gray, None) |
|
|
keypoints = list(keypoints) if keypoints is not None else [] |
|
|
|
|
|
|
|
|
if self.config['detect_rack_labels']: |
|
|
labels = self.detect_rack_labels(gray) |
|
|
for label in labels: |
|
|
cx, cy = label['center'] |
|
|
keypoints.append(cv2.KeyPoint(float(cx), float(cy), 10)) |
|
|
|
|
|
return keypoints, descriptors, gray |
|
|
|
|
|
def match_features(self, desc1: np.ndarray, desc2: np.ndarray) -> List: |
|
|
"""Match features with Lowe's ratio test""" |
|
|
if desc1 is None or desc2 is None: |
|
|
return [] |
|
|
|
|
|
if self.config['feature_extractor'] == 'ORB': |
|
|
matcher = cv2.BFMatcher(cv2.NORM_HAMMING, crossCheck=False) |
|
|
else: |
|
|
matcher = cv2.BFMatcher(cv2.NORM_L2, crossCheck=False) |
|
|
|
|
|
matches = matcher.knnMatch(desc1, desc2, k=2) |
|
|
|
|
|
good_matches = [] |
|
|
for match_pair in matches: |
|
|
if len(match_pair) == 2: |
|
|
m, n = match_pair |
|
|
if m.distance < 0.75 * n.distance: |
|
|
good_matches.append(m) |
|
|
|
|
|
return good_matches |
|
|
|
|
|
def estimate_homography(self, kp1, kp2, matches): |
|
|
"""Estimate homography with RANSAC""" |
|
|
if len(matches) < self.config['min_match_count']: |
|
|
return None, None, 0.0 |
|
|
|
|
|
src_pts = np.float32([kp1[m.queryIdx].pt for m in matches]).reshape(-1, 1, 2) |
|
|
dst_pts = np.float32([kp2[m.trainIdx].pt for m in matches]).reshape(-1, 1, 2) |
|
|
|
|
|
H, mask = cv2.findHomography(src_pts, dst_pts, cv2.RANSAC, |
|
|
self.config['ransac_threshold']) |
|
|
|
|
|
if H is None: |
|
|
return None, None, 0.0 |
|
|
|
|
|
inliers = np.sum(mask) |
|
|
confidence = inliers / len(matches) |
|
|
|
|
|
return H, mask, confidence |
|
|
|
|
|
def blend_images(self, img1: np.ndarray, img2: np.ndarray, H: np.ndarray) -> np.ndarray: |
|
|
"""Blend images using homography""" |
|
|
h1, w1 = img1.shape[:2] |
|
|
h2, w2 = img2.shape[:2] |
|
|
|
|
|
pts2 = np.float32([[0, 0], [0, h2], [w2, h2], [w2, 0]]).reshape(-1, 1, 2) |
|
|
pts2_transformed = cv2.perspectiveTransform(pts2, H) |
|
|
|
|
|
pts = np.concatenate((pts2_transformed, |
|
|
np.float32([[0, 0], [0, h1], [w1, h1], [w1, 0]]).reshape(-1, 1, 2)), |
|
|
axis=0) |
|
|
|
|
|
[xmin, ymin] = np.int32(pts.min(axis=0).ravel() - 0.5) |
|
|
[xmax, ymax] = np.int32(pts.max(axis=0).ravel() + 0.5) |
|
|
|
|
|
t = [-xmin, -ymin] |
|
|
Ht = np.array([[1, 0, t[0]], [0, 1, t[1]], [0, 0, 1]]) |
|
|
|
|
|
result = cv2.warpPerspective(img2, Ht.dot(H), (xmax - xmin, ymax - ymin)) |
|
|
result[t[1]:h1 + t[1], t[0]:w1 + t[0]] = img1 |
|
|
|
|
|
return result |
|
|
|
|
|
def stitch_images(self, images: List, progress=gr.Progress()) -> Tuple: |
|
|
"""Main stitching pipeline with progress tracking""" |
|
|
if not images or len(images) < 2: |
|
|
return None, "β Error: Please upload at least 2 images", None |
|
|
|
|
|
logs = [] |
|
|
logs.append("=" * 70) |
|
|
logs.append("π INDUSTRIAL WAREHOUSE IMAGE STITCHING PIPELINE") |
|
|
logs.append("=" * 70) |
|
|
logs.append(f"π
Started: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") |
|
|
logs.append(f"πΈ Images to process: {len(images)}") |
|
|
logs.append("") |
|
|
|
|
|
|
|
|
cv_images = [] |
|
|
for i, img in enumerate(images): |
|
|
if isinstance(img, str): |
|
|
img = cv2.imread(img) |
|
|
elif isinstance(img, Image.Image): |
|
|
img = cv2.cvtColor(np.array(img), cv2.COLOR_RGB2BGR) |
|
|
cv_images.append(img) |
|
|
logs.append(f"β Image {i+1}: {img.shape[1]}x{img.shape[0]} pixels") |
|
|
|
|
|
logs.append("") |
|
|
|
|
|
|
|
|
result = cv_images[0] |
|
|
total_matches = 0 |
|
|
total_inliers = 0 |
|
|
|
|
|
for i in range(1, len(cv_images)): |
|
|
|
|
|
progress(i / len(cv_images)) |
|
|
logs.append("-" * 70) |
|
|
logs.append(f"π PROCESSING IMAGE {i+1}/{len(cv_images)}") |
|
|
logs.append("-" * 70) |
|
|
|
|
|
|
|
|
kp1, desc1, _ = self.extract_features(result) |
|
|
kp2, desc2, _ = self.extract_features(cv_images[i]) |
|
|
logs.append(f"π Features: {len(kp1)} β {len(kp2)}") |
|
|
|
|
|
|
|
|
matches = self.match_features(desc1, desc2) |
|
|
total_matches += len(matches) |
|
|
logs.append(f"π Matches: {len(matches)} good matches") |
|
|
|
|
|
if len(matches) < self.config['min_match_count']: |
|
|
logs.append(f"β οΈ WARNING: Only {len(matches)} matches") |
|
|
logs.append(f"βοΈ Skipping image {i+1}") |
|
|
continue |
|
|
|
|
|
|
|
|
H, mask, confidence = self.estimate_homography(kp1, kp2, matches) |
|
|
|
|
|
if H is None: |
|
|
logs.append(f"β ERROR: Failed to compute homography") |
|
|
continue |
|
|
|
|
|
inliers = int(np.sum(mask)) |
|
|
total_inliers += inliers |
|
|
logs.append(f"π Homography: {inliers}/{len(matches)} inliers ({confidence:.1%})") |
|
|
|
|
|
|
|
|
result = self.blend_images(result, cv_images[i], H) |
|
|
logs.append(f"β
Success! New size: {result.shape[1]}x{result.shape[0]}") |
|
|
logs.append("") |
|
|
|
|
|
|
|
|
logs.append("=" * 70) |
|
|
logs.append("π FINAL STATISTICS") |
|
|
logs.append("=" * 70) |
|
|
logs.append(f"β Final Resolution: {result.shape[1]} x {result.shape[0]} pixels") |
|
|
logs.append(f"β Total Matches: {total_matches:,}") |
|
|
logs.append(f"β Total Inliers: {total_inliers:,}") |
|
|
logs.append("=" * 70) |
|
|
|
|
|
|
|
|
result_rgb = cv2.cvtColor(result, cv2.COLOR_BGR2RGB) |
|
|
result_pil = Image.fromarray(result_rgb) |
|
|
|
|
|
|
|
|
with tempfile.NamedTemporaryFile(mode='wb', suffix='.png', delete=False) as f: |
|
|
result_pil.save(f, format='PNG', optimize=True) |
|
|
temp_path = f.name |
|
|
|
|
|
|
|
|
return result_pil, "\n".join(logs), temp_path |
|
|
|
|
|
|
|
|
class PoseGuidedWarehouseStitcher(WarehouseStitcher): |
|
|
"""Enhanced version using drone pose metadata for guided stitching""" |
|
|
|
|
|
def load_metadata_from_file(self, json_path: str) -> dict: |
|
|
"""Load JSON metadata file""" |
|
|
with open(json_path, 'r') as f: |
|
|
return json.load(f) |
|
|
|
|
|
def calculate_relative_motion(self, pose1: dict, pose2: dict) -> dict: |
|
|
"""Calculate relative motion between two poses""" |
|
|
nav1 = pose1['nav_snapshot'] |
|
|
nav2 = pose2['nav_snapshot'] |
|
|
|
|
|
dx = nav2['x'] - nav1['x'] |
|
|
dy = nav2['y'] - nav1['y'] |
|
|
dz = nav2['z'] - nav1['z'] |
|
|
dyaw = nav2['yaw'] - nav1['yaw'] |
|
|
|
|
|
distance = np.sqrt(dx**2 + dy**2 + dz**2) |
|
|
|
|
|
return { |
|
|
'dx': dx, 'dy': dy, 'dz': dz, |
|
|
'dyaw': dyaw, |
|
|
'distance': distance, |
|
|
'avg_height': (abs(nav1['z']) + abs(nav2['z'])) / 2 |
|
|
} |
|
|
|
|
|
def estimate_homography_from_pose(self, motion: dict, img_width: int, img_height: int) -> np.ndarray: |
|
|
"""Estimate initial homography from drone pose data""" |
|
|
focal_length_px = img_width * 0.8 |
|
|
scale = abs(motion['avg_height']) if motion['avg_height'] != 0 else 10.0 |
|
|
|
|
|
tx = (motion['dx'] / scale) * focal_length_px |
|
|
ty = (motion['dy'] / scale) * focal_length_px |
|
|
|
|
|
theta = motion['dyaw'] |
|
|
cos_theta = np.cos(theta) |
|
|
sin_theta = np.sin(theta) |
|
|
|
|
|
H = np.array([ |
|
|
[cos_theta, -sin_theta, tx], |
|
|
[sin_theta, cos_theta, ty], |
|
|
[0, 0, 1] |
|
|
], dtype=np.float64) |
|
|
|
|
|
return H |
|
|
|
|
|
def sort_by_capture_sequence(self, image_paths: List[str], metadata_paths: List[str]) -> Tuple[List, List]: |
|
|
"""Sort images by capture timestamp""" |
|
|
pairs = [] |
|
|
|
|
|
for img_path, meta_path in zip(image_paths, metadata_paths): |
|
|
metadata = self.load_metadata_from_file(meta_path) |
|
|
timestamp = metadata['nav_snapshot']['timestamp_usec'] |
|
|
pairs.append((timestamp, img_path, meta_path, metadata)) |
|
|
|
|
|
pairs.sort(key=lambda x: x[0]) |
|
|
|
|
|
sorted_imgs = [p[1] for p in pairs] |
|
|
sorted_metas = [p[3] for p in pairs] |
|
|
|
|
|
return sorted_imgs, sorted_metas |
|
|
|
|
|
def stitch_with_poses(self, image_paths: List[str], metadata_paths: List[str], |
|
|
progress=gr.Progress()) -> Tuple: |
|
|
"""Main pose-guided stitching pipeline""" |
|
|
|
|
|
if len(image_paths) != len(metadata_paths): |
|
|
return None, "β Error: Number of images and metadata files must match", None |
|
|
|
|
|
if len(image_paths) < 2: |
|
|
return None, "β Error: Need at least 2 images", None |
|
|
|
|
|
logs = [] |
|
|
logs.append("=" * 70) |
|
|
logs.append("π POSE-GUIDED DRONE IMAGE STITCHING PIPELINE") |
|
|
logs.append("=" * 70) |
|
|
logs.append(f"π
Started: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") |
|
|
logs.append(f"πΈ Image pairs to process: {len(image_paths)}") |
|
|
logs.append("") |
|
|
|
|
|
|
|
|
logs.append("π Sorting images by capture sequence...") |
|
|
sorted_imgs, sorted_metas = self.sort_by_capture_sequence(image_paths, metadata_paths) |
|
|
logs.append(f"β Images sorted chronologically") |
|
|
logs.append("") |
|
|
|
|
|
|
|
|
result = cv2.imread(sorted_imgs[0]) |
|
|
logs.append(f"πΈ Image 1: {result.shape[1]}x{result.shape[0]} @ z={sorted_metas[0]['nav_snapshot']['z']:.2f}m") |
|
|
|
|
|
total_matches = 0 |
|
|
total_inliers = 0 |
|
|
|
|
|
|
|
|
for i in range(1, len(sorted_imgs)): |
|
|
|
|
|
progress(i / len(sorted_imgs)) |
|
|
|
|
|
logs.append("-" * 70) |
|
|
logs.append(f"π PROCESSING IMAGE PAIR {i}/{len(sorted_imgs)-1}") |
|
|
logs.append("-" * 70) |
|
|
|
|
|
|
|
|
current_img = cv2.imread(sorted_imgs[i]) |
|
|
logs.append(f"πΈ Image {i+1}: {current_img.shape[1]}x{current_img.shape[0]} @ z={sorted_metas[i]['nav_snapshot']['z']:.2f}m") |
|
|
|
|
|
|
|
|
motion = self.calculate_relative_motion(sorted_metas[i-1], sorted_metas[i]) |
|
|
logs.append(f"π Drone motion: Ξx={motion['dx']:.3f}m, Ξy={motion['dy']:.3f}m, Ξz={motion['dz']:.3f}m") |
|
|
logs.append(f"π§ Yaw change: {np.degrees(motion['dyaw']):.2f}Β°") |
|
|
logs.append(f"π Distance: {motion['distance']:.3f}m") |
|
|
|
|
|
|
|
|
H_initial = self.estimate_homography_from_pose(motion, result.shape[1], result.shape[0]) |
|
|
logs.append(f"π― Initial homography estimated from drone pose") |
|
|
|
|
|
|
|
|
kp1, desc1, _ = self.extract_features(result) |
|
|
kp2, desc2, _ = self.extract_features(current_img) |
|
|
logs.append(f"π Features: {len(kp1)} β {len(kp2)}") |
|
|
|
|
|
|
|
|
matches = self.match_features(desc1, desc2) |
|
|
total_matches += len(matches) |
|
|
logs.append(f"π Matches: {len(matches)} good matches") |
|
|
|
|
|
if len(matches) < self.config['min_match_count']: |
|
|
logs.append(f"β οΈ WARNING: Insufficient matches, using pose-only homography") |
|
|
H_final = H_initial |
|
|
else: |
|
|
|
|
|
H_refined, mask, confidence = self.estimate_homography(kp1, kp2, matches) |
|
|
|
|
|
if H_refined is not None: |
|
|
inliers = int(np.sum(mask)) |
|
|
total_inliers += inliers |
|
|
logs.append(f"π Refined homography: {inliers}/{len(matches)} inliers ({confidence:.1%})") |
|
|
H_final = H_refined |
|
|
else: |
|
|
logs.append(f"β οΈ Feature-based homography failed, using pose estimate") |
|
|
H_final = H_initial |
|
|
|
|
|
|
|
|
result = self.blend_images(result, current_img, H_final) |
|
|
logs.append(f"β
Blended! New size: {result.shape[1]}x{result.shape[0]}") |
|
|
logs.append("") |
|
|
|
|
|
|
|
|
logs.append("=" * 70) |
|
|
logs.append("π FINAL STATISTICS") |
|
|
logs.append("=" * 70) |
|
|
logs.append(f"β Final Resolution: {result.shape[1]} x {result.shape[0]} pixels") |
|
|
logs.append(f"β Total Matches: {total_matches:,}") |
|
|
logs.append(f"β Total Inliers: {total_inliers:,}") |
|
|
logs.append(f"β Completed: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") |
|
|
logs.append("=" * 70) |
|
|
|
|
|
|
|
|
result_rgb = cv2.cvtColor(result, cv2.COLOR_BGR2RGB) |
|
|
result_pil = Image.fromarray(result_rgb) |
|
|
|
|
|
|
|
|
with tempfile.NamedTemporaryFile(mode='wb', suffix='.png', delete=False) as f: |
|
|
result_pil.save(f, format='PNG', optimize=True) |
|
|
temp_path = f.name |
|
|
|
|
|
return result_pil, "\n".join(logs), temp_path |
|
|
|
|
|
|
|
|
def create_demo(): |
|
|
"""Create and configure Gradio interface""" |
|
|
|
|
|
|
|
|
basic_stitcher = WarehouseStitcher() |
|
|
pose_stitcher = PoseGuidedWarehouseStitcher() |
|
|
|
|
|
def process_images_basic(files, feature_type, matcher_type, use_clahe, detect_labels, ransac_thresh): |
|
|
"""Process uploaded images (basic mode)""" |
|
|
if not files or len(files) < 2: |
|
|
return None, "β Please upload at least 2 images for stitching", None |
|
|
|
|
|
|
|
|
file_paths = [] |
|
|
for f in files: |
|
|
if isinstance(f, tuple): |
|
|
|
|
|
file_paths.append(f[0]) |
|
|
else: |
|
|
file_paths.append(f) |
|
|
|
|
|
|
|
|
valid_paths = [] |
|
|
for path in file_paths: |
|
|
try: |
|
|
img = cv2.imread(path) |
|
|
if img is not None: |
|
|
valid_paths.append(path) |
|
|
except: |
|
|
pass |
|
|
|
|
|
if len(valid_paths) < 2: |
|
|
return None, f"β Only {len(valid_paths)} valid images found", None |
|
|
|
|
|
file_paths = valid_paths |
|
|
|
|
|
|
|
|
basic_stitcher.config['feature_extractor'] = feature_type |
|
|
basic_stitcher.config['matcher'] = matcher_type |
|
|
basic_stitcher.config['use_clahe'] = use_clahe |
|
|
basic_stitcher.config['detect_rack_labels'] = detect_labels |
|
|
basic_stitcher.config['ransac_threshold'] = ransac_thresh |
|
|
|
|
|
|
|
|
try: |
|
|
images = [Image.open(f) for f in file_paths] |
|
|
return basic_stitcher.stitch_images(images) |
|
|
except Exception as e: |
|
|
return None, f"β Error: {str(e)}", None |
|
|
|
|
|
def process_zip_with_metadata(zip_file, feature_type, matcher_type, use_clahe, detect_labels, ransac_thresh): |
|
|
"""Process ZIP file containing images and metadata""" |
|
|
if not zip_file: |
|
|
return None, "β Please upload a ZIP file", None |
|
|
|
|
|
|
|
|
if isinstance(zip_file, tuple): |
|
|
zip_path = zip_file[0] |
|
|
else: |
|
|
zip_path = zip_file |
|
|
|
|
|
try: |
|
|
with tempfile.TemporaryDirectory() as tmpdir: |
|
|
tmpdir_path = Path(tmpdir) |
|
|
|
|
|
|
|
|
with zipfile.ZipFile(zip_path, 'r') as zip_ref: |
|
|
zip_ref.extractall(tmpdir_path) |
|
|
|
|
|
|
|
|
image_files = sorted(list(tmpdir_path.rglob('*.jpg')) + |
|
|
list(tmpdir_path.rglob('*.png'))) |
|
|
json_files = sorted(list(tmpdir_path.rglob('*.json'))) |
|
|
|
|
|
if len(image_files) < 2: |
|
|
return None, f"β Found only {len(image_files)} images, need at least 2", None |
|
|
|
|
|
if len(json_files) == 0: |
|
|
return None, "β No JSON metadata files found in ZIP", None |
|
|
|
|
|
|
|
|
|
|
|
image_metadata_pairs = [] |
|
|
|
|
|
for img_file in image_files: |
|
|
img_name = img_file.stem |
|
|
|
|
|
|
|
|
|
|
|
if '_cam' in img_name: |
|
|
parts = img_name.rsplit('_cam', 1) |
|
|
if len(parts) == 2: |
|
|
base_name = parts[0] |
|
|
else: |
|
|
base_name = img_name |
|
|
else: |
|
|
base_name = img_name |
|
|
|
|
|
|
|
|
json_name = base_name + '.json' |
|
|
json_candidates = [j for j in json_files if j.name == json_name] |
|
|
|
|
|
if json_candidates: |
|
|
image_metadata_pairs.append({ |
|
|
'image': str(img_file), |
|
|
'metadata': str(json_candidates[0]), |
|
|
'base_name': base_name, |
|
|
'timestamp': None |
|
|
}) |
|
|
|
|
|
if len(image_metadata_pairs) < 2: |
|
|
return None, f"β Only {len(image_metadata_pairs)} images have matching metadata. Need at least 2.\n\nFound images: {[f.name for f in image_files[:5]]}\nFound JSON: {[j.name for j in json_files[:5]]}", None |
|
|
|
|
|
|
|
|
for pair in image_metadata_pairs: |
|
|
try: |
|
|
with open(pair['metadata'], 'r') as f: |
|
|
metadata = json.load(f) |
|
|
pair['timestamp'] = metadata['nav_snapshot']['timestamp_usec'] |
|
|
except Exception as e: |
|
|
return None, f"β Error reading metadata {pair['metadata']}: {str(e)}", None |
|
|
|
|
|
|
|
|
image_metadata_pairs.sort(key=lambda x: x['timestamp']) |
|
|
|
|
|
|
|
|
image_paths = [p['image'] for p in image_metadata_pairs] |
|
|
metadata_paths = [p['metadata'] for p in image_metadata_pairs] |
|
|
|
|
|
|
|
|
pose_stitcher.config['feature_extractor'] = feature_type |
|
|
pose_stitcher.config['matcher'] = matcher_type |
|
|
pose_stitcher.config['use_clahe'] = use_clahe |
|
|
pose_stitcher.config['detect_rack_labels'] = detect_labels |
|
|
pose_stitcher.config['ransac_threshold'] = ransac_thresh |
|
|
|
|
|
|
|
|
return pose_stitcher.stitch_with_poses(image_paths, metadata_paths) |
|
|
|
|
|
except Exception as e: |
|
|
import traceback |
|
|
return None, f"β Error processing ZIP: {str(e)}\n\n{traceback.format_exc()}", None |
|
|
|
|
|
|
|
|
custom_css = """ |
|
|
.gradio-container { |
|
|
font-family: 'Arial', sans-serif; |
|
|
} |
|
|
.output-image { |
|
|
border: 2px solid #4CAF50; |
|
|
border-radius: 8px; |
|
|
} |
|
|
""" |
|
|
|
|
|
|
|
|
with gr.Blocks(title="Warehouse Image Stitching", theme=gr.themes.Soft(), css=custom_css) as demo: |
|
|
|
|
|
|
|
|
gr.Markdown(""" |
|
|
# π Industrial Warehouse Image Stitching Pipeline |
|
|
|
|
|
<div style="background-color: #f0f8ff; padding: 20px; border-radius: 10px; margin-bottom: 20px;"> |
|
|
<h3>π― Production-Ready Stitching for Warehouse Environments</h3> |
|
|
|
|
|
**Key Features:** |
|
|
- π **Pose-Guided Stitching**: Uses drone navigation data for intelligent alignment |
|
|
- β¨ Handles specular reflections from shrink wrap and metallic surfaces |
|
|
- π·οΈ Detects and uses warehouse rack labels as alignment anchors |
|
|
- π CLAHE preprocessing for enhanced contrast |
|
|
- π― RANSAC-based robust homography estimation |
|
|
</div> |
|
|
""") |
|
|
|
|
|
|
|
|
with gr.Tabs(): |
|
|
|
|
|
with gr.TabItem("πΈ Basic Mode"): |
|
|
gr.Markdown(""" |
|
|
### Upload images directly (no metadata needed) |
|
|
Perfect for general-purpose panoramas and quick stitching. |
|
|
""") |
|
|
|
|
|
with gr.Row(): |
|
|
with gr.Column(scale=1): |
|
|
gr.Markdown("## βοΈ Configuration") |
|
|
|
|
|
feature_type_basic = gr.Radio( |
|
|
choices=['SIFT', 'ORB', 'AKAZE'], |
|
|
value='SIFT', |
|
|
label="π Feature Extractor" |
|
|
) |
|
|
|
|
|
matcher_type_basic = gr.Radio( |
|
|
choices=['BF', 'FLANN'], |
|
|
value='BF', |
|
|
label="π Feature Matcher" |
|
|
) |
|
|
|
|
|
use_clahe_basic = gr.Checkbox( |
|
|
value=True, |
|
|
label="β¨ Enable CLAHE Enhancement" |
|
|
) |
|
|
|
|
|
detect_labels_basic = gr.Checkbox( |
|
|
value=True, |
|
|
label="π·οΈ Detect Rack Labels" |
|
|
) |
|
|
|
|
|
ransac_thresh_basic = gr.Slider( |
|
|
minimum=1.0, |
|
|
maximum=10.0, |
|
|
value=5.0, |
|
|
step=0.5, |
|
|
label="π RANSAC Threshold" |
|
|
) |
|
|
|
|
|
with gr.Column(scale=2): |
|
|
file_input = gr.File( |
|
|
file_count="multiple", |
|
|
file_types=["image"], |
|
|
label="πΈ Upload Warehouse Images (minimum 2)", |
|
|
type="filepath" |
|
|
) |
|
|
|
|
|
process_basic_btn = gr.Button( |
|
|
"π¨ Stitch Images", |
|
|
variant="primary", |
|
|
size="lg" |
|
|
) |
|
|
|
|
|
|
|
|
with gr.TabItem("π Pose-Guided Mode"): |
|
|
gr.Markdown(""" |
|
|
### Upload ZIP file with images + JSON metadata |
|
|
For drone captures with navigation data - more robust and accurate! |
|
|
|
|
|
<div style="background-color: #e3f2fd; padding: 15px; border-radius: 8px; margin: 10px 0;"> |
|
|
<strong>π¦ ZIP Structure Example:</strong> |
|
|
<pre style="background: #fff; padding: 10px; border-radius: 5px;"> |
|
|
dataset.zip |
|
|
βββ image_001.jpg |
|
|
βββ image_001.json (with nav_snapshot) |
|
|
βββ image_002.jpg |
|
|
βββ image_002.json |
|
|
βββ ... |
|
|
</pre> |
|
|
<strong>Or nested folders:</strong> |
|
|
<pre style="background: #fff; padding: 10px; border-radius: 5px;"> |
|
|
dataset.zip |
|
|
βββ images/ |
|
|
β βββ img1.jpg |
|
|
β βββ img2.jpg |
|
|
βββ metadata/ |
|
|
βββ img1.json |
|
|
βββ img2.json |
|
|
</pre> |
|
|
</div> |
|
|
""") |
|
|
|
|
|
with gr.Row(): |
|
|
with gr.Column(scale=1): |
|
|
gr.Markdown("## βοΈ Configuration") |
|
|
|
|
|
feature_type_pose = gr.Radio( |
|
|
choices=['SIFT', 'ORB', 'AKAZE'], |
|
|
value='SIFT', |
|
|
label="π Feature Extractor" |
|
|
) |
|
|
|
|
|
matcher_type_pose = gr.Radio( |
|
|
choices=['BF', 'FLANN'], |
|
|
value='BF', |
|
|
label="π Feature Matcher" |
|
|
) |
|
|
|
|
|
use_clahe_pose = gr.Checkbox( |
|
|
value=True, |
|
|
label="β¨ Enable CLAHE Enhancement" |
|
|
) |
|
|
|
|
|
detect_labels_pose = gr.Checkbox( |
|
|
value=True, |
|
|
label="π·οΈ Detect Rack Labels" |
|
|
) |
|
|
|
|
|
ransac_thresh_pose = gr.Slider( |
|
|
minimum=1.0, |
|
|
maximum=10.0, |
|
|
value=5.0, |
|
|
step=0.5, |
|
|
label="π RANSAC Threshold" |
|
|
) |
|
|
|
|
|
with gr.Column(scale=2): |
|
|
zip_input = gr.File( |
|
|
file_count="single", |
|
|
file_types=[".zip"], |
|
|
label="π¦ Upload ZIP (images + metadata)", |
|
|
type="filepath" |
|
|
) |
|
|
|
|
|
process_pose_btn = gr.Button( |
|
|
"π¨ Stitch with Pose Guidance", |
|
|
variant="primary", |
|
|
size="lg" |
|
|
) |
|
|
|
|
|
|
|
|
gr.Markdown("## π Results") |
|
|
|
|
|
with gr.Row(): |
|
|
with gr.Column(scale=2): |
|
|
output_image = gr.Image( |
|
|
label="πΌοΈ Stitched Panorama", |
|
|
type="pil", |
|
|
height=500, |
|
|
elem_classes=["output-image"] |
|
|
) |
|
|
|
|
|
download_btn = gr.File( |
|
|
label="β¬οΈ Download High-Resolution Result" |
|
|
) |
|
|
|
|
|
with gr.Column(scale=1): |
|
|
logs_output = gr.Textbox( |
|
|
label="π Processing Logs", |
|
|
lines=25, |
|
|
max_lines=35, |
|
|
autoscroll=True, |
|
|
show_copy_button=True |
|
|
) |
|
|
|
|
|
|
|
|
gr.Markdown(""" |
|
|
--- |
|
|
<div style="text-align: center; color: #666;"> |
|
|
<p><strong>Industrial Warehouse Image Stitching Pipeline v1.0.0</strong></p> |
|
|
<p>Powered by OpenCV β’ SIFT β’ RANSAC β’ Pose-Guided Alignment</p> |
|
|
</div> |
|
|
""") |
|
|
|
|
|
|
|
|
process_basic_btn.click( |
|
|
fn=process_images_basic, |
|
|
inputs=[file_input, feature_type_basic, matcher_type_basic, use_clahe_basic, detect_labels_basic, ransac_thresh_basic], |
|
|
outputs=[output_image, logs_output, download_btn], |
|
|
api_name="stitch" |
|
|
) |
|
|
|
|
|
process_pose_btn.click( |
|
|
fn=process_zip_with_metadata, |
|
|
inputs=[zip_input, feature_type_pose, matcher_type_pose, use_clahe_pose, detect_labels_pose, ransac_thresh_pose], |
|
|
outputs=[output_image, logs_output, download_btn], |
|
|
api_name="pose_stitch" |
|
|
) |
|
|
|
|
|
return demo |
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
demo = create_demo() |
|
|
demo.queue(max_size=5) |
|
|
demo.launch( |
|
|
server_name="0.0.0.0", |
|
|
server_port=7860, |
|
|
share=True, |
|
|
show_error=True |
|
|
) |