Spaces:
Running
Running
| import os | |
| import cv2 | |
| import numpy as np | |
| import torch | |
| import random | |
| from os import path as osp | |
| from torch.nn import functional as F | |
| from abc import ABCMeta, abstractmethod | |
| def scandir(dir_path, suffix=None, recursive=False, full_path=False): | |
| """Scan a directory to find the interested files. | |
| Args: | |
| dir_path (str): Path of the directory. | |
| suffix (str | tuple(str), optional): File suffix that we are | |
| interested in. Default: None. | |
| recursive (bool, optional): If set to True, recursively scan the | |
| directory. Default: False. | |
| full_path (bool, optional): If set to True, include the dir_path. | |
| Default: False. | |
| Returns: | |
| A generator for all the interested files with relative paths. | |
| """ | |
| if (suffix is not None) and not isinstance(suffix, (str, tuple)): | |
| raise TypeError('"suffix" must be a string or tuple of strings') | |
| root = dir_path | |
| def _scandir(dir_path, suffix, recursive): | |
| for entry in os.scandir(dir_path): | |
| if not entry.name.startswith('.') and entry.is_file(): | |
| if full_path: | |
| return_path = entry.path | |
| else: | |
| return_path = osp.relpath(entry.path, root) | |
| if suffix is None: | |
| yield return_path | |
| elif return_path.endswith(suffix): | |
| yield return_path | |
| else: | |
| if recursive: | |
| yield from _scandir(entry.path, suffix=suffix, recursive=recursive) | |
| else: | |
| continue | |
| return _scandir(dir_path, suffix=suffix, recursive=recursive) | |
| def read_img_seq(path, require_mod_crop=False, scale=1, return_imgname=False): | |
| """Read a sequence of images from a given folder path. | |
| Args: | |
| path (list[str] | str): List of image paths or image folder path. | |
| require_mod_crop (bool): Require mod crop for each image. | |
| Default: False. | |
| scale (int): Scale factor for mod_crop. Default: 1. | |
| return_imgname(bool): Whether return image names. Default False. | |
| Returns: | |
| Tensor: size (t, c, h, w), RGB, [0, 1]. | |
| list[str]: Returned image name list. | |
| """ | |
| if isinstance(path, list): | |
| img_paths = path | |
| else: | |
| img_paths = sorted(list(scandir(path, full_path=True))) | |
| imgs = [cv2.imread(v).astype(np.float32) / 255. for v in img_paths] | |
| if require_mod_crop: | |
| imgs = [mod_crop(img, scale) for img in imgs] | |
| imgs = img2tensor(imgs, bgr2rgb=True, float32=True) | |
| imgs = torch.stack(imgs, dim=0) | |
| if return_imgname: | |
| imgnames = [osp.splitext(osp.basename(path))[0] for path in img_paths] | |
| return imgs, imgnames | |
| else: | |
| return imgs | |
| def img2tensor(imgs, bgr2rgb=True, float32=True): | |
| """Numpy array to tensor. | |
| Args: | |
| imgs (list[ndarray] | ndarray): Input images. | |
| bgr2rgb (bool): Whether to change bgr to rgb. | |
| float32 (bool): Whether to change to float32. | |
| Returns: | |
| list[tensor] | tensor: Tensor images. If returned results only have | |
| one element, just return tensor. | |
| """ | |
| def _totensor(img, bgr2rgb, float32): | |
| if img.shape[2] == 3 and bgr2rgb: | |
| if img.dtype == 'float64': | |
| img = img.astype('float32') | |
| img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) | |
| img = torch.from_numpy(img.transpose(2, 0, 1)) | |
| if float32: | |
| img = img.float() | |
| return img | |
| if isinstance(imgs, list): | |
| return [_totensor(img, bgr2rgb, float32) for img in imgs] | |
| else: | |
| return _totensor(imgs, bgr2rgb, float32) | |
| def tensor2img(tensor, rgb2bgr=True, out_type=np.uint8, min_max=(0, 1)): | |
| """Convert torch Tensors into image numpy arrays. | |
| After clamping to [min, max], values will be normalized to [0, 1]. | |
| Args: | |
| tensor (Tensor or list[Tensor]): Accept shapes: | |
| 1) 4D mini-batch Tensor of shape (B x 3/1 x H x W); | |
| 2) 3D Tensor of shape (3/1 x H x W); | |
| 3) 2D Tensor of shape (H x W). | |
| Tensor channel should be in RGB order. | |
| rgb2bgr (bool): Whether to change rgb to bgr. | |
| out_type (numpy type): output types. If ``np.uint8``, transform outputs | |
| to uint8 type with range [0, 255]; otherwise, float type with | |
| range [0, 1]. Default: ``np.uint8``. | |
| min_max (tuple[int]): min and max values for clamp. | |
| Returns: | |
| (Tensor or list): 3D ndarray of shape (H x W x C) OR 2D ndarray of | |
| shape (H x W). The channel order is BGR. | |
| """ | |
| if not (torch.is_tensor(tensor) or (isinstance(tensor, list) and all(torch.is_tensor(t) for t in tensor))): | |
| raise TypeError(f'tensor or list of tensors expected, got {type(tensor)}') | |
| if torch.is_tensor(tensor): | |
| tensor = [tensor] | |
| result = [] | |
| for _tensor in tensor: | |
| _tensor = _tensor.squeeze(0).float().detach().cpu().clamp_(*min_max) | |
| _tensor = (_tensor - min_max[0]) / (min_max[1] - min_max[0]) | |
| n_dim = _tensor.dim() | |
| if n_dim == 4: | |
| img_np = make_grid(_tensor, nrow=int(math.sqrt(_tensor.size(0))), normalize=False).numpy() | |
| img_np = img_np.transpose(1, 2, 0) | |
| if rgb2bgr: | |
| img_np = cv2.cvtColor(img_np, cv2.COLOR_RGB2BGR) | |
| elif n_dim == 3: | |
| img_np = _tensor.numpy() | |
| img_np = img_np.transpose(1, 2, 0) | |
| if img_np.shape[2] == 1: # gray image | |
| img_np = np.squeeze(img_np, axis=2) | |
| else: | |
| if rgb2bgr: | |
| img_np = cv2.cvtColor(img_np, cv2.COLOR_RGB2BGR) | |
| elif n_dim == 2: | |
| img_np = _tensor.numpy() | |
| else: | |
| raise TypeError(f'Only support 4D, 3D or 2D tensor. But received with dimension: {n_dim}') | |
| if out_type == np.uint8: | |
| # Unlike MATLAB, numpy.unit8() WILL NOT round by default. | |
| img_np = (img_np * 255.0).round() | |
| img_np = img_np.astype(out_type) | |
| result.append(img_np) | |
| if len(result) == 1: | |
| result = result[0] | |
| return result | |
| def augment(imgs, hflip=True, rotation=True, flows=None, return_status=False): | |
| """Augment: horizontal flips OR rotate (0, 90, 180, 270 degrees). | |
| We use vertical flip and transpose for rotation implementation. | |
| All the images in the list use the same augmentation. | |
| Args: | |
| imgs (list[ndarray] | ndarray): Images to be augmented. If the input | |
| is an ndarray, it will be transformed to a list. | |
| hflip (bool): Horizontal flip. Default: True. | |
| rotation (bool): Ratotation. Default: True. | |
| flows (list[ndarray]: Flows to be augmented. If the input is an | |
| ndarray, it will be transformed to a list. | |
| Dimension is (h, w, 2). Default: None. | |
| return_status (bool): Return the status of flip and rotation. | |
| Default: False. | |
| Returns: | |
| list[ndarray] | ndarray: Augmented images and flows. If returned | |
| results only have one element, just return ndarray. | |
| """ | |
| hflip = hflip and random.random() < 0.5 | |
| vflip = rotation and random.random() < 0.5 | |
| rot90 = rotation and random.random() < 0.5 | |
| def _augment(img): | |
| if hflip: # horizontal | |
| cv2.flip(img, 1, img) | |
| if vflip: # vertical | |
| cv2.flip(img, 0, img) | |
| if rot90: | |
| img = img.transpose(1, 0, 2) | |
| return img | |
| def _augment_flow(flow): | |
| if hflip: # horizontal | |
| cv2.flip(flow, 1, flow) | |
| flow[:, :, 0] *= -1 | |
| if vflip: # vertical | |
| cv2.flip(flow, 0, flow) | |
| flow[:, :, 1] *= -1 | |
| if rot90: | |
| flow = flow.transpose(1, 0, 2) | |
| flow = flow[:, :, [1, 0]] | |
| return flow | |
| if not isinstance(imgs, list): | |
| imgs = [imgs] | |
| imgs = [_augment(img) for img in imgs] | |
| if len(imgs) == 1: | |
| imgs = imgs[0] | |
| if flows is not None: | |
| if not isinstance(flows, list): | |
| flows = [flows] | |
| flows = [_augment_flow(flow) for flow in flows] | |
| if len(flows) == 1: | |
| flows = flows[0] | |
| return imgs, flows | |
| else: | |
| if return_status: | |
| return imgs, (hflip, vflip, rot90) | |
| else: | |
| return imgs | |
| def paired_random_crop(img_gts, img_lqs, gt_patch_size, scale, gt_path=None): | |
| """Paired random crop. Support Numpy array and Tensor inputs. | |
| It crops lists of lq and gt images with corresponding locations. | |
| Args: | |
| img_gts (list[ndarray] | ndarray | list[Tensor] | Tensor): GT images. Note that all images | |
| should have the same shape. If the input is an ndarray, it will | |
| be transformed to a list containing itself. | |
| img_lqs (list[ndarray] | ndarray): LQ images. Note that all images | |
| should have the same shape. If the input is an ndarray, it will | |
| be transformed to a list containing itself. | |
| gt_patch_size (int): GT patch size. | |
| scale (int): Scale factor. | |
| gt_path (str): Path to ground-truth. Default: None. | |
| Returns: | |
| list[ndarray] | ndarray: GT images and LQ images. If returned results | |
| only have one element, just return ndarray. | |
| """ | |
| if not isinstance(img_gts, list): | |
| img_gts = [img_gts] | |
| if not isinstance(img_lqs, list): | |
| img_lqs = [img_lqs] | |
| # determine input type: Numpy array or Tensor | |
| input_type = 'Tensor' if torch.is_tensor(img_gts[0]) else 'Numpy' | |
| if input_type == 'Tensor': | |
| h_lq, w_lq = img_lqs[0].size()[-2:] | |
| h_gt, w_gt = img_gts[0].size()[-2:] | |
| else: | |
| h_lq, w_lq = img_lqs[0].shape[0:2] | |
| h_gt, w_gt = img_gts[0].shape[0:2] | |
| lq_patch_size = gt_patch_size // scale | |
| if h_gt != h_lq * scale or w_gt != w_lq * scale: | |
| raise ValueError(f'Scale mismatches. GT ({h_gt}, {w_gt}) is not {scale}x ', | |
| f'multiplication of LQ ({h_lq}, {w_lq}).') | |
| if h_lq < lq_patch_size or w_lq < lq_patch_size: | |
| raise ValueError(f'LQ ({h_lq}, {w_lq}) is smaller than patch size ' | |
| f'({lq_patch_size}, {lq_patch_size}). ' | |
| f'Please remove {gt_path}.') | |
| # randomly choose top and left coordinates for lq patch | |
| top = random.randint(0, h_lq - lq_patch_size) | |
| left = random.randint(0, w_lq - lq_patch_size) | |
| # crop lq patch | |
| if input_type == 'Tensor': | |
| img_lqs = [v[:, :, top:top + lq_patch_size, left:left + lq_patch_size] for v in img_lqs] | |
| else: | |
| img_lqs = [v[top:top + lq_patch_size, left:left + lq_patch_size, ...] for v in img_lqs] | |
| # crop corresponding gt patch | |
| top_gt, left_gt = int(top * scale), int(left * scale) | |
| if input_type == 'Tensor': | |
| img_gts = [v[:, :, top_gt:top_gt + gt_patch_size, left_gt:left_gt + gt_patch_size] for v in img_gts] | |
| else: | |
| img_gts = [v[top_gt:top_gt + gt_patch_size, left_gt:left_gt + gt_patch_size, ...] for v in img_gts] | |
| if len(img_gts) == 1: | |
| img_gts = img_gts[0] | |
| if len(img_lqs) == 1: | |
| img_lqs = img_lqs[0] | |
| return img_gts, img_lqs | |
| # Modified from https://github.com/open-mmlab/mmcv/blob/master/mmcv/fileio/file_client.py # noqa: E501 | |
| class BaseStorageBackend(metaclass=ABCMeta): | |
| """Abstract class of storage backends. | |
| All backends need to implement two apis: ``get()`` and ``get_text()``. | |
| ``get()`` reads the file as a byte stream and ``get_text()`` reads the file | |
| as texts. | |
| """ | |
| def get(self, filepath): | |
| pass | |
| def get_text(self, filepath): | |
| pass | |
| class MemcachedBackend(BaseStorageBackend): | |
| """Memcached storage backend. | |
| Attributes: | |
| server_list_cfg (str): Config file for memcached server list. | |
| client_cfg (str): Config file for memcached client. | |
| sys_path (str | None): Additional path to be appended to `sys.path`. | |
| Default: None. | |
| """ | |
| def __init__(self, server_list_cfg, client_cfg, sys_path=None): | |
| if sys_path is not None: | |
| import sys | |
| sys.path.append(sys_path) | |
| try: | |
| import mc | |
| except ImportError: | |
| raise ImportError('Please install memcached to enable MemcachedBackend.') | |
| self.server_list_cfg = server_list_cfg | |
| self.client_cfg = client_cfg | |
| self._client = mc.MemcachedClient.GetInstance(self.server_list_cfg, self.client_cfg) | |
| # mc.pyvector servers as a point which points to a memory cache | |
| self._mc_buffer = mc.pyvector() | |
| def get(self, filepath): | |
| filepath = str(filepath) | |
| import mc | |
| self._client.Get(filepath, self._mc_buffer) | |
| value_buf = mc.ConvertBuffer(self._mc_buffer) | |
| return value_buf | |
| def get_text(self, filepath): | |
| raise NotImplementedError | |
| class HardDiskBackend(BaseStorageBackend): | |
| """Raw hard disks storage backend.""" | |
| def get(self, filepath): | |
| filepath = str(filepath) | |
| with open(filepath, 'rb') as f: | |
| value_buf = f.read() | |
| return value_buf | |
| def get_text(self, filepath): | |
| filepath = str(filepath) | |
| with open(filepath, 'r') as f: | |
| value_buf = f.read() | |
| return value_buf | |
| class LmdbBackend(BaseStorageBackend): | |
| """Lmdb storage backend. | |
| Args: | |
| db_paths (str | list[str]): Lmdb database paths. | |
| client_keys (str | list[str]): Lmdb client keys. Default: 'default'. | |
| readonly (bool, optional): Lmdb environment parameter. If True, | |
| disallow any write operations. Default: True. | |
| lock (bool, optional): Lmdb environment parameter. If False, when | |
| concurrent access occurs, do not lock the database. Default: False. | |
| readahead (bool, optional): Lmdb environment parameter. If False, | |
| disable the OS filesystem readahead mechanism, which may improve | |
| random read performance when a database is larger than RAM. | |
| Default: False. | |
| Attributes: | |
| db_paths (list): Lmdb database path. | |
| _client (list): A list of several lmdb envs. | |
| """ | |
| def __init__(self, db_paths, client_keys='default', readonly=True, lock=False, readahead=False, **kwargs): | |
| try: | |
| import lmdb | |
| except ImportError: | |
| raise ImportError('Please install lmdb to enable LmdbBackend.') | |
| if isinstance(client_keys, str): | |
| client_keys = [client_keys] | |
| if isinstance(db_paths, list): | |
| self.db_paths = [str(v) for v in db_paths] | |
| elif isinstance(db_paths, str): | |
| self.db_paths = [str(db_paths)] | |
| assert len(client_keys) == len(self.db_paths), ('client_keys and db_paths should have the same length, ' | |
| f'but received {len(client_keys)} and {len(self.db_paths)}.') | |
| self._client = {} | |
| for client, path in zip(client_keys, self.db_paths): | |
| self._client[client] = lmdb.open(path, readonly=readonly, lock=lock, readahead=readahead, **kwargs) | |
| def get(self, filepath, client_key): | |
| """Get values according to the filepath from one lmdb named client_key. | |
| Args: | |
| filepath (str | obj:`Path`): Here, filepath is the lmdb key. | |
| client_key (str): Used for distinguishing different lmdb envs. | |
| """ | |
| filepath = str(filepath) | |
| assert client_key in self._client, (f'client_key {client_key} is not ' 'in lmdb clients.') | |
| client = self._client[client_key] | |
| with client.begin(write=False) as txn: | |
| value_buf = txn.get(filepath.encode('ascii')) | |
| return value_buf | |
| def get_text(self, filepath): | |
| raise NotImplementedError | |
| class FileClient(object): | |
| """A general file client to access files in different backend. | |
| The client loads a file or text in a specified backend from its path | |
| and return it as a binary file. it can also register other backend | |
| accessor with a given name and backend class. | |
| Attributes: | |
| backend (str): The storage backend type. Options are "disk", | |
| "memcached" and "lmdb". | |
| client (:obj:`BaseStorageBackend`): The backend object. | |
| """ | |
| _backends = { | |
| 'disk': HardDiskBackend, | |
| 'memcached': MemcachedBackend, | |
| 'lmdb': LmdbBackend, | |
| } | |
| def __init__(self, backend='disk', **kwargs): | |
| if backend not in self._backends: | |
| raise ValueError(f'Backend {backend} is not supported. Currently supported ones' | |
| f' are {list(self._backends.keys())}') | |
| self.backend = backend | |
| self.client = self._backends[backend](**kwargs) | |
| def get(self, filepath, client_key='default'): | |
| # client_key is used only for lmdb, where different fileclients have | |
| # different lmdb environments. | |
| if self.backend == 'lmdb': | |
| return self.client.get(filepath, client_key) | |
| else: | |
| return self.client.get(filepath) | |
| def get_text(self, filepath): | |
| return self.client.get_text(filepath) | |
| def imfrombytes(content, flag='color', float32=False): | |
| """Read an image from bytes. | |
| Args: | |
| content (bytes): Image bytes got from files or other streams. | |
| flag (str): Flags specifying the color type of a loaded image, | |
| candidates are `color`, `grayscale` and `unchanged`. | |
| float32 (bool): Whether to change to float32., If True, will also norm | |
| to [0, 1]. Default: False. | |
| Returns: | |
| ndarray: Loaded image array. | |
| """ | |
| img_np = np.frombuffer(content, np.uint8) | |
| imread_flags = {'color': cv2.IMREAD_COLOR, 'grayscale': cv2.IMREAD_GRAYSCALE, 'unchanged': cv2.IMREAD_UNCHANGED} | |
| img = cv2.imdecode(img_np, imread_flags[flag]) | |
| if float32: | |
| img = img.astype(np.float32) / 255. | |
| return img | |