| |
| import os |
| import csv |
| import json |
| import numpy as np |
| import cv2 |
|
|
| from eval_gpt import load_all_json_recursive_with_paths |
| from identify_queue_start_end import identify_start_end_bboxes, load_fpx_from_txt |
|
|
|
|
| GT_FOLDER = "/scratch/ds5725/linefinder/LineFinder/GT_json" |
| IMG_FOLDER = "/scratch/ds5725/linefinder/LineFinder/Images" |
|
|
| DEPTH_DIR = "/scratch/ds5725/linefinder/LineFinder/depth_map" |
| BBOX_ORIENT_DIR = "/scratch/ds5725/linefinder/LineFinder/bbox_orient" |
| FOCAL_TXT = "/scratch/ds5725/linefinder/LineFinder/focal_length_px.txt" |
|
|
| def flatten_and_fix_gt(gt_entry: dict) -> dict: |
| """ |
| Input GT format (nested): |
| gt_entry["end_of_line"]["visible"] |
| gt_entry["end_of_line"]["location_if_visible"] |
| gt_entry["end_of_line"]["direction_to_turn_if_not_visible"] |
| Output (flat, matching prediction keys): |
| end_of_line_visible |
| end_of_line_location_if_visible |
| direction_to_turn_to_see_end_if_not_visible |
| Same for start. |
| |
| Also enforces: |
| visible=="yes" => turn="N/A" |
| visible=="no" => location="N/A" |
| """ |
| def get_nested(side: str, key: str, default=""): |
| obj = gt_entry.get(f"{side}_of_line", {}) |
| if not isinstance(obj, dict): |
| return default |
| return obj.get(key, default) |
|
|
| flat = { |
| "end_of_line_visible": str(get_nested("end", "visible", "")).strip().lower(), |
| "end_of_line_location_if_visible": str(get_nested("end", "location_if_visible", "N/A")).strip().lower(), |
| "direction_to_turn_to_see_end_if_not_visible": str(get_nested("end", "direction_to_turn_if_not_visible", "N/A")).strip().lower(), |
|
|
| "start_of_line_visible": str(get_nested("start", "visible", "")).strip().lower(), |
| "start_of_line_location_if_visible": str(get_nested("start", "location_if_visible", "N/A")).strip().lower(), |
| "direction_to_turn_to_see_start_if_not_visible": str(get_nested("start", "direction_to_turn_if_not_visible", "N/A")).strip().lower(), |
| } |
|
|
| |
| def fix(prefix: str): |
| vis_k = f"{prefix}_of_line_visible" |
| loc_k = f"{prefix}_of_line_location_if_visible" |
| turn_k = f"direction_to_turn_to_see_{prefix}_if_not_visible" |
|
|
| vis = flat.get(vis_k, "") |
| if vis not in ("yes", "no"): |
| return |
|
|
| if vis == "yes": |
| flat[turn_k] = "N/A" |
| valid_locs = {"far left","center left","center","center right","far right"} |
| if flat.get(loc_k, "N/A") not in valid_locs: |
| flat[loc_k] = "N/A" |
| else: |
| flat[loc_k] = "N/A" |
| valid_turns = {"left","right"} |
| if flat.get(turn_k, "N/A") not in valid_turns: |
| flat[turn_k] = "N/A" |
|
|
| fix("end") |
| fix("start") |
|
|
| |
| |
| |
| for k in ["end_of_line_location_if_visible", "start_of_line_location_if_visible", |
| "direction_to_turn_to_see_end_if_not_visible", "direction_to_turn_to_see_start_if_not_visible"]: |
| v = flat.get(k, "N/A") |
| flat[k] = "N/A" if v in ("n/a", "na", "") else v |
|
|
| for k in ["end_of_line_visible", "start_of_line_visible"]: |
| v = flat.get(k, "") |
| flat[k] = "yes" if v == "yes" else ("no" if v == "no" else "") |
|
|
| return flat |
|
|
| def normalize_visibility_fields(gt: dict) -> dict: |
| """ |
| Fix inconsistent GT fields in-place according to the visibility rules. |
| Returns a new dict (copy) with repaired fields. |
| """ |
| gt = dict(gt) |
|
|
| def norm_side(prefix: str): |
| |
| vis_k = f"{prefix}_of_line_visible" |
| loc_k = f"{prefix}_of_line_location_if_visible" |
| turn_k = f"direction_to_turn_to_see_{prefix}_if_not_visible" |
|
|
| vis = str(gt.get(vis_k, "")).strip().lower() |
| if vis not in ("yes", "no"): |
| return |
|
|
| |
| gt[vis_k] = "yes" if vis == "yes" else "no" |
|
|
| if gt[vis_k] == "yes": |
| |
| gt[turn_k] = "N/A" |
|
|
| |
| valid_locs = {"far left", "center left", "center", "center right", "far right"} |
| loc = str(gt.get(loc_k, "N/A")).strip().lower() |
| if loc in valid_locs: |
| |
| gt[loc_k] = loc |
| else: |
| gt[loc_k] = "N/A" |
|
|
| else: |
| |
| gt[loc_k] = "N/A" |
|
|
| |
| valid_turn = {"left", "right"} |
| turn = str(gt.get(turn_k, "N/A")).strip().lower() |
| if turn in valid_turn: |
| gt[turn_k] = turn |
| else: |
| gt[turn_k] = "N/A" |
|
|
| norm_side("end") |
| norm_side("start") |
| return gt |
|
|
| def get_images_with_gt(img_folder, gt_keys): |
| """Same logic as in batch_queue_direction.py: match by basename (no extension).""" |
| matched = [] |
| valid_exts = (".jpg", ".jpeg", ".png", ".webp", ".gif") |
| for root, _, files in os.walk(img_folder): |
| for fname in files: |
| if fname.lower().endswith(valid_exts): |
| key = os.path.splitext(fname)[0] |
| if key in gt_keys: |
| matched.append(os.path.join(root, fname)) |
| return matched |
|
|
|
|
| def _bbox_edge_flags(bbox_xyxy, W, H, margin_px): |
| x1, y1, x2, y2 = [float(v) for v in bbox_xyxy.tolist()] |
| near_left = x1 <= margin_px |
| near_right = x2 >= (W - 1 - margin_px) |
| near_top = y1 <= margin_px |
| near_bottom = y2 >= (H - 1 - margin_px) |
| touches_any = near_left or near_right or near_top or near_bottom |
| return touches_any, near_left, near_right, near_top, near_bottom |
|
|
|
|
| def _location_bucket_from_center_x(cx, W): |
| r = cx / max(W, 1) |
| if r < 0.2: |
| return "far left" |
| elif r < 0.4: |
| return "center left" |
| elif r < 0.6: |
| return "center" |
| elif r < 0.8: |
| return "center right" |
| else: |
| return "far right" |
|
|
|
|
| def endpoint_fields(bbox_xyxy, W, H, margin_px): |
| """ |
| Implements your rule: |
| - if bbox touches/is close to any edge -> not visible |
| - if not visible: turn left if near left edge else right |
| - if visible: location bucket by bbox center x |
| """ |
| x1, y1, x2, y2 = [float(v) for v in bbox_xyxy.tolist()] |
| cx = 0.5 * (x1 + x2) |
|
|
| touches_any, near_left, near_right, near_top, near_bottom = _bbox_edge_flags( |
| bbox_xyxy, W, H, margin_px |
| ) |
|
|
| if touches_any: |
| visible = "no" |
| location = "N/A" |
| turn = "left" if near_left else "right" |
| else: |
| visible = "yes" |
| location = _location_bucket_from_center_x(cx, W) |
| turn = "N/A" |
|
|
| return visible, location, turn |
|
|
|
|
| def process_one_image(img_path, gt_entry, margin_px=10): |
| image_id = os.path.splitext(os.path.basename(img_path))[0] |
|
|
| |
| depth_path = os.path.join(DEPTH_DIR, image_id + ".npy") |
| bbox_path = os.path.join(BBOX_ORIENT_DIR, image_id + "_bboxes.npy") |
| orient_path = os.path.join(BBOX_ORIENT_DIR, image_id + "_orient.npy") |
|
|
| |
| for p in [depth_path, bbox_path, orient_path, FOCAL_TXT]: |
| if not os.path.isfile(p): |
| return None, f"missing:{p}" |
|
|
| |
| img = cv2.imread(img_path) |
| if img is None: |
| return None, "missing-image" |
| H, W = img.shape[:2] |
|
|
| |
| try: |
| f_px = load_fpx_from_txt(FOCAL_TXT, image_id) |
| except Exception as e: |
| return None, f"missing-fpx:{e}" |
|
|
| |
| try: |
| res = identify_start_end_bboxes( |
| image_path=img_path, |
| depth_npy_path=depth_path, |
| bboxes_npy_path=bbox_path, |
| orient_npy_path=orient_path, |
| f_px=f_px, |
| ) |
| except Exception as e: |
| return None, f"fail-identify:{e}" |
|
|
| start_bbox = res["start_bbox_xyxy"] |
| end_bbox = res["end_bbox_xyxy"] |
|
|
| |
| end_visible, end_loc, end_turn = endpoint_fields(end_bbox, W, H, margin_px) |
| start_visible, start_loc, start_turn = endpoint_fields(start_bbox, W, H, margin_px) |
|
|
| pred = { |
| "image_id": image_id, |
| "image_path": img_path, |
|
|
| "end_of_line_visible": end_visible, |
| "end_of_line_location_if_visible": end_loc, |
| "direction_to_turn_to_see_end_if_not_visible": end_turn, |
|
|
| "start_of_line_visible": start_visible, |
| "start_of_line_location_if_visible": start_loc, |
| "direction_to_turn_to_see_start_if_not_visible": start_turn, |
| } |
|
|
| |
| gt = {} |
| if isinstance(gt_entry, dict): |
| for k in [ |
| "end_of_line_visible", |
| "end_of_line_location_if_visible", |
| "direction_to_turn_to_see_end_if_not_visible", |
| "start_of_line_visible", |
| "start_of_line_location_if_visible", |
| "direction_to_turn_to_see_start_if_not_visible", |
| ]: |
| if k in gt_entry: |
| gt[k] = gt_entry[k] |
|
|
| return (pred, gt), "ok" |
|
|
|
|
| def main(): |
| |
| gt_dict, gt_paths = load_all_json_recursive_with_paths(GT_FOLDER) |
| gt_keys = set(gt_dict.keys()) |
| print(f"Loaded {len(gt_keys)} GT JSONs.") |
|
|
| |
| image_paths = get_images_with_gt(IMG_FOLDER, gt_keys) |
| print(f"Found {len(image_paths)} images that have GT JSONs.") |
|
|
| margin_px = 10 |
|
|
| |
| rows = [] |
| correct = {k: 0 for k in [ |
| "end_of_line_visible", |
| "end_of_line_location_if_visible", |
| "direction_to_turn_to_see_end_if_not_visible", |
| "start_of_line_visible", |
| "start_of_line_location_if_visible", |
| "direction_to_turn_to_see_start_if_not_visible", |
| ]} |
| total = {k: 0 for k in correct.keys()} |
|
|
| failures = 0 |
|
|
| for img_path in image_paths: |
| image_id = os.path.splitext(os.path.basename(img_path))[0] |
|
|
| |
| gt_flat = flatten_and_fix_gt(gt_dict.get(image_id, {})) |
|
|
| |
| out, status = process_one_image(img_path, gt_flat, margin_px=margin_px) |
| if status != "ok": |
| failures += 1 |
| rows.append({ |
| "image_id": image_id, |
| "image_path": img_path, |
| "status": status, |
| }) |
| print(f"[WARN] {image_id}: {status}") |
| continue |
|
|
| pred, gt = out |
|
|
| |
| row = dict(pred) |
| row["status"] = "ok" |
|
|
| |
| for k, v in gt.items(): |
| if v != "" and v is not None: |
| row[f"gt_{k}"] = v |
|
|
| |
| for k in correct.keys(): |
| if gt.get(k, "") != "": |
| total[k] += 1 |
| if str(pred[k]).strip().lower() == str(gt[k]).strip().lower(): |
| correct[k] += 1 |
|
|
| rows.append(row) |
| |
| end_vis_wrong = ( |
| gt.get("end_of_line_visible", "") != "" and |
| pred["end_of_line_visible"] != gt["end_of_line_visible"] |
| ) |
|
|
| start_vis_wrong = ( |
| gt.get("start_of_line_visible", "") != "" and |
| pred["start_of_line_visible"] != gt["start_of_line_visible"] |
| ) |
|
|
| if end_vis_wrong or start_vis_wrong: |
| end_loc = pred["end_of_line_location_if_visible"] if pred["end_of_line_visible"] == "yes" else "N/A" |
| gt_end_loc = gt.get("end_of_line_location_if_visible", "N/A") |
|
|
| start_loc = pred["start_of_line_location_if_visible"] if pred["start_of_line_visible"] == "yes" else "N/A" |
| gt_start_loc = gt.get("start_of_line_location_if_visible", "N/A") |
|
|
| print( |
| f"[VIS ERROR] {image_id} | " |
| f"end: pred={pred['end_of_line_visible']} " |
| f"(loc={end_loc}) gt={gt.get('end_of_line_visible')} " |
| f"start: pred={pred['start_of_line_visible']} " |
| f"(loc={start_loc}) gt={gt.get('start_of_line_visible')} " |
| ) |
|
|
| |
| print("\n=== Accuracy (only where GT field exists) ===") |
| for k in correct.keys(): |
| if total[k] == 0: |
| print(f"{k}: N/A (no GT)") |
| else: |
| acc = correct[k] / total[k] |
| print(f"{k}: {acc:.4f} ({correct[k]}/{total[k]})") |
|
|
| print(f"\nFailures: {failures}/{len(image_paths)}") |
|
|
| |
| out_csv = "line_visibility_results.csv" |
| |
| all_cols = set() |
| for r in rows: |
| all_cols.update(r.keys()) |
| all_cols = sorted(all_cols) |
|
|
| with open(out_csv, "w", newline="", encoding="utf-8") as f: |
| w = csv.DictWriter(f, fieldnames=all_cols) |
| w.writeheader() |
| for r in rows: |
| w.writerow(r) |
|
|
| print(f"\nSaved: {out_csv}") |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|