Spaces:
Running
Running
| # Copyright (c) Meta Platforms, Inc. and affiliates. | |
| # All rights reserved. | |
| # | |
| # This source code is licensed under the license found in the | |
| # LICENSE file in the root directory of this source tree. | |
| # Copyright (c) Github URL | |
| # Copied from | |
| # https://github.com/youtubevos/cocoapi/blob/master/PythonAPI/pycocotools/ytvos.py | |
| __author__ = 'ychfan' | |
| # Interface for accessing the YouTubeVIS dataset. | |
| # The following API functions are defined: | |
| # YTVIS - YTVIS api class that loads YouTubeVIS annotation file | |
| # and prepare data structures. | |
| # decodeMask - Decode binary mask M encoded via run-length encoding. | |
| # encodeMask - Encode binary mask M using run-length encoding. | |
| # getAnnIds - Get ann ids that satisfy given filter conditions. | |
| # getCatIds - Get cat ids that satisfy given filter conditions. | |
| # getImgIds - Get img ids that satisfy given filter conditions. | |
| # loadAnns - Load anns with the specified ids. | |
| # loadCats - Load cats with the specified ids. | |
| # loadImgs - Load imgs with the specified ids. | |
| # annToMask - Convert segmentation in an annotation to binary mask. | |
| # loadRes - Load algorithm results and create API for accessing them. | |
| # Microsoft COCO Toolbox. version 2.0 | |
| # Data, paper, and tutorials available at: http://mscoco.org/ | |
| # Code written by Piotr Dollar and Tsung-Yi Lin, 2014. | |
| # Licensed under the Simplified BSD License [see bsd.txt] | |
| import copy | |
| import itertools | |
| import json | |
| import sys | |
| import time | |
| from collections import defaultdict | |
| import numpy as np | |
| from pycocotools import mask as maskUtils | |
| PYTHON_VERSION = sys.version_info[0] | |
| def _isArrayLike(obj): | |
| return hasattr(obj, '__iter__') and hasattr(obj, '__len__') | |
| class YTVIS: | |
| def __init__(self, annotation_file=None): | |
| """Constructor of Microsoft COCO helper class for reading and | |
| visualizing annotations. | |
| :param annotation_file (str | dict): location of annotation file or | |
| dict results. | |
| :param image_folder (str): location to the folder that hosts images. | |
| :return: | |
| """ | |
| # load dataset | |
| self.dataset, self.anns, self.cats, self.vids = dict(), dict(), dict( | |
| ), dict() | |
| self.vidToAnns, self.catToVids = defaultdict(list), defaultdict(list) | |
| if annotation_file is not None: | |
| print('loading annotations into memory...') | |
| tic = time.time() | |
| if type(annotation_file) == str: | |
| dataset = json.load(open(annotation_file, 'r')) | |
| else: | |
| dataset = annotation_file | |
| assert type( | |
| dataset | |
| ) == dict, 'annotation file format {} not supported'.format( | |
| type(dataset)) | |
| print('Done (t={:0.2f}s)'.format(time.time() - tic)) | |
| self.dataset = dataset | |
| self.createIndex() | |
| def createIndex(self): | |
| # create index | |
| print('creating index...') | |
| anns, cats, vids = {}, {}, {} | |
| vidToAnns, catToVids = defaultdict(list), defaultdict(list) | |
| if 'annotations' in self.dataset: | |
| for ann in self.dataset['annotations']: | |
| vidToAnns[ann['video_id']].append(ann) | |
| anns[ann['id']] = ann | |
| if 'videos' in self.dataset: | |
| for vid in self.dataset['videos']: | |
| vids[vid['id']] = vid | |
| if 'categories' in self.dataset: | |
| for cat in self.dataset['categories']: | |
| cats[cat['id']] = cat | |
| if 'annotations' in self.dataset and 'categories' in self.dataset: | |
| for ann in self.dataset['annotations']: | |
| catToVids[ann['category_id']].append(ann['video_id']) | |
| print('index created!') | |
| # create class members | |
| self.anns = anns | |
| self.vidToAnns = vidToAnns | |
| self.catToVids = catToVids | |
| self.vids = vids | |
| self.cats = cats | |
| def getAnnIds(self, vidIds=[], catIds=[], areaRng=[], iscrowd=None): | |
| """Get ann ids that satisfy given filter conditions. default skips that | |
| filter. | |
| :param vidIds (int array) : get anns for given vids | |
| catIds (int array) : get anns for given cats | |
| areaRng (float array) : get anns for given area range | |
| iscrowd (boolean) : get anns for given crowd label | |
| :return: ids (int array) : integer array of ann ids | |
| """ | |
| vidIds = vidIds if _isArrayLike(vidIds) else [vidIds] | |
| catIds = catIds if _isArrayLike(catIds) else [catIds] | |
| if len(vidIds) == len(catIds) == len(areaRng) == 0: | |
| anns = self.dataset['annotations'] | |
| else: | |
| if not len(vidIds) == 0: | |
| lists = [ | |
| self.vidToAnns[vidId] for vidId in vidIds | |
| if vidId in self.vidToAnns | |
| ] | |
| anns = list(itertools.chain.from_iterable(lists)) | |
| else: | |
| anns = self.dataset['annotations'] | |
| anns = anns if len(catIds) == 0 else [ | |
| ann for ann in anns if ann['category_id'] in catIds | |
| ] | |
| anns = anns if len(areaRng) == 0 else [ | |
| ann for ann in anns if ann['avg_area'] > areaRng[0] | |
| and ann['avg_area'] < areaRng[1] | |
| ] | |
| if iscrowd is not None: | |
| ids = [ann['id'] for ann in anns if ann['iscrowd'] == iscrowd] | |
| else: | |
| ids = [ann['id'] for ann in anns] | |
| return ids | |
| def getCatIds(self, catNms=[], supNms=[], catIds=[]): | |
| """filtering parameters. default skips that filter. | |
| :param catNms (str array) : get cats for given cat names | |
| :param supNms (str array) : get cats for given supercategory names | |
| :param catIds (int array) : get cats for given cat ids | |
| :return: ids (int array) : integer array of cat ids | |
| """ | |
| catNms = catNms if _isArrayLike(catNms) else [catNms] | |
| supNms = supNms if _isArrayLike(supNms) else [supNms] | |
| catIds = catIds if _isArrayLike(catIds) else [catIds] | |
| if len(catNms) == len(supNms) == len(catIds) == 0: | |
| cats = self.dataset['categories'] | |
| else: | |
| cats = self.dataset['categories'] | |
| cats = cats if len(catNms) == 0 else [ | |
| cat for cat in cats if cat['name'] in catNms | |
| ] | |
| cats = cats if len(supNms) == 0 else [ | |
| cat for cat in cats if cat['supercategory'] in supNms | |
| ] | |
| cats = cats if len(catIds) == 0 else [ | |
| cat for cat in cats if cat['id'] in catIds | |
| ] | |
| ids = [cat['id'] for cat in cats] | |
| return ids | |
| def getVidIds(self, vidIds=[], catIds=[]): | |
| """Get vid ids that satisfy given filter conditions. | |
| :param vidIds (int array) : get vids for given ids | |
| :param catIds (int array) : get vids with all given cats | |
| :return: ids (int array) : integer array of vid ids | |
| """ | |
| vidIds = vidIds if _isArrayLike(vidIds) else [vidIds] | |
| catIds = catIds if _isArrayLike(catIds) else [catIds] | |
| if len(vidIds) == len(catIds) == 0: | |
| ids = self.vids.keys() | |
| else: | |
| ids = set(vidIds) | |
| for i, catId in enumerate(catIds): | |
| if i == 0 and len(ids) == 0: | |
| ids = set(self.catToVids[catId]) | |
| else: | |
| ids &= set(self.catToVids[catId]) | |
| return list(ids) | |
| def loadAnns(self, ids=[]): | |
| """Load anns with the specified ids. | |
| :param ids (int array) : integer ids specifying anns | |
| :return: anns (object array) : loaded ann objects | |
| """ | |
| if _isArrayLike(ids): | |
| return [self.anns[id] for id in ids] | |
| elif type(ids) == int: | |
| return [self.anns[ids]] | |
| def loadCats(self, ids=[]): | |
| """Load cats with the specified ids. | |
| :param ids (int array) : integer ids specifying cats | |
| :return: cats (object array) : loaded cat objects | |
| """ | |
| if _isArrayLike(ids): | |
| return [self.cats[id] for id in ids] | |
| elif type(ids) == int: | |
| return [self.cats[ids]] | |
| def loadVids(self, ids=[]): | |
| """Load anns with the specified ids. | |
| :param ids (int array) : integer ids specifying vid | |
| :return: vids (object array) : loaded vid objects | |
| """ | |
| if _isArrayLike(ids): | |
| return [self.vids[id] for id in ids] | |
| elif type(ids) == int: | |
| return [self.vids[ids]] | |
| def loadRes(self, resFile): | |
| """Load result file and return a result api object. | |
| :param resFile (str) : file name of result file | |
| :return: res (obj) : result api object | |
| """ | |
| res = YTVIS() | |
| res.dataset['videos'] = [img for img in self.dataset['videos']] | |
| print('Loading and preparing results...') | |
| tic = time.time() | |
| if type(resFile) == str or (PYTHON_VERSION == 2 | |
| and type(resFile) == str): | |
| anns = json.load(open(resFile)) | |
| elif type(resFile) == np.ndarray: | |
| anns = self.loadNumpyAnnotations(resFile) | |
| else: | |
| anns = resFile | |
| assert type(anns) == list, 'results in not an array of objects' | |
| annsVidIds = [ann['video_id'] for ann in anns] | |
| assert set(annsVidIds) == (set(annsVidIds) & set(self.getVidIds())), \ | |
| 'Results do not correspond to current coco set' | |
| if 'segmentations' in anns[0]: | |
| res.dataset['categories'] = copy.deepcopy( | |
| self.dataset['categories']) | |
| for id, ann in enumerate(anns): | |
| ann['areas'] = [] | |
| if 'bboxes' not in ann: | |
| ann['bboxes'] = [] | |
| for seg in ann['segmentations']: | |
| # now only support compressed RLE format | |
| # as segmentation results | |
| if seg: | |
| ann['areas'].append(maskUtils.area(seg)) | |
| if len(ann['bboxes']) < len(ann['areas']): | |
| ann['bboxes'].append(maskUtils.toBbox(seg)) | |
| else: | |
| ann['areas'].append(None) | |
| if len(ann['bboxes']) < len(ann['areas']): | |
| ann['bboxes'].append(None) | |
| ann['id'] = id + 1 | |
| l_ori = [a for a in ann['areas'] if a] | |
| if len(l_ori) == 0: | |
| ann['avg_area'] = 0 | |
| else: | |
| ann['avg_area'] = np.array(l_ori).mean() | |
| ann['iscrowd'] = 0 | |
| print('DONE (t={:0.2f}s)'.format(time.time() - tic)) | |
| res.dataset['annotations'] = anns | |
| res.createIndex() | |
| return res | |
| def annToRLE(self, ann, frameId): | |
| """Convert annotation which can be polygons, uncompressed RLE to RLE. | |
| :return: binary mask (numpy 2D array) | |
| """ | |
| t = self.vids[ann['video_id']] | |
| h, w = t['height'], t['width'] | |
| segm = ann['segmentations'][frameId] | |
| if type(segm) == list: | |
| # polygon -- a single object might consist of multiple parts | |
| # we merge all parts into one mask rle code | |
| rles = maskUtils.frPyObjects(segm, h, w) | |
| rle = maskUtils.merge(rles) | |
| elif type(segm['counts']) == list: | |
| # uncompressed RLE | |
| rle = maskUtils.frPyObjects(segm, h, w) | |
| else: | |
| # rle | |
| rle = segm | |
| return rle | |
| def annToMask(self, ann, frameId): | |
| """Convert annotation which can be polygons, uncompressed RLE, or RLE | |
| to binary mask. | |
| :return: binary mask (numpy 2D array) | |
| """ | |
| rle = self.annToRLE(ann, frameId) | |
| m = maskUtils.decode(rle) | |
| return m | |