| | import cv2 |
| | import torch |
| | import numpy as np |
| | import imageio |
| |
|
| |
|
| | def aug_matrix(w1, h1, w2, h2): |
| | dx = (w2 - w1) / 2.0 |
| | dy = (h2 - h1) / 2.0 |
| |
|
| | matrix_trans = np.array([[1.0, 0, dx], [0, 1.0, dy], [0, 0, 1.0]]) |
| |
|
| | scale = np.min([float(w2) / w1, float(h2) / h1]) |
| |
|
| | M = get_affine_matrix(center=(w2 / 2.0, h2 / 2.0), |
| | translate=(0, 0), |
| | scale=scale) |
| |
|
| | M = np.array(M + [0., 0., 1.]).reshape(3, 3) |
| | M = M.dot(matrix_trans) |
| |
|
| | return M |
| |
|
| |
|
| | def get_affine_matrix(center, translate, scale): |
| | cx, cy = center |
| | tx, ty = translate |
| |
|
| | M = [1, 0, 0, 0, 1, 0] |
| | M = [x * scale for x in M] |
| |
|
| | |
| | M[2] += M[0] * (-cx) + M[1] * (-cy) |
| | M[5] += M[3] * (-cx) + M[4] * (-cy) |
| |
|
| | |
| | M[2] += cx + tx |
| | M[5] += cy + ty |
| | return M |
| |
|
| |
|
| | class BaseStreamer(): |
| | """This streamer will return images at 512x512 size. |
| | """ |
| |
|
| | def __init__(self, |
| | width=512, |
| | height=512, |
| | pad=True, |
| | mean=(0.5, 0.5, 0.5), |
| | std=(0.5, 0.5, 0.5), |
| | **kwargs): |
| | self.width = width |
| | self.height = height |
| | self.pad = pad |
| | self.mean = np.array(mean) |
| | self.std = np.array(std) |
| |
|
| | self.loader = self.create_loader() |
| |
|
| | def create_loader(self): |
| | raise NotImplementedError |
| | yield np.zeros((600, 400, 3)) |
| |
|
| | def __getitem__(self, index): |
| | image = next(self.loader) |
| | in_height, in_width, _ = image.shape |
| | M = aug_matrix(in_width, in_height, self.width, self.height, self.pad) |
| | image = cv2.warpAffine(image, |
| | M[0:2, :], (self.width, self.height), |
| | flags=cv2.INTER_CUBIC) |
| |
|
| | input = np.float32(image) |
| | input = (input / 255.0 - self.mean) / self.std |
| | input = input.transpose(2, 0, 1) |
| | return torch.from_numpy(input).float() |
| |
|
| | def __len__(self): |
| | raise NotImplementedError |
| |
|
| |
|
| | class CaptureStreamer(BaseStreamer): |
| | """This streamer takes webcam as input. |
| | """ |
| |
|
| | def __init__(self, id=0, width=512, height=512, pad=True, **kwargs): |
| | super().__init__(width, height, pad, **kwargs) |
| | self.capture = cv2.VideoCapture(id) |
| |
|
| | def create_loader(self): |
| | while True: |
| | _, image = self.capture.read() |
| | image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) |
| | yield image |
| |
|
| | def __len__(self): |
| | return 100_000_000 |
| |
|
| | def __del__(self): |
| | self.capture.release() |
| |
|
| |
|
| | class VideoListStreamer(BaseStreamer): |
| | """This streamer takes a list of video files as input. |
| | """ |
| |
|
| | def __init__(self, files, width=512, height=512, pad=True, **kwargs): |
| | super().__init__(width, height, pad, **kwargs) |
| | self.files = files |
| | self.captures = [imageio.get_reader(f) for f in files] |
| | self.nframes = sum([ |
| | int(cap._meta["fps"] * cap._meta["duration"]) |
| | for cap in self.captures |
| | ]) |
| |
|
| | def create_loader(self): |
| | for capture in self.captures: |
| | for image in capture: |
| | yield image |
| |
|
| | def __len__(self): |
| | return self.nframes |
| |
|
| | def __del__(self): |
| | for capture in self.captures: |
| | capture.close() |
| |
|
| |
|
| | class ImageListStreamer(BaseStreamer): |
| | """This streamer takes a list of image files as input. |
| | """ |
| |
|
| | def __init__(self, files, width=512, height=512, pad=True, **kwargs): |
| | super().__init__(width, height, pad, **kwargs) |
| | self.files = files |
| |
|
| | def create_loader(self): |
| | for f in self.files: |
| | image = cv2.imread(f, cv2.IMREAD_UNCHANGED)[:, :, 0:3] |
| | image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) |
| | yield image |
| |
|
| | def __len__(self): |
| | return len(self.files) |
| |
|