| | import copy |
| | import os |
| | import json |
| | import random |
| | from PIL import Image, ImageDraw |
| | import numpy as np |
| |
|
| |
|
| | def find_nearest(anno_list, idx): |
| | while anno_list[idx] is None: |
| | idx -= 1 |
| | return idx |
| |
|
| |
|
| | def parse_anno(path): |
| | with open(path, 'r') as f: |
| | datas = f.readlines() |
| |
|
| | ret = [] |
| | for data in datas: |
| | polygon = data.replace('\n', "").replace("m", "").split(",") |
| | polygon = [int(_item) for _item in polygon] |
| | print(len(polygon)) |
| | print(polygon) |
| | if len(polygon) % 2 != 0: |
| | polygon = polygon[:-1] |
| | polygon = np.array(polygon).reshape(-1, 2) |
| | max_coord = np.max(polygon, axis=0) |
| | min_coord = np.min(polygon, axis=0) |
| | bbox = [int(min_coord[0]), int(min_coord[1]), int(max_coord[0] - min_coord[0]), int(max_coord[1] - min_coord[1])] |
| | ret.append(bbox) |
| | return ret |
| |
|
| | |
| | if not os.path.exists('./achieved'): |
| | os.mkdir('./achieved') |
| | if not os.path.exists('./achieved/images/'): |
| | os.mkdir('./achieved/images') |
| |
|
| | save_image_path = './achieved/images' |
| | save_json_path = './achieved/anno.json' |
| |
|
| | final_json_data = { |
| | "task": "video object tracking, vot video", |
| | "data_source": "VOT2022", |
| | "type": "comprehension", |
| | "modality": { |
| | "in": ["image", "text"], |
| | "out": ["text"] |
| | }, |
| | "version": 1.0, |
| | } |
| |
|
| | src_frames_folder = 'data/' |
| | src_anno_folder = 'annotations/' |
| |
|
| | _PER_NUMBER=50 |
| | _SAMPLE_FRAMES=30 |
| |
|
| | split_data_list = {'person': [], "animal": [], "others": []} |
| | split_key_str = { |
| | 'person': ['agility', 'gymnastics', "iceskater", "shaking", "girl", "graduate", "singer", "soldier", "bolt"], |
| | 'animal': ['tiger', 'ant', 'butterfly', 'nature', 'lamb', 'rabbit', 'zebrafish', 'crabs', 'fish', 'bird', 'dinosaur', 'flamingo', 'monkey'], |
| | } |
| |
|
| | for instance_name in os.listdir(src_frames_folder): |
| | if '.zip' in instance_name and not os.path.exists(os.path.join(src_frames_folder, instance_name.replace(".zip", ""))): |
| | folder_name = instance_name.replace(".zip", "") |
| | os.mkdir(os.path.join(src_frames_folder, instance_name.replace(".zip", ""))) |
| | os.system(f"unzip -d {os.path.join(src_frames_folder, folder_name)} {os.path.join(src_frames_folder, instance_name)}") |
| | for instance_name in os.listdir(src_anno_folder): |
| | if '.zip' in instance_name and not os.path.exists(os.path.join(src_anno_folder, instance_name.replace(".zip", ""))): |
| | |
| | folder_name = instance_name.replace(".zip", "") |
| | os.system(f"unzip -d {os.path.join(src_anno_folder, folder_name)} {os.path.join(src_anno_folder, instance_name)}") |
| |
|
| | _id = 10000 |
| | for instance_name in os.listdir(src_frames_folder): |
| | if '.zip' in instance_name: |
| | continue |
| | _split = None |
| | _sub_nums = None |
| | for _key_str in split_key_str['person']: |
| | if _key_str in instance_name: |
| | _split = 'person' |
| | _sub_nums = 6 |
| | break |
| | if _split is None: |
| | for _key_str in split_key_str['animal']: |
| | if _key_str in instance_name: |
| | _split = 'animal' |
| | _sub_nums = 5 |
| | break |
| | if _split is None: |
| | _split = "others" |
| | _sub_nums = 3 |
| |
|
| | if len(split_data_list[_split]) >= _PER_NUMBER: |
| | continue |
| |
|
| | anno_file_path = os.path.join(src_anno_folder, instance_name, "groundtruth.txt") |
| | anno_bboxes = parse_anno(anno_file_path) |
| | if anno_bboxes is None: |
| | continue |
| |
|
| | cur_video_folder = os.path.join(src_frames_folder, instance_name) |
| | frame_names = os.listdir(cur_video_folder) |
| | len_frames = len(frame_names) |
| |
|
| | if len_frames > len(anno_bboxes): |
| | print(f"Wrong anno and seq, {len_frames} frames, {len(anno_bboxes)} bboxes.") |
| | continue |
| |
|
| | print(instance_name) |
| |
|
| | frame_steps = len_frames // _sub_nums |
| | for _sub_idx in range(_sub_nums): |
| | frame_start_idx = _sub_idx * frame_steps |
| | frame_end_idx = min((_sub_idx + 1) * frame_steps, len_frames) |
| |
|
| | selected_frames_idxs = list(range(frame_start_idx, frame_end_idx)) |
| | random.shuffle(selected_frames_idxs) |
| | selected_frames_idxs = selected_frames_idxs[:_SAMPLE_FRAMES] |
| | selected_frames_idxs.sort() |
| |
|
| | if anno_bboxes[selected_frames_idxs[0]] is None: |
| | selected_frames_idxs.append(find_nearest(anno_bboxes, selected_frames_idxs[0])) |
| | selected_frames_idxs.sort() |
| |
|
| | |
| | str_id = str(_id)[1:] |
| | _id += 1 |
| | drt_folder = os.path.join('./achieved/images/', str_id) |
| | if not os.path.exists(drt_folder): |
| | os.mkdir(drt_folder) |
| | for select_frame_idx in selected_frames_idxs: |
| | frame_name = frame_names[select_frame_idx] |
| | os.system(f"cp {os.path.join(cur_video_folder, frame_name)} {drt_folder}") |
| |
|
| | |
| | selected_anns = [] |
| | print(len(anno_bboxes), '--', selected_frames_idxs) |
| | for select_frame_idx in selected_frames_idxs: |
| | selected_anns.append(anno_bboxes[select_frame_idx]) |
| |
|
| | _data = {"id": "vt_vot{}".format(str_id)} |
| | _data["input"] = {"video_folder": drt_folder.replace('/achieved', ''), "prompt": "Please tracking the object within red box in image 1."} |
| | _data["output"] = {"bboxes": selected_anns} |
| |
|
| | |
| | |
| | |
| | first_frame = Image.open(os.path.join(drt_folder, frame_names[selected_frames_idxs[0]])) |
| | draw = ImageDraw.Draw(first_frame) |
| | draw.rectangle([selected_anns[0][0], selected_anns[0][1], |
| | selected_anns[0][2] + selected_anns[0][0], |
| | selected_anns[0][3] + selected_anns[0][1]], outline='red', width=2) |
| | first_frame.save(os.path.join(drt_folder, frame_names[selected_frames_idxs[0]].replace('.jpg', '_draw.jpg'))) |
| | split_data_list[_split].append(_data) |
| |
|
| | for _split in split_data_list.keys(): |
| | with open(f'./achieved/{_split}.json', 'w') as f: |
| | print(len(split_data_list[_split])) |
| | _data = split_data_list[_split] |
| | _ret_data = copy.deepcopy(final_json_data) |
| | _ret_data["task"] += f", {_split}" |
| | _ret_data["data"] = _data |
| | json.dump(_ret_data, f) |