File size: 3,511 Bytes
6789f6f |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 |
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import logging
import numpy as np
import os
from typing import Optional, Callable, Set
import torch
from torchvision.datasets.vision import VisionDataset
from torchvision.transforms import ToTensor
from fairseq.data import FairseqDataset
logger = logging.getLogger(__name__)
class ImageDataset(FairseqDataset, VisionDataset):
def __init__(
self,
root: str,
extensions: Set[str],
load_classes: bool,
transform: Optional[Callable] = None,
shuffle=True,
):
FairseqDataset.__init__(self)
VisionDataset.__init__(self, root=root, transform=transform)
self.shuffle = shuffle
self.tensor_transform = ToTensor()
self.classes = None
self.labels = None
if load_classes:
classes = [d.name for d in os.scandir(root) if d.is_dir()]
classes.sort()
self.classes = {cls_name: i for i, cls_name in enumerate(classes)}
logger.info(f"loaded {len(self.classes)} classes")
self.labels = []
def walk_path(root_path):
for root, _, fnames in sorted(os.walk(root_path, followlinks=True)):
for fname in sorted(fnames):
fname_ext = os.path.splitext(fname)
if fname_ext[-1].lower() not in extensions:
continue
path = os.path.join(root, fname)
yield path
logger.info(f"finding images in {root}")
if self.classes is not None:
self.files = []
self.labels = []
for c, i in self.classes.items():
for f in walk_path(os.path.join(root, c)):
self.files.append(f)
self.labels.append(i)
else:
self.files = [f for f in walk_path(root)]
logger.info(f"loaded {len(self.files)} examples")
def __getitem__(self, index):
from PIL import Image
fpath = self.files[index]
with open(fpath, "rb") as f:
img = Image.open(f).convert("RGB")
if self.transform is None:
img = self.tensor_transform(img)
else:
img = self.transform(img)
assert torch.is_tensor(img)
res = {"id": index, "img": img}
if self.labels is not None:
res["label"] = self.labels[index]
return res
def __len__(self):
return len(self.files)
def collater(self, samples):
if len(samples) == 0:
return {}
collated_img = torch.stack([s["img"] for s in samples], dim=0)
res = {
"id": torch.LongTensor([s["id"] for s in samples]),
"net_input": {
"img": collated_img,
},
}
if "label" in samples[0]:
res["net_input"]["label"] = torch.LongTensor([s["label"] for s in samples])
return res
def num_tokens(self, index):
return 1
def size(self, index):
return 1
def ordered_indices(self):
"""Return an ordered list of indices. Batches will be constructed based
on this order."""
if self.shuffle:
order = [np.random.permutation(len(self))]
else:
order = [np.arange(len(self))]
return order[0]
|