| import os |
| import csv |
| import numpy as np |
| import cv2 |
|
|
| from identify_queue_start_end import identify_start_end_bboxes, load_fpx_from_txt |
|
|
| |
| IMAGE_LIST_TXT = "olivia_luna_image_paths.txt" |
|
|
| DEPTH_DIR = "/scratch/ds5725/linefinder/LineFinder/depth_map" |
| BBOX_ORIENT_DIR = "/scratch/ds5725/linefinder/LineFinder/bbox_orient" |
| FOCAL_TXT = "/scratch/ds5725/linefinder/LineFinder/focal_length_px.txt" |
|
|
| OUTPUT_CSV = "OL_line_visibility.csv" |
|
|
|
|
| |
| def read_image_list(txt_path): |
| with open(txt_path, "r") as f: |
| lines = [l.strip() for l in f.readlines() if l.strip()] |
| return lines |
|
|
|
|
| def _bbox_edge_flags(bbox_xyxy, W, H, margin_px): |
| x1, y1, x2, y2 = [float(v) for v in bbox_xyxy.tolist()] |
| near_left = x1 <= margin_px |
| near_right = x2 >= (W - 1 - margin_px) |
| near_top = y1 <= margin_px |
| near_bottom = y2 >= (H - 1 - margin_px) |
| touches_any = near_left or near_right or near_top or near_bottom |
| return touches_any, near_left, near_right |
|
|
|
|
| def _location_bucket_from_center_x(cx, W): |
| r = cx / max(W, 1) |
| if r < 0.2: |
| return "far left" |
| elif r < 0.4: |
| return "center left" |
| elif r < 0.6: |
| return "center" |
| elif r < 0.8: |
| return "center right" |
| else: |
| return "far right" |
|
|
|
|
| def endpoint_fields(bbox_xyxy, W, H, margin_px): |
| x1, y1, x2, y2 = [float(v) for v in bbox_xyxy.tolist()] |
| cx = 0.5 * (x1 + x2) |
|
|
| touches_any, near_left, near_right = _bbox_edge_flags( |
| bbox_xyxy, W, H, margin_px |
| ) |
|
|
| if touches_any: |
| visible = "no" |
| location = "N/A" |
| turn = "left" if near_left else "right" |
| else: |
| visible = "yes" |
| location = _location_bucket_from_center_x(cx, W) |
| turn = "N/A" |
|
|
| return visible, location, turn |
|
|
|
|
| def process_one_image(img_path, margin_px=10): |
| image_id = os.path.splitext(os.path.basename(img_path))[0] |
|
|
| depth_path = os.path.join(DEPTH_DIR, image_id + ".npy") |
| bbox_path = os.path.join(BBOX_ORIENT_DIR, image_id + "_bboxes.npy") |
| orient_path = os.path.join(BBOX_ORIENT_DIR, image_id + "_orient.npy") |
|
|
| |
| for p in [depth_path, bbox_path, orient_path, FOCAL_TXT]: |
| if not os.path.isfile(p): |
| return None, f"missing:{p}" |
|
|
| img = cv2.imread(img_path) |
| if img is None: |
| return None, "missing-image" |
|
|
| H, W = img.shape[:2] |
|
|
| try: |
| f_px = load_fpx_from_txt(FOCAL_TXT, image_id) |
| except Exception as e: |
| return None, f"missing-fpx:{e}" |
|
|
| try: |
| res = identify_start_end_bboxes( |
| image_path=img_path, |
| depth_npy_path=depth_path, |
| bboxes_npy_path=bbox_path, |
| orient_npy_path=orient_path, |
| f_px=f_px, |
| ) |
| except Exception as e: |
| return None, f"fail-identify:{e}" |
|
|
| start_bbox = res["start_bbox_xyxy"] |
| end_bbox = res["end_bbox_xyxy"] |
|
|
| end_visible, end_loc, end_turn = endpoint_fields(end_bbox, W, H, margin_px) |
| start_visible, start_loc, start_turn = endpoint_fields(start_bbox, W, H, margin_px) |
|
|
| pred = { |
| "image_id": image_id, |
| "image_path": os.path.abspath(img_path), |
|
|
| "end_of_line_visible": end_visible, |
| "end_of_line_location_if_visible": end_loc, |
| "direction_to_turn_to_see_end_if_not_visible": end_turn, |
|
|
| "start_of_line_visible": start_visible, |
| "start_of_line_location_if_visible": start_loc, |
| "direction_to_turn_to_see_start_if_not_visible": start_turn, |
| } |
|
|
| return pred, "ok" |
|
|
|
|
| |
| def main(): |
| margin_px = 10 |
| image_paths = read_image_list(IMAGE_LIST_TXT) |
|
|
| print(f"Loaded {len(image_paths)} image paths") |
|
|
| rows = [] |
| failures = 0 |
|
|
| for img_path in image_paths: |
| pred, status = process_one_image(img_path, margin_px) |
|
|
| if status != "ok": |
| failures += 1 |
| rows.append({ |
| "image_id": os.path.splitext(os.path.basename(img_path))[0], |
| "image_path": img_path, |
| "status": status |
| }) |
| continue |
|
|
| pred["status"] = "ok" |
| rows.append(pred) |
|
|
| cols = [ |
| "image_id", |
| "image_path", |
| "status", |
| "end_of_line_visible", |
| "end_of_line_location_if_visible", |
| "direction_to_turn_to_see_end_if_not_visible", |
| "start_of_line_visible", |
| "start_of_line_location_if_visible", |
| "direction_to_turn_to_see_start_if_not_visible", |
| ] |
|
|
| with open(OUTPUT_CSV, "w", newline="", encoding="utf-8") as f: |
| writer = csv.DictWriter(f, fieldnames=cols) |
| writer.writeheader() |
| for r in rows: |
| for c in cols: |
| r.setdefault(c, "") |
| writer.writerow({c: r[c] for c in cols}) |
|
|
| print(f"\nSaved predictions to {OUTPUT_CSV}") |
| print(f"Failures: {failures}/{len(image_paths)}") |
|
|
|
|
| if __name__ == "__main__": |
| main() |