|
|
import argparse |
|
|
import concurrent.futures |
|
|
import json |
|
|
import os |
|
|
from pathlib import Path |
|
|
|
|
|
from tqdm import tqdm |
|
|
|
|
|
from detectron2.data.detection_utils import read_image |
|
|
|
|
|
|
|
|
def parse_args(): |
|
|
""" |
|
|
Parse input arguments |
|
|
""" |
|
|
parser = argparse.ArgumentParser(description="Get image information") |
|
|
parser.add_argument("--image_dir", help="path to image", type=str) |
|
|
parser.add_argument("--json_path", help="path to json", type=str) |
|
|
parser.add_argument("--output_path", type=str) |
|
|
parser.add_argument("--max_workers", type=int, default=100) |
|
|
args = parser.parse_args() |
|
|
return args |
|
|
|
|
|
|
|
|
args = parse_args() |
|
|
|
|
|
|
|
|
def get_image_info(line, image_dir): |
|
|
result = { |
|
|
"status": "", |
|
|
"id": None, |
|
|
"file_name": None, |
|
|
"height": None, |
|
|
"width": None, |
|
|
"channel": None, |
|
|
} |
|
|
|
|
|
file_name = line["file_name"] |
|
|
|
|
|
file_path = os.path.join(image_dir, file_name) |
|
|
if not os.path.isfile(file_path): |
|
|
result["status"] = "NOFOUND" |
|
|
print(line) |
|
|
return result |
|
|
try: |
|
|
image = read_image(file_path, format="BGR") |
|
|
except Exception as e: |
|
|
result["status"] = "TRUNCATED" |
|
|
print(e, line) |
|
|
return result |
|
|
|
|
|
result["status"] = "SUCCESS" |
|
|
result["id"] = line["id"] |
|
|
result["file_name"] = line["file_name"] |
|
|
result["height"] = image.shape[0] |
|
|
result["width"] = image.shape[1] |
|
|
result["channel"] = image.shape[2] |
|
|
|
|
|
return result |
|
|
|
|
|
|
|
|
def get_images_info(data, image_dir, record_file): |
|
|
with tqdm(total=len(data)) as pbar: |
|
|
with concurrent.futures.ThreadPoolExecutor(max_workers=args.max_workers) as executor: |
|
|
|
|
|
chunk_size = min(50000, args.max_workers * 500) |
|
|
for i in range(0, len(data), chunk_size): |
|
|
futures = [ |
|
|
executor.submit(get_image_info, line, image_dir) |
|
|
for line in data[i : i + chunk_size] |
|
|
] |
|
|
for future in concurrent.futures.as_completed(futures): |
|
|
r = future.result() |
|
|
status, image_id, file_name, height, width, channel = ( |
|
|
r["status"], |
|
|
r["id"], |
|
|
r["file_name"], |
|
|
r["height"], |
|
|
r["width"], |
|
|
r["channel"], |
|
|
) |
|
|
if status == "SUCCESS": |
|
|
record_file.write(f"{image_id} {file_name} {height} {width} {channel}\n") |
|
|
elif status == "NOFOUND": |
|
|
pass |
|
|
elif status == "TRUNCATED": |
|
|
pass |
|
|
else: |
|
|
assert False |
|
|
pbar.update(1) |
|
|
|
|
|
|
|
|
def main(): |
|
|
print("loading", args.json_path) |
|
|
json_data = json.load(open(args.json_path, "r")) |
|
|
images = json_data["images"] |
|
|
|
|
|
record_file = open(args.output_path, "w", encoding="utf8") |
|
|
|
|
|
get_images_info(images, args.image_dir, record_file) |
|
|
|
|
|
record_file.close() |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |
|
|
|