|
|
| import time |
| import numpy as np |
| import cv2 |
| from typing import List, Tuple, Sequence, Any |
| from numpy import ndarray |
| from multiprocessing import cpu_count |
| from functools import partial |
| import copy |
| import threading |
| from pathlib import Path |
|
|
| |
| _TEMPLATE_KEYPOINTS: list[tuple[int, int]] = None |
| _TEMPLATE_IMAGE: np.ndarray = None |
| |
| _TEMPLATE_MAX_X: int = 1045 |
| _TEMPLATE_MAX_Y: int = 675 |
|
|
|
|
| def _initialize_template_variables(template_keypoints=None, template_image=None): |
| """ |
| Initialize module-level template variables. |
| Called once from run_keypoints_post_processing. |
| |
| Args: |
| template_keypoints: Optional template keypoints (pre-loaded) |
| template_image: Optional template image (pre-loaded from miner constructor) |
| """ |
| global _TEMPLATE_KEYPOINTS, _TEMPLATE_IMAGE |
| |
| if _TEMPLATE_KEYPOINTS is None or _TEMPLATE_IMAGE is None: |
| try: |
| from keypoint_evaluation import ( |
| TEMPLATE_KEYPOINTS, |
| ) |
| |
| |
| if _TEMPLATE_KEYPOINTS is None: |
| if template_keypoints is not None: |
| _TEMPLATE_KEYPOINTS = template_keypoints |
| else: |
| _TEMPLATE_KEYPOINTS = TEMPLATE_KEYPOINTS |
| |
| |
| if _TEMPLATE_IMAGE is None: |
| if template_image is not None: |
| |
| _TEMPLATE_IMAGE = template_image |
| else: |
| print("Warning: Template image not provided, some validation may be skipped") |
| |
| |
| global _TEMPLATE_MAX_X, _TEMPLATE_MAX_Y |
| if _TEMPLATE_KEYPOINTS is not None and len(_TEMPLATE_KEYPOINTS) > 0: |
| valid_template_points = [(x, y) for x, y in _TEMPLATE_KEYPOINTS if x > 0 and y > 0] |
| if len(valid_template_points) > 0: |
| _TEMPLATE_MAX_X = max(x for x, y in valid_template_points) |
| _TEMPLATE_MAX_Y = max(y for x, y in valid_template_points) |
| except ImportError: |
| pass |
| except Exception as e: |
| print(f"Warning: Could not load template: {e}") |
|
|
| FOOTBALL_KEYPOINTS: list[tuple[int, int]] = [ |
| (0, 0), |
| (0, 0), |
| (0, 0), |
| (0, 0), |
| (0, 0), |
| (0, 0), |
| |
| (0, 0), |
| (0, 0), |
| (0, 0), |
|
|
| (0, 0), |
| (0, 0), |
| (0, 0), |
| (0, 0), |
|
|
| (0, 0), |
| (527, 283), |
| (527, 403), |
| (0, 0), |
|
|
| (0, 0), |
| (0, 0), |
| (0, 0), |
| (0, 0), |
|
|
| (0, 0), |
|
|
| (0, 0), |
| (0, 0), |
|
|
| (0, 0), |
| (0, 0), |
| (0, 0), |
| (0, 0), |
| (0, 0), |
| (0, 0), |
|
|
| (405, 340), |
| (645, 340), |
| ] |
|
|
| def convert_keypoints_to_val_format(keypoints): |
| return [tuple(int(x) for x in pair) for pair in keypoints] |
|
|
| def validate_with_nearby_keypoints( |
| kp_idx: int, |
| kp: tuple[int, int], |
| valid_indices: list[int], |
| result: list[tuple[int, int]], |
| template_keypoints: list[tuple[int, int]], |
| scale_factor: float = None, |
| ) -> float: |
| """ |
| Validate a keypoint by checking distances to nearby keypoints on the same side. |
| |
| Returns validation score (lower is better), or None if validation not possible. |
| """ |
| template_kp = template_keypoints[kp_idx] |
| |
| |
| |
| |
| |
| left_side_indices = [9, 10, 11, 12] |
| right_side_indices = list(range(17, 30)) |
| |
| |
| if kp_idx in left_side_indices: |
| same_side_indices = left_side_indices |
| elif kp_idx in right_side_indices: |
| same_side_indices = right_side_indices |
| else: |
| return None |
| |
| |
| nearby_kps = [] |
| for nearby_idx in same_side_indices: |
| if nearby_idx != kp_idx and nearby_idx in valid_indices: |
| nearby_kp = result[nearby_idx] |
| nearby_template_kp = template_keypoints[nearby_idx] |
| nearby_kps.append((nearby_idx, nearby_kp, nearby_template_kp)) |
| |
| if len(nearby_kps) == 0: |
| return None |
| |
| |
| distance_errors = [] |
| for nearby_idx, nearby_kp, nearby_template_kp in nearby_kps: |
| |
| detected_dist = np.sqrt((kp[0] - nearby_kp[0])**2 + (kp[1] - nearby_kp[1])**2) |
| |
| |
| template_dist = np.sqrt((template_kp[0] - nearby_template_kp[0])**2 + |
| (template_kp[1] - nearby_template_kp[1])**2) |
| |
| if template_dist > 0: |
| |
| if scale_factor: |
| expected_dist = template_dist * scale_factor |
| else: |
| expected_dist = template_dist |
| |
| if expected_dist > 0: |
| |
| error = abs(detected_dist - expected_dist) / expected_dist |
| distance_errors.append(error) |
| |
| if len(distance_errors) > 0: |
| return np.mean(distance_errors) |
| return None |
|
|
| def remove_duplicate_detections( |
| keypoints: list[tuple[int, int]], |
| frame_width: int = None, |
| frame_height: int = None, |
| ) -> list[tuple[int, int]]: |
| """ |
| Remove duplicate/conflicting keypoint detections using distance-based validation. |
| |
| Uses the principle that if two keypoints are detected very close together, |
| but in the template they should be far apart, one of them is likely wrong. |
| Validates each keypoint by checking if its distances to other keypoints |
| match the expected template distances. |
| |
| Args: |
| keypoints: List of 32 keypoints |
| frame_width: Optional frame width for validation |
| frame_height: Optional frame height for validation |
| |
| Returns: |
| Cleaned list of keypoints with duplicates removed |
| """ |
| if len(keypoints) != 32: |
| if len(keypoints) < 32: |
| keypoints = list(keypoints) + [(0, 0)] * (32 - len(keypoints)) |
| else: |
| keypoints = keypoints[:32] |
| |
| result = list(keypoints) |
| |
| try: |
| from keypoint_evaluation import TEMPLATE_KEYPOINTS |
| template_available = True |
| except ImportError: |
| template_available = False |
| |
| if not template_available: |
| return result |
| |
| |
| valid_indices = [] |
| for i in range(32): |
| if result[i][0] > 0 and result[i][1] > 0: |
| valid_indices.append(i) |
| |
| if len(valid_indices) < 2: |
| return result |
| |
| |
| |
| scale_factor = None |
| if len(valid_indices) >= 2: |
| max_template_dist = 0 |
| max_detected_dist = 0 |
| |
| for i in range(len(valid_indices)): |
| for j in range(i + 1, len(valid_indices)): |
| idx_i = valid_indices[i] |
| idx_j = valid_indices[j] |
| |
| template_i = TEMPLATE_KEYPOINTS[idx_i] |
| template_j = TEMPLATE_KEYPOINTS[idx_j] |
| template_dist = np.sqrt((template_i[0] - template_j[0])**2 + (template_i[1] - template_j[1])**2) |
| |
| kp_i = result[idx_i] |
| kp_j = result[idx_j] |
| detected_dist = np.sqrt((kp_i[0] - kp_j[0])**2 + (kp_i[1] - kp_j[1])**2) |
| |
| if template_dist > max_template_dist and detected_dist > 0: |
| max_template_dist = template_dist |
| max_detected_dist = detected_dist |
| |
| if max_template_dist > 0 and max_detected_dist > 0: |
| scale_factor = max_detected_dist / max_template_dist |
| |
| |
| keypoint_scores = {} |
| for idx in valid_indices: |
| kp = result[idx] |
| template_kp = TEMPLATE_KEYPOINTS[idx] |
| |
| |
| distance_errors = [] |
| num_comparisons = 0 |
| |
| for other_idx in valid_indices: |
| if other_idx == idx: |
| continue |
| |
| other_kp = result[other_idx] |
| other_template_kp = TEMPLATE_KEYPOINTS[other_idx] |
| |
| |
| detected_dist = np.sqrt((kp[0] - other_kp[0])**2 + (kp[1] - other_kp[1])**2) |
| |
| |
| template_dist = np.sqrt((template_kp[0] - other_template_kp[0])**2 + |
| (template_kp[1] - other_template_kp[1])**2) |
| |
| if template_dist > 50: |
| num_comparisons += 1 |
| |
| |
| if scale_factor: |
| expected_dist = template_dist * scale_factor |
| else: |
| expected_dist = template_dist |
| |
| |
| if expected_dist > 0: |
| error = abs(detected_dist - expected_dist) / expected_dist |
| distance_errors.append(error) |
| |
| |
| if num_comparisons > 0: |
| avg_error = np.mean(distance_errors) |
| keypoint_scores[idx] = avg_error |
| else: |
| keypoint_scores[idx] = 0.0 |
| |
| |
| conflicts = [] |
| for i in range(len(valid_indices)): |
| for j in range(i + 1, len(valid_indices)): |
| idx_i = valid_indices[i] |
| idx_j = valid_indices[j] |
| |
| kp_i = result[idx_i] |
| kp_j = result[idx_j] |
| |
| |
| detected_dist = np.sqrt((kp_i[0] - kp_j[0])**2 + (kp_i[1] - kp_j[1])**2) |
| |
| |
| template_i = TEMPLATE_KEYPOINTS[idx_i] |
| template_j = TEMPLATE_KEYPOINTS[idx_j] |
| template_dist = np.sqrt((template_i[0] - template_j[0])**2 + |
| (template_i[1] - template_j[1])**2) |
| |
| |
| if template_dist > 100 and detected_dist < 30: |
| |
| |
| score_i = keypoint_scores.get(idx_i, 1.0) |
| score_j = keypoint_scores.get(idx_j, 1.0) |
| |
| |
| |
| |
| nearby_validation_i = validate_with_nearby_keypoints( |
| idx_i, kp_i, valid_indices, result, TEMPLATE_KEYPOINTS, scale_factor |
| ) |
| nearby_validation_j = validate_with_nearby_keypoints( |
| idx_j, kp_j, valid_indices, result, TEMPLATE_KEYPOINTS, scale_factor |
| ) |
| |
| |
| |
| validation_score_i = score_i |
| validation_score_j = score_j |
| |
| if nearby_validation_i is not None and nearby_validation_j is not None: |
| |
| validation_score_i = nearby_validation_i |
| validation_score_j = nearby_validation_j |
| elif nearby_validation_i is not None: |
| |
| validation_score_i = nearby_validation_i |
| validation_score_j = score_j + 1.0 |
| elif nearby_validation_j is not None: |
| |
| validation_score_i = score_i + 1.0 |
| validation_score_j = nearby_validation_j |
| |
| |
| |
| if validation_score_i > validation_score_j: |
| conflicts.append((idx_i, idx_j, validation_score_i, validation_score_j)) |
| else: |
| conflicts.append((idx_j, idx_i, validation_score_j, validation_score_i)) |
| |
| |
| removed_indices = set() |
| for remove_idx, keep_idx, remove_score, keep_score in conflicts: |
| if remove_idx not in removed_indices: |
| print(f"Removing duplicate detection: keypoint {remove_idx+1} at {result[remove_idx]} conflicts with keypoint {keep_idx+1} at {result[keep_idx]} " |
| f"(detected distance: {np.sqrt((result[remove_idx][0] - result[keep_idx][0])**2 + (result[remove_idx][1] - result[keep_idx][1])**2):.1f}, " |
| f"template distance: {np.sqrt((TEMPLATE_KEYPOINTS[remove_idx][0] - TEMPLATE_KEYPOINTS[keep_idx][0])**2 + (TEMPLATE_KEYPOINTS[remove_idx][1] - TEMPLATE_KEYPOINTS[keep_idx][1])**2):.1f}). " |
| f"Keeping keypoint {keep_idx+1} (score: {keep_score:.3f} vs {remove_score:.3f}).") |
| result[remove_idx] = (0, 0) |
| removed_indices.add(remove_idx) |
| |
| return result |
|
|
| def calculate_missing_keypoints( |
| keypoints: list[tuple[int, int]], |
| frame_width: int = None, |
| frame_height: int = None, |
| ) -> list[tuple[int, int]]: |
| """ |
| Calculate missing keypoint coordinates for multiple cases: |
| 1. Given keypoints 14, 15, 16 (and possibly 17), and either 31 or 32, |
| calculate the missing center circle point (32 or 31). |
| 2. Given three or four of keypoints 18, 19, 20, 21 and any of 22-30, |
| calculate missing keypoint positions (like 22 or others) to prevent warping failures. |
| |
| Args: |
| keypoints: List of 32 keypoints (some may be (0,0) if missing) |
| frame_width: Optional frame width for validation |
| frame_height: Optional frame height for validation |
| |
| Returns: |
| Updated list of 32 keypoints with calculated missing keypoints filled in |
| """ |
| if len(keypoints) != 32: |
| |
| if len(keypoints) < 32: |
| keypoints = list(keypoints) + [(0, 0)] * (32 - len(keypoints)) |
| else: |
| keypoints = keypoints[:32] |
| |
| result = list(keypoints) |
| |
| |
| def get_kp(kp_idx): |
| if kp_idx < 0 or kp_idx >= 32: |
| return None |
| x, y = result[kp_idx] |
|
|
| if x == 0 and y == 0: |
| return None |
| |
| return (x, y) |
|
|
| |
| |
| |
| center_x = None |
| for center_kp_idx in [13, 14, 15, 16]: |
| kp = get_kp(center_kp_idx) |
| if kp: |
| center_x = kp[0] |
| break |
| |
| |
| if center_x is not None: |
| |
| |
| kp_31 = get_kp(30) |
| kp_32 = get_kp(31) |
| |
| if kp_31 and not kp_32: |
| |
| |
| |
| dx = center_x - kp_31[0] |
| result[31] = (int(round(center_x + dx)), kp_31[1]) |
| elif kp_32 and not kp_31: |
| |
| |
| |
| dx = kp_32[0] - center_x |
| result[30] = (int(round(center_x - dx)), kp_32[1]) |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| left_side_all = [] |
| line_1_6_points = [] |
| line_7_8_points = [] |
| line_10_13_points = [] |
| |
| for idx in range(0, 13): |
| if idx == 8: |
| continue |
| kp = get_kp(idx) |
| if kp: |
| left_side_all.append((idx, kp)) |
| if 0 <= idx <= 5: |
| line_1_6_points.append((idx, kp)) |
| elif 6 <= idx <= 7: |
| line_7_8_points.append((idx, kp)) |
| elif 9 <= idx <= 12: |
| line_10_13_points.append((idx, kp)) |
| |
| kp_9 = get_kp(8) |
| if kp_9: |
| left_side_all.append((8, kp_9)) |
| |
| total_left_side_count = len(left_side_all) |
| |
| |
| if total_left_side_count >= 6: |
| pass |
| elif total_left_side_count == 5: |
| |
| counts_per_line = [ |
| len(line_1_6_points), |
| len(line_7_8_points), |
| len(line_10_13_points) |
| ] |
| |
| if max(counts_per_line) == 4 and sum(counts_per_line) == 4: |
| |
| |
| if len(line_1_6_points) == 4: |
| |
| |
| if len(line_10_13_points) == 0: |
| |
| |
| points_1_6 = np.array([[kp[0], kp[1]] for _, kp in line_1_6_points]) |
| x_coords = points_1_6[:, 0] |
| y_coords = points_1_6[:, 1] |
| A = np.vstack([x_coords, np.ones(len(x_coords))]).T |
| m_1_6, b_1_6 = np.linalg.lstsq(A, y_coords, rcond=None)[0] |
| |
| |
| |
| template_ys_10_13 = [140, 270, 410, 540] |
| template_indices_10_13 = [9, 10, 11, 12] |
| |
| |
| median_y = np.median(y_coords) |
| |
| |
| |
| |
| if abs(m_1_6) > 1e-6: |
| x_on_line_1_6 = (median_y - b_1_6) / m_1_6 |
| x_new = int(round(x_on_line_1_6 * 33)) |
| else: |
| x_new = int(round(np.median(x_coords) * 33)) |
| |
| |
| for template_y, idx in zip(template_ys_10_13, template_indices_10_13): |
| if result[idx] is None: |
| result[idx] = (x_new, int(round(median_y))) |
| break |
| elif len(line_10_13_points) == 4: |
| |
| |
| points_10_13 = np.array([[kp[0], kp[1]] for _, kp in line_10_13_points]) |
| x_coords = points_10_13[:, 0] |
| y_coords = points_10_13[:, 1] |
| A = np.vstack([x_coords, np.ones(len(x_coords))]).T |
| m_10_13, b_10_13 = np.linalg.lstsq(A, y_coords, rcond=None)[0] |
| |
| |
| template_ys_1_6 = [5, 140, 250, 430, 540, 675] |
| template_indices_1_6 = [0, 1, 2, 3, 4, 5] |
| |
| median_y = np.median(y_coords) |
| |
| |
| |
| if abs(m_10_13) > 1e-6: |
| x_on_line_10_13 = (median_y - b_10_13) / m_10_13 |
| x_new = int(round(x_on_line_10_13 * 0.0303)) |
| else: |
| x_new = int(round(np.median(x_coords) * 0.0303)) |
| |
| for template_y, idx in zip(template_ys_1_6, template_indices_1_6): |
| if result[idx] is None: |
| result[idx] = (x_new, int(round(median_y))) |
| break |
| elif total_left_side_count < 5: |
| |
| |
| |
| |
| |
| |
| template_coords_left = { |
| 0: (5, 5), |
| 1: (5, 140), |
| 2: (5, 250), |
| 3: (5, 430), |
| 4: (5, 540), |
| 5: (5, 675), |
| 6: (55, 250), |
| 7: (55, 430), |
| 8: (110, 340), |
| 9: (165, 140), |
| 10: (165, 270), |
| 11: (165, 410), |
| 12: (165, 540), |
| } |
| |
| |
| |
| |
| line_groups_left = { |
| '1-6': ([0, 1, 2, 3, 4, 5], 'vertical'), |
| '7-8': ([6, 7], 'vertical'), |
| '10-13': ([9, 10, 11, 12], 'vertical'), |
| '2-10': ([1, 9], 'horizontal'), |
| '3-7': ([2, 6], 'horizontal'), |
| '4-8': ([3, 7], 'horizontal'), |
| '5-13': ([4, 12], 'horizontal'), |
| } |
| |
| |
| all_available_points_left = {} |
| for idx, kp in line_1_6_points: |
| all_available_points_left[idx] = kp |
| for idx, kp in line_7_8_points: |
| all_available_points_left[idx] = kp |
| for idx, kp in line_10_13_points: |
| all_available_points_left[idx] = kp |
| |
| |
| best_vertical_line_name_left = None |
| best_vertical_line_points_left = [] |
| max_vertical_points_left = 1 |
| |
| best_horizontal_line_name_left = None |
| best_horizontal_line_points_left = [] |
| max_horizontal_points_left = 1 |
| |
| for line_name, (indices, line_type) in line_groups_left.items(): |
| line_points = [(idx, all_available_points_left[idx]) for idx in indices if idx in all_available_points_left] |
| if line_type == 'vertical' and len(line_points) > max_vertical_points_left: |
| max_vertical_points_left = len(line_points) |
| best_vertical_line_name_left = line_name |
| best_vertical_line_points_left = line_points |
| elif line_type == 'horizontal' and len(line_points) > max_horizontal_points_left: |
| max_horizontal_points_left = len(line_points) |
| best_horizontal_line_name_left = line_name |
| best_horizontal_line_points_left = line_points |
| |
| |
| |
| if best_vertical_line_name_left is not None: |
| expected_indices = line_groups_left[best_vertical_line_name_left][0] |
| detected_indices = {idx for idx, _ in best_vertical_line_points_left} |
| missing_indices = [idx for idx in expected_indices if idx not in detected_indices] |
| |
| if len(missing_indices) > 0: |
| |
| template_start = template_coords_left[best_vertical_line_points_left[0][0]] |
| template_end = template_coords_left[best_vertical_line_points_left[-1][0]] |
| frame_start = best_vertical_line_points_left[0][1] |
| frame_end = best_vertical_line_points_left[-1][1] |
| |
| for missing_idx in missing_indices: |
| template_missing = template_coords_left[missing_idx] |
| |
| |
| template_y_start = template_start[1] |
| template_y_end = template_end[1] |
| template_y_missing = template_missing[1] |
| |
| if abs(template_y_end - template_y_start) > 1e-6: |
| ratio = (template_y_missing - template_y_start) / (template_y_end - template_y_start) |
| else: |
| ratio = 0.5 |
| |
| |
| x_new = frame_start[0] + (frame_end[0] - frame_start[0]) * ratio |
| y_new = frame_start[1] + (frame_end[1] - frame_start[1]) * ratio |
| new_point = (int(round(x_new)), int(round(y_new))) |
| |
| |
| result[missing_idx] = new_point |
| best_vertical_line_points_left.append((missing_idx, new_point)) |
| all_available_points_left[missing_idx] = new_point |
| total_left_side_count += 1 |
| max_vertical_points_left = len(best_vertical_line_points_left) |
| |
| |
| best_vertical_line_points_left.sort(key=lambda x: x[0]) |
| |
| |
| for line_name, (indices, line_type) in line_groups_left.items(): |
| if line_type == 'horizontal': |
| line_points = [(idx, all_available_points_left[idx]) for idx in indices if idx in all_available_points_left] |
| if len(line_points) > max_horizontal_points_left: |
| max_horizontal_points_left = len(line_points) |
| best_horizontal_line_name_left = line_name |
| best_horizontal_line_points_left = line_points |
| |
| |
| if best_horizontal_line_name_left is not None: |
| expected_indices = line_groups_left[best_horizontal_line_name_left][0] |
| detected_indices = {idx for idx, _ in best_horizontal_line_points_left} |
| missing_indices = [idx for idx in expected_indices if idx not in detected_indices] |
| |
| if len(missing_indices) > 0: |
| |
| template_start = template_coords_left[best_horizontal_line_points_left[0][0]] |
| template_end = template_coords_left[best_horizontal_line_points_left[-1][0]] |
| frame_start = best_horizontal_line_points_left[0][1] |
| frame_end = best_horizontal_line_points_left[-1][1] |
| |
| for missing_idx in missing_indices: |
| template_missing = template_coords_left[missing_idx] |
| |
| |
| template_x_start = template_start[0] |
| template_x_end = template_end[0] |
| template_x_missing = template_missing[0] |
| |
| if abs(template_x_end - template_x_start) > 1e-6: |
| ratio = (template_x_missing - template_x_start) / (template_x_end - template_x_start) |
| else: |
| ratio = 0.5 |
| |
| |
| x_new = frame_start[0] + (frame_end[0] - frame_start[0]) * ratio |
| y_new = frame_start[1] + (frame_end[1] - frame_start[1]) * ratio |
| new_point = (int(round(x_new)), int(round(y_new))) |
| |
| |
| result[missing_idx] = new_point |
| best_horizontal_line_points_left.append((missing_idx, new_point)) |
| all_available_points_left[missing_idx] = new_point |
| total_left_side_count += 1 |
| max_horizontal_points_left = len(best_horizontal_line_points_left) |
| |
| |
| best_horizontal_line_points_left.sort(key=lambda x: x[0]) |
| |
| |
| for line_name, (indices, line_type) in line_groups_left.items(): |
| if line_type == 'vertical': |
| line_points = [(idx, all_available_points_left[idx]) for idx in indices if idx in all_available_points_left] |
| if len(line_points) > max_vertical_points_left: |
| max_vertical_points_left = len(line_points) |
| best_vertical_line_name_left = line_name |
| best_vertical_line_points_left = line_points |
| |
| |
| |
| if best_vertical_line_name_left is not None and best_horizontal_line_name_left is None: |
| |
| |
| off_line_point = None |
| off_line_idx = None |
| vertical_line_indices = line_groups_left[best_vertical_line_name_left][0] |
| for idx, kp in all_available_points_left.items(): |
| if idx not in vertical_line_indices: |
| off_line_point = kp |
| off_line_idx = idx |
| break |
| |
| if off_line_point is not None: |
| |
| off_line_point = np.array(off_line_point) |
| |
| |
| template_off_line = template_coords_left[off_line_idx] |
| |
| template_vertical_start_index = best_vertical_line_points_left[0][0] |
| template_vertical_end_index = best_vertical_line_points_left[-1][0] |
| |
| template_vertical_start = template_coords_left[template_vertical_start_index] |
| template_vertical_end = template_coords_left[template_vertical_end_index] |
| |
| |
| template_y_off = template_off_line[1] |
| template_y_vertical_start = template_vertical_start[1] |
| template_y_vertical_end = template_vertical_end[1] |
| |
| if abs(template_y_vertical_end - template_y_vertical_start) > 1e-6: |
| ratio_proj = (template_y_off - template_y_vertical_start) / (template_y_vertical_end - template_y_vertical_start) |
| else: |
| ratio_proj = 0.5 |
| |
| frame_vertical_start = best_vertical_line_points_left[0][1] |
| frame_vertical_end = best_vertical_line_points_left[-1][1] |
| proj_x = frame_vertical_start[0] + (frame_vertical_end[0] - frame_vertical_start[0]) * ratio_proj |
| proj_y = frame_vertical_start[1] + (frame_vertical_end[1] - frame_vertical_start[1]) * ratio_proj |
| proj_point = np.array([proj_x, proj_y]) |
| |
| |
| if best_vertical_line_name_left == '10-13': |
| |
| if off_line_idx == 1: |
| kp_10 = np.array(best_vertical_line_points_left[0][1]) |
| kp_2 = off_line_point + (kp_10 - proj_point) |
| result[1] = tuple(kp_2.astype(int)) |
| total_left_side_count += 1 |
| all_available_points_left[1] = tuple(kp_2.astype(int)) |
| elif off_line_idx == 4: |
| kp_13 = np.array(best_vertical_line_points_left[-1][1]) |
| kp_5 = off_line_point + (kp_13 - proj_point) |
| result[4] = tuple(kp_5.astype(int)) |
| total_left_side_count += 1 |
| all_available_points_left[4] = tuple(kp_5.astype(int)) |
| |
| elif best_vertical_line_name_left == '1-6': |
| |
| if off_line_idx == 6 or off_line_idx == 7: |
| template_off = template_coords_left[off_line_idx] |
| template_3 = template_coords_left[2] |
| template_4 = template_coords_left[3] |
| template_7 = template_coords_left[6] |
| template_8 = template_coords_left[7] |
| |
| if off_line_idx == 6: |
| ratio = (template_3[0] - template_7[0]) / (template_7[0] - template_off[0]) if abs(template_7[0] - template_off[0]) > 1e-6 else 0.5 |
| kp_3 = proj_point + (off_line_point - proj_point) * ratio |
| result[2] = tuple(kp_3.astype(int)) |
| total_left_side_count += 1 |
| all_available_points_left[2] = tuple(kp_3.astype(int)) |
| else: |
| ratio = (template_4[0] - template_8[0]) / (template_8[0] - template_off[0]) if abs(template_8[0] - template_off[0]) > 1e-6 else 0.5 |
| kp_4 = proj_point + (off_line_point - proj_point) * ratio |
| result[3] = tuple(kp_4.astype(int)) |
| total_left_side_count += 1 |
| all_available_points_left[3] = tuple(kp_4.astype(int)) |
| elif off_line_idx == 9 or off_line_idx == 12: |
| if off_line_idx == 9: |
| kp_2 = off_line_point + (np.array(best_vertical_line_points_left[1][1]) - proj_point) |
| result[1] = tuple(kp_2.astype(int)) |
| total_left_side_count += 1 |
| all_available_points_left[1] = tuple(kp_2.astype(int)) |
| else: |
| kp_5 = off_line_point + (np.array(best_vertical_line_points_left[4][1]) - proj_point) |
| result[4] = tuple(kp_5.astype(int)) |
| total_left_side_count += 1 |
| all_available_points_left[4] = tuple(kp_5.astype(int)) |
| |
| elif best_vertical_line_name_left == '7-8': |
| |
| if off_line_idx == 2 or off_line_idx == 3: |
| if off_line_idx == 2: |
| kp_7 = off_line_point + (np.array(best_vertical_line_points_left[0][1]) - proj_point) |
| result[6] = tuple(kp_7.astype(int)) |
| total_left_side_count += 1 |
| all_available_points_left[6] = tuple(kp_7.astype(int)) |
| else: |
| kp_8 = off_line_point + (np.array(best_vertical_line_points_left[-1][1]) - proj_point) |
| result[7] = tuple(kp_8.astype(int)) |
| total_left_side_count += 1 |
| all_available_points_left[7] = tuple(kp_8.astype(int)) |
| |
| |
| for line_name, (indices, line_type) in line_groups_left.items(): |
| if line_type == 'horizontal': |
| line_points = [(idx, all_available_points_left[idx]) for idx in indices if idx in all_available_points_left] |
| if len(line_points) > max_horizontal_points_left: |
| max_horizontal_points_left = len(line_points) |
| best_horizontal_line_name_left = line_name |
| best_horizontal_line_points_left = line_points |
| |
| elif best_horizontal_line_name_left is not None and best_vertical_line_name_left is None: |
| |
| |
| off_line_point = None |
| off_line_idx = None |
| horizontal_line_indices = line_groups_left[best_horizontal_line_name_left][0] |
| for idx, kp in all_available_points_left.items(): |
| if idx not in horizontal_line_indices: |
| off_line_point = kp |
| off_line_idx = idx |
| break |
| |
| if off_line_point is not None: |
| |
| template_off_line = template_coords_left[off_line_idx] |
| template_horizontal_start = template_coords_left[best_horizontal_line_points_left[0][0]] |
| template_horizontal_end = template_coords_left[best_horizontal_line_points_left[-1][0]] |
| |
| |
| template_x_off = template_off_line[0] |
| template_x_horizontal_start = template_horizontal_start[0] |
| template_x_horizontal_end = template_horizontal_end[0] |
| |
| if abs(template_x_horizontal_end - template_x_horizontal_start) > 1e-6: |
| ratio_proj = (template_x_off - template_x_horizontal_start) / (template_x_horizontal_end - template_x_horizontal_start) |
| else: |
| ratio_proj = 0.5 |
| |
| frame_horizontal_start = best_horizontal_line_points_left[0][1] |
| frame_horizontal_end = best_horizontal_line_points_left[-1][1] |
| proj_x = frame_horizontal_start[0] + (frame_horizontal_end[0] - frame_horizontal_start[0]) * ratio_proj |
| proj_y = frame_horizontal_start[1] + (frame_horizontal_end[1] - frame_horizontal_start[1]) * ratio_proj |
| proj_point = np.array([proj_x, proj_y]) |
| off_line_point = np.array(off_line_point) |
| |
| |
| if best_horizontal_line_name_left == '2-10': |
| |
| if off_line_idx == 0 or off_line_idx == 5: |
| kp_2 = off_line_point + (np.array(best_horizontal_line_points_left[0][1]) - proj_point) |
| result[1] = tuple(kp_2.astype(int)) |
| total_left_side_count += 1 |
| all_available_points_left[1] = tuple(kp_2.astype(int)) |
| elif off_line_idx == 9 or off_line_idx == 12: |
| kp_10 = off_line_point + (np.array(best_horizontal_line_points_left[-1][1]) - proj_point) |
| result[9] = tuple(kp_10.astype(int)) |
| total_left_side_count += 1 |
| all_available_points_left[9] = tuple(kp_10.astype(int)) |
| |
| elif best_horizontal_line_name_left == '3-7': |
| |
| if off_line_idx == 0 or off_line_idx == 5: |
| kp_3 = off_line_point + (np.array(best_horizontal_line_points_left[0][1]) - proj_point) |
| result[2] = tuple(kp_3.astype(int)) |
| total_left_side_count += 1 |
| all_available_points_left[2] = tuple(kp_3.astype(int)) |
| elif off_line_idx == 6 or off_line_idx == 7: |
| kp_7 = off_line_point + (np.array(best_horizontal_line_points_left[-1][1]) - proj_point) |
| result[6] = tuple(kp_7.astype(int)) |
| total_left_side_count += 1 |
| all_available_points_left[6] = tuple(kp_7.astype(int)) |
| |
| elif best_horizontal_line_name_left == '4-8': |
| |
| if off_line_idx == 0 or off_line_idx == 5: |
| kp_4 = off_line_point + (np.array(best_horizontal_line_points_left[0][1]) - proj_point) |
| result[3] = tuple(kp_4.astype(int)) |
| total_left_side_count += 1 |
| all_available_points_left[3] = tuple(kp_4.astype(int)) |
| elif off_line_idx == 6 or off_line_idx == 7: |
| kp_8 = off_line_point + (np.array(best_horizontal_line_points_left[-1][1]) - proj_point) |
| result[7] = tuple(kp_8.astype(int)) |
| total_left_side_count += 1 |
| all_available_points_left[7] = tuple(kp_8.astype(int)) |
| |
| elif best_horizontal_line_name_left == '5-13': |
| |
| if off_line_idx == 0 or off_line_idx == 5: |
| kp_5 = off_line_point + (np.array(best_horizontal_line_points_left[0][1]) - proj_point) |
| result[4] = tuple(kp_5.astype(int)) |
| total_left_side_count += 1 |
| all_available_points_left[4] = tuple(kp_5.astype(int)) |
| elif off_line_idx == 9 or off_line_idx == 12: |
| kp_13 = off_line_point + (np.array(best_horizontal_line_points_left[-1][1]) - proj_point) |
| result[12] = tuple(kp_13.astype(int)) |
| total_left_side_count += 1 |
| all_available_points_left[12] = tuple(kp_13.astype(int)) |
| |
| |
| for line_name, (indices, line_type) in line_groups_left.items(): |
| if line_type == 'vertical': |
| line_points = [(idx, all_available_points_left[idx]) for idx in indices if idx in all_available_points_left] |
| if len(line_points) > max_vertical_points_left: |
| max_vertical_points_left = len(line_points) |
| best_vertical_line_name_left = line_name |
| best_vertical_line_points_left = line_points |
| |
| |
| if best_vertical_line_name_left is not None and best_horizontal_line_name_left is not None: |
| if kp_9 is None: |
| print(f"Calculating keypoint 9 using both vertical and horizontal lines: {best_vertical_line_name_left} and {best_horizontal_line_name_left}") |
|
|
| template_x_9 = 110 |
| template_y_9 = 340 |
| |
| |
| template_vertical_start = template_coords_left[best_vertical_line_points_left[0][0]] |
| template_vertical_end = template_coords_left[best_vertical_line_points_left[-1][0]] |
| |
| |
| template_y_vertical_start = template_vertical_start[1] |
| template_y_vertical_end = template_vertical_end[1] |
| |
| if abs(template_y_vertical_end - template_y_vertical_start) > 1e-6: |
| ratio_9_vertical = (template_y_9 - template_y_vertical_start) / (template_y_vertical_end - template_y_vertical_start) |
| else: |
| ratio_9_vertical = 0.5 |
| |
| frame_vertical_start = best_vertical_line_points_left[0][1] |
| frame_vertical_end = best_vertical_line_points_left[-1][1] |
| proj_9_on_vertical_x = frame_vertical_start[0] + (frame_vertical_end[0] - frame_vertical_start[0]) * ratio_9_vertical |
| proj_9_on_vertical_y = frame_vertical_start[1] + (frame_vertical_end[1] - frame_vertical_start[1]) * ratio_9_vertical |
| proj_9_on_vertical = (proj_9_on_vertical_x, proj_9_on_vertical_y) |
| |
| |
| template_horizontal_start = template_coords_left[best_horizontal_line_points_left[0][0]] |
| template_horizontal_end = template_coords_left[best_horizontal_line_points_left[-1][0]] |
| |
| |
| template_x_horizontal_start = template_horizontal_start[0] |
| template_x_horizontal_end = template_horizontal_end[0] |
| |
| if abs(template_x_horizontal_end - template_x_horizontal_start) > 1e-6: |
| ratio_9_horizontal = (template_x_9 - template_x_horizontal_start) / (template_x_horizontal_end - template_x_horizontal_start) |
| else: |
| ratio_9_horizontal = 0.5 |
| |
| frame_horizontal_start = best_horizontal_line_points_left[0][1] |
| frame_horizontal_end = best_horizontal_line_points_left[-1][1] |
| proj_9_on_horizontal_x = frame_horizontal_start[0] + (frame_horizontal_end[0] - frame_horizontal_start[0]) * ratio_9_horizontal |
| proj_9_on_horizontal_y = frame_horizontal_start[1] + (frame_horizontal_end[1] - frame_horizontal_start[1]) * ratio_9_horizontal |
| proj_9_on_horizontal = (proj_9_on_horizontal_x, proj_9_on_horizontal_y) |
| |
| |
| |
| |
| |
| |
| horizontal_dir_x = frame_horizontal_end[0] - frame_horizontal_start[0] |
| horizontal_dir_y = frame_horizontal_end[1] - frame_horizontal_start[1] |
| horizontal_dir_length = np.sqrt(horizontal_dir_x**2 + horizontal_dir_y**2) |
| |
| |
| vertical_dir_x = frame_vertical_end[0] - frame_vertical_start[0] |
| vertical_dir_y = frame_vertical_end[1] - frame_vertical_start[1] |
| vertical_dir_length = np.sqrt(vertical_dir_x**2 + vertical_dir_y**2) |
| |
| if horizontal_dir_length > 1e-6 and vertical_dir_length > 1e-6: |
| |
| horizontal_dir_x /= horizontal_dir_length |
| horizontal_dir_y /= horizontal_dir_length |
| vertical_dir_x /= vertical_dir_length |
| vertical_dir_y /= vertical_dir_length |
| |
| |
| A = np.array([ |
| [horizontal_dir_x, -vertical_dir_x], |
| [horizontal_dir_y, -vertical_dir_y] |
| ]) |
| b = np.array([ |
| proj_9_on_horizontal[0] - proj_9_on_vertical[0], |
| proj_9_on_horizontal[1] - proj_9_on_vertical[1] |
| ]) |
| |
| try: |
| t, s = np.linalg.solve(A, b) |
| |
| |
| x_9 = proj_9_on_vertical[0] + t * horizontal_dir_x |
| y_9 = proj_9_on_vertical[1] + t * horizontal_dir_y |
| |
| result[8] = (int(round(x_9)), int(round(y_9))) |
| total_left_side_count += 1 |
| except np.linalg.LinAlgError: |
| |
| x_9 = proj_9_on_vertical[0] |
| y_9 = proj_9_on_horizontal[1] |
| result[8] = (int(round(x_9)), int(round(y_9))) |
| total_left_side_count += 1 |
| else: |
| |
| x_9 = proj_9_on_vertical[0] |
| y_9 = proj_9_on_horizontal[1] |
| result[8] = (int(round(x_9)), int(round(y_9))) |
| total_left_side_count += 1 |
| |
| print(f"total_left_side_count: {total_left_side_count}, result: {result}") |
| if total_left_side_count > 5: |
| pass |
| |
| |
| m_line_left = None |
| b_line_left = None |
| best_line_for_calc_left = None |
| best_line_type_for_calc_left = None |
| |
| if best_vertical_line_name_left is not None and len(best_vertical_line_points_left) >= 2: |
| best_line_for_calc_left = best_vertical_line_points_left |
| best_line_type_for_calc_left = 'vertical' |
| points_array = np.array([[kp[0], kp[1]] for _, kp in best_vertical_line_points_left]) |
| x_coords = points_array[:, 0] |
| y_coords = points_array[:, 1] |
| A = np.vstack([x_coords, np.ones(len(x_coords))]).T |
| m_line_left, b_line_left = np.linalg.lstsq(A, y_coords, rcond=None)[0] |
| elif best_horizontal_line_name_left is not None and len(best_horizontal_line_points_left) >= 2: |
| best_line_for_calc_left = best_horizontal_line_points_left |
| best_line_type_for_calc_left = 'horizontal' |
| points_array = np.array([[kp[0], kp[1]] for _, kp in best_horizontal_line_points_left]) |
| x_coords = points_array[:, 0] |
| y_coords = points_array[:, 1] |
| A = np.vstack([x_coords, np.ones(len(x_coords))]).T |
| m_line_left, b_line_left = np.linalg.lstsq(A, y_coords, rcond=None)[0] |
| |
| |
| |
| if total_left_side_count < 5 and (m_line_left is not None or (best_line_for_calc_left is not None and best_line_type_for_calc_left == 'vertical')): |
| |
| counts_per_line = [ |
| len(line_1_6_points), |
| len(line_7_8_points), |
| len(line_10_13_points) |
| ] |
| |
| |
| template_ys_1_6 = [5, 140, 250, 430, 540, 675] |
| template_indices_1_6 = [0, 1, 2, 3, 4, 5] |
| |
| if best_vertical_line_name_left == '10-13': |
| |
| for template_y, idx in zip(template_ys_1_6, template_indices_1_6): |
| if result[idx] is None and total_left_side_count < 5: |
| |
| new_counts = counts_per_line.copy() |
| new_counts[0] += 1 |
| if max(new_counts) >= 4 and total_left_side_count == 4: |
| |
| continue |
| |
| |
| ref_ys = [kp[1] for _, kp in line_10_13_points] |
| ref_template_ys = [140, 270, 410, 540] |
| ref_indices = [9, 10, 11, 12] |
| |
| matched_template_ys = [] |
| for ref_idx, ref_kp in line_10_13_points: |
| if ref_idx in ref_indices: |
| template_idx = ref_indices.index(ref_idx) |
| matched_template_ys.append((ref_template_ys[template_idx], ref_kp[1])) |
| |
| if len(matched_template_ys) >= 1: |
| ref_template_y, ref_frame_y = matched_template_ys[0] |
| if ref_template_y > 0: |
| scale = ref_frame_y / ref_template_y |
| y_new = int(round(template_y * scale)) |
| else: |
| y_new = ref_frame_y |
| else: |
| y_new = int(round(np.median(ref_ys))) if ref_ys else template_y |
| |
| |
| if abs(m_line_left) > 1e-6: |
| x_on_line_10_13 = (y_new - b_line_left) / m_line_left |
| x_new = int(round(x_on_line_10_13 * 0.0303)) |
| else: |
| x_new = int(round(np.median([kp[0] for _, kp in line_10_13_points]) * 0.0303)) |
| |
| result[idx] = (x_new, y_new) |
| total_left_side_count += 1 |
| if total_left_side_count >= 5: |
| break |
| elif best_vertical_line_name_left == '1-6': |
| |
| for template_y, idx in zip(template_ys_1_6, template_indices_1_6): |
| if result[idx] is None and total_left_side_count < 5: |
| |
| new_counts = counts_per_line.copy() |
| new_counts[0] += 1 |
| if max(new_counts) >= 4 and total_left_side_count == 4: |
| |
| continue |
| |
| |
| if abs(m_line_left) > 1e-6: |
| x_new = (template_y - b_line_left) / m_line_left |
| else: |
| x_new = np.median([kp[0] for _, kp in line_1_6_points]) |
| |
| |
| ref_ys = [kp[1] for _, kp in line_1_6_points] |
| ref_template_ys = [] |
| for ref_idx, _ in line_1_6_points: |
| if ref_idx in template_indices_1_6: |
| template_idx = template_indices_1_6.index(ref_idx) |
| ref_template_ys.append(template_ys_1_6[template_idx]) |
| |
| if len(ref_ys) >= 1 and len(ref_template_ys) >= 1: |
| ref_template_y = ref_template_ys[0] |
| ref_frame_y = ref_ys[0] |
| if ref_template_y > 0: |
| scale = ref_frame_y / ref_template_y |
| y_new = int(round(template_y * scale)) |
| else: |
| y_new = ref_frame_y |
| else: |
| y_new = int(round(np.median(ref_ys))) if ref_ys else template_y |
| |
| result[idx] = (int(round(x_new)), y_new) |
| total_left_side_count += 1 |
| if total_left_side_count >= 5: |
| break |
| |
| print(f"total_left_side_count: {total_left_side_count}, result: {result}") |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| right_side_all = [] |
| line_18_21_points = [] |
| line_23_24_points = [] |
| line_25_30_points = [] |
| |
| for idx in range(17, 30): |
| kp = get_kp(idx) |
| if kp: |
| right_side_all.append((idx, kp)) |
| if 17 <= idx <= 20: |
| line_18_21_points.append((idx, kp)) |
| elif 22 <= idx <= 23: |
| line_23_24_points.append((idx, kp)) |
| elif 24 <= idx <= 29: |
| line_25_30_points.append((idx, kp)) |
| |
| kp_22 = get_kp(21) |
| if kp_22: |
| right_side_all.append((21, kp_22)) |
| |
| total_right_side_count = len(right_side_all) |
| |
| |
| if total_right_side_count >= 6: |
| pass |
| elif total_right_side_count == 5: |
| |
| counts_per_line = [ |
| len(line_18_21_points), |
| len(line_23_24_points), |
| len(line_25_30_points) |
| ] |
| |
| if max(counts_per_line) == 4 and sum(counts_per_line) == 4: |
| |
| |
| if len(line_18_21_points) == 4: |
| |
| |
| if len(line_25_30_points) == 0: |
| |
| |
| points_18_21 = np.array([[kp[0], kp[1]] for _, kp in line_18_21_points]) |
| x_coords = points_18_21[:, 0] |
| y_coords = points_18_21[:, 1] |
| A = np.vstack([x_coords, np.ones(len(x_coords))]).T |
| m_18_21, b_18_21 = np.linalg.lstsq(A, y_coords, rcond=None)[0] |
| |
| |
| |
| template_ys_25_30 = [5, 140, 250, 430, 540, 675] |
| template_indices_25_30 = [24, 25, 26, 27, 28, 29] |
| |
| |
| median_y = np.median(y_coords) |
| |
| ref_template_y = min(template_ys_25_30, key=lambda ty: abs(ty - np.median([kp[1] for _, kp in line_18_21_points]))) |
| ref_idx = template_ys_25_30.index(ref_template_y) |
| |
| |
| y_new = int(round(median_y)) |
| |
| |
| |
| |
| if abs(m_18_21) > 1e-6: |
| x_on_line_18_21 = (y_new - b_18_21) / m_18_21 |
| x_new = int(round(x_on_line_18_21 * 1.177)) |
| else: |
| x_new = int(round(np.median(x_coords) * 1.177)) |
| |
| |
| for template_y, idx in zip(template_ys_25_30, template_indices_25_30): |
| if result[idx] is None: |
| result[idx] = (x_new, y_new) |
| break |
| elif len(line_25_30_points) == 4: |
| |
| |
| points_25_30 = np.array([[kp[0], kp[1]] for _, kp in line_25_30_points]) |
| x_coords = points_25_30[:, 0] |
| y_coords = points_25_30[:, 1] |
| A = np.vstack([x_coords, np.ones(len(x_coords))]).T |
| m_25_30, b_25_30 = np.linalg.lstsq(A, y_coords, rcond=None)[0] |
| |
| |
| template_ys_18_21 = [140, 270, 410, 540] |
| template_indices_18_21 = [17, 18, 19, 20] |
| |
| median_y = np.median(y_coords) |
| |
| |
| |
| if abs(m_25_30) > 1e-6: |
| x_on_line_25_30 = (median_y - b_25_30) / m_25_30 |
| x_new = int(round(x_on_line_25_30 * 0.850)) |
| else: |
| x_new = int(round(np.median(x_coords) * 0.850)) |
| |
| for template_y, idx in zip(template_ys_18_21, template_indices_18_21): |
| if result[idx] is None: |
| result[idx] = (x_new, int(round(median_y))) |
| break |
| elif total_right_side_count < 5: |
| |
| |
| |
| |
| |
| |
| template_coords = { |
| 17: (888, 140), |
| 18: (888, 270), |
| 19: (888, 410), |
| 20: (888, 540), |
| 21: (940, 340), |
| 22: (998, 250), |
| 23: (998, 430), |
| 24: (1045, 5), |
| 25: (1045, 140), |
| 26: (1045, 250), |
| 27: (1045, 430), |
| 28: (1045, 540), |
| 29: (1045, 675), |
| } |
| |
| |
| |
| |
| line_groups = { |
| '18-21': ([17, 18, 19, 20], 'vertical'), |
| '23-24': ([22, 23], 'vertical'), |
| '25-30': ([24, 25, 26, 27, 28, 29], 'vertical'), |
| '18-26': ([17, 25], 'horizontal'), |
| '23-27': ([22, 26], 'horizontal'), |
| '24-28': ([23, 27], 'horizontal'), |
| '21-29': ([20, 28], 'horizontal'), |
| } |
| |
| |
| all_available_points = {} |
| for idx, kp in line_18_21_points: |
| all_available_points[idx] = kp |
| for idx, kp in line_23_24_points: |
| all_available_points[idx] = kp |
| for idx, kp in line_25_30_points: |
| all_available_points[idx] = kp |
| |
| |
| best_vertical_line_name = None |
| best_vertical_line_points = [] |
| max_vertical_points = 1 |
| |
| best_horizontal_line_name = None |
| best_horizontal_line_points = [] |
| max_horizontal_points = 1 |
| |
| for line_name, (indices, line_type) in line_groups.items(): |
| line_points = [(idx, all_available_points[idx]) for idx in indices if idx in all_available_points] |
| if line_type == 'vertical' and len(line_points) > max_vertical_points: |
| max_vertical_points = len(line_points) |
| best_vertical_line_name = line_name |
| best_vertical_line_points = line_points |
| elif line_type == 'horizontal' and len(line_points) > max_horizontal_points: |
| max_horizontal_points = len(line_points) |
| best_horizontal_line_name = line_name |
| best_horizontal_line_points = line_points |
| |
| |
| |
| if best_vertical_line_name is not None: |
| expected_indices = line_groups[best_vertical_line_name][0] |
| detected_indices = {idx for idx, _ in best_vertical_line_points} |
| missing_indices = [idx for idx in expected_indices if idx not in detected_indices] |
| |
| if len(missing_indices) > 0: |
| |
| template_start = template_coords[best_vertical_line_points[0][0]] |
| template_end = template_coords[best_vertical_line_points[-1][0]] |
| frame_start = best_vertical_line_points[0][1] |
| frame_end = best_vertical_line_points[-1][1] |
| |
| for missing_idx in missing_indices: |
| template_missing = template_coords[missing_idx] |
| |
| |
| template_y_start = template_start[1] |
| template_y_end = template_end[1] |
| template_y_missing = template_missing[1] |
| |
| if abs(template_y_end - template_y_start) > 1e-6: |
| ratio = (template_y_missing - template_y_start) / (template_y_end - template_y_start) |
| else: |
| ratio = 0.5 |
| |
| |
| x_new = frame_start[0] + (frame_end[0] - frame_start[0]) * ratio |
| y_new = frame_start[1] + (frame_end[1] - frame_start[1]) * ratio |
| new_point = (int(round(x_new)), int(round(y_new))) |
| |
| |
| result[missing_idx] = new_point |
| best_vertical_line_points.append((missing_idx, new_point)) |
| all_available_points[missing_idx] = new_point |
| total_right_side_count += 1 |
| max_vertical_points = len(best_vertical_line_points) |
| |
| |
| best_vertical_line_points.sort(key=lambda x: x[0]) |
| |
| |
| for line_name, (indices, line_type) in line_groups.items(): |
| if line_type == 'horizontal': |
| line_points = [(idx, all_available_points[idx]) for idx in indices if idx in all_available_points] |
| if len(line_points) > max_horizontal_points: |
| max_horizontal_points = len(line_points) |
| best_horizontal_line_name = line_name |
| best_horizontal_line_points = line_points |
| |
| |
| if best_horizontal_line_name is not None: |
| expected_indices = line_groups[best_horizontal_line_name][0] |
| detected_indices = {idx for idx, _ in best_horizontal_line_points} |
| missing_indices = [idx for idx in expected_indices if idx not in detected_indices] |
| |
| if len(missing_indices) > 0: |
| |
| template_start = template_coords[best_horizontal_line_points[0][0]] |
| template_end = template_coords[best_horizontal_line_points[-1][0]] |
| frame_start = best_horizontal_line_points[0][1] |
| frame_end = best_horizontal_line_points[-1][1] |
| |
| for missing_idx in missing_indices: |
| template_missing = template_coords[missing_idx] |
| |
| |
| template_x_start = template_start[0] |
| template_x_end = template_end[0] |
| template_x_missing = template_missing[0] |
| |
| if abs(template_x_end - template_x_start) > 1e-6: |
| ratio = (template_x_missing - template_x_start) / (template_x_end - template_x_start) |
| else: |
| ratio = 0.5 |
| |
| |
| x_new = frame_start[0] + (frame_end[0] - frame_start[0]) * ratio |
| y_new = frame_start[1] + (frame_end[1] - frame_start[1]) * ratio |
| new_point = (int(round(x_new)), int(round(y_new))) |
| |
| |
| result[missing_idx] = new_point |
| best_horizontal_line_points.append((missing_idx, new_point)) |
| all_available_points[missing_idx] = new_point |
| total_right_side_count += 1 |
| max_horizontal_points = len(best_horizontal_line_points) |
| |
| |
| best_horizontal_line_points.sort(key=lambda x: x[0]) |
| |
| |
| for line_name, (indices, line_type) in line_groups.items(): |
| if line_type == 'vertical': |
| line_points = [(idx, all_available_points[idx]) for idx in indices if idx in all_available_points] |
| if len(line_points) > max_vertical_points: |
| max_vertical_points = len(line_points) |
| best_vertical_line_name = line_name |
| best_vertical_line_points = line_points |
| |
| |
| if best_vertical_line_name is not None and best_horizontal_line_name is None: |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| off_line_point = None |
| off_line_idx = None |
| vertical_line_indices = line_groups[best_vertical_line_name][0] |
| for idx, kp in all_available_points.items(): |
| if idx not in vertical_line_indices: |
| off_line_point = kp |
| off_line_idx = idx |
| break |
| |
| if off_line_point is not None: |
| |
| off_line_point = np.array(off_line_point) |
| |
| |
| template_off_line = template_coords[off_line_idx] |
|
|
| template_vertical_start_index = best_vertical_line_points[0][0] |
| template_vertical_end_index = best_vertical_line_points[-1][0] |
| |
| template_vertical_start = template_coords[template_vertical_start_index] |
| template_vertical_end = template_coords[template_vertical_end_index] |
| |
| |
| template_y_off = template_off_line[1] |
| template_y_vertical_start = template_vertical_start[1] |
| template_y_vertical_end = template_vertical_end[1] |
| |
| if abs(template_y_vertical_end - template_y_vertical_start) > 1e-6: |
| ratio_proj = (template_y_off - template_y_vertical_start) / (template_y_vertical_end - template_y_vertical_start) |
| else: |
| ratio_proj = 0.5 |
| |
| frame_vertical_start = best_vertical_line_points[0][1] |
| frame_vertical_end = best_vertical_line_points[-1][1] |
| proj_x = frame_vertical_start[0] + (frame_vertical_end[0] - frame_vertical_start[0]) * ratio_proj |
| proj_y = frame_vertical_start[1] + (frame_vertical_end[1] - frame_vertical_start[1]) * ratio_proj |
| proj_point = np.array([proj_x, proj_y]) |
|
|
| if best_vertical_line_name == '25-30' and len(best_vertical_line_points) == 6: |
| if off_line_idx == 18 or off_line_idx == 19: |
| kp_26 = np.array(best_vertical_line_points[1][1]) |
|
|
| kp_18 = off_line_point + (kp_26 - proj_point) |
| result[17] = tuple(kp_18.astype(int)) |
| total_right_side_count += 1 |
| all_available_points[17] = tuple(kp_18.astype(int)) |
|
|
| if best_vertical_line_name == '18-21' and len(best_vertical_line_points) == 4: |
| if off_line_idx == 22 or off_line_idx == 23: |
| template_19 = template_coords[18] |
| template_23 = template_coords[22] |
| template_27 = template_coords[26] |
|
|
| ratio = (template_27[0] - template_19[0]) / (template_23[0] - template_19[0]) |
|
|
| expected_point = proj_point + (off_line_point - proj_point) * ratio |
|
|
| if off_line_idx == 22: |
| result[26] = tuple(expected_point.astype(int)) |
| total_right_side_count += 1 |
| all_available_points[26] = tuple(expected_point.astype(int)) |
| else: |
| result[27] = tuple(expected_point.astype(int)) |
| total_right_side_count += 1 |
| all_available_points[27] = tuple(expected_point.astype(int)) |
|
|
| if off_line_idx == 24 or off_line_idx == 26: |
| kp_18 = np.array(best_vertical_line_points[0][1]) |
| kp_26 = off_line_point + (kp_18 - proj_point) |
|
|
| result[25] = tuple(kp_26.astype(int)) |
| total_right_side_count += 1 |
| all_available_points[25] = tuple(kp_26.astype(int)) |
|
|
| if off_line_idx == 27 or off_line_idx == 29: |
| kp_21 = np.array(best_vertical_line_points[-1][1]) |
| kp_29 = off_line_point + (kp_21 - proj_point) |
|
|
| result[28] = tuple(kp_29.astype(int)) |
| total_right_side_count += 1 |
| all_available_points[28] = tuple(kp_29.astype(int)) |
|
|
| |
| if best_vertical_line_name == '23-24' and len(best_vertical_line_points) == 2: |
| if off_line_idx == 17 or off_line_idx == 18 or off_line_idx == 19 or off_line_idx == 20: |
| template_18 = template_coords[17] |
| template_26 = template_coords[25] |
| template_23 = template_coords[22] |
|
|
| ratio_26 = (template_26[0] - template_18[0]) / (template_23[0] - template_18[0]) |
| |
| kp_18 = None |
| if off_line_idx == 17: |
| kp_18 = off_line_point |
| elif off_line_idx == 18 or off_line_idx == 19 or off_line_idx == 20: |
| template_off_line = template_coords[off_line_idx] |
| ratio = (template_18[1] - template_off_line[1]) / (template_23[1] - template_off_line[1]) |
| kp_18 = off_line_point + (np.array(best_vertical_line_points[0][1]) - proj_point) * ratio |
|
|
| if kp_18 is not None: |
| kp_26 = kp_18 + (proj_point - off_line_point) * ratio_26 |
| result[25] = tuple(kp_26.astype(int)) |
| total_right_side_count += 1 |
| all_available_points[25] = tuple(kp_26.astype(int)) |
|
|
| if off_line_idx == 24 or off_line_idx == 25: |
| kp_27 = off_line_point + (np.array(best_vertical_line_points[0][1]) - proj_point) |
|
|
| result[26] = tuple(kp_27.astype(int)) |
| total_right_side_count += 1 |
| all_available_points[26] = tuple(kp_27.astype(int)) |
|
|
| if off_line_idx == 28 or off_line_idx == 29: |
| kp_29 = off_line_point + (np.array(best_vertical_line_points[-1][1]) - proj_point) |
|
|
| result[28] = tuple(kp_29.astype(int)) |
| total_right_side_count += 1 |
| all_available_points[28] = tuple(kp_29.astype(int)) |
|
|
|
|
| |
| for line_name, (indices, line_type) in line_groups.items(): |
| if line_type == 'horizontal': |
| line_points = [(idx, all_available_points[idx]) for idx in indices if idx in all_available_points] |
| if len(line_points) > max_horizontal_points: |
| max_horizontal_points = len(line_points) |
| best_horizontal_line_name = line_name |
| best_horizontal_line_points = line_points |
| |
|
|
| elif best_horizontal_line_name is not None and best_vertical_line_name is None: |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| off_line_point = None |
| off_line_idx = None |
| horizontal_line_indices = line_groups[best_horizontal_line_name][0] |
| for idx, kp in all_available_points.items(): |
| if idx not in horizontal_line_indices: |
| off_line_point = kp |
| off_line_idx = idx |
| break |
| |
| if off_line_point is not None: |
| |
| template_off_line = template_coords[off_line_idx] |
| template_horizontal_start = template_coords[best_horizontal_line_points[0][0]] |
| template_horizontal_end = template_coords[best_horizontal_line_points[-1][0]] |
| |
| |
| template_x_off = template_off_line[0] |
| template_x_horizontal_start = template_horizontal_start[0] |
| template_x_horizontal_end = template_horizontal_end[0] |
| |
| if abs(template_x_horizontal_end - template_x_horizontal_start) > 1e-6: |
| ratio_proj = (template_x_off - template_x_horizontal_start) / (template_x_horizontal_end - template_x_horizontal_start) |
| else: |
| ratio_proj = 0.5 |
| |
| frame_horizontal_start = best_horizontal_line_points[0][1] |
| frame_horizontal_end = best_horizontal_line_points[-1][1] |
| proj_x = frame_horizontal_start[0] + (frame_horizontal_end[0] - frame_horizontal_start[0]) * ratio_proj |
| proj_y = frame_horizontal_start[1] + (frame_horizontal_end[1] - frame_horizontal_start[1]) * ratio_proj |
| proj_point = np.array([proj_x, proj_y]) |
|
|
| if best_horizontal_line_name == '18-26': |
| if off_line_idx == 22 or off_line_idx == 23: |
| template_18 = template_coords[best_horizontal_line_points[0][0]] |
| template_26 = template_coords[best_horizontal_line_points[-1][0]] |
| template_23 = template_coords[off_line_idx] |
|
|
| ratio_26 = (template_26[0] - template_23[0]) / (template_26[0] - template_18[0]) |
| |
| detected_point = off_line_point + (np.array(best_horizontal_line_points[-1][1]) - np.array(best_horizontal_line_points[0][1])) * ratio_26 |
| |
| if off_line_idx == 22: |
| result[26] = tuple(detected_point.astype(int)) |
| total_right_side_count += 1 |
| all_available_points[26] = tuple(detected_point.astype(int)) |
| else: |
| result[27] = tuple(detected_point.astype(int)) |
| total_right_side_count += 1 |
| all_available_points[27] = tuple(detected_point.astype(int)) |
|
|
| if best_horizontal_line_name == '23-27': |
| if off_line_idx == 17 or off_line_idx == 20: |
| template_18 = template_coords[17] |
| template_26 = template_coords[25] |
| template_23 = template_coords[best_horizontal_line_points[0][0]] |
|
|
| ratio_26 = (template_26[0] - template_18[0]) / (template_26[0] - template_23[0]) |
| |
| detected_point = off_line_point + (np.array(best_horizontal_line_points[-1][1]) - np.array(best_horizontal_line_points[0][1])) * ratio_26 |
|
|
| if off_line_idx == 17: |
| result[25] = tuple(detected_point.astype(int)) |
| total_right_side_count += 1 |
| all_available_points[25] = tuple(detected_point.astype(int)) |
| else: |
| result[28] = tuple(detected_point.astype(int)) |
| total_right_side_count += 1 |
| all_available_points[28] = tuple(detected_point.astype(int)) |
|
|
| if off_line_idx == 18 or off_line_idx == 19: |
| template_18 = template_coords[17] |
| template_off_line = template_coords[off_line_idx] |
| template_23 = template_coords[best_horizontal_line_points[0][0]] |
|
|
| ratio = (template_off_line[1] - template_18[1]) / (template_off_line[1] - template_23[1]) |
| kp_18 = off_line_point + (proj_point - off_line_point) * ratio |
|
|
| result[17] = tuple(kp_18.astype(int)) |
| total_right_side_count += 1 |
| all_available_points[17] = tuple(kp_18.astype(int)) |
|
|
| if best_horizontal_line_name == '24-28': |
| if off_line_idx == 17 or off_line_idx == 20: |
| template_18 = template_coords[17] |
| template_26 = template_coords[25] |
| template_24 = template_coords[best_horizontal_line_points[0][0]] |
|
|
| ratio_26 = (template_26[0] - template_18[0]) / (template_26[0] - template_24[0]) |
| |
| detected_point = off_line_point + (np.array(best_horizontal_line_points[-1][1]) - np.array(best_horizontal_line_points[0][1])) * ratio_26 |
|
|
| if off_line_idx == 17: |
| result[25] = tuple(detected_point.astype(int)) |
| total_right_side_count += 1 |
| all_available_points[25] = tuple(detected_point.astype(int)) |
| else: |
| result[28] = tuple(detected_point.astype(int)) |
| total_right_side_count += 1 |
| all_available_points[28] = tuple(detected_point.astype(int)) |
|
|
| if off_line_idx == 18 or off_line_idx == 19: |
| template_21 = template_coords[20] |
| template_off_line = template_coords[off_line_idx] |
| template_24 = template_coords[best_horizontal_line_points[0][0]] |
|
|
| ratio = (template_21[1] - template_off_line[1]) / (template_24[1] - template_off_line[1]) |
| kp_21 = off_line_point + (proj_point - off_line_point) * ratio |
|
|
| result[20] = tuple(kp_18.astype(int)) |
| total_right_side_count += 1 |
| all_available_points[20] = tuple(kp_18.astype(int)) |
|
|
| if best_horizontal_line_name == '21-29': |
| if off_line_idx == 22 or off_line_idx == 23: |
| template_21 = template_coords[best_horizontal_line_points[0][0]] |
| template_29 = template_coords[best_horizontal_line_points[-1][0]] |
| template_23 = template_coords[off_line_idx] |
|
|
| ratio_29 = (template_29[0] - template_23[0]) / (template_29[0] - template_21[0]) |
| |
| detected_point = off_line_point + (np.array(best_horizontal_line_points[-1][1]) - np.array(best_horizontal_line_points[0][1])) * ratio_29 |
| |
| if off_line_idx == 22: |
| result[26] = tuple(detected_point.astype(int)) |
| total_right_side_count += 1 |
| all_available_points[26] = tuple(detected_point.astype(int)) |
| else: |
| result[27] = tuple(detected_point.astype(int)) |
| total_right_side_count += 1 |
| all_available_points[27] = tuple(detected_point.astype(int)) |
|
|
| |
| for line_name, (indices, line_type) in line_groups.items(): |
| if line_type == 'vertical': |
| line_points = [(idx, all_available_points[idx]) for idx in indices if idx in all_available_points] |
| if len(line_points) > max_vertical_points: |
| max_vertical_points = len(line_points) |
| best_vertical_line_name = line_name |
| best_vertical_line_points = line_points |
| |
| |
| if best_vertical_line_name is not None and best_horizontal_line_name is not None: |
| if kp_22 is None: |
| print(f"Calculating keypoint 22 using both vertical and horizontal lines: {best_vertical_line_name} and {best_horizontal_line_name}") |
|
|
| template_x_22 = 940 |
| template_y_22 = 340 |
| |
| |
|
|
| template_vertical_start = template_coords[best_vertical_line_points[0][0]] |
| template_vertical_end = template_coords[best_vertical_line_points[-1][0]] |
| |
| |
| template_y_vertical_start = template_vertical_start[1] |
| template_y_vertical_end = template_vertical_end[1] |
| |
| if abs(template_y_vertical_end - template_y_vertical_start) > 1e-6: |
| ratio_22_vertical = (template_y_22 - template_y_vertical_start) / (template_y_vertical_end - template_y_vertical_start) |
| else: |
| ratio_22_vertical = 0.5 |
| |
| frame_vertical_start = best_vertical_line_points[0][1] |
| frame_vertical_end = best_vertical_line_points[-1][1] |
| proj_22_on_vertical_x = frame_vertical_start[0] + (frame_vertical_end[0] - frame_vertical_start[0]) * ratio_22_vertical |
| proj_22_on_vertical_y = frame_vertical_start[1] + (frame_vertical_end[1] - frame_vertical_start[1]) * ratio_22_vertical |
| proj_22_on_vertical = (proj_22_on_vertical_x, proj_22_on_vertical_y) |
| |
| |
|
|
| template_horizontal_start = template_coords[best_horizontal_line_points[0][0]] |
| template_horizontal_end = template_coords[best_horizontal_line_points[-1][0]] |
| |
| |
| template_x_horizontal_start = template_horizontal_start[0] |
| template_x_horizontal_end = template_horizontal_end[0] |
| |
| if abs(template_x_horizontal_end - template_x_horizontal_start) > 1e-6: |
| ratio_22_horizontal = (template_x_22 - template_x_horizontal_start) / (template_x_horizontal_end - template_x_horizontal_start) |
| else: |
| ratio_22_horizontal = 0.5 |
| |
| frame_horizontal_start = best_horizontal_line_points[0][1] |
| frame_horizontal_end = best_horizontal_line_points[-1][1] |
| proj_22_on_horizontal_x = frame_horizontal_start[0] + (frame_horizontal_end[0] - frame_horizontal_start[0]) * ratio_22_horizontal |
| proj_22_on_horizontal_y = frame_horizontal_start[1] + (frame_horizontal_end[1] - frame_horizontal_start[1]) * ratio_22_horizontal |
| proj_22_on_horizontal = (proj_22_on_horizontal_x, proj_22_on_horizontal_y) |
| |
| |
| |
| |
| |
| |
| horizontal_dir_x = frame_horizontal_end[0] - frame_horizontal_start[0] |
| horizontal_dir_y = frame_horizontal_end[1] - frame_horizontal_start[1] |
| horizontal_dir_length = np.sqrt(horizontal_dir_x**2 + horizontal_dir_y**2) |
| |
| |
| vertical_dir_x = frame_vertical_end[0] - frame_vertical_start[0] |
| vertical_dir_y = frame_vertical_end[1] - frame_vertical_start[1] |
| vertical_dir_length = np.sqrt(vertical_dir_x**2 + vertical_dir_y**2) |
| |
| if horizontal_dir_length > 1e-6 and vertical_dir_length > 1e-6: |
| |
| horizontal_dir_x /= horizontal_dir_length |
| horizontal_dir_y /= horizontal_dir_length |
| vertical_dir_x /= vertical_dir_length |
| vertical_dir_y /= vertical_dir_length |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| A = np.array([ |
| [horizontal_dir_x, -vertical_dir_x], |
| [horizontal_dir_y, -vertical_dir_y] |
| ]) |
| b = np.array([ |
| proj_22_on_horizontal[0] - proj_22_on_vertical[0], |
| proj_22_on_horizontal[1] - proj_22_on_vertical[1] |
| ]) |
| |
| try: |
| t, s = np.linalg.solve(A, b) |
| |
| |
| x_22 = proj_22_on_vertical[0] + t * horizontal_dir_x |
| y_22 = proj_22_on_vertical[1] + t * horizontal_dir_y |
| |
| result[21] = (int(round(x_22)), int(round(y_22))) |
| total_right_side_count += 1 |
| except np.linalg.LinAlgError: |
| |
| |
| x_22 = proj_22_on_vertical[0] |
| y_22 = proj_22_on_horizontal[1] |
| result[21] = (int(round(x_22)), int(round(y_22))) |
| total_right_side_count += 1 |
| else: |
| |
| x_22 = proj_22_on_vertical[0] |
| y_22 = proj_22_on_horizontal[1] |
| result[21] = (int(round(x_22)), int(round(y_22))) |
| total_right_side_count += 1 |
| |
| print(f"total_right_side_count: {total_right_side_count}, result: {result}") |
| if total_right_side_count > 5: |
| return result |
|
|
| |
| m_line = None |
| b_line = None |
| best_line_for_calc = None |
| best_line_type_for_calc = None |
| |
| if best_vertical_line_name is not None and len(best_vertical_line_points) >= 2: |
| best_line_for_calc = best_vertical_line_points |
| best_line_type_for_calc = 'vertical' |
| points_array = np.array([[kp[0], kp[1]] for _, kp in best_vertical_line_points]) |
| x_coords = points_array[:, 0] |
| y_coords = points_array[:, 1] |
| A = np.vstack([x_coords, np.ones(len(x_coords))]).T |
| m_line, b_line = np.linalg.lstsq(A, y_coords, rcond=None)[0] |
| elif best_horizontal_line_name is not None and len(best_horizontal_line_points) >= 2: |
| best_line_for_calc = best_horizontal_line_points |
| best_line_type_for_calc = 'horizontal' |
| points_array = np.array([[kp[0], kp[1]] for _, kp in best_horizontal_line_points]) |
| x_coords = points_array[:, 0] |
| y_coords = points_array[:, 1] |
| A = np.vstack([x_coords, np.ones(len(x_coords))]).T |
| m_line, b_line = np.linalg.lstsq(A, y_coords, rcond=None)[0] |
| |
| |
| |
| if total_right_side_count < 5 and (m_line is not None or (best_line_for_calc is not None and best_line_type_for_calc == 'vertical')): |
| |
| counts_per_line = [ |
| len(line_18_21_points), |
| len(line_23_24_points), |
| len(line_25_30_points) |
| ] |
| |
| |
| template_ys_18_21 = [140, 270, 410, 540] |
| template_indices_18_21 = [17, 18, 19, 20] |
| |
| if best_vertical_line_name == '25-30': |
| |
| for template_y, idx in zip(template_ys_18_21, template_indices_18_21): |
| if result[idx] is None and total_right_side_count < 5: |
| |
| new_counts = counts_per_line.copy() |
| new_counts[0] += 1 |
| if max(new_counts) >= 4 and total_right_side_count == 4: |
| |
| continue |
| |
| |
| ref_ys = [kp[1] for _, kp in line_25_30_points] |
| ref_template_ys = [5, 140, 250, 430, 540, 675] |
| ref_indices = [24, 25, 26, 27, 28, 29] |
| |
| matched_template_ys = [] |
| for ref_idx, ref_kp in line_25_30_points: |
| if ref_idx in ref_indices: |
| template_idx = ref_indices.index(ref_idx) |
| matched_template_ys.append((ref_template_ys[template_idx], ref_kp[1])) |
| |
| if len(matched_template_ys) >= 1: |
| ref_template_y, ref_frame_y = matched_template_ys[0] |
| if ref_template_y > 0: |
| scale = ref_frame_y / ref_template_y |
| y_new = int(round(template_y * scale)) |
| else: |
| y_new = ref_frame_y |
| else: |
| y_new = int(round(np.median(ref_ys))) if ref_ys else template_y |
| |
| |
| if abs(m_line) > 1e-6: |
| x_on_line_25_30 = (y_new - b_line) / m_line |
| x_new = int(round(x_on_line_25_30 * 0.850)) |
| else: |
| x_new = int(round(np.median([kp[0] for _, kp in line_25_30_points]) * 0.850)) |
| |
| result[idx] = (x_new, y_new) |
| total_right_side_count += 1 |
| if total_right_side_count >= 5: |
| break |
| elif best_vertical_line_name == '18-21': |
| |
| for template_y, idx in zip(template_ys_18_21, template_indices_18_21): |
| if result[idx] is None and total_right_side_count < 5: |
| |
| new_counts = counts_per_line.copy() |
| new_counts[0] += 1 |
| if max(new_counts) >= 4 and total_right_side_count == 4: |
| |
| continue |
| |
| |
| if abs(m_line) > 1e-6: |
| x_new = (template_y - b_line) / m_line |
| else: |
| x_new = np.median([kp[0] for _, kp in line_18_21_points]) |
| |
| |
| ref_ys = [kp[1] for _, kp in line_18_21_points] |
| ref_template_ys = [] |
| for ref_idx, _ in line_18_21_points: |
| if ref_idx in template_indices_18_21: |
| template_idx = template_indices_18_21.index(ref_idx) |
| ref_template_ys.append(template_ys_18_21[template_idx]) |
| |
| if len(ref_ys) >= 1 and len(ref_template_ys) >= 1: |
| ref_template_y = ref_template_ys[0] |
| ref_frame_y = ref_ys[0] |
| if ref_template_y > 0: |
| scale = ref_frame_y / ref_template_y |
| y_new = int(round(template_y * scale)) |
| else: |
| y_new = ref_frame_y |
| else: |
| y_new = int(round(np.median(ref_ys))) if ref_ys else template_y |
| |
| result[idx] = (int(round(x_new)), y_new) |
| total_right_side_count += 1 |
| if total_right_side_count >= 5: |
| break |
| |
| |
| |
| |
| return result |
|
|
| def check_keypoints_would_cause_invalid_mask( |
| frame_keypoints: list[tuple[int, int]], |
| template_keypoints: list[tuple[int, int]] = None, |
| frame: np.ndarray = None, |
| floor_markings_template: np.ndarray = None, |
| return_warped_data: bool = False, |
| ) -> tuple[bool, str] | tuple[bool, str, tuple]: |
| """ |
| Check if keypoints would cause InvalidMask errors during evaluation. |
| |
| Args: |
| frame_keypoints: Frame keypoints to check |
| template_keypoints: Template keypoints (defaults to TEMPLATE_KEYPOINTS) |
| frame: Optional frame image for full validation |
| floor_markings_template: Optional template image for full validation |
| |
| Returns: |
| Tuple of (would_cause_error, error_message) |
| """ |
| try: |
| from keypoint_evaluation import ( |
| validate_projected_corners, |
| TEMPLATE_KEYPOINTS, |
| INDEX_KEYPOINT_CORNER_BOTTOM_LEFT, |
| INDEX_KEYPOINT_CORNER_BOTTOM_RIGHT, |
| INDEX_KEYPOINT_CORNER_TOP_LEFT, |
| INDEX_KEYPOINT_CORNER_TOP_RIGHT, |
| findHomography, |
| InvalidMask, |
| ) |
| |
| if template_keypoints is None: |
| template_keypoints = TEMPLATE_KEYPOINTS |
| |
| |
| filtered_template = [] |
| filtered_frame = [] |
| |
| for i, (t_kp, f_kp) in enumerate(zip(template_keypoints, frame_keypoints)): |
| if f_kp[0] > 0 and f_kp[1] > 0: |
| filtered_template.append(t_kp) |
| filtered_frame.append(f_kp) |
| |
| if len(filtered_template) < 4: |
| if return_warped_data: |
| return (True, "Not enough keypoints for homography", None) |
| return (True, "Not enough keypoints for homography") |
| |
| |
| src_pts = np.array(filtered_template, dtype=np.float32) |
| dst_pts = np.array(filtered_frame, dtype=np.float32) |
| |
| result = findHomography(src_pts, dst_pts) |
| if result is None: |
| if return_warped_data: |
| return (True, "Failed to compute homography", None) |
| return (True, "Failed to compute homography") |
| H, _ = result |
| |
| |
| try: |
| validate_projected_corners( |
| source_keypoints=template_keypoints, |
| homography_matrix=H |
| ) |
| except Exception as e: |
| error_msg = "Projection twisted (bowtie)" if "twisted" in str(e).lower() or "Projection twisted" in str(e).lower() else str(e) |
| if return_warped_data: |
| return (True, error_msg, None) |
| return (True, error_msg) |
| |
| |
| if frame is not None and floor_markings_template is not None: |
| try: |
| from keypoint_evaluation import ( |
| project_image_using_keypoints, |
| extract_masks_for_ground_and_lines, |
| InvalidMask, |
| ) |
| |
| |
| try: |
| |
| warped_template = project_image_using_keypoints( |
| image=floor_markings_template, |
| source_keypoints=template_keypoints, |
| destination_keypoints=frame_keypoints, |
| destination_width=frame.shape[1], |
| destination_height=frame.shape[0], |
| ) |
| |
| |
| except InvalidMask as e: |
| if return_warped_data: |
| return (True, f"Projection validation failed: {e}", None) |
| return (True, f"Projection validation failed: {e}") |
| except Exception as e: |
| |
| if return_warped_data: |
| return (True, f"Projection failed: {e}", None) |
| return (True, f"Projection failed: {e}") |
| |
| |
| try: |
| mask_ground, mask_lines_expected = extract_masks_for_ground_and_lines( |
| image=warped_template |
| ) |
| except InvalidMask as e: |
| if return_warped_data: |
| return (True, f"Mask extraction validation failed: {e}", None) |
| return (True, f"Mask extraction validation failed: {e}") |
| except Exception as e: |
| if return_warped_data: |
| return (True, f"Mask extraction failed: {e}", None) |
| return (True, f"Mask extraction failed: {e}") |
| |
| |
| from keypoint_evaluation import validate_mask_lines, validate_mask_ground |
| try: |
| validate_mask_lines(mask_lines_expected) |
| except InvalidMask as e: |
| if return_warped_data: |
| return (True, f"Mask lines validation failed: {e}", None) |
| return (True, f"Mask lines validation failed: {e}") |
| except Exception as e: |
| if return_warped_data: |
| return (True, f"Mask lines validation error: {e}", None) |
| return (True, f"Mask lines validation error: {e}") |
| |
| try: |
| validate_mask_ground(mask_ground) |
| except InvalidMask as e: |
| if return_warped_data: |
| return (True, f"Mask ground validation failed: {e}", None) |
| return (True, f"Mask ground validation failed: {e}") |
| except Exception as e: |
| if return_warped_data: |
| return (True, f"Mask ground validation error: {e}", None) |
| return (True, f"Mask ground validation error: {e}") |
| |
| |
| if return_warped_data: |
| return (False, "", (warped_template, mask_ground, mask_lines_expected)) |
| |
| except ImportError: |
| |
| pass |
| except InvalidMask as e: |
| |
| if return_warped_data: |
| return (True, f"InvalidMask error: {e}", None) |
| return (True, f"InvalidMask error: {e}") |
| except Exception as e: |
| |
| |
| pass |
| |
| |
| if return_warped_data: |
| return (False, "", None) |
| return (False, "") |
| |
| except ImportError: |
| |
| if return_warped_data: |
| return (False, "", None) |
| return (False, "") |
| except Exception as e: |
| |
| if return_warped_data: |
| return (True, f"Validation error: {e}", None) |
| return (True, f"Validation error: {e}") |
|
|
|
|
| def evaluate_keypoints_with_cached_data( |
| frame: np.ndarray, |
| mask_ground: np.ndarray, |
| mask_lines_expected: np.ndarray, |
| ) -> float: |
| """ |
| Evaluate keypoints using pre-computed warped template and masks. |
| This avoids redundant computation when we already have the warped data from validation. |
| |
| Args: |
| frame: Frame image |
| mask_ground: Pre-computed ground mask from warped template |
| mask_lines_expected: Pre-computed expected lines mask from warped template |
| |
| Returns: |
| Score between 0.0 and 1.0 |
| """ |
| try: |
| from keypoint_evaluation import ( |
| extract_mask_of_ground_lines_in_image, |
| bitwise_and, |
| ) |
| |
| |
| mask_lines_predicted = extract_mask_of_ground_lines_in_image( |
| image=frame, ground_mask=mask_ground |
| ) |
| |
| pixels_overlapping = bitwise_and( |
| mask_lines_expected, mask_lines_predicted |
| ).sum() |
| |
| pixels_on_lines = mask_lines_expected.sum() |
| |
| score = pixels_overlapping / (pixels_on_lines + 1e-8) |
| |
| return min(1.0, max(0.0, score)) |
| |
| except Exception as e: |
| print(f'Error in cached keypoint evaluation: {e}') |
| return 0.0 |
|
|
|
|
| def check_and_evaluate_keypoints( |
| frame_keypoints: list[tuple[int, int]], |
| frame: np.ndarray, |
| ) -> tuple[bool, float, str]: |
| """ |
| Check if keypoints would cause InvalidMask errors and evaluate them in one call. |
| This reuses the warped template and masks computed during validation for evaluation. |
| |
| Args: |
| frame_keypoints: Frame keypoints to check and evaluate |
| frame: Frame image |
| |
| Returns: |
| Tuple of (is_valid, score, error_msg). |
| - If is_valid is True, score is the evaluation score and error_msg is empty string. |
| - If is_valid is False, score is 0.0 and error_msg contains the error message. |
| """ |
| |
| |
| check_result = check_keypoints_would_cause_invalid_mask( |
| frame_keypoints, _TEMPLATE_KEYPOINTS, frame, _TEMPLATE_IMAGE, |
| return_warped_data=True |
| ) |
| |
| |
| |
| if len(check_result) == 3: |
| would_cause_error, error_msg, warped_data = check_result |
| else: |
| would_cause_error, error_msg = check_result |
| warped_data = None |
| |
| if would_cause_error: |
| return (False, 0.0, error_msg) |
| |
| |
| if warped_data is not None: |
| _, mask_ground, mask_lines_expected = warped_data |
| try: |
| score = evaluate_keypoints_with_cached_data( |
| frame, mask_ground, mask_lines_expected |
| ) |
| return (True, score, "") |
| except Exception as e: |
| print(f'Error evaluating with cached data: {e}') |
| return (True, 0.0, "") |
| |
| |
| try: |
| from keypoint_evaluation import evaluate_keypoints_for_frame |
| score = evaluate_keypoints_for_frame( |
| _TEMPLATE_KEYPOINTS, frame_keypoints, frame, _TEMPLATE_IMAGE |
| ) |
| return (True, score, "") |
| except Exception as e: |
| print(f'Error in regular evaluation: {e}') |
| return (True, 0.0, "") |
|
|
|
|
| |
| |
| |
|
|
| def _evaluate_batch_of_candidates(args): |
| """ |
| Worker function to evaluate a batch of keypoint candidates. |
| Uses threading, so we can share the frame/template without pickling overhead. |
| OpenCV operations are thread-safe for read operations, so no locking needed. |
| """ |
| candidate_batch, frame = args |
| |
| results = [] |
| for test_kps, candidate_metadata in candidate_batch: |
| |
| |
| try: |
| if frame is not None and _TEMPLATE_IMAGE is not None: |
| is_valid, score, _ = check_and_evaluate_keypoints( |
| test_kps, frame |
| ) |
| |
| if is_valid: |
| results.append((is_valid, score, test_kps, candidate_metadata)) |
| except Exception: |
| |
| |
| pass |
| |
| return results |
|
|
|
|
| def evaluate_keypoints_candidates_parallel( |
| candidate_kps_list: List[List[Tuple[int, int]]], |
| candidate_metadata: List[Any], |
| frame: np.ndarray, |
| num_workers: int = None, |
| ) -> Tuple[bool, float, List[Tuple[int, int]], Any]: |
| """ |
| Evaluate multiple keypoint candidates in parallel using threading. |
| Threading is faster than multiprocessing here because: |
| 1. OpenCV releases GIL, so threads can run in parallel |
| 2. No pickling overhead for large arrays (frame, template) |
| 3. Lower overhead than spawning processes |
| """ |
| if len(candidate_kps_list) == 0: |
| return (False, -1.0, None, None) |
| |
| if num_workers is None: |
| |
| |
| |
| |
| max_cpu_workers = min(32, cpu_count()) |
| max_workers = min(max_cpu_workers, len(candidate_kps_list)) |
| num_workers = max(1, max_workers) |
| |
| |
| |
| |
| if len(candidate_kps_list) < 10: |
| best_result = None |
| best_score = -1.0 |
| for test_kps, metadata in zip(candidate_kps_list, candidate_metadata): |
| try: |
| is_valid, score, _ = check_and_evaluate_keypoints( |
| test_kps, frame |
| ) |
| if is_valid and score > best_score: |
| best_score = score |
| best_result = (is_valid, score, test_kps, metadata) |
| except Exception: |
| pass |
| else: |
| |
| import platform |
| is_linux = platform.system().lower() == 'linux' |
| |
| |
| if is_linux: |
| |
| from concurrent.futures import ProcessPoolExecutor, as_completed |
| else: |
| |
| from concurrent.futures import ThreadPoolExecutor, as_completed |
| |
| |
| |
| batch_size = max(1, (len(candidate_kps_list) + num_workers - 1) // num_workers) |
| batches = [] |
| total_candidates_in_batches = 0 |
| for i in range(0, len(candidate_kps_list), batch_size): |
| batch = list(zip( |
| candidate_kps_list[i:i+batch_size], |
| candidate_metadata[i:i+batch_size] |
| )) |
| if len(batch) > 0: |
| batches.append((batch, frame)) |
| total_candidates_in_batches += len(batch) |
| |
| |
| if total_candidates_in_batches != len(candidate_kps_list): |
| print(f"Warning: Batch mismatch! Expected {len(candidate_kps_list)} candidates, got {total_candidates_in_batches}") |
| |
| best_result = None |
| best_score = -1.0 |
| |
| try: |
| if is_linux: |
| executor_class = ProcessPoolExecutor |
| else: |
| executor_class = ThreadPoolExecutor |
| |
| with executor_class(max_workers=num_workers) as executor: |
| futures = [executor.submit(_evaluate_batch_of_candidates, args) for args in batches] |
| |
| all_results = [] |
| for future in as_completed(futures): |
| try: |
| batch_results = future.result() |
| if batch_results: |
| all_results.extend(batch_results) |
| except Exception as e: |
| |
| print(f"Error processing batch result: {e}") |
| import traceback |
| traceback.print_exc() |
| pass |
| |
| |
| if len(all_results) == 0: |
| print(f"Warning: No valid results from parallel evaluation of {len(candidate_kps_list)} candidates") |
| |
| |
| |
| |
| for result in all_results: |
| if result is not None: |
| is_valid, score, test_kps, metadata = result |
| |
| try: |
| score = float(score) if score is not None else 0.0 |
| except (ValueError, TypeError): |
| score = 0.0 |
| |
| if is_valid and score > best_score: |
| best_score = score |
| best_result = (is_valid, score, test_kps, metadata) |
| except Exception as e: |
| print(f"Threading evaluation failed: {e}, falling back to sequential") |
| for test_kps, metadata in zip(candidate_kps_list, candidate_metadata): |
| try: |
| is_valid, score, _ = check_and_evaluate_keypoints( |
| test_kps, frame |
| ) |
| if is_valid and score > best_score: |
| best_score = score |
| best_result = (is_valid, score, test_kps, metadata) |
| except Exception: |
| pass |
| |
| if best_result is not None: |
| return best_result |
| |
| return (False, -1.0, None, None) |
|
|
|
|
| def _process_single_frame_for_prediction(args): |
| """ |
| Worker function to process a single frame for failed index prediction. |
| Returns: (frame_index, score, adjusted_success) |
| - score: evaluation score of the calculated keypoints (0.0 if failed or invalid) |
| - adjusted_success: True if keypoints were successfully adjusted, False otherwise |
| """ |
| frame_index, frame_result, frame_width, frame_height, frame_image, offset = args |
| |
| try: |
| from keypoint_helper_v2_optimized import ( |
| remove_duplicate_detections, |
| calculate_missing_keypoints, |
| adjust_keypoints_to_avoid_invalid_mask, |
| ) |
| |
| frame_keypoints = getattr(frame_result, "keypoints", []) or [] |
| original_count = sum(1 for (x, y) in frame_keypoints if int(x) != 0 and int(y) != 0) |
| |
| cleaned_keypoints = remove_duplicate_detections( |
| frame_keypoints, frame_width, frame_height |
| ) |
| |
| valid_keypoint_indices = [idx for idx, kp in enumerate(cleaned_keypoints) if kp[0] != 0 and kp[1] != 0] |
|
|
| if len(valid_keypoint_indices) > 5: |
| calculated_keypoints = cleaned_keypoints |
| else: |
| left_side_indices_range = range(0, 13) |
| right_side_indices_range = range(17, 30) |
|
|
| side_check_set = set() |
| if len(valid_keypoint_indices) >= 4: |
| for idx in valid_keypoint_indices: |
| if idx in left_side_indices_range: |
| side_check_set.add("left") |
| elif idx in right_side_indices_range: |
| side_check_set.add("right") |
| else: |
| side_check_set.add("center") |
|
|
| if len(side_check_set) > 1: |
| calculated_keypoints = cleaned_keypoints |
| else: |
| calculated_keypoints = calculate_missing_keypoints( |
| cleaned_keypoints, frame_width, frame_height |
| ) |
| |
| original_frame_number = offset + frame_index |
| print(f"Frame {original_frame_number} (index {frame_index}): original_count: {original_count}, cleaned_keypoints: {len([kp for kp in cleaned_keypoints if kp[0] != 0 and kp[1] != 0])}, calculated_keypoints: {len([kp for kp in calculated_keypoints if kp[0] != 0 and kp[1] != 0])}") |
|
|
| start_time = time.time() |
| adjusted_success, calculated_keypoints, score = adjust_keypoints_to_avoid_invalid_mask( |
| calculated_keypoints, frame_image |
| ) |
| end_time = time.time() |
| print(f"adjust_keypoints_to_avoid_invalid_mask time: {end_time - start_time} seconds") |
| |
| if not adjusted_success: |
| return (frame_index, 0.0, False) |
| |
| print(f"after adjustment, calculated_keypoints: {calculated_keypoints}, score: {score:.4f}") |
| setattr(frame_result, "keypoints", list(calculated_keypoints)) |
| |
| return (frame_index, score, True) |
| except Exception as e: |
| print(f"Error processing frame {frame_index}: {e}") |
| return (frame_index, 0.0, False) |
|
|
|
|
| def _generate_sparse_keypoints_for_frame(args): |
| """ |
| Worker function to generate sparse keypoints for a single frame. |
| Returns: (frame_index, sparse_keypoints) |
| """ |
| frame_index, frame_width, frame_height, frame_image = args |
| |
| try: |
| from keypoint_helper_v2_optimized import ( |
| _generate_sparse_template_keypoints, |
| ) |
| |
| sparse_keypoints = _generate_sparse_template_keypoints( |
| frame_width, |
| frame_height, |
| frame_image=frame_image, |
| ) |
| |
| return (frame_index, sparse_keypoints) |
| except Exception as e: |
| print(f"Error generating sparse keypoints for frame {frame_index}: {e}") |
| |
| return (frame_index, [(0, 0)] * 32) |
|
|
|
|
| def _evaluate_keypoints_for_frame(args): |
| """ |
| Worker function to evaluate both sparse and calculated keypoints for a single frame. |
| Returns: (frame_index, sparse_score, calculated_score, sparse_keypoints, calculated_keypoints) |
| """ |
| frame_index, sparse_keypoints, calculated_keypoints, frame_image, pre_calculated_score = args |
| |
| sparse_score = 0.0 |
| calculated_score = 0.0 |
| |
| |
| if pre_calculated_score is not None and pre_calculated_score > 0.0: |
| calculated_score = pre_calculated_score |
| print(f"Frame {frame_index}: Using pre-calculated score: {calculated_score:.4f}") |
| else: |
| |
| calculated_score = 0.0 |
| |
| try: |
| from keypoint_evaluation import evaluate_keypoints_for_frame |
| |
| |
| if frame_image is not None and _TEMPLATE_IMAGE is not None and _TEMPLATE_KEYPOINTS is not None: |
| try: |
| sparse_score = evaluate_keypoints_for_frame( |
| template_keypoints=_TEMPLATE_KEYPOINTS, |
| frame_keypoints=sparse_keypoints, |
| frame=frame_image, |
| floor_markings_template=_TEMPLATE_IMAGE, |
| ) |
| except Exception: |
| sparse_score = 0.0 |
| |
| |
| if pre_calculated_score is None or pre_calculated_score <= 0.0: |
| calculated_keypoints_valid = len([kp for kp in calculated_keypoints if kp[0] != 0 or kp[1] != 0]) >= 4 |
| if calculated_keypoints_valid: |
| try: |
| calculated_score = evaluate_keypoints_for_frame( |
| template_keypoints=_TEMPLATE_KEYPOINTS, |
| frame_keypoints=calculated_keypoints, |
| frame=frame_image, |
| floor_markings_template=_TEMPLATE_IMAGE, |
| ) |
| except Exception: |
| calculated_score = 0.0 |
| else: |
| calculated_score = -1.0 |
| except Exception as e: |
| print(f"Error evaluating keypoints for frame {frame_index}: {e}") |
| |
| return (frame_index, sparse_score, calculated_score, sparse_keypoints, calculated_keypoints) |
|
|
| def _calculate_keypoints_score( |
| keypoints: list[tuple[int, int]], |
| frame: np.ndarray, |
| ) -> float: |
| """ |
| Helper function to calculate score for keypoints. |
| Returns 0.0 if evaluation fails or keypoints are invalid. |
| """ |
| score = 0.0 |
| try: |
| from keypoint_evaluation import evaluate_keypoints_for_frame |
| |
| |
| keypoints_valid = len([kp for kp in keypoints if kp[0] != 0 or kp[1] != 0]) >= 4 |
| if keypoints_valid and frame is not None and _TEMPLATE_IMAGE is not None and _TEMPLATE_KEYPOINTS is not None: |
| try: |
| score = evaluate_keypoints_for_frame( |
| template_keypoints=_TEMPLATE_KEYPOINTS, |
| frame_keypoints=keypoints, |
| frame=frame, |
| floor_markings_template=_TEMPLATE_IMAGE, |
| ) |
| except Exception: |
| score = 0.0 |
| except Exception: |
| score = 0.0 |
| |
| return score |
|
|
|
|
| def adjust_keypoints_to_avoid_invalid_mask( |
| frame_keypoints: list[tuple[int, int]], |
| frame: np.ndarray = None, |
| max_iterations: int = 5, |
| num_workers: int = None, |
| ) -> tuple[bool, list[tuple[int, int]], float]: |
| """ |
| Adjust keypoints to avoid InvalidMask errors. |
| |
| This function tries to fix common issues: |
| 1. Twisted projection (bowtie) - adjusts corner keypoints |
| 2. Ground covers too much - shrinks projected area by moving corners inward |
| 3. Other mask validation issues - adjusts keypoints to improve projection |
| |
| Args: |
| frame_keypoints: Frame keypoints to adjust |
| frame: Optional frame image for validation |
| max_iterations: Maximum number of adjustment iterations |
| num_workers: Number of workers for parallel evaluation |
| |
| Returns: |
| Tuple of (success, adjusted_keypoints, score): |
| - success: True if keypoints were successfully adjusted, False otherwise |
| - adjusted_keypoints: Adjusted keypoints |
| - score: Evaluation score of the adjusted keypoints (0.0 if failed or invalid) |
| """ |
| adjusted = list(frame_keypoints) |
| |
| |
| |
| error_msg = "" |
| would_cause_error = False |
|
|
| is_valid, score, error_msg = check_and_evaluate_keypoints( |
| adjusted, frame |
| ) |
| if is_valid: |
| return (True, adjusted, score) |
| |
| would_cause_error = True |
|
|
| |
| print(f"Would cause error: {would_cause_error}, error_msg: {error_msg}") |
| |
| |
| if "twisted" in error_msg.lower() or "bowtie" in error_msg.lower() or "Projection twisted" in error_msg.lower(): |
| |
| adjusted = _adjust_keypoints_to_pass_validation( |
| adjusted, |
| frame.shape[1] if frame is not None else None, |
| frame.shape[0] if frame is not None else None |
| ) |
| |
| |
| if frame is not None and _TEMPLATE_IMAGE is not None and _TEMPLATE_KEYPOINTS is not None: |
| is_valid, score, error_msg = check_and_evaluate_keypoints( |
| adjusted, frame |
| ) |
| if is_valid: |
| return (True, adjusted, score) |
| |
| else: |
| would_cause_error, error_msg = check_keypoints_would_cause_invalid_mask( |
| adjusted, _TEMPLATE_KEYPOINTS, frame, _TEMPLATE_IMAGE |
| ) |
| if not would_cause_error: |
| score = 0.0 |
| return (True, adjusted, score) |
| |
| |
| |
| if "too wide" in error_msg.lower() or "wide line" in error_msg.lower(): |
| print(f"Adjusting keypoints to fix 'a projected line is too wide' error") |
| try: |
| |
| |
| |
| valid_keypoints = [] |
| for idx in range(len(adjusted)): |
| x, y = adjusted[idx] |
| if x == 0 and y == 0: |
| continue |
| valid_keypoints.append((idx, x, y)) |
| |
| if len(valid_keypoints) >= 4: |
| |
| center_x = sum(x for _, x, y in valid_keypoints) / len(valid_keypoints) |
| center_y = sum(y for _, x, y in valid_keypoints) / len(valid_keypoints) |
| |
| |
| distances = [] |
| for idx, x, y in valid_keypoints: |
| dist = np.sqrt((x - center_x)**2 + (y - center_y)**2) |
| distances.append((idx, x, y, dist)) |
| |
| |
| distances.sort(key=lambda d: d[3], reverse=True) |
| |
| |
| |
| best_wide_kps = None |
| best_wide_score = -1.0 |
| |
| |
| candidate_kps_list = [] |
| candidate_metadata = [] |
| |
| |
| |
| for expand_factor in [1.05, 1.10]: |
| test_kps = list(adjusted) |
| for idx, x, y, dist in distances: |
| new_x = int(round(center_x + (x - center_x) * expand_factor)) |
| new_y = int(round(center_y + (y - center_y) * expand_factor)) |
| test_kps[idx] = (new_x, new_y) |
| |
| |
| candidate_kps_list.append(test_kps) |
| candidate_metadata.append(('expand', expand_factor)) |
| |
| |
| |
| for idx, x, y, dist in distances[:2]: |
| for adjust_x in [-2, 0, 2]: |
| for adjust_y in [-2, 0, 2]: |
| if adjust_x == 0 and adjust_y == 0: |
| continue |
| test_kps = list(adjusted) |
| test_kps[idx] = (x + adjust_x, y + adjust_y) |
| |
| |
| candidate_kps_list.append(test_kps) |
| candidate_metadata.append(('perturb', idx, adjust_x, adjust_y)) |
| |
| |
| |
| for shrink_factor in [0.96, 0.94]: |
| test_kps = list(adjusted) |
| for idx, x, y, dist in distances: |
| new_x = int(round(center_x + (x - center_x) * shrink_factor)) |
| new_y = int(round(center_y + (y - center_y) * shrink_factor)) |
| test_kps[idx] = (new_x, new_y) |
| |
| |
| candidate_kps_list.append(test_kps) |
| candidate_metadata.append(('shrink', shrink_factor)) |
| |
| |
| if len(candidate_kps_list) > 0: |
| print(f"Evaluating {len(candidate_kps_list)} wide-line candidates in parallel...") |
| eval_start = time.time() |
| is_valid, score, best_kps, best_meta = evaluate_keypoints_candidates_parallel( |
| candidate_kps_list, candidate_metadata, |
| frame, num_workers |
| ) |
| eval_time = time.time() - eval_start |
| print(f"Parallel evaluation took {eval_time:.2f} seconds for {len(candidate_kps_list)} candidates") |
| |
| if is_valid and score > best_wide_score: |
| best_wide_score = score |
| best_wide_kps = best_kps |
| print(f"Found best wide-line adjustment: {best_meta}, score: {score:.4f}") |
| |
| if best_wide_kps is not None: |
| |
| return (True, best_wide_kps, best_wide_score) |
| except Exception as e: |
| print(f"Error in wide line adjustment: {e}") |
| pass |
| |
| |
| |
| if "should be a single" in error_msg.lower() or "single object" in error_msg.lower() or "distinct regions" in error_msg.lower(): |
| print(f"Adjusting keypoints to fix 'projected ground should be a single object' error (optimized)") |
| try: |
| valid_keypoints = [] |
| for idx in range(len(adjusted)): |
| x, y = adjusted[idx] |
| if x == 0 and y == 0: |
| continue |
| valid_keypoints.append((idx, x, y)) |
| |
| if len(valid_keypoints) >= 4: |
| center_x = sum(x for _, x, y in valid_keypoints) / len(valid_keypoints) |
| center_y = sum(y for _, x, y in valid_keypoints) / len(valid_keypoints) |
| |
| candidate_kps_list = [] |
| candidate_metadata = [] |
| |
| |
| |
| for shrink_factor in [0.96, 0.92, 0.90]: |
| test_kps = list(adjusted) |
| for idx, x, y in valid_keypoints: |
| new_x = int(round(center_x + (x - center_x) * shrink_factor)) |
| new_y = int(round(center_y + (y - center_y) * shrink_factor)) |
| test_kps[idx] = (new_x, new_y) |
| |
| |
| candidate_kps_list.append(test_kps) |
| candidate_metadata.append(('shrink', shrink_factor)) |
| |
| |
| distances = [] |
| for idx, x, y in valid_keypoints: |
| dist = np.sqrt((x - center_x)**2 + (y - center_y)**2) |
| distances.append((idx, x, y, dist)) |
| distances.sort(key=lambda d: d[3], reverse=True) |
| |
| |
| for shrink_factor in [0.90, 0.85]: |
| test_kps = list(adjusted) |
| boundary_count = max(1, len(distances) // 4) |
| for idx, x, y, dist in distances[:boundary_count]: |
| new_x = int(round(center_x + (x - center_x) * shrink_factor)) |
| new_y = int(round(center_y + (y - center_y) * shrink_factor)) |
| test_kps[idx] = (new_x, new_y) |
| |
| |
| candidate_kps_list.append(test_kps) |
| candidate_metadata.append(('boundary', shrink_factor)) |
| |
| |
| if len(candidate_kps_list) > 0: |
| print(f"Evaluating {len(candidate_kps_list)} single-object candidates in parallel...") |
| eval_start = time.time() |
| is_valid, score, best_kps, best_meta = evaluate_keypoints_candidates_parallel( |
| candidate_kps_list, candidate_metadata, |
| frame, num_workers |
| ) |
| eval_time = time.time() - eval_start |
| print(f"Parallel evaluation took {eval_time:.2f} seconds for {len(candidate_kps_list)} candidates") |
| |
| if is_valid: |
| print(f"Found best single-object adjustment: {best_meta}, score: {score:.4f}") |
| |
| return (True, best_kps, score) |
| except Exception as e: |
| print(f"Error in optimized single object adjustment: {e}") |
| pass |
| |
| |
| if "ground covers" in error_msg.lower() or "covers more than" in error_msg.lower(): |
| print(f"Adjusting keypoints to avoid 'ground covers too much' error") |
| try: |
| from keypoint_evaluation import ( |
| INDEX_KEYPOINT_CORNER_BOTTOM_LEFT, |
| INDEX_KEYPOINT_CORNER_BOTTOM_RIGHT, |
| INDEX_KEYPOINT_CORNER_TOP_LEFT, |
| INDEX_KEYPOINT_CORNER_TOP_RIGHT, |
| ) |
| |
| |
| corner_indices = [ |
| INDEX_KEYPOINT_CORNER_TOP_LEFT, |
| INDEX_KEYPOINT_CORNER_TOP_RIGHT, |
| INDEX_KEYPOINT_CORNER_BOTTOM_RIGHT, |
| INDEX_KEYPOINT_CORNER_BOTTOM_LEFT, |
| ] |
| |
| |
| corners = [] |
| center_x, center_y = 0, 0 |
| valid_corners = 0 |
| |
| for corner_idx in corner_indices: |
| if corner_idx < len(adjusted): |
| x, y = adjusted[corner_idx] |
| if x == 0 and y == 0: |
| continue |
| corners.append((corner_idx, x, y)) |
| center_x += x |
| center_y += y |
| valid_corners += 1 |
| |
| if valid_corners >= 4: |
| center_x /= valid_corners |
| center_y /= valid_corners |
| |
| candidate_kps_list = [] |
| candidate_metadata = [] |
| |
| |
| |
| for shrink_factor in [0.90, 0.85, 0.75, 0.65]: |
| test_kps = list(adjusted) |
| for corner_idx, x, y in corners: |
| new_x = int(round(center_x + (x - center_x) * shrink_factor)) |
| new_y = int(round(center_y + (y - center_y) * shrink_factor)) |
| test_kps[corner_idx] = (new_x, new_y) |
| |
| |
| candidate_kps_list.append(test_kps) |
| candidate_metadata.append(('corner', shrink_factor)) |
| |
| |
| if len(candidate_kps_list) > 0: |
| print(f"Evaluating {len(candidate_kps_list)} corner adjustment candidates in parallel...") |
| eval_start = time.time() |
| is_valid, score, best_kps, best_meta = evaluate_keypoints_candidates_parallel( |
| candidate_kps_list, candidate_metadata, |
| frame, num_workers |
| ) |
| eval_time = time.time() - eval_start |
| print(f"Parallel evaluation took {eval_time:.2f} seconds for {len(candidate_kps_list)} candidates") |
| |
| if is_valid: |
| print(f"Found best corner adjustment: {best_meta}, score: {score:.4f}") |
| |
| return (True, best_kps, score) |
| |
| |
| |
| |
| valid_keypoints = [] |
| all_center_x, all_center_y = 0, 0 |
| valid_count = 0 |
| |
| for idx in range(len(adjusted)): |
| x, y = adjusted[idx] |
| if x == 0 and y == 0: |
| continue |
| valid_keypoints.append((idx, x, y)) |
| all_center_x += x |
| all_center_y += y |
| valid_count += 1 |
| |
| if valid_count >= 4: |
| all_center_x /= valid_count |
| all_center_y /= valid_count |
| |
| |
| |
| distances = [] |
| for idx, x, y in valid_keypoints: |
| dist = np.sqrt((x - all_center_x)**2 + (y - all_center_y)**2) |
| distances.append((idx, x, y, dist)) |
| |
| |
| distances.sort(key=lambda d: d[3], reverse=True) |
| |
| |
| candidate_kps_list = [] |
| candidate_metadata = [] |
| |
| |
| |
| for idx, x, y, dist in distances[:3]: |
| for shrink_factor in [0.95, 0.90, 0.80, 0.70]: |
| test_kps = list(adjusted) |
| new_x = int(round(all_center_x + (x - all_center_x) * shrink_factor)) |
| new_y = int(round(all_center_y + (y - all_center_y) * shrink_factor)) |
| test_kps[idx] = (new_x, new_y) |
| |
| |
| candidate_kps_list.append(test_kps) |
| candidate_metadata.append(('individual', idx, shrink_factor)) |
| |
| |
| |
| if valid_count >= 6: |
| for shrink_factor in [0.90, 0.80, 0.70]: |
| test_kps = list(adjusted) |
| for idx, x, y, dist in distances[:2]: |
| new_x = int(round(all_center_x + (x - all_center_x) * shrink_factor)) |
| new_y = int(round(all_center_y + (y - all_center_y) * shrink_factor)) |
| test_kps[idx] = (new_x, new_y) |
| |
| |
| candidate_kps_list.append(test_kps) |
| candidate_metadata.append(('pair', shrink_factor)) |
| |
| |
| if len(candidate_kps_list) > 0: |
| print(f"Evaluating {len(candidate_kps_list)} ground-coverage candidates in parallel...") |
| eval_start = time.time() |
| is_valid, score, best_kps, best_meta = evaluate_keypoints_candidates_parallel( |
| candidate_kps_list, candidate_metadata, |
| frame, num_workers |
| ) |
| eval_time = time.time() - eval_start |
| print(f"Parallel evaluation took {eval_time:.2f} seconds for {len(candidate_kps_list)} candidates") |
| |
| if is_valid: |
| print(f"Found best ground-coverage adjustment: {best_meta}, score: {score:.4f}") |
| |
| return (True, best_kps, score) |
| except Exception as e: |
| print(f"Error in ground coverage adjustment: {e}") |
| pass |
| |
| |
| |
| if would_cause_error and max_iterations > 0: |
| try: |
| from keypoint_evaluation import ( |
| INDEX_KEYPOINT_CORNER_BOTTOM_LEFT, |
| INDEX_KEYPOINT_CORNER_BOTTOM_RIGHT, |
| INDEX_KEYPOINT_CORNER_TOP_LEFT, |
| INDEX_KEYPOINT_CORNER_TOP_RIGHT, |
| ) |
| |
| corner_indices = [ |
| INDEX_KEYPOINT_CORNER_TOP_LEFT, |
| INDEX_KEYPOINT_CORNER_TOP_RIGHT, |
| INDEX_KEYPOINT_CORNER_BOTTOM_RIGHT, |
| INDEX_KEYPOINT_CORNER_BOTTOM_LEFT, |
| ] |
| |
| |
| candidate_kps_list = [] |
| candidate_metadata = [] |
| |
| |
| |
| for corner_idx in corner_indices: |
| if corner_idx < len(adjusted): |
| x, y = adjusted[corner_idx] |
| if x == 0 and y == 0: |
| continue |
| for dx in [-3, 0, 3]: |
| for dy in [-3, 0, 3]: |
| if dx == 0 and dy == 0: |
| continue |
| test_kps = list(adjusted) |
| test_kps[corner_idx] = (x + dx, y + dy) |
| |
| |
| candidate_kps_list.append(test_kps) |
| candidate_metadata.append(('corner_perturb', corner_idx, dx, dy)) |
| |
| |
| if len(candidate_kps_list) > 0: |
| print(f"Evaluating {len(candidate_kps_list)} corner perturbation candidates in parallel...") |
| eval_start = time.time() |
| is_valid, score, best_kps, best_meta = evaluate_keypoints_candidates_parallel( |
| candidate_kps_list, candidate_metadata, |
| frame, num_workers |
| ) |
| eval_time = time.time() - eval_start |
| print(f"Parallel evaluation took {eval_time:.2f} seconds for {len(candidate_kps_list)} candidates") |
| |
| if is_valid: |
| print(f"Found best corner perturbation: {best_meta}, score: {score:.4f}") |
| |
| return (True, best_kps, score) |
| except Exception: |
| pass |
| |
| |
| score = _calculate_keypoints_score(adjusted, frame) |
| return (False, adjusted, score) |
|
|
|
|
| def _validate_keypoints_corners( |
| frame_keypoints: list[tuple[int, int]], |
| template_keypoints: list[tuple[int, int]] = None, |
| ) -> bool: |
| """ |
| Validate that frame keypoints can form a valid homography with template keypoints |
| (corners don't create twisted projection). |
| |
| Returns True if validation passes, False otherwise. |
| """ |
| try: |
| from keypoint_evaluation import ( |
| validate_projected_corners, |
| TEMPLATE_KEYPOINTS, |
| INDEX_KEYPOINT_CORNER_BOTTOM_LEFT, |
| INDEX_KEYPOINT_CORNER_BOTTOM_RIGHT, |
| INDEX_KEYPOINT_CORNER_TOP_LEFT, |
| INDEX_KEYPOINT_CORNER_TOP_RIGHT, |
| ) |
| |
| |
| if template_keypoints is None: |
| template_keypoints = TEMPLATE_KEYPOINTS |
| |
| |
| filtered_template = [] |
| filtered_frame = [] |
| |
| for i, (t_kp, f_kp) in enumerate(zip(template_keypoints, frame_keypoints)): |
| if f_kp[0] > 0 and f_kp[1] > 0: |
| filtered_template.append(t_kp) |
| filtered_frame.append(f_kp) |
| |
| if len(filtered_template) < 4: |
| return False |
| |
| |
| src_pts = np.array(filtered_template, dtype=np.float32) |
| dst_pts = np.array(filtered_frame, dtype=np.float32) |
| |
| H, mask = cv2.findHomography(src_pts, dst_pts) |
| |
| if H is None: |
| return False |
| |
| |
| try: |
| validate_projected_corners( |
| source_keypoints=template_keypoints, |
| homography_matrix=H |
| ) |
| return True |
| except Exception: |
| return False |
| |
| except ImportError: |
| |
| return True |
| except Exception: |
| |
| return False |
|
|
| def calculate_and_adjust_keypoints( |
| results_frames: Sequence[Any], |
| frame_width: int = None, |
| frame_height: int = None, |
| frames: List[np.ndarray] = None, |
| offset: int = 0, |
| num_workers: int = None, |
| ) -> list[tuple[int, float, bool]]: |
| """ |
| Calculate missing keypoints, adjust them to avoid invalid masks, and evaluate scores. |
| Processes frames in parallel using threading. |
| |
| For each frame: |
| 1. Calculates missing keypoints if needed |
| 2. Adjusts keypoints to avoid InvalidMask errors |
| 3. Evaluates the adjusted keypoints and calculates a score |
| |
| Args: |
| results_frames: Sequence of frame results with keypoints |
| frame_width: Frame width |
| frame_height: Frame height |
| frames: Optional list of frame images for validation |
| offset: Frame offset for tracking |
| num_workers: Number of worker threads (defaults to cpu_count()) |
| |
| Returns: |
| List of tuples (frame_index, score, adjusted_success) for all frames: |
| - frame_index: Index of the frame |
| - score: Evaluation score of the adjusted keypoints (0.0 if failed) |
| - adjusted_success: True if keypoints were successfully adjusted, False otherwise |
| """ |
| max_frames = len(results_frames) |
| if max_frames == 0: |
| return [] |
|
|
| if num_workers is None: |
| |
| |
| |
| |
| max_cpu_workers = min(32, cpu_count()) |
| max_workers = min(max_cpu_workers, max_frames) |
| num_workers = max(1, max_workers) |
|
|
| |
| |
| |
| args_list = [] |
| for frame_index, frame_result in enumerate(results_frames): |
| frame_image = None |
| if frames is not None and frame_index < len(frames): |
| frame_image = frames[frame_index] |
| |
| args_list.append(( |
| frame_index, frame_result, frame_width, frame_height, |
| frame_image, offset |
| )) |
|
|
| results = [] |
| |
| |
| |
| import platform |
| is_linux = platform.system().lower() == 'linux' |
| |
| |
| if max_frames >= 4 and num_workers > 1: |
| try: |
| if is_linux: |
| |
| from concurrent.futures import ProcessPoolExecutor, as_completed |
| print(f"Linux detected: Processing {max_frames} frames in parallel using {num_workers} processes (ProcessPoolExecutor)...") |
| with ProcessPoolExecutor(max_workers=num_workers) as executor: |
| futures = {executor.submit(_process_single_frame_for_prediction, args): args for args in args_list} |
| |
| for future in as_completed(futures): |
| try: |
| frame_index, score, adjusted_success = future.result() |
| results.append((frame_index, score, adjusted_success)) |
| except Exception as e: |
| print(f"Error getting result from worker: {e}") |
| |
| args = futures[future] |
| frame_index = args[0] |
| results.append((frame_index, 0.0, False)) |
| else: |
| |
| from concurrent.futures import ThreadPoolExecutor, as_completed |
| print(f"Processing {max_frames} frames in parallel using {num_workers} workers (ThreadPoolExecutor)...") |
| with ThreadPoolExecutor(max_workers=num_workers) as executor: |
| futures = {executor.submit(_process_single_frame_for_prediction, args): args for args in args_list} |
| |
| for future in as_completed(futures): |
| try: |
| frame_index, score, adjusted_success = future.result() |
| results.append((frame_index, score, adjusted_success)) |
| except Exception as e: |
| print(f"Error getting result from worker: {e}") |
| |
| args = futures[future] |
| frame_index = args[0] |
| results.append((frame_index, 0.0, False)) |
| except Exception as e: |
| print(f"Parallel processing failed: {e}, falling back to sequential") |
| |
| for args in args_list: |
| try: |
| frame_index, score, adjusted_success = _process_single_frame_for_prediction(args) |
| results.append((frame_index, score, adjusted_success)) |
| except Exception as e: |
| print(f"Error processing frame: {e}") |
| |
| frame_index = args[0] |
| results.append((frame_index, 0.0, False)) |
| else: |
| |
| for args in args_list: |
| try: |
| frame_index, score, adjusted_success = _process_single_frame_for_prediction(args) |
| results.append((frame_index, score, adjusted_success)) |
| except Exception as e: |
| print(f"Error processing frame: {e}") |
| |
| frame_index = args[0] |
| results.append((frame_index, 0.0, False)) |
| |
| |
| results.sort(key=lambda x: x[0]) |
| return results |
|
|
| def _generate_sparse_template_keypoints( |
| frame_width: int, |
| frame_height: int, |
| frame_image: np.ndarray = None, |
| ) -> list[tuple[int, int]]: |
| |
| template_max_x = _TEMPLATE_MAX_X |
| template_max_y = _TEMPLATE_MAX_Y |
| |
| |
| sx = float(frame_width) / float(template_max_x if template_max_x != 0 else 1) |
| sy = float(frame_height) / float(template_max_y if template_max_y != 0 else 1) |
| |
| |
| |
| |
| uniform_scale = min(sx, sy) |
| |
| |
| |
| |
| scale_factor = 0.15 |
| uniform_scale = uniform_scale * scale_factor |
| |
| |
| |
| |
| |
| |
| |
| |
| min_scale_absolute = 0.3 |
| |
| uniform_scale = max(uniform_scale, min_scale_absolute) |
| |
| |
| |
| try: |
| from keypoint_evaluation import ( |
| INDEX_KEYPOINT_CORNER_TOP_LEFT, |
| INDEX_KEYPOINT_CORNER_TOP_RIGHT, |
| INDEX_KEYPOINT_CORNER_BOTTOM_RIGHT, |
| INDEX_KEYPOINT_CORNER_BOTTOM_LEFT, |
| ) |
| selected_keypoint_indices = set([ |
| INDEX_KEYPOINT_CORNER_TOP_LEFT, |
| INDEX_KEYPOINT_CORNER_TOP_RIGHT, |
| INDEX_KEYPOINT_CORNER_BOTTOM_RIGHT, |
| INDEX_KEYPOINT_CORNER_BOTTOM_LEFT, |
| ]) |
| except ImportError: |
| |
| |
| selected_keypoint_indices = set([0, 24, 29, 5]) |
| |
| line_distribution = None |
| |
| |
| if frame_image is not None and _TEMPLATE_IMAGE is not None and _TEMPLATE_KEYPOINTS is not None: |
| try: |
| from keypoint_evaluation import ( |
| project_image_using_keypoints, |
| extract_masks_for_ground_and_lines_no_validation, |
| extract_mask_of_ground_lines_in_image |
| ) |
| |
| |
| |
| |
| |
| initial_sx = float(frame_width) / float(template_max_x if template_max_x != 0 else 1) |
| initial_sy = float(frame_height) / float(template_max_y if template_max_y != 0 else 1) |
| num_template_kps = len(_TEMPLATE_KEYPOINTS) if _TEMPLATE_KEYPOINTS is not None else 32 |
| num_kps = max(32, num_template_kps) |
| |
| |
| if _TEMPLATE_KEYPOINTS is not None and len(_TEMPLATE_KEYPOINTS) >= num_kps: |
| template_array = np.array(_TEMPLATE_KEYPOINTS[:num_kps], dtype=np.float32) |
| else: |
| |
| template_array = np.zeros((num_kps, 2), dtype=np.float32) |
| if _TEMPLATE_KEYPOINTS is not None: |
| template_array[:len(_TEMPLATE_KEYPOINTS)] = _TEMPLATE_KEYPOINTS |
| |
| |
| scaled_array = template_array.copy() |
| scaled_array[:, 0] = np.clip(np.round(template_array[:, 0] * initial_sx), 0, frame_width - 1) |
| scaled_array[:, 1] = np.clip(np.round(template_array[:, 1] * initial_sy), 0, frame_height - 1) |
| |
| |
| mask = (template_array[:, 0] <= 0) | (template_array[:, 1] <= 0) |
| scaled_array[mask] = 0 |
| |
| |
| initial_scaled = [(int(x), int(y)) for x, y in scaled_array] |
| |
| |
| initial_centered = initial_scaled |
| |
| if len(initial_scaled) > 0: |
| try: |
| warped_template = project_image_using_keypoints( |
| image=_TEMPLATE_IMAGE, |
| source_keypoints=_TEMPLATE_KEYPOINTS, |
| destination_keypoints=initial_centered, |
| destination_width=frame_width, |
| destination_height=frame_height, |
| ) |
| |
| |
| |
| mask_ground, mask_lines = extract_masks_for_ground_and_lines_no_validation(image=warped_template) |
| mask_lines_predicted = extract_mask_of_ground_lines_in_image( |
| image=frame_image, ground_mask=mask_ground |
| ) |
|
|
| h, w = mask_lines_predicted.shape |
| |
| |
| |
| region_size_ratio = 0.35 |
| region_w = max(50, int(w * region_size_ratio)) |
| region_h = max(50, int(h * region_size_ratio)) |
| |
| |
| |
| step_size = max(20, min(region_w // 3, region_h // 3, w // 10, h // 10)) |
| |
| |
| max_density = 0.0 |
| best_region_center = None |
| |
| |
| y_starts = list(range(0, h - region_h + 1, step_size)) |
| x_starts = list(range(0, w - region_w + 1, step_size)) |
| |
| |
| for y_start in y_starts: |
| y_end = min(y_start + region_h, h) |
| for x_start in x_starts: |
| x_end = min(x_start + region_w, w) |
| |
| |
| region_mask = mask_lines_predicted[y_start:y_end, x_start:x_end] |
| region_area = (x_end - x_start) * (y_end - y_start) |
| |
| if region_area == 0: |
| continue |
| |
| |
| line_count = np.count_nonzero(region_mask) |
| density = float(line_count) / float(region_area) |
| |
| |
| if density > max_density: |
| max_density = density |
| best_region_center = ((x_start + x_end) // 2, (y_start + y_end) // 2) |
| |
| |
| if best_region_center is None: |
| best_region_center = (w // 2, h // 2) |
| max_density = 0.0 |
| |
| |
| total_line_count = np.sum(mask_lines_predicted > 0) |
| |
| line_distribution = (total_line_count, best_region_center, max_density) |
| |
| print(f"Density-based region analysis: center={best_region_center}, density={max_density:.4f}, total_lines={total_line_count}") |
| except Exception: |
| pass |
| except Exception: |
| pass |
| |
| |
| |
| source_keypoints = _TEMPLATE_KEYPOINTS if _TEMPLATE_KEYPOINTS is not None else FOOTBALL_KEYPOINTS |
| num_keypoints = len(source_keypoints) if source_keypoints is not None else 32 |
| |
| scaled: list[tuple[int, int]] = [] |
| for i in range(num_keypoints): |
| if i in selected_keypoint_indices and i < len(source_keypoints): |
| tx, ty = source_keypoints[i] |
| if tx > 0 and ty > 0: |
| x_scaled = int(round(tx * uniform_scale)) |
| y_scaled = int(round(ty * uniform_scale)) |
| scaled.append((x_scaled, y_scaled)) |
| else: |
| scaled.append((0, 0)) |
| else: |
| scaled.append((0, 0)) |
| |
| |
| |
| |
| |
| min_spacing = 5 |
| min_spacing_sq = min_spacing * min_spacing |
| |
| |
| valid_kps = np.array([(x, y) for x, y in scaled if x != 0 or y != 0], dtype=np.float32) |
| needs_adjustment = False |
| |
| if len(valid_kps) > 1: |
| |
| |
| diff = valid_kps[:, None, :] - valid_kps[None, :, :] |
| dist_sq = np.sum(diff ** 2, axis=2) |
| |
| |
| np.fill_diagonal(dist_sq, min_spacing_sq + 1) |
| |
| |
| if np.any(dist_sq < min_spacing_sq): |
| needs_adjustment = True |
| |
| |
| if needs_adjustment and uniform_scale < 0.25: |
| uniform_scale = uniform_scale * 1.2 |
| uniform_scale = min(uniform_scale, 0.25) |
| |
| source_array = np.array(source_keypoints[:num_keypoints] if len(source_keypoints) >= num_keypoints |
| else source_keypoints + [(0, 0)] * (num_keypoints - len(source_keypoints)), |
| dtype=np.float32) |
| |
| |
| selected_mask = np.array([i in selected_keypoint_indices for i in range(num_keypoints)], dtype=bool) |
| valid_mask = (source_array[:, 0] > 0) & (source_array[:, 1] > 0) |
| final_mask = selected_mask & valid_mask |
| |
| |
| scaled_array = source_array.copy() |
| scaled_array[final_mask, 0] = np.round(source_array[final_mask, 0] * uniform_scale) |
| scaled_array[final_mask, 1] = np.round(source_array[final_mask, 1] * uniform_scale) |
| scaled_array[~final_mask] = 0 |
| |
| |
| scaled = [(int(x), int(y)) for x, y in scaled_array] |
| |
| |
| offset_x = 0 |
| offset_y = 0 |
| |
| if line_distribution is not None: |
| |
| if len(line_distribution) >= 3: |
| total_line_count, best_region_center, max_density = line_distribution |
| else: |
| |
| total_line_count = line_distribution[0] if len(line_distribution) > 0 else 0 |
| best_region_center = None |
| max_density = 0.0 |
| |
| |
| valid_points = [(x, y) for x, y in scaled if x > 0 and y > 0] |
| if len(valid_points) > 0: |
| scaled_width = max(x for x, y in valid_points) |
| scaled_height = max(y for x, y in valid_points) |
| |
| margin = 5 |
| |
| |
| |
| if total_line_count > 100 and best_region_center is not None and max_density > 0.01: |
| |
| target_center_x, target_center_y = best_region_center |
| |
| |
| |
| scaled_center_x = scaled_width // 2 |
| scaled_center_y = scaled_height // 2 |
| |
| offset_x = target_center_x - scaled_center_x |
| offset_y = target_center_y - scaled_center_y |
| |
| |
| offset_x = max(margin, min(offset_x, frame_width - scaled_width - margin)) |
| offset_y = max(margin, min(offset_y, frame_height - scaled_height - margin)) |
| |
| print(f"Positioning sparse template: target_center=({target_center_x}, {target_center_y}), offset=({offset_x}, {offset_y}), scaled_size=({scaled_width}, {scaled_height}), density={max_density:.4f}") |
| else: |
| |
| offset_x = max(margin, (frame_width - scaled_width) // 2) |
| offset_y = max(margin, (frame_height - scaled_height) // 2) |
| offset_x = min(offset_x, frame_width - scaled_width - margin) |
| offset_y = min(offset_y, frame_height - scaled_height - margin) |
| offset_x = max(0, offset_x) |
| offset_y = max(0, offset_y) |
| else: |
| |
| valid_points = [(x, y) for x, y in scaled if x > 0 and y > 0] |
| if len(valid_points) > 0: |
| scaled_width = max(x for x, y in valid_points) |
| scaled_height = max(y for x, y in valid_points) |
| margin = 5 |
| offset_x = max(margin, (frame_width - scaled_width) // 2) |
| offset_y = max(margin, (frame_height - scaled_height) // 2) |
| offset_x = min(offset_x, frame_width - scaled_width - margin) |
| offset_y = min(offset_y, frame_height - scaled_height - margin) |
| offset_x = max(0, offset_x) |
| offset_y = max(0, offset_y) |
| |
| |
| |
| if frame_image is not None and _TEMPLATE_IMAGE is not None and _TEMPLATE_KEYPOINTS is not None and line_distribution is not None: |
| try: |
| total_line_count, best_region_center, max_density = line_distribution |
| if total_line_count > 100: |
| from keypoint_evaluation import ( |
| project_image_using_keypoints, |
| extract_masks_for_ground_and_lines_no_validation, |
| extract_mask_of_ground_lines_in_image |
| ) |
| |
| |
| initial_centered = [] |
| for x, y in scaled: |
| if x == 0 and y == 0: |
| initial_centered.append((0, 0)) |
| else: |
| new_x = x + offset_x |
| new_y = y + offset_y |
| new_x = max(0, min(new_x, frame_width - 1)) |
| initial_centered.append((new_x, new_y)) |
| |
| |
| best_adjusted_offset_y = offset_y |
| best_overlap = 0.0 |
| |
| |
| vertical_adjustments = [-15, -7, 0, 7, 15] |
| for adj_y in vertical_adjustments: |
| test_offset_y = offset_y + adj_y |
| |
| |
| test_offset_y = max(margin, min(test_offset_y, frame_height - scaled_height - margin)) |
| |
| |
| test_centered = [] |
| for x, y in scaled: |
| if x == 0 and y == 0: |
| test_centered.append((0, 0)) |
| else: |
| new_x = x + offset_x |
| new_y = y + test_offset_y |
| new_x = max(0, min(new_x, frame_width - 1)) |
| new_y = max(0, min(new_y, frame_height - 1)) |
| test_centered.append((new_x, new_y)) |
| |
| |
| test_corners = [test_centered[idx] for idx in sorted(selected_keypoint_indices) |
| if idx < len(test_centered) and test_centered[idx][0] > 0] |
| |
| if len(test_corners) == 4: |
| min_dist = float('inf') |
| for i in range(len(test_corners)): |
| for j in range(i + 1, len(test_corners)): |
| x1, y1 = test_corners[i] |
| x2, y2 = test_corners[j] |
| dist = np.sqrt((x2 - x1)**2 + (y2 - y1)**2) |
| min_dist = min(min_dist, dist) |
| |
| min_required_dist = max(30, min(frame_width, frame_height) * 0.1) |
| if min_dist < min_required_dist: |
| continue |
| |
| |
| try: |
| warped = project_image_using_keypoints( |
| image=_TEMPLATE_IMAGE, |
| source_keypoints=_TEMPLATE_KEYPOINTS, |
| destination_keypoints=test_centered, |
| destination_width=frame_width, |
| destination_height=frame_height, |
| ) |
| |
| mask_ground_test, mask_lines_expected = extract_masks_for_ground_and_lines_no_validation(image=warped) |
| mask_lines_predicted = extract_mask_of_ground_lines_in_image( |
| image=frame_image, ground_mask=mask_ground_test |
| ) |
| |
| |
| overlap_mask = (mask_lines_expected > 0) & (mask_lines_predicted > 0) |
| expected_pixels = np.sum(mask_lines_expected > 0) |
| |
| if expected_pixels > 0: |
| overlap = np.sum(overlap_mask) / float(expected_pixels) |
| |
| if overlap > best_overlap: |
| best_overlap = overlap |
| best_adjusted_offset_y = test_offset_y |
| except Exception: |
| continue |
| |
| if best_overlap > 0.0: |
| offset_y = best_adjusted_offset_y |
| print(f"Vertical adjustment: best overlap={best_overlap:.4f}, adjusted offset_y={offset_y}") |
| except Exception as e: |
| print(f"Vertical adjustment error: {e}") |
| pass |
| |
| |
| centered = [] |
| for x, y in scaled: |
| if x == 0 and y == 0: |
| centered.append((0, 0)) |
| else: |
| new_x = x + offset_x |
| new_y = y + offset_y |
| |
| |
| new_x = max(0, min(new_x, frame_width - 1)) |
| |
| |
| centered.append((new_x, new_y)) |
| |
| |
| |
| visible_keypoints = [kp for kp in centered if kp[1] > 0] |
| if len(visible_keypoints) < 4: |
| |
| |
| min_y = min(y for x, y in centered if y != 0) if visible_keypoints else 0 |
| if min_y < 0: |
| adjustment = abs(min_y) + 10 |
| centered = [] |
| for x, y in scaled: |
| if x == 0 and y == 0: |
| centered.append((0, 0)) |
| else: |
| new_x = x + offset_x |
| new_y = y + offset_y + adjustment |
| new_x = max(0, min(new_x, frame_width - 1)) |
| new_y = max(0, new_y) |
| centered.append((new_x, new_y)) |
| return centered |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
|
|
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
|
|
| def _adjust_keypoints_to_pass_validation( |
| keypoints: list[tuple[int, int]], |
| frame_width: int = None, |
| frame_height: int = None, |
| ) -> list[tuple[int, int]]: |
| """ |
| Adjust keypoints to pass validate_projected_corners. |
| If validation fails, try to fix by ensuring corners form a valid quadrilateral. |
| """ |
| if _validate_keypoints_corners(keypoints, _TEMPLATE_KEYPOINTS): |
| return keypoints |
| |
| |
| try: |
| from keypoint_evaluation import ( |
| INDEX_KEYPOINT_CORNER_BOTTOM_LEFT, |
| INDEX_KEYPOINT_CORNER_BOTTOM_RIGHT, |
| INDEX_KEYPOINT_CORNER_TOP_LEFT, |
| INDEX_KEYPOINT_CORNER_TOP_RIGHT, |
| ) |
| |
| template_keypoints = _TEMPLATE_KEYPOINTS |
| |
| |
| corner_indices = [ |
| INDEX_KEYPOINT_CORNER_TOP_LEFT, |
| INDEX_KEYPOINT_CORNER_TOP_RIGHT, |
| INDEX_KEYPOINT_CORNER_BOTTOM_RIGHT, |
| INDEX_KEYPOINT_CORNER_BOTTOM_LEFT, |
| ] |
| |
| |
| corners = [] |
| for idx in corner_indices: |
| if idx < len(keypoints): |
| x, y = keypoints[idx] |
| if x > 0 and y > 0: |
| corners.append((x, y, idx)) |
| |
| if len(corners) < 4: |
| |
| return keypoints |
| |
| |
| corner_coords = [(x, y) for x, y, _ in corners] |
| |
| |
| |
| def segments_intersect(p1, p2, q1, q2): |
| """Check if line segments p1-p2 and q1-q2 intersect.""" |
| def ccw(a, b, c): |
| return (c[1] - a[1]) * (b[0] - a[0]) > (b[1] - a[1]) * (c[0] - a[0]) |
| return (ccw(p1, q1, q2) != ccw(p2, q1, q2)) and (ccw(p1, p2, q1) != ccw(p1, p2, q2)) |
| |
| |
| |
| |
| |
| |
| |
| top_corners = sorted([c for c in corners if c[1] <= np.mean([c[1] for c in corners])], |
| key=lambda c: c[0]) |
| |
| bottom_corners = sorted([c for c in corners if c[1] > np.mean([c[1] for c in corners])], |
| key=lambda c: c[0]) |
| |
| |
| if len(top_corners) == 2 and len(bottom_corners) == 2: |
| |
| if top_corners[0][0] > top_corners[1][0]: |
| top_corners = top_corners[::-1] |
| if bottom_corners[0][0] > bottom_corners[1][0]: |
| bottom_corners = bottom_corners[::-1] |
| |
| |
| result = list(keypoints) |
| |
| |
| corner_mapping = { |
| INDEX_KEYPOINT_CORNER_TOP_LEFT: top_corners[0], |
| INDEX_KEYPOINT_CORNER_TOP_RIGHT: top_corners[1], |
| INDEX_KEYPOINT_CORNER_BOTTOM_RIGHT: bottom_corners[1], |
| INDEX_KEYPOINT_CORNER_BOTTOM_LEFT: bottom_corners[0], |
| } |
| |
| for corner_idx, (x, y, _) in corner_mapping.items(): |
| if corner_idx < len(result): |
| result[corner_idx] = (x, y) |
| |
| |
| if _validate_keypoints_corners(result, _TEMPLATE_KEYPOINTS): |
| return result |
| |
| |
| |
| if len(corners) >= 4: |
| |
| non_corner_kps = [(i, keypoints[i]) for i in range(len(keypoints)) |
| if i not in corner_indices and keypoints[i][0] > 0 and keypoints[i][1] > 0] |
| |
| if len(non_corner_kps) >= 2: |
| |
| scales_x = [] |
| scales_y = [] |
| for idx, (x, y) in non_corner_kps: |
| if idx < len(template_keypoints): |
| tx, ty = template_keypoints[idx] |
| if tx > 0: |
| scales_x.append(x / tx) |
| if ty > 0: |
| scales_y.append(y / ty) |
| |
| if scales_x and scales_y: |
| avg_scale_x = sum(scales_x) / len(scales_x) |
| avg_scale_y = sum(scales_y) / len(scales_y) |
| |
| result = list(keypoints) |
| |
| for corner_idx in corner_indices: |
| if corner_idx < len(template_keypoints): |
| tx, ty = template_keypoints[corner_idx] |
| new_x = int(round(tx * avg_scale_x)) |
| new_y = int(round(ty * avg_scale_y)) |
| if corner_idx < len(result): |
| result[corner_idx] = (new_x, new_y) |
| |
| |
| if _validate_keypoints_corners(result, _TEMPLATE_KEYPOINTS): |
| return result |
| |
| except Exception: |
| pass |
| |
| |
| return keypoints |
|
|
| def fix_keypoints( |
| results_frames: Sequence[Any], |
| frame_results: list[tuple[int, float, bool]], |
| frame_width: int, |
| frame_height: int, |
| frames: List[np.ndarray] = None, |
| offset: int = 0, |
| num_workers: int = None, |
| ) -> list[Any]: |
| """ |
| Optimized version using batch-first approach: |
| 1. Generate sparse keypoints for ALL frames first |
| 2. Evaluate both sparse and calculated keypoints for ALL frames |
| 3. Choose the one with bigger score per frame |
| |
| Args: |
| results_frames: Sequence of frame results with keypoints |
| frame_results: List of tuples (frame_index, score, adjusted_success) from calculate_and_adjust_keypoints |
| frame_width: Frame width |
| frame_height: Frame height |
| frames: Optional list of frame images for validation |
| offset: Frame offset for tracking |
| num_workers: Number of worker threads (defaults to cpu_count()) |
| |
| Returns: |
| List of processed frame results |
| """ |
| max_frames = len(results_frames) |
| if max_frames == 0: |
| return list(results_frames) |
| |
| |
| frame_results_dict = {frame_index: (score, adjusted_success) for frame_index, score, adjusted_success in frame_results} |
|
|
|
|
| if num_workers is None: |
| |
| |
| |
| |
| max_cpu_workers = min(32, cpu_count()) |
| max_workers = min(max_cpu_workers, max_frames) |
| num_workers = max(1, max_workers) |
|
|
| |
| |
| from keypoint_helper_v2_optimized import convert_keypoints_to_val_format |
| |
| calculated_keypoints_list = [] |
| pre_calculated_scores = {} |
| last_success_kps = None |
|
|
| for frame_index in range(max_frames): |
| frame_result = results_frames[frame_index] |
| current_kps_raw = getattr(frame_result, "keypoints", []) or [] |
| calculated_kps = convert_keypoints_to_val_format(current_kps_raw) |
| |
| |
| if frame_index in frame_results_dict: |
| score, adjusted_success = frame_results_dict[frame_index] |
| if adjusted_success: |
| pre_calculated_scores[frame_index] = score |
| calculated_keypoints_list.append(calculated_kps) |
| last_success_kps = calculated_kps |
| else: |
| if last_success_kps is not None: |
| calculated_keypoints_list.append(last_success_kps) |
| else: |
| calculated_keypoints_list.append(calculated_kps) |
| else: |
| if last_success_kps is not None: |
| calculated_keypoints_list.append(last_success_kps) |
| else: |
| calculated_keypoints_list.append(calculated_kps) |
|
|
| |
| print(f"Generating sparse keypoints for {max_frames} frames...") |
| sparse_args_list = [] |
| for frame_index in range(max_frames): |
| frame_for_analysis = None |
| if frames is not None and frame_index < len(frames): |
| frame_for_analysis = frames[frame_index] |
| |
| sparse_args_list.append(( |
| frame_index, frame_width, frame_height, |
| frame_for_analysis |
| )) |
| |
| sparse_keypoints_dict = {} |
| |
| import platform |
| is_linux = platform.system().lower() == 'linux' |
| |
| if max_frames >= 4 and num_workers > 1: |
| try: |
| if is_linux: |
| from concurrent.futures import ProcessPoolExecutor, as_completed |
| executor_class = ProcessPoolExecutor |
| else: |
| from concurrent.futures import ThreadPoolExecutor, as_completed |
| executor_class = ThreadPoolExecutor |
| |
| with executor_class(max_workers=num_workers) as executor: |
| futures = [executor.submit(_generate_sparse_keypoints_for_frame, args) for args in sparse_args_list] |
| |
| for future in as_completed(futures): |
| try: |
| frame_idx, sparse_kps = future.result() |
| sparse_keypoints_dict[frame_idx] = sparse_kps |
| except Exception as e: |
| print(f"Error generating sparse keypoints: {e}") |
| except Exception as e: |
| print(f"Parallel processing failed for sparse generation: {e}, falling back to sequential") |
| for args in sparse_args_list: |
| try: |
| frame_idx, sparse_kps = _generate_sparse_keypoints_for_frame(args) |
| sparse_keypoints_dict[frame_idx] = sparse_kps |
| except Exception: |
| pass |
| else: |
| |
| for args in sparse_args_list: |
| try: |
| frame_idx, sparse_kps = _generate_sparse_keypoints_for_frame(args) |
| sparse_keypoints_dict[frame_idx] = sparse_kps |
| except Exception: |
| pass |
| |
| |
| for frame_index in range(max_frames): |
| if frame_index not in sparse_keypoints_dict: |
| sparse_keypoints_dict[frame_index] = [(0, 0)] * 32 |
|
|
| |
| print(f"Evaluating sparse and calculated keypoints for {max_frames} frames...") |
| eval_args_list = [] |
| for frame_index in range(max_frames): |
| sparse_kps = sparse_keypoints_dict[frame_index] |
| calculated_kps = calculated_keypoints_list[frame_index] |
| |
| frame_for_analysis = None |
| if frames is not None and frame_index < len(frames): |
| frame_for_analysis = frames[frame_index] |
| |
| |
| pre_calculated_score = pre_calculated_scores.get(frame_index, None) |
| |
| eval_args_list.append(( |
| frame_index, sparse_kps, calculated_kps, |
| frame_for_analysis, pre_calculated_score |
| )) |
| |
| evaluation_results = {} |
| if max_frames >= 4 and num_workers > 1: |
| try: |
| if is_linux: |
| from concurrent.futures import ProcessPoolExecutor, as_completed |
| executor_class = ProcessPoolExecutor |
| else: |
| from concurrent.futures import ThreadPoolExecutor, as_completed |
| executor_class = ThreadPoolExecutor |
| |
| with executor_class(max_workers=num_workers) as executor: |
| futures = [executor.submit(_evaluate_keypoints_for_frame, args) for args in eval_args_list] |
| |
| for future in as_completed(futures): |
| try: |
| frame_idx, sparse_score, calculated_score, sparse_kps, calculated_kps = future.result() |
| evaluation_results[frame_idx] = (sparse_score, calculated_score, sparse_kps, calculated_kps) |
| except Exception as e: |
| print(f"Error evaluating keypoints: {e}") |
| except Exception as e: |
| print(f"Threading failed for evaluation: {e}, falling back to sequential") |
| for args in eval_args_list: |
| try: |
| frame_idx, sparse_score, calculated_score, sparse_kps, calculated_kps = _evaluate_keypoints_for_frame(args) |
| evaluation_results[frame_idx] = (sparse_score, calculated_score, sparse_kps, calculated_kps) |
| except Exception: |
| pass |
| else: |
| |
| for args in eval_args_list: |
| try: |
| frame_idx, sparse_score, calculated_score, sparse_kps, calculated_kps = _evaluate_keypoints_for_frame(args) |
| evaluation_results[frame_idx] = (sparse_score, calculated_score, sparse_kps, calculated_kps) |
| except Exception: |
| pass |
|
|
| |
| print(f"Choosing best keypoints for {max_frames} frames...") |
| |
| for frame_index in range(max_frames): |
| frame_result = results_frames[frame_index] |
| |
| |
| if frame_index in evaluation_results: |
| sparse_score, calculated_score, sparse_kps, calculated_kps = evaluation_results[frame_index] |
| |
| |
| if calculated_score > sparse_score: |
| final_keypoints = calculated_kps |
| print(f"Frame {frame_index}: Using calculated keypoints (score: {calculated_score:.4f} > sparse: {sparse_score:.4f})") |
| else: |
| final_keypoints = sparse_kps |
| print(f"Frame {frame_index}: Using sparse keypoints (score: {sparse_score:.4f} >= calculated: {calculated_score:.4f})") |
| else: |
| |
| final_keypoints = sparse_keypoints_dict.get(frame_index, [(0, 0)] * 32) |
| print(f"Frame {frame_index}: Using sparse keypoints (evaluation failed)") |
| |
| setattr(frame_result, "keypoints", list(convert_keypoints_to_val_format(final_keypoints))) |
| |
| return list(results_frames) |
|
|
| def run_keypoints_post_processing( |
| results_frames: Sequence[Any], |
| frame_width: int, |
| frame_height: int, |
| frames: List[np.ndarray] = None, |
| template_keypoints: list[tuple[int, int]] = None, |
| template_image: np.ndarray = None, |
| offset: int = 0, |
| num_workers: int = None, |
| ) -> list[Any]: |
| """ |
| Optimized post-processing with multiprocessing support. |
| |
| Args: |
| results_frames: Sequence of frame results with keypoints |
| frame_width: Frame width |
| frame_height: Frame height |
| frames: Optional list of frame images for validation |
| template_keypoints: Optional template keypoints (defaults to TEMPLATE_KEYPOINTS) |
| template_image: Optional pre-loaded template image (from miner constructor) |
| offset: Frame offset for tracking (defaults to 0) |
| num_workers: Number of worker processes for multiprocessing (defaults to cpu_count()) |
| |
| Returns: |
| List of processed frame results |
| """ |
| |
| _initialize_template_variables(template_keypoints, template_image) |
| |
| |
| frame_results = calculate_and_adjust_keypoints( |
| results_frames, frame_width, frame_height, |
| frames, offset, num_workers |
| ) |
|
|
| return fix_keypoints( |
| results_frames, frame_results, frame_width, frame_height, |
| frames, offset, num_workers |
| ) |