Spaces:
Build error
Build error
| import os | |
| import sys | |
| import shutil | |
| sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) # NOQA | |
| import numpy as np | |
| import argparse | |
| from PIL import Image | |
| from .mask_generators import get_video_masks_by_moving_random_stroke, get_masked_ratio | |
| from .util import make_dirs, make_dir_under_root, get_everything_under | |
| from .readers import MaskReader | |
| def parse_args(): | |
| parser = argparse.ArgumentParser() | |
| parser.add_argument( | |
| '-od', '--output_dir', | |
| type=str, | |
| help="Output directory name" | |
| ) | |
| parser.add_argument( | |
| '-im', | |
| '--image_masks', action='store_true', | |
| help="Set this if you want to generate independent masks in one directory." | |
| ) | |
| parser.add_argument( | |
| '-vl', '--video_len', | |
| type=int, | |
| help="Maximum video length (i.e. #mask)" | |
| ) | |
| parser.add_argument( | |
| '-ns', '--num_stroke', | |
| type=int, | |
| help="Number of stroke in one mask" | |
| ) | |
| parser.add_argument( | |
| '-nsb', '--num_stroke_bound', | |
| type=int, | |
| nargs=2, | |
| help="Upper/lower bound of number of stroke in one mask" | |
| ) | |
| parser.add_argument( | |
| '-n', | |
| type=int, | |
| help="Number of mask to generate" | |
| ) | |
| parser.add_argument( | |
| '-sp', | |
| '--stroke_preset', | |
| type=str, | |
| default='rand_curve', | |
| help="Preset of the stroke parameters" | |
| ) | |
| parser.add_argument( | |
| '-iw', | |
| '--image_width', | |
| type=int, | |
| default=320 | |
| ) | |
| parser.add_argument( | |
| '-ih', | |
| '--image_height', | |
| type=int, | |
| default=180 | |
| ) | |
| parser.add_argument( | |
| '--cluster_by_area', | |
| action='store_true' | |
| ) | |
| parser.add_argument( | |
| '--leave_boarder_unmasked', | |
| type=int, | |
| help='Set this to a number, then a copy of the mask where the mask of boarder is erased.' | |
| ) | |
| parser.add_argument( | |
| '--redo_without_generation', | |
| action='store_true', | |
| help='Set this, and the script will skip the generation and redo the left tasks' | |
| '(uncluster -> erase boarder -> re-cluster)' | |
| ) | |
| args = parser.parse_args() | |
| return args | |
| def get_stroke_preset(stroke_preset): | |
| if stroke_preset == 'object_like': | |
| return { | |
| "nVertexBound": [5, 30], | |
| "maxHeadSpeed": 15, | |
| "maxHeadAcceleration": (10, 1.5), | |
| "brushWidthBound": (20, 50), | |
| "nMovePointRatio": 0.5, | |
| "maxPiontMove": 10, | |
| "maxLineAcceleration": (5, 0.5), | |
| "boarderGap": None, | |
| "maxInitSpeed": 10, | |
| } | |
| elif stroke_preset == 'object_like_middle': | |
| return { | |
| "nVertexBound": [5, 15], | |
| "maxHeadSpeed": 8, | |
| "maxHeadAcceleration": (4, 1.5), | |
| "brushWidthBound": (20, 50), | |
| "nMovePointRatio": 0.5, | |
| "maxPiontMove": 5, | |
| "maxLineAcceleration": (5, 0.5), | |
| "boarderGap": None, | |
| "maxInitSpeed": 10, | |
| } | |
| elif stroke_preset == 'object_like_small': | |
| return { | |
| "nVertexBound": [5, 20], | |
| "maxHeadSpeed": 7, | |
| "maxHeadAcceleration": (3.5, 1.5), | |
| "brushWidthBound": (10, 30), | |
| "nMovePointRatio": 0.5, | |
| "maxPiontMove": 5, | |
| "maxLineAcceleration": (3, 0.5), | |
| "boarderGap": None, | |
| "maxInitSpeed": 4, | |
| } | |
| elif stroke_preset == 'rand_curve': | |
| return { | |
| "nVertexBound": [10, 30], | |
| "maxHeadSpeed": 20, | |
| "maxHeadAcceleration": (15, 0.5), | |
| "brushWidthBound": (3, 10), | |
| "nMovePointRatio": 0.5, | |
| "maxPiontMove": 3, | |
| "maxLineAcceleration": (5, 0.5), | |
| "boarderGap": None, | |
| "maxInitSpeed": 6 | |
| } | |
| elif stroke_preset == 'rand_curve_small': | |
| return { | |
| "nVertexBound": [6, 22], | |
| "maxHeadSpeed": 12, | |
| "maxHeadAcceleration": (8, 0.5), | |
| "brushWidthBound": (2.5, 5), | |
| "nMovePointRatio": 0.5, | |
| "maxPiontMove": 1.5, | |
| "maxLineAcceleration": (3, 0.5), | |
| "boarderGap": None, | |
| "maxInitSpeed": 3 | |
| } | |
| else: | |
| raise NotImplementedError(f'The stroke presetting "{stroke_preset}" does not exist.') | |
| def copy_masks_without_boarder(root_dir, args): | |
| def erase_mask_boarder(mask, gap): | |
| pix = np.asarray(mask).astype('uint8') * 255 | |
| pix[:gap, :] = 255 | |
| pix[-gap:, :] = 255 | |
| pix[:, :gap] = 255 | |
| pix[:, -gap:] = 255 | |
| return Image.fromarray(pix).convert('1') | |
| wo_boarder_dir = root_dir + '_noBoarder' | |
| shutil.copytree(root_dir, wo_boarder_dir) | |
| for i, filename in enumerate(get_everything_under(wo_boarder_dir)): | |
| if args.image_masks: | |
| mask = Image.open(filename) | |
| mask_wo_boarder = erase_mask_boarder(mask, args.leave_boarder_unmasked) | |
| mask_wo_boarder.save(filename) | |
| else: | |
| # filename is a diretory containing multiple mask files | |
| for f in get_everything_under(filename, pattern='*.png'): | |
| mask = Image.open(f) | |
| mask_wo_boarder = erase_mask_boarder(mask, args.leave_boarder_unmasked) | |
| mask_wo_boarder.save(f) | |
| return wo_boarder_dir | |
| def cluster_by_masked_area(root_dir, args): | |
| clustered_dir = root_dir + '_clustered' | |
| make_dirs(clustered_dir) | |
| radius = 5 | |
| # all masks with ratio in x +- radius will be stored in sub-directory x | |
| clustered_centors = np.arange(radius, 100, radius * 2) | |
| clustered_subdirs = [] | |
| for c in clustered_centors: | |
| # make sub-directories for each ratio range | |
| clustered_subdirs.append(make_dir_under_root(clustered_dir, str(c))) | |
| for i, filename in enumerate(get_everything_under(root_dir)): | |
| if args.image_masks: | |
| ratio = get_masked_ratio(Image.open(filename)) | |
| else: | |
| # filename is a diretory containing multiple mask files | |
| ratio = np.mean([ | |
| get_masked_ratio(Image.open(f)) | |
| for f in get_everything_under(filename, pattern='*.png') | |
| ]) | |
| # find the nearest centor | |
| for i, c in enumerate(clustered_centors): | |
| if c - radius <= ratio * 100 <= c + radius: | |
| shutil.move(filename, clustered_subdirs[i]) | |
| break | |
| shutil.rmtree(root_dir) | |
| os.rename(clustered_dir, root_dir) | |
| def decide_nStroke(args): | |
| if args.num_stroke is not None: | |
| return args.num_stroke | |
| elif args.num_stroke_bound is not None: | |
| return np.random.randint(args.num_stroke_bound[0], args.num_stroke_bound[1]) | |
| else: | |
| raise ValueError('One of "-ns" or "-nsb" is needed') | |
| def main(args): | |
| preset = get_stroke_preset(args.stroke_preset) | |
| make_dirs(args.output_dir) | |
| if args.redo_without_generation: | |
| assert(len(get_everything_under(args.output_dir)) > 0) | |
| # put back clustered masks | |
| for clustered_subdir in get_everything_under(args.output_dir): | |
| if not os.path.isdir(clustered_subdir): | |
| continue | |
| for f in get_everything_under(clustered_subdir): | |
| shutil.move(f, args.output_dir) | |
| os.rmdir(clustered_subdir) | |
| else: | |
| if args.image_masks: | |
| for i in range(args.n): | |
| nStroke = decide_nStroke(args) | |
| mask = get_video_masks_by_moving_random_stroke( | |
| video_len=1, imageWidth=args.image_width, imageHeight=args.image_height, | |
| nStroke=nStroke, **preset | |
| )[0] | |
| mask.save(os.path.join(args.output_dir, f'{i:07d}.png')) | |
| else: | |
| for i in range(args.n): | |
| mask_dir = make_dir_under_root(args.output_dir, f'{i:05d}') | |
| mask_reader = MaskReader(mask_dir, read=False) | |
| nStroke = decide_nStroke(args) | |
| masks = get_video_masks_by_moving_random_stroke( | |
| imageWidth=args.image_width, imageHeight=args.image_height, | |
| video_len=args.video_len, nStroke=nStroke, **preset) | |
| mask_reader.set_files(masks) | |
| mask_reader.save_files(output_dir=mask_reader.dir_name) | |
| if args.leave_boarder_unmasked is not None: | |
| dir_leave_boarder = copy_masks_without_boarder(args.output_dir, args) | |
| if args.cluster_by_area: | |
| cluster_by_masked_area(dir_leave_boarder, args) | |
| if args.cluster_by_area: | |
| cluster_by_masked_area(args.output_dir, args) | |
| if __name__ == "__main__": | |
| args = parse_args() | |
| main(args) | |