Spaces:
Sleeping
Sleeping
| ```python | |
| from concurrent.futures import ThreadPoolExecutor | |
| def extract_patch(job): | |
| x, y, img_padded, patch_size, transform = job | |
| patch = img_padded.crop((x, y, x + patch_size, y + patch_size)) | |
| return x, y, transform(patch) | |
| jobs = [(x, y, img_padded, patch_size, transform) for x, y in coords] | |
| with ThreadPoolExecutor(max_workers=8) as executor: | |
| patch_tensors = list(executor.map(extract_patch, jobs)) | |
| ``` | |
| ```python | |
| def grid_merge_points(points, radius=8.0): | |
| if not points: | |
| return [] | |
| cell_size = radius | |
| cells = {} | |
| merged = [] | |
| for point in points: | |
| x, y = point | |
| key = (int(x // cell_size), int(y // cell_size)) | |
| duplicate = False | |
| for nx in range(key[0] - 1, key[0] + 2): | |
| for ny in range(key[1] - 1, key[1] + 2): | |
| for existing in cells.get((nx, ny), []): | |
| if ((x - existing[0]) ** 2 + (y - existing[1]) ** 2) ** 0.5 <= radius: | |
| duplicate = True | |
| break | |
| if duplicate: | |
| break | |
| if duplicate: | |
| break | |
| if not duplicate: | |
| merged.append(point) | |
| cells.setdefault(key, []).append(point) | |
| return merged | |
| ``` | |
| ```python | |
| import cv2 | |
| import numpy as np | |
| class ORBHomographyMotionEstimator: | |
| def __init__(self, max_features=500): | |
| self.prev_gray = None | |
| self.orb = cv2.ORB_create(max_features) | |
| self.matcher = cv2.BFMatcher(cv2.NORM_HAMMING, crossCheck=True) | |
| def update(self, frame_bgr): | |
| gray = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2GRAY) | |
| if self.prev_gray is None: | |
| self.prev_gray = gray | |
| return np.eye(3, dtype=np.float32) | |
| kp1, des1 = self.orb.detectAndCompute(self.prev_gray, None) | |
| kp2, des2 = self.orb.detectAndCompute(gray, None) | |
| H = np.eye(3, dtype=np.float32) | |
| if des1 is not None and des2 is not None and len(kp1) >= 8 and len(kp2) >= 8: | |
| matches = sorted(self.matcher.match(des1, des2), key=lambda m: m.distance)[:100] | |
| if len(matches) >= 8: | |
| src = np.float32([kp1[m.queryIdx].pt for m in matches]).reshape(-1, 1, 2) | |
| dst = np.float32([kp2[m.trainIdx].pt for m in matches]).reshape(-1, 1, 2) | |
| H_found, _ = cv2.findHomography(src, dst, cv2.RANSAC, 5.0) | |
| if H_found is not None: | |
| H = H_found.astype(np.float32) | |
| self.prev_gray = gray | |
| return H | |
| ``` | |
| ```python | |
| from PIL import Image | |
| def batch_infer_frames(frames_rgb, process_frame_fn, model, device, transform, **kwargs): | |
| results = [] | |
| for frame_rgb in frames_rgb: | |
| image = Image.fromarray(frame_rgb) | |
| _, count, points = process_frame_fn(image, model, device, transform, **kwargs) | |
| results.append({"count": count, "points": points}) | |
| return results | |
| ``` | |