Spaces:
Runtime error
Runtime error
| import cv2 | |
| import numpy as np | |
| from pathlib import Path | |
| from itertools import chain | |
| from multiprocessing import Queue | |
| import mmcv | |
| def load_calib(calib: str) -> np.ndarray: | |
| calib = np.loadtxt(calib, delimiter=" ") | |
| fx, fy, cx, cy = calib[:4] | |
| K = np.eye(3) | |
| K[0, 0] = fx | |
| K[0, 2] = cx | |
| K[1, 1] = fy | |
| K[1, 2] = cy | |
| return K, calib | |
| def image_stream( | |
| queue: Queue, imagedir: str, calib: str | None, stride: int, skip: int = 0 | |
| ) -> None: | |
| """image generator""" | |
| if calib is not None: | |
| K, calib = load_calib(calib) | |
| fx, fy, cx, cy = K[0, 0], K[1, 1], K[0, 2], K[1, 2] | |
| img_exts = ["*.png", "*.jpeg", "*.jpg"] | |
| image_list = sorted(chain.from_iterable(Path(imagedir).glob(e) for e in img_exts))[ | |
| skip::stride | |
| ] | |
| for t, imfile in enumerate(image_list): | |
| image = cv2.imread(str(imfile)) | |
| if calib is not None: | |
| intrinsics = np.array([fx, fy, cx, cy]) | |
| else: | |
| intrinsics = None | |
| h, w, _ = image.shape | |
| image = image[: h - h % 16, : w - w % 16] | |
| queue.put((t, image, intrinsics)) | |
| queue.put((-1, image, intrinsics)) | |
| def video_stream( | |
| queue: Queue, imagedir: str, calib: str | None, stride: int, skip: int = 0 | |
| ) -> None: | |
| """video generator""" | |
| if calib is not None: | |
| K, calib = load_calib(calib) | |
| fx, fy, cx, cy = K[0, 0], K[1, 1], K[0, 2], K[1, 2] | |
| video_reader = mmcv.VideoReader(imagedir) | |
| t = 0 | |
| for _ in range(skip): | |
| image = video_reader.read() | |
| while True: | |
| # Capture frame-by-frame | |
| for _ in range(stride): | |
| image = video_reader.read() | |
| if image is None: | |
| break | |
| if image is None: | |
| break | |
| # if len(calib) > 4: | |
| # image = cv2.undistort(image, K, calib[4:]) | |
| image = cv2.resize(image, None, fx=0.5, fy=0.5, interpolation=cv2.INTER_AREA) | |
| h, w, _ = image.shape | |
| image = image[: h - h % 16, : w - w % 16] | |
| if calib is not None: | |
| intrinsics = np.array([fx * 0.5, fy * 0.5, cx * 0.5, cy * 0.5]) | |
| else: | |
| intrinsics = None | |
| queue.put((t, image, intrinsics)) | |
| t += 1 | |
| queue.put((-1, image, intrinsics)) | |