import os import argparse import numpy as np import cv2 from visual_3D import ( load_depth_npy, load_orientations_npy, bbox_centers_to_3d, ransac_line_3d, estimate_queue_forward_direction, ) # ----------------------------- # Core: pick start/end boxes # ----------------------------- def load_fpx_from_txt(txt_path: str, image_id: str) -> float: with open(txt_path, "r") as f: for line in f: if not line.strip(): continue k, v = line.strip().split() if k == image_id: return float(v) raise KeyError(f"fpx not found for image_id={image_id} in {txt_path}") def pick_start_end_indices(points_3d, line_point, queue_forward_dir_3d, inlier_mask=None, gap_scale=3.0, max_gap=None): """ Robust endpoint selection: - Project points onto queue direction -> 1D coordinate t - Sort by t - Split into contiguous segments using neighbor gaps - Take endpoints from the largest segment (the actual queue) START = head (max t), END = tail (min t) gap_scale controls how strict contiguity is: max_gap = gap_scale * median_neighbor_gap (if max_gap is None) """ q = np.asarray(queue_forward_dir_3d, dtype=np.float32) q = q / (np.linalg.norm(q) + 1e-8) t_vals = (points_3d - line_point[None, :]) @ q # (N,) # restrict candidate points if inlier_mask is not None and inlier_mask.shape[0] == points_3d.shape[0]: idx_pool = np.flatnonzero(inlier_mask) else: idx_pool = np.arange(points_3d.shape[0]) if idx_pool.size < 2: i = int(idx_pool[0]) if idx_pool.size == 1 else 0 return i, i, t_vals # sort pool by t pool_t = t_vals[idx_pool] order = np.argsort(pool_t) idx_sorted = idx_pool[order] t_sorted = pool_t[order] gaps = np.diff(t_sorted) if gaps.size == 0: i = int(idx_sorted[0]) return i, i, t_vals # auto threshold based on typical spacing along the line if max_gap is None: med_gap = float(np.median(gaps)) med_gap = max(med_gap, 1e-3) max_gap = gap_scale * med_gap # find contiguous segments where gap <= max_gap breaks = np.where(gaps > max_gap)[0] seg_starts = np.r_[0, breaks + 1] seg_ends = np.r_[breaks, len(t_sorted) - 1] seg_lengths = seg_ends - seg_starts + 1 # pick the largest segment = main queue chain best = int(np.argmax(seg_lengths)) s0, s1 = int(seg_starts[best]), int(seg_ends[best]) chain_idx = idx_sorted[s0:s1 + 1] chain_t = t_sorted[s0:s1 + 1] # endpoints within the chain start_idx = int(chain_idx[np.argmax(chain_t)]) # head end_idx = int(chain_idx[np.argmin(chain_t)]) # tail return start_idx, end_idx, t_vals def identify_start_end_bboxes( image_path: str, depth_npy_path: str, bboxes_npy_path: str, orient_npy_path: str, f_px: float, ransac_num_iters: int = 1000, ransac_dist_thresh: float = 0.8, ransac_min_inliers_ratio: float = 0.3 ): """ Returns a dict with: - start_bbox_xyxy, end_bbox_xyxy (in original bbox file indexing) - start_valid_idx, end_valid_idx (indices into the valid-depth subset) - valid_to_orig_idx (mapping from valid-depth idx -> original bbox idx) - queue_forward_dir_3d, line_point, line_dir, inlier_mask """ depth = load_depth_npy(depth_npy_path) bboxes_all = np.load(bboxes_npy_path).astype(np.float32) orientations_deg_all = load_orientations_npy(orient_npy_path) # (N,) # IMPORTANT: bbox_centers_to_3d filters out invalid depth centers, so we must build a mapping points_3d, centers_uv = bbox_centers_to_3d(bboxes_all, depth, f_px) # Rebuild the valid->orig mapping by re-running the same validity test (mirrors bbox_centers_to_3d) H, W = depth.shape fx = fy = float(f_px) cx = W / 2.0 cy = H / 2.0 valid_to_orig_idx = [] valid_orientations = [] valid_bboxes = [] for i, (x1, y1, x2, y2) in enumerate(bboxes_all): u = int(round((x1 + x2) / 2.0)) v = int(round((y1 + y2) / 2.0)) u = int(np.clip(u, 0, W - 1)) v = int(np.clip(v, 0, H - 1)) Z = float(depth[v, u]) if not np.isfinite(Z) or Z <= 0: continue valid_to_orig_idx.append(i) valid_orientations.append(orientations_deg_all[i]) valid_bboxes.append(bboxes_all[i]) valid_to_orig_idx = np.array(valid_to_orig_idx, dtype=np.int64) valid_orientations = np.array(valid_orientations, dtype=np.float32) valid_bboxes = np.array(valid_bboxes, dtype=np.float32) # orig_N = bboxes_all.shape[0] # valid_set = set(valid_to_orig_idx.tolist()) # dropped = [i for i in range(orig_N) if i not in valid_set] # print(f"Total bboxes: {orig_N}, valid(3D): {len(valid_to_orig_idx)}, dropped: {len(dropped)}") # print("Dropped indices (first 20):", dropped[:20]) if points_3d.shape[0] < 2: raise ValueError("Not enough valid 3D points to fit a queue line.") # Fit line with RANSAC line_point, line_dir, inlier_mask = ransac_line_3d( points_3d, num_iters=ransac_num_iters, dist_thresh=ransac_dist_thresh, min_inliers_ratio=ransac_min_inliers_ratio, ) # Choose sign of direction using orientations (this is your queue direction) queue_forward_dir_3d, score = estimate_queue_forward_direction( line_dir_3d=line_dir, orientations_deg=valid_orientations, inlier_mask=inlier_mask, ) # Pick endpoints along the signed queue direction start_valid_idx, end_valid_idx, t_vals = pick_start_end_indices( points_3d=points_3d, line_point=line_point, queue_forward_dir_3d=queue_forward_dir_3d, inlier_mask=inlier_mask ) start_orig_idx = int(valid_to_orig_idx[start_valid_idx]) end_orig_idx = int(valid_to_orig_idx[end_valid_idx]) return { "score": float(score), "queue_forward_dir_3d": queue_forward_dir_3d, "line_point": line_point, "line_dir": line_dir, "inlier_mask": inlier_mask, "t_vals": t_vals, "start_valid_idx": start_valid_idx, "end_valid_idx": end_valid_idx, "start_orig_idx": start_orig_idx, "end_orig_idx": end_orig_idx, "start_bbox_xyxy": bboxes_all[start_orig_idx], "end_bbox_xyxy": bboxes_all[end_orig_idx], } # ----------------------------- # Optional: visualization # ----------------------------- def visualize_start_end_on_image(image_path: str, start_bbox: np.ndarray, end_bbox: np.ndarray, out_path: str): img = cv2.imread(image_path) if img is None: raise FileNotFoundError(f"Could not read image: {image_path}") def draw_box(im, box, label, color): x1, y1, x2, y2 = [int(round(v)) for v in box.tolist()] cv2.rectangle(im, (x1, y1), (x2, y2), color, 3, lineType=cv2.LINE_AA) cv2.putText(im, label, (x1, max(0, y1 - 8)), cv2.FONT_HERSHEY_SIMPLEX, 0.8, color, 2, lineType=cv2.LINE_AA) # Colors are BGR draw_box(img, start_bbox, "START (head)", (0, 255, 255)) # yellow draw_box(img, end_bbox, "END (tail)", (0, 255, 0)) # green os.makedirs(os.path.dirname(out_path) or ".", exist_ok=True) cv2.imwrite(out_path, img) print(f"Saved visualization: {out_path}") # ----------------------------- # CLI # ----------------------------- def main(): ap = argparse.ArgumentParser() ap.add_argument("--image_id", required=True) ap.add_argument("--root", default="/scratch/ds5725/linefinder/LineFinder") ap.add_argument("--out", default=None) args = ap.parse_args() image_id = args.image_id root = args.root image_path = os.path.join(root, "Images/QueuesInThemeParks", f"{image_id}.jpg") depth_path = os.path.join(root, "depth_map", f"{image_id}.npy") bbox_path = os.path.join(root, "bbox_orient", f"{image_id}_bboxes.npy") orient_path = os.path.join(root, "bbox_orient", f"{image_id}_orient.npy") fpx_path = os.path.join(root, "focal_length_px.txt") f_px = load_fpx_from_txt(fpx_path, image_id) if args.out is None: args.out = f"{image_id}_start_end.jpg" res = identify_start_end_bboxes( image_path=image_path, depth_npy_path=depth_path, bboxes_npy_path=bbox_path, orient_npy_path=orient_path, f_px=f_px, ) print("avg alignment score:", res["score"]) print("start_orig_idx:", res["start_orig_idx"], "start_bbox:", res["start_bbox_xyxy"].tolist()) print("end_orig_idx:", res["end_orig_idx"], "end_bbox:", res["end_bbox_xyxy"].tolist()) if args.out is not None: visualize_start_end_on_image( image_path=image_path, start_bbox=res["start_bbox_xyxy"], end_bbox=res["end_bbox_xyxy"], out_path=args.out, ) if __name__ == "__main__": main()