| from __future__ import absolute_import |
| from __future__ import division |
| from __future__ import print_function |
|
|
| import logging |
| from typing import List, Tuple, Optional |
| from pathlib import Path |
| import numpy as np |
| from numpy import extract, ndarray, array, float32, uint8 |
| import copy |
|
|
| import cv2 |
|
|
| |
| try: |
| import torch |
| import torch.nn.functional as F |
| TORCH_AVAILABLE = True |
| except ImportError: |
| TORCH_AVAILABLE = False |
| torch = None |
| F = None |
|
|
| |
| bitwise_and = cv2.bitwise_and |
| findHomography = cv2.findHomography |
| warpPerspective = cv2.warpPerspective |
| cvtColor = cv2.cvtColor |
| COLOR_BGR2GRAY = cv2.COLOR_BGR2GRAY |
| threshold = cv2.threshold |
| THRESH_BINARY = cv2.THRESH_BINARY |
| getStructuringElement = cv2.getStructuringElement |
| MORPH_RECT = cv2.MORPH_RECT |
| MORPH_TOPHAT = cv2.MORPH_TOPHAT |
| GaussianBlur = cv2.GaussianBlur |
| morphologyEx = cv2.morphologyEx |
| Canny = cv2.Canny |
| connectedComponents = cv2.connectedComponents |
| perspectiveTransform = cv2.perspectiveTransform |
| RETR_EXTERNAL = cv2.RETR_EXTERNAL |
| CHAIN_APPROX_SIMPLE = cv2.CHAIN_APPROX_SIMPLE |
| findContours = cv2.findContours |
| boundingRect = cv2.boundingRect |
| dilate = cv2.dilate |
|
|
| logger = logging.getLogger(__name__) |
|
|
| |
| |
| TEMPLATE_KEYPOINTS: list[tuple[int, int]] = [ |
| (5, 5), |
| (5, 140), |
| (5, 250), |
| (5, 430), |
| (5, 540), |
| (5, 675), |
| |
| (55, 250), |
| (55, 430), |
| |
| (110, 340), |
| |
| (165, 140), |
| (165, 270), |
| (165, 410), |
| (165, 540), |
| |
| (527, 5), |
| (527, 253), |
| (527, 433), |
| (527, 675), |
| |
| (888, 140), |
| (888, 270), |
| (888, 410), |
| (888, 540), |
| |
| (940, 340), |
| |
| (998, 250), |
| (998, 430), |
| |
| (1045, 5), |
| (1045, 140), |
| (1045, 250), |
| (1045, 430), |
| (1045, 540), |
| (1045, 675), |
| |
| (435, 340), |
| (615, 340), |
| ] |
|
|
| INDEX_KEYPOINT_CORNER_BOTTOM_LEFT = 5 |
| INDEX_KEYPOINT_CORNER_BOTTOM_RIGHT = 29 |
| INDEX_KEYPOINT_CORNER_TOP_LEFT = 0 |
| INDEX_KEYPOINT_CORNER_TOP_RIGHT = 24 |
|
|
|
|
| class InvalidMask(Exception): |
| """Exception raised when mask validation fails.""" |
| pass |
|
|
|
|
| def has_a_wide_line(mask: ndarray, max_aspect_ratio: float = 1.0) -> bool: |
| contours, _ = findContours(mask, RETR_EXTERNAL, CHAIN_APPROX_SIMPLE) |
| for cnt in contours: |
| x, y, w, h = boundingRect(cnt) |
| aspect_ratio = min(w, h) / max(w, h) |
| |
| if aspect_ratio >= max_aspect_ratio: |
| return True |
| return False |
|
|
|
|
| def is_bowtie(points: ndarray) -> bool: |
| def segments_intersect(p1: int, p2: int, q1: int, q2: int) -> bool: |
| def ccw(a: int, b: int, c: int): |
| return (c[1] - a[1]) * (b[0] - a[0]) > (b[1] - a[1]) * (c[0] - a[0]) |
|
|
| return (ccw(p1, q1, q2) != ccw(p2, q1, q2)) and ( |
| ccw(p1, p2, q1) != ccw(p1, p2, q2) |
| ) |
|
|
| pts = points.reshape(-1, 2) |
| edges = [(pts[0], pts[1]), (pts[1], pts[2]), (pts[2], pts[3]), (pts[3], pts[0])] |
| return segments_intersect(*edges[0], *edges[2]) or segments_intersect( |
| *edges[1], *edges[3] |
| ) |
|
|
| def validate_mask_lines(mask: ndarray) -> None: |
| if mask.sum() == 0: |
| raise InvalidMask("No projected lines") |
| if mask.sum() == mask.size: |
| raise InvalidMask("Projected lines cover the entire image surface") |
| if has_a_wide_line(mask=mask): |
| raise InvalidMask("A projected line is too wide") |
|
|
|
|
| def validate_mask_ground(mask: ndarray) -> None: |
| num_labels, _ = connectedComponents(mask) |
| num_distinct_regions = num_labels - 1 |
| if num_distinct_regions > 1: |
| raise InvalidMask( |
| f"Projected ground should be a single object, detected {num_distinct_regions}" |
| ) |
| area_covered = mask.sum() / mask.size |
| if area_covered >= 0.9: |
| raise InvalidMask( |
| f"Projected ground covers more than {area_covered:.2f}% of the image surface which is unrealistic" |
| ) |
|
|
|
|
| def validate_projected_corners( |
| source_keypoints: list[tuple[int, int]], homography_matrix: ndarray |
| ) -> None: |
| src_corners = array( |
| [ |
| source_keypoints[INDEX_KEYPOINT_CORNER_BOTTOM_LEFT], |
| source_keypoints[INDEX_KEYPOINT_CORNER_BOTTOM_RIGHT], |
| source_keypoints[INDEX_KEYPOINT_CORNER_TOP_RIGHT], |
| source_keypoints[INDEX_KEYPOINT_CORNER_TOP_LEFT], |
| ], |
| dtype="float32", |
| )[None, :, :] |
|
|
| warped_corners = perspectiveTransform(src_corners, homography_matrix)[0] |
|
|
| if is_bowtie(warped_corners): |
| raise InvalidMask("Projection twisted!") |
|
|
|
|
| def project_image_using_keypoints( |
| image: ndarray, |
| source_keypoints: List[Tuple[int, int]], |
| destination_keypoints: List[Tuple[int, int]], |
| destination_width: int, |
| destination_height: int, |
| inverse: bool = False, |
| ) -> ndarray: |
| """Project image using homography from source to destination keypoints.""" |
| filtered_src = [] |
| filtered_dst = [] |
|
|
| for src_pt, dst_pt in zip(source_keypoints, destination_keypoints): |
| if dst_pt[0] == 0.0 and dst_pt[1] == 0.0: |
| continue |
| filtered_src.append(src_pt) |
| filtered_dst.append(dst_pt) |
|
|
| if len(filtered_src) < 4: |
| raise ValueError("At least 4 valid keypoints are required for homography.") |
|
|
| source_points = array(filtered_src, dtype=float32) |
| destination_points = array(filtered_dst, dtype=float32) |
|
|
| if inverse: |
| result = findHomography(destination_points, source_points) |
| if result is None: |
| raise ValueError("Failed to compute inverse homography.") |
| H_inv, _ = result |
| return warpPerspective(image, H_inv, (destination_width, destination_height)) |
|
|
| result = findHomography(source_points, destination_points) |
| if result is None: |
| raise ValueError("Failed to compute homography.") |
| H, _ = result |
| projected_image = warpPerspective(image, H, (destination_width, destination_height)) |
|
|
| validate_projected_corners(source_keypoints=source_keypoints, homography_matrix=H) |
| return projected_image |
|
|
|
|
| def extract_masks_for_ground_and_lines( |
| image: ndarray, |
| ) -> Tuple[ndarray, ndarray]: |
| """Extract masks for ground (gray) and lines (white) from template image.""" |
| gray = cvtColor(image, COLOR_BGR2GRAY) |
| _, mask_ground = threshold(gray, 10, 255, THRESH_BINARY) |
| _, mask_lines = threshold(gray, 200, 255, THRESH_BINARY) |
| mask_ground_binary = (mask_ground > 0).astype(uint8) |
| mask_lines_binary = (mask_lines > 0).astype(uint8) |
| validate_mask_ground(mask=mask_ground_binary) |
| validate_mask_lines(mask=mask_lines_binary) |
| return mask_ground_binary, mask_lines_binary |
|
|
|
|
| def extract_masks_for_ground_and_lines_no_validation( |
| image: ndarray, |
| ) -> Tuple[ndarray, ndarray]: |
| """ |
| Extract masks for ground (gray) and lines (white) from template image WITHOUT validation. |
| This is useful for line distribution analysis where exact fitting might create invalid masks |
| but we still want to analyze where lines are located. |
| """ |
| gray = cvtColor(image, COLOR_BGR2GRAY) |
| _, mask_ground = threshold(gray, 10, 255, THRESH_BINARY) |
| _, mask_lines = threshold(gray, 200, 255, THRESH_BINARY) |
| mask_ground_binary = (mask_ground > 0).astype(uint8) |
| mask_lines_binary = (mask_lines > 0).astype(uint8) |
| |
| return mask_ground_binary, mask_lines_binary |
|
|
|
|
| def extract_mask_of_ground_lines_in_image( |
| image: ndarray, |
| ground_mask: ndarray, |
| blur_ksize: int = 5, |
| canny_low: int = 30, |
| canny_high: int = 100, |
| use_tophat: bool = True, |
| dilate_kernel_size: int = 3, |
| dilate_iterations: int = 3, |
| ) -> ndarray: |
| """Extract line mask from image using edge detection on ground region.""" |
| gray = cvtColor(image, COLOR_BGR2GRAY) |
|
|
| if use_tophat: |
| kernel = getStructuringElement(MORPH_RECT, (31, 31)) |
| gray = morphologyEx(gray, MORPH_TOPHAT, kernel) |
|
|
| if blur_ksize and blur_ksize % 2 == 1: |
| gray = GaussianBlur(gray, (blur_ksize, blur_ksize), 0) |
|
|
| image_edges = Canny(gray, canny_low, canny_high) |
| image_edges_on_ground = bitwise_and(image_edges, image_edges, mask=ground_mask) |
|
|
| if dilate_kernel_size > 1: |
| dilate_kernel = getStructuringElement( |
| MORPH_RECT, (dilate_kernel_size, dilate_kernel_size) |
| ) |
| image_edges_on_ground = dilate( |
| image_edges_on_ground, dilate_kernel, iterations=dilate_iterations |
| ) |
|
|
| return (image_edges_on_ground > 0).astype(uint8) |
|
|
|
|
| def evaluate_keypoints_for_frame( |
| template_keypoints: List[Tuple[int, int]], |
| frame_keypoints: List[Tuple[int, int]], |
| frame: ndarray, |
| floor_markings_template: ndarray, |
| ) -> float: |
| """ |
| Evaluate keypoint accuracy for a single frame. |
| |
| Returns score between 0.0 and 1.0 based on overlap between |
| projected template lines and detected lines in frame. |
| """ |
| try: |
| warped_template = project_image_using_keypoints( |
| image=floor_markings_template, |
| source_keypoints=template_keypoints, |
| destination_keypoints=frame_keypoints, |
| destination_width=frame.shape[1], |
| destination_height=frame.shape[0], |
| ) |
|
|
| mask_ground, mask_lines_expected = extract_masks_for_ground_and_lines( |
| image=warped_template |
| ) |
|
|
| mask_lines_predicted = extract_mask_of_ground_lines_in_image( |
| image=frame, ground_mask=mask_ground |
| ) |
|
|
| pixels_overlapping = bitwise_and( |
| mask_lines_expected, mask_lines_predicted |
| ).sum() |
|
|
| pixels_on_lines = mask_lines_expected.sum() |
|
|
| score = pixels_overlapping / (pixels_on_lines + 1e-8) |
| |
| return min(1.0, max(0.0, score)) |
|
|
| except (InvalidMask, ValueError) as e: |
| print(f'InvalidMask or ValueError in keypoint evaluation: {e}') |
| return 0.0 |
| except Exception as e: |
| print(f'Unexpected error in keypoint evaluation: {e}') |
| return 0.0 |
|
|
| def warp_image_pytorch( |
| image: ndarray, |
| homography_matrix: ndarray, |
| output_width: int, |
| output_height: int, |
| device: str = "cuda", |
| ) -> ndarray: |
| """ |
| Warp image using PyTorch (GPU-accelerated) instead of cv2.warpPerspective. |
| |
| Args: |
| image: Input image to warp (H, W, C) numpy array |
| homography_matrix: 3x3 homography matrix |
| output_width: Output image width |
| output_height: Output image height |
| device: "cuda" or "cpu" |
| |
| Returns: |
| Warped image as numpy array |
| """ |
| if not TORCH_AVAILABLE: |
| |
| return warpPerspective(image, homography_matrix, (output_width, output_height)) |
| |
| |
| if device == "cuda" and (not torch.cuda.is_available()): |
| device = "cpu" |
| |
| try: |
| |
| image_tensor = torch.from_numpy(image).to(device).float() |
| H = torch.from_numpy(homography_matrix).to(device).float() |
| |
| |
| h, w = image.shape[:2] |
| if len(image.shape) == 2: |
| |
| image_tensor = image_tensor.unsqueeze(2) |
| channels = 1 |
| else: |
| channels = image.shape[2] |
| |
| |
| y_coords, x_coords = torch.meshgrid( |
| torch.arange(0, output_height, device=device, dtype=torch.float32), |
| torch.arange(0, output_width, device=device, dtype=torch.float32), |
| indexing='ij' |
| ) |
| |
| |
| ones = torch.ones_like(x_coords) |
| coords = torch.stack([x_coords.flatten(), y_coords.flatten(), ones.flatten()], dim=0) |
| H_inv = torch.inverse(H) |
| src_coords = H_inv @ coords |
| src_coords = src_coords[:2] / (src_coords[2:3] + 1e-8) |
| |
| |
| src_x = src_coords[0].reshape(output_height, output_width) |
| src_y = src_coords[1].reshape(output_height, output_width) |
| |
| |
| src_x_norm = 2.0 * src_x / (w - 1) - 1.0 |
| src_y_norm = 2.0 * src_y / (h - 1) - 1.0 |
| grid = torch.stack([src_x_norm, src_y_norm], dim=-1).unsqueeze(0) |
| |
| |
| image_batch = image_tensor.permute(2, 0, 1).unsqueeze(0) |
| |
| |
| warped = F.grid_sample( |
| image_batch, grid, mode='bilinear', padding_mode='zeros', align_corners=True |
| ) |
| |
| |
| warped = warped.squeeze(0).permute(1, 2, 0) |
| |
| |
| if channels == 1: |
| warped = warped.squeeze(2) |
| |
| |
| warped_np = warped.cpu().numpy().clip(0, 255).astype(np.uint8) |
| return warped_np |
| |
| except Exception as e: |
| logger.error(f"PyTorch warping failed: {e}, falling back to OpenCV") |
| return warpPerspective(image, homography_matrix, (output_width, output_height)) |
|
|
|
|
| def evaluate_keypoints_for_frame_gpu( |
| template_keypoints: List[Tuple[int, int]], |
| frame_keypoints: List[Tuple[int, int]], |
| frame: ndarray, |
| floor_markings_template: ndarray, |
| device: str = "cuda", |
| ) -> float: |
| """ |
| GPU-accelerated keypoint evaluation using PyTorch for warping. |
| |
| This function uses PyTorch's grid_sample for GPU-accelerated image warping |
| instead of cv2.warpPerspective, making it compatible with PyTorch CUDA. |
| |
| Args: |
| template_keypoints: Template keypoint coordinates |
| frame_keypoints: Frame keypoint coordinates |
| frame: Input frame image |
| floor_markings_template: Template image |
| device: "cuda" or "cpu" (auto-detects if CUDA available) |
| |
| Returns: |
| Score between 0.0 and 1.0 |
| """ |
| if not TORCH_AVAILABLE: |
| |
| return evaluate_keypoints_for_frame( |
| template_keypoints, frame_keypoints, frame, floor_markings_template |
| ) |
| |
| |
| if device == "cuda" and not torch.cuda.is_available(): |
| device = "cpu" |
| |
| try: |
| |
| filtered_src = [] |
| filtered_dst = [] |
| for src_pt, dst_pt in zip(template_keypoints, frame_keypoints): |
| if dst_pt[0] == 0.0 and dst_pt[1] == 0.0: |
| continue |
| filtered_src.append(src_pt) |
| filtered_dst.append(dst_pt) |
| |
| if len(filtered_src) < 4: |
| return 0.0 |
| |
| source_points = array(filtered_src, dtype=float32) |
| destination_points = array(filtered_dst, dtype=float32) |
| result = findHomography(source_points, destination_points) |
| if result is None: |
| return 0.0 |
| H, _ = result |
| |
| |
| src_corners = array([ |
| template_keypoints[INDEX_KEYPOINT_CORNER_BOTTOM_LEFT], |
| template_keypoints[INDEX_KEYPOINT_CORNER_BOTTOM_RIGHT], |
| template_keypoints[INDEX_KEYPOINT_CORNER_TOP_RIGHT], |
| template_keypoints[INDEX_KEYPOINT_CORNER_TOP_LEFT], |
| ], dtype=float32)[None, :, :] |
| warped_corners = perspectiveTransform(src_corners, H)[0] |
| if is_bowtie(warped_corners): |
| return 0.0 |
| |
| |
| h, w = frame.shape[:2] |
| warped_template = warp_image_pytorch( |
| floor_markings_template, |
| H, |
| w, |
| h, |
| device=device |
| ) |
| |
| |
| mask_ground, mask_lines_expected = extract_masks_for_ground_and_lines( |
| image=warped_template |
| ) |
| |
| mask_lines_predicted = extract_mask_of_ground_lines_in_image( |
| image=frame, ground_mask=mask_ground |
| ) |
| |
| |
| pixels_overlapping = bitwise_and( |
| mask_lines_expected, mask_lines_predicted |
| ).sum() |
| |
| pixels_on_lines = mask_lines_expected.sum() |
| |
| score = pixels_overlapping / (pixels_on_lines + 1e-8) |
| return min(1.0, max(0.0, score)) |
| |
| except (InvalidMask, ValueError) as e: |
| logger.debug(f"Keypoint evaluation failed: {e}") |
| return 0.0 |
| except Exception as e: |
| logger.error(f"GPU evaluation failed: {e}, falling back to CPU") |
| return evaluate_keypoints_for_frame( |
| template_keypoints, frame_keypoints, frame, floor_markings_template |
| ) |
|
|
|
|
| |
| _template_gpumat_cache = None |
| _template_cache_key = None |
| _cuda_available_cache = None |
| _cuda_module_cache = None |
| _frame_gpumat_reusable = None |
| _frame_gpumat_size = None |
|
|
| def evaluate_keypoints_for_frame_opencv_cuda( |
| template_keypoints: List[Tuple[int, int]], |
| frame_keypoints: List[Tuple[int, int]], |
| frame: ndarray, |
| floor_markings_template: ndarray, |
| device: str = "cuda", |
| ) -> float: |
| """ |
| GPU-accelerated version using OpenCV CUDA (if available). |
| Falls back to CPU if CUDA not available. |
| |
| Note: opencv-python-headless doesn't include CUDA support, so this will |
| always fall back to CPU. Use evaluate_keypoints_for_frame_gpu for PyTorch GPU acceleration. |
| |
| Optimizations: |
| - Template GpuMat is cached to avoid re-uploading |
| - CUDA availability check is cached |
| - Frame GpuMat is reused when frame size matches |
| - Keypoint filtering optimized with list comprehension |
| |
| Args: |
| device: Ignored (kept for compatibility). OpenCV CUDA check is automatic. |
| """ |
| global _template_gpumat_cache, _template_cache_key |
| global _cuda_available_cache, _cuda_module_cache, _frame_gpumat_reusable, _frame_gpumat_size |
| |
| |
| if _cuda_available_cache is None: |
| cuda_available = False |
| cuda = None |
| try: |
| import cv2.cuda as cuda |
| |
| if hasattr(cuda, 'warpPerspective'): |
| |
| try: |
| test_mat = cuda.GpuMat() |
| test_mat.upload(np.zeros((10, 10, 3), dtype=np.uint8)) |
| cuda_available = True |
| except (AttributeError, Exception): |
| |
| cuda_available = False |
| except (ImportError, AttributeError): |
| cuda_available = False |
| |
| _cuda_available_cache = cuda_available |
| _cuda_module_cache = cuda |
| else: |
| cuda_available = _cuda_available_cache |
| cuda = _cuda_module_cache |
| |
| |
| |
| if not cuda_available: |
| |
| return evaluate_keypoints_for_frame( |
| template_keypoints, frame_keypoints, frame, floor_markings_template |
| ) |
| |
| |
| try: |
| |
| |
| template_shape = floor_markings_template.shape |
| |
| checksum = ( |
| int(floor_markings_template[0, 0].sum()) + |
| int(floor_markings_template[0, -1].sum()) + |
| int(floor_markings_template[-1, 0].sum()) + |
| int(floor_markings_template[-1, -1].sum()) |
| ) |
| current_cache_key = (template_shape, checksum) |
| |
| |
| if _template_gpumat_cache is None or _template_cache_key != current_cache_key: |
| |
| _template_gpumat_cache = cuda.GpuMat() |
| _template_gpumat_cache.upload(floor_markings_template) |
| _template_cache_key = current_cache_key |
| |
| |
| h, w = frame.shape[:2] |
| frame_shape = (h, w) |
| if _frame_gpumat_reusable is None or _frame_gpumat_size != frame_shape: |
| _frame_gpumat_reusable = cuda.GpuMat() |
| _frame_gpumat_size = frame_shape |
| gpu_frame = _frame_gpumat_reusable |
| gpu_frame.upload(frame) |
| |
| |
| gpu_template = _template_gpumat_cache |
| |
| |
| filtered_pairs = [(src_pt, dst_pt) for src_pt, dst_pt in zip(template_keypoints, frame_keypoints) |
| if not (dst_pt[0] == 0.0 and dst_pt[1] == 0.0)] |
| |
| if len(filtered_pairs) < 4: |
| return 0.0 |
| |
| |
| filtered_src, filtered_dst = zip(*filtered_pairs) |
| |
| |
| source_points = array(filtered_src, dtype=float32) |
| destination_points = array(filtered_dst, dtype=float32) |
| result = findHomography(source_points, destination_points) |
| if result is None: |
| return 0.0 |
| H, _ = result |
| |
| |
| gpu_warped = cuda.warpPerspective(gpu_template, H, (w, h)) |
| |
| |
| warped_template = gpu_warped.download() |
| |
| |
| mask_ground, mask_lines_expected = extract_masks_for_ground_and_lines(warped_template) |
| mask_lines_predicted = extract_mask_of_ground_lines_in_image(frame, mask_ground) |
| |
| |
| pixels_overlapping = bitwise_and(mask_lines_expected, mask_lines_predicted).sum() |
| pixels_on_lines = mask_lines_expected.sum() |
| score = pixels_overlapping / (pixels_on_lines + 1e-8) |
| return min(1.0, max(0.0, score)) |
| |
| except Exception as e: |
| logger.error(f"OpenCV CUDA evaluation failed: {e}, falling back to CPU") |
| return evaluate_keypoints_for_frame( |
| template_keypoints, frame_keypoints, frame, floor_markings_template |
| ) |
|
|
| def evaluate_keypoints_batch_gpu( |
| template_keypoints: List[Tuple[int, int]], |
| frame_keypoints_list: List[List[Tuple[int, int]]], |
| frames: List[ndarray], |
| floor_markings_template: ndarray, |
| device: str = "cuda", |
| ) -> List[float]: |
| """ |
| Batch GPU-accelerated keypoint evaluation for multiple frames simultaneously. |
| |
| This function processes multiple frames in parallel using PyTorch batch operations, |
| which is much faster than evaluating frames one-by-one. |
| |
| Args: |
| template_keypoints: Template keypoint coordinates (same for all frames) |
| frame_keypoints_list: List of frame keypoint coordinates (one per frame) |
| frames: List of frame images (numpy arrays) |
| floor_markings_template: Template image |
| device: "cuda" or "cpu" |
| |
| Returns: |
| List of scores (one per frame) between 0.0 and 1.0 |
| """ |
| if not TORCH_AVAILABLE: |
| |
| return [ |
| evaluate_keypoints_for_frame( |
| template_keypoints, kp, frame, floor_markings_template |
| ) |
| for kp, frame in zip(frame_keypoints_list, frames) |
| ] |
| |
| |
| if device == "cuda" and not torch.cuda.is_available(): |
| device = "cpu" |
| |
| batch_size = len(frames) |
| if batch_size == 0: |
| return [] |
| |
| |
| h, w = frames[0].shape[:2] |
| |
| try: |
| |
| homographies = [] |
| valid_indices = [] |
| |
| for idx, (frame_keypoints, frame) in enumerate(zip(frame_keypoints_list, frames)): |
| |
| filtered_pairs = [(src_pt, dst_pt) for src_pt, dst_pt in zip(template_keypoints, frame_keypoints) |
| if not (dst_pt[0] == 0.0 and dst_pt[1] == 0.0)] |
| |
| if len(filtered_pairs) < 4: |
| continue |
| |
| filtered_src, filtered_dst = zip(*filtered_pairs) |
| source_points = array(filtered_src, dtype=float32) |
| destination_points = array(filtered_dst, dtype=float32) |
| result = findHomography(source_points, destination_points) |
| if result is None: |
| continue |
| H, _ = result |
| |
| |
| src_corners = array([ |
| template_keypoints[INDEX_KEYPOINT_CORNER_BOTTOM_LEFT], |
| template_keypoints[INDEX_KEYPOINT_CORNER_BOTTOM_RIGHT], |
| template_keypoints[INDEX_KEYPOINT_CORNER_TOP_RIGHT], |
| template_keypoints[INDEX_KEYPOINT_CORNER_TOP_LEFT], |
| ], dtype=float32)[None, :, :] |
| warped_corners = perspectiveTransform(src_corners, H)[0] |
| if not is_bowtie(warped_corners): |
| homographies.append(H) |
| valid_indices.append(idx) |
| |
| if len(homographies) == 0: |
| return [0.0] * batch_size |
| |
| |
| template_tensor = torch.from_numpy(floor_markings_template).to(device).float() |
| t_h, t_w = floor_markings_template.shape[:2] |
| |
| if len(floor_markings_template.shape) == 2: |
| template_tensor = template_tensor.unsqueeze(2) |
| t_channels = 1 |
| else: |
| t_channels = floor_markings_template.shape[2] |
| |
| |
| template_batch = template_tensor.permute(2, 0, 1).unsqueeze(0).repeat(len(homographies), 1, 1, 1) |
| |
| |
| y_coords, x_coords = torch.meshgrid( |
| torch.arange(0, h, device=device, dtype=torch.float32), |
| torch.arange(0, w, device=device, dtype=torch.float32), |
| indexing='ij' |
| ) |
| ones = torch.ones_like(x_coords) |
| coords = torch.stack([x_coords.flatten(), y_coords.flatten(), ones.flatten()], dim=0) |
| |
| |
| H_tensors = torch.from_numpy(np.stack(homographies)).to(device).float() |
| H_inv_batch = torch.inverse(H_tensors) |
| |
| |
| coords_expanded = coords.unsqueeze(0).expand(len(homographies), -1, -1) |
| src_coords_batch = torch.bmm(H_inv_batch, coords_expanded) |
| src_coords_batch = src_coords_batch[:, :2] / (src_coords_batch[:, 2:3] + 1e-8) |
| |
| |
| src_x_batch = src_coords_batch[:, 0].reshape(len(homographies), h, w) |
| src_y_batch = src_coords_batch[:, 1].reshape(len(homographies), h, w) |
| src_x_norm = 2.0 * src_x_batch / (t_w - 1) - 1.0 |
| src_y_norm = 2.0 * src_y_batch / (t_h - 1) - 1.0 |
| grid_batch = torch.stack([src_x_norm, src_y_norm], dim=-1) |
| |
| |
| warped_batch = F.grid_sample( |
| template_batch, grid_batch, mode='bilinear', padding_mode='zeros', align_corners=True |
| ) |
| |
| |
| warped_batch = warped_batch.permute(0, 2, 3, 1) |
| if t_channels == 1: |
| warped_batch = warped_batch.squeeze(3) |
| warped_templates = warped_batch.cpu().numpy().clip(0, 255).astype(np.uint8) |
| |
| |
| scores = [0.0] * batch_size |
| |
| |
| warped_templates_tensor = torch.from_numpy(warped_templates).to(device).float() |
| frames_tensor = torch.from_numpy(np.stack([frames[i] for i in valid_indices])).to(device).float() |
| |
| |
| |
| if len(warped_templates_tensor.shape) == 4: |
| gray_templates = (warped_templates_tensor[:, :, :, 0] * 0.299 + |
| warped_templates_tensor[:, :, :, 1] * 0.587 + |
| warped_templates_tensor[:, :, :, 2] * 0.114) |
| else: |
| gray_templates = warped_templates_tensor |
| |
| |
| mask_ground_batch = (gray_templates > 10.0).float() |
| mask_lines_expected_batch = (gray_templates > 200.0).float() |
| |
| |
| if len(frames_tensor.shape) == 4: |
| gray_frames = (frames_tensor[:, :, :, 0] * 0.299 + |
| frames_tensor[:, :, :, 1] * 0.587 + |
| frames_tensor[:, :, :, 2] * 0.114) |
| else: |
| gray_frames = frames_tensor |
| |
| |
| |
| sobel_x = torch.tensor([[-1, 0, 1], [-2, 0, 2], [-1, 0, 1]], |
| device=device, dtype=torch.float32).unsqueeze(0).unsqueeze(0) |
| sobel_y = torch.tensor([[-1, -2, -1], [0, 0, 0], [1, 2, 1]], |
| device=device, dtype=torch.float32).unsqueeze(0).unsqueeze(0) |
| |
| |
| gray_frames_batch = gray_frames.unsqueeze(1) |
| grad_x_batch = F.conv2d(gray_frames_batch, sobel_x, padding=1) |
| grad_y_batch = F.conv2d(gray_frames_batch, sobel_y, padding=1) |
| magnitude_batch = torch.sqrt(grad_x_batch.squeeze(1) ** 2 + grad_y_batch.squeeze(1) ** 2 + 1e-8) |
| edges_batch = (magnitude_batch > 30.0).float() |
| |
| |
| mask_lines_predicted_batch = edges_batch * mask_ground_batch |
| |
| |
| pixels_overlapping_batch = (mask_lines_expected_batch * mask_lines_predicted_batch).sum(dim=(1, 2)) |
| pixels_on_lines_batch = mask_lines_expected_batch.sum(dim=(1, 2)) |
| scores_batch = (pixels_overlapping_batch / (pixels_on_lines_batch + 1e-8)).cpu().numpy() |
| |
| |
| for batch_idx, valid_idx in enumerate(valid_indices): |
| scores[valid_idx] = min(1.0, max(0.0, float(scores_batch[batch_idx]))) |
| |
| return scores |
| |
| except Exception as e: |
| logger.error(f"Batch GPU evaluation failed: {e}, falling back to sequential CPU") |
| return [ |
| evaluate_keypoints_for_frame( |
| template_keypoints, kp, frame, floor_markings_template |
| ) |
| for kp, frame in zip(frame_keypoints_list, frames) |
| ] |
|
|
|
|
| def evaluate_keypoints_batch_for_frame( |
| template_keypoints: List[Tuple[int, int]], |
| frame_keypoints_list: List[List[Tuple[int, int]]], |
| frame: ndarray, |
| floor_markings_template: ndarray, |
| device: str = "cuda", |
| batch_size: int = 32, |
| ) -> List[float]: |
| """ |
| Fast batch GPU evaluation of multiple keypoint sets for a single frame. |
| |
| This function evaluates multiple keypoint sets (e.g., from different models) |
| for the same frame using batch GPU processing, which is much faster than |
| evaluating them sequentially. |
| |
| Args: |
| template_keypoints: Template keypoint coordinates |
| frame_keypoints_list: List of frame keypoint coordinate sets to evaluate |
| frame: Single frame image (same for all keypoint sets) |
| floor_markings_template: Template image |
| device: "cuda" or "cpu" |
| batch_size: Number of keypoint sets to process in each GPU batch |
| |
| Returns: |
| List of scores (one per keypoint set) between 0.0 and 1.0 |
| """ |
| if len(frame_keypoints_list) == 0: |
| return [] |
| |
| if len(frame_keypoints_list) == 1: |
| |
| return [evaluate_keypoints_for_frame_opencv_cuda( |
| template_keypoints=template_keypoints, |
| frame_keypoints=frame_keypoints_list[0], |
| frame=frame, |
| floor_markings_template=floor_markings_template, |
| device=device |
| )] |
| |
| |
| |
| frames_list = [frame] * len(frame_keypoints_list) |
| |
| |
| try: |
| scores = evaluate_keypoints_batch_gpu( |
| template_keypoints=template_keypoints, |
| frame_keypoints_list=frame_keypoints_list, |
| frames=frames_list, |
| floor_markings_template=floor_markings_template, |
| device=device, |
| ) |
| return scores |
| except Exception as e: |
| logger.warning(f"Batch GPU evaluation failed: {e}, falling back to sequential") |
| |
| scores = [] |
| for frame_keypoints in frame_keypoints_list: |
| try: |
| score = evaluate_keypoints_for_frame_opencv_cuda( |
| template_keypoints=template_keypoints, |
| frame_keypoints=frame_keypoints, |
| frame=frame, |
| floor_markings_template=floor_markings_template, |
| device=device |
| ) |
| scores.append(score) |
| except Exception as e2: |
| logger.debug(f"Error evaluating keypoints: {e2}") |
| scores.append(0.0) |
| return scores |
|
|
|
|
| def load_template_from_file( |
| template_image_path: str, |
| ) -> Tuple[ndarray, List[Tuple[int, int]]]: |
| """ |
| Load template image and use TEMPLATE_KEYPOINTS constant for keypoints. |
| |
| Args: |
| template_image_path: Path to template image file |
| |
| Returns: |
| template_image: Loaded template image |
| template_keypoints: List of (x, y) keypoint coordinates from TEMPLATE_KEYPOINTS constant |
| """ |
| |
| template_image = cv2.imread(template_image_path) |
| if template_image is None: |
| raise ValueError(f"Could not load template image from {template_image_path}") |
| |
| |
| if len(TEMPLATE_KEYPOINTS) == 0: |
| raise ValueError( |
| "TEMPLATE_KEYPOINTS constant is empty. Please define keypoints in keypoint_evaluation.py" |
| ) |
| |
| if len(TEMPLATE_KEYPOINTS) < 4: |
| raise ValueError(f"TEMPLATE_KEYPOINTS must have at least 4 keypoints, found {len(TEMPLATE_KEYPOINTS)}") |
| |
| logger.info(f"Loaded template image: {template_image_path}") |
| logger.info(f"Using TEMPLATE_KEYPOINTS constant with {len(TEMPLATE_KEYPOINTS)} keypoints") |
| |
| return template_image, TEMPLATE_KEYPOINTS |
|
|
|
|
|
|