| import copy |
| import os |
| import json |
| import random |
| from PIL import Image, ImageDraw |
| import numpy as np |
|
|
|
|
| def find_nearest(anno_list, idx): |
| while anno_list[idx] is None: |
| idx -= 1 |
| return idx |
|
|
|
|
| def parse_anno(path): |
| with open(path, 'r') as f: |
| datas = f.readlines() |
|
|
| ret = [] |
| for data in datas: |
| bbox = data.replace("\n", "").split() |
| bbox = [float(_item.strip()) for _item in bbox] |
| ret.append(bbox) |
| return ret |
|
|
| |
| if not os.path.exists('./achieved'): |
| os.mkdir('./achieved') |
| if not os.path.exists('./achieved/images/'): |
| os.mkdir('./achieved/images') |
|
|
| save_image_path = './achieved/images' |
| save_json_path = './achieved/anno.json' |
|
|
| final_json_data = { |
| "task": "video object tracking, under water video", |
| "data_source": "UTB180", |
| "type": "comprehension", |
| "modality": { |
| "in": ["image", "text"], |
| "out": ["text"] |
| }, |
| "version": 1.0, |
| } |
|
|
| src_frames_folder = 'UTB180/UTB180/' |
| src_first_frame_folder = 'thumbnails/thumbnails/UTB180_1st_frames/' |
|
|
| _PER_NUMBER=50 |
| _SAMPLE_FRAMES=30 |
|
|
| with open("./under_water_attr.txt", 'r') as f: |
| datas = f.readlines() |
| split_data_list = {'blue_water': [], "green_water": [], "yellow_water": [], "white_water": []} |
| split_attr_list = {'blue_water': [], "green_water": [], "yellow_water": [], "white_water": []} |
| for idx in range(1, 181): |
| _str_idx = str(idx + 10000)[1:] |
| idx_attr = datas[idx-1].lower() |
| if 'blue' in idx_attr: |
| split_attr_list['blue_water'].append(_str_idx) |
| elif 'clear' in idx_attr or idx_attr == "": |
| split_attr_list["white_water"].append(_str_idx) |
| elif 'green' in idx_attr: |
| split_attr_list["green_water"].append(_str_idx) |
| elif 'brown' in idx_attr or 'brown' in idx_attr: |
| split_attr_list["yellow_water"].append(_str_idx) |
|
|
| _id = 10000 |
| for instance_name in os.listdir(src_frames_folder): |
| if '.json' in instance_name or '.xlsx' in instance_name: |
| continue |
| _split = None |
| _sub_nums = None |
| for _key_str in split_attr_list['blue_water']: |
| if _key_str in instance_name: |
| _split = 'blue_water' |
| _sub_nums = _PER_NUMBER // len(split_attr_list['blue_water']) + 1 |
| break |
| if _split is None: |
| for _key_str in split_attr_list['green_water']: |
| if _key_str in instance_name: |
| _split = 'green_water' |
| _sub_nums = _PER_NUMBER // len(split_attr_list['green_water']) + 1 |
| break |
| if _split is None: |
| for _key_str in split_attr_list['white_water']: |
| print(_key_str, ' ', instance_name) |
| if _key_str in instance_name: |
| _split = 'white_water' |
| _sub_nums = _PER_NUMBER // len(split_attr_list['white_water']) + 1 |
| break |
| if _split is None: |
| for _key_str in split_attr_list['yellow_water']: |
| if _key_str in instance_name: |
| _split = 'yellow_water' |
| _sub_nums = _PER_NUMBER // len(split_attr_list['yellow_water']) + 1 |
| break |
| if _split is None: |
| continue |
|
|
| if len(split_data_list[_split]) >= _PER_NUMBER: |
| continue |
|
|
| anno_file_path = os.path.join(src_frames_folder, instance_name, "groundtruth_rect.txt") |
| anno_bboxes = parse_anno(anno_file_path) |
| if anno_bboxes is None: |
| continue |
|
|
| cur_video_folder = os.path.join(src_frames_folder, instance_name, 'imgs') |
| frame_names = os.listdir(cur_video_folder) |
| len_frames = len(frame_names) |
|
|
| if len_frames > len(anno_bboxes): |
| print(f"Wrong anno and seq, {len_frames} frames, {len(anno_bboxes)} bboxes.") |
| continue |
|
|
| print(instance_name) |
|
|
| frame_steps = len_frames // _sub_nums |
| for _sub_idx in range(_sub_nums): |
| frame_start_idx = _sub_idx * frame_steps |
| frame_end_idx = min((_sub_idx + 1) * frame_steps, len_frames) |
|
|
| selected_frames_idxs = list(range(frame_start_idx, frame_end_idx)) |
| random.shuffle(selected_frames_idxs) |
| selected_frames_idxs = selected_frames_idxs[:_SAMPLE_FRAMES] |
| selected_frames_idxs.sort() |
|
|
| if anno_bboxes[selected_frames_idxs[0]] is None: |
| selected_frames_idxs.append(find_nearest(anno_bboxes, selected_frames_idxs[0])) |
| selected_frames_idxs.sort() |
|
|
| |
| str_id = str(_id)[1:] |
| _id += 1 |
| drt_folder = os.path.join('./achieved/images/', str_id) |
| if not os.path.exists(drt_folder): |
| os.mkdir(drt_folder) |
| for select_frame_idx in selected_frames_idxs: |
| frame_name = frame_names[select_frame_idx] |
| os.system(f"cp {os.path.join(cur_video_folder, frame_name)} {drt_folder}") |
|
|
| |
| selected_anns = [] |
| print(len(anno_bboxes), '--', selected_frames_idxs) |
| for select_frame_idx in selected_frames_idxs: |
| selected_anns.append(anno_bboxes[select_frame_idx]) |
|
|
| _data = {"id": "vt_vot{}".format(str_id)} |
| _data["input"] = {"video_folder": drt_folder.replace('/achieved', ''), "prompt": "Please tracking the object within red box in image 1."} |
| _data["output"] = {"bboxes": selected_anns} |
|
|
| |
| |
| |
| first_frame = Image.open(os.path.join(drt_folder, frame_names[selected_frames_idxs[0]])) |
| draw = ImageDraw.Draw(first_frame) |
| draw.rectangle([selected_anns[0][0], selected_anns[0][1], |
| selected_anns[0][2] + selected_anns[0][0], |
| selected_anns[0][3] + selected_anns[0][1]], outline='red', width=2) |
| first_frame.save(os.path.join(drt_folder, frame_names[selected_frames_idxs[0]].replace('.jpg', '_draw.jpg'))) |
| split_data_list[_split].append(_data) |
|
|
| for _split in split_data_list.keys(): |
| with open(f'./achieved/{_split}.json', 'w') as f: |
| print(len(split_data_list[_split])) |
| _data = split_data_list[_split] |
| _ret_data = copy.deepcopy(final_json_data) |
| _ret_data["task"] += f", {_split}" |
| _ret_data["data"] = _data |
| json.dump(_ret_data, f) |