import os import os.path as osp import time import itertools import shutil import glob import argparse import json from concurrent.futures import ThreadPoolExecutor, as_completed import tqdm import numpy as np def save_lines(lines, filename): os.makedirs(osp.dirname(filename), exist_ok=True) with open(filename, 'w') as f: f.writelines(lines) del lines def get_part_jsonls(save_dir, total_line_number, ext='.jsonl', chunk_size=1000, bucket_size=1000): if osp.exists(save_dir): shutil.rmtree(save_dir) chunk_id2save_files = {} missing = False parts = int(np.ceil(total_line_number / chunk_size)) for chunk_id in range(1, parts+1): if chunk_id == parts: num_of_lines = total_line_number - chunk_size * (parts-1) else: num_of_lines = chunk_size bucket = (chunk_id-1) // args.bucket_size + 1 chunk_id2save_files[chunk_id] = osp.join(save_dir, f'{bucket:06d}', f'{chunk_id:04d}_{parts:04d}_{num_of_lines:09d}{ext}') if not osp.exists(chunk_id2save_files[chunk_id]): missing = True return missing, chunk_id2save_files def split_large_txt_files(all_lines, chunk_id2save_files): chunk_id = 1 total = len(all_lines) pbar = tqdm.tqdm(total=len(chunk_id2save_files)) chunk = [] futures = [] max_workers = 128 with ThreadPoolExecutor(max_workers=max_workers) as executor: for line in all_lines: chunk.append(line) cur_chunk_size = int(osp.splitext(osp.basename(chunk_id2save_files[chunk_id]))[0].split('_')[-1]) if len(chunk) >= cur_chunk_size: futures.append( executor.submit(save_lines, chunk, chunk_id2save_files[chunk_id]) ) pbar.update(1) chunk = [] chunk_id += 1 if len(chunk): raise ValueError("last chunk not save, means misalign data!") for future in as_completed(futures): future.result() pbar.close() from multiprocessing import Manager lock = Manager().Lock() def read_jsonl(jsonl_file): with open(jsonl_file, 'r') as f: lines = f.readlines() global pbar with lock: pbar.update(1) return lines def read_jsonls(jsonl_files, worker): global pbar from multiprocessing.pool import ThreadPool pbar = tqdm.tqdm(total=len(jsonl_files)) print(f'[Data Loading] Reading {len(jsonl_files)} meta files...') all_lines = [] if len(jsonl_files) == 1: lines_num = int(osp.splitext(jsonl_files[0])[0].split('_')[-1]) pbar = tqdm.tqdm(total=lines_num) with open(jsonl_files[0], 'r') as f: for line in f: pbar.update(1) all_lines.append(line) else: with ThreadPool(worker) as pool: for img_metas in pool.starmap(read_jsonl, [(bin_file,) for bin_file in jsonl_files]): all_lines.extend(img_metas) np.random.shuffle(all_lines) return all_lines if __name__ == '__main__': parser = argparse.ArgumentParser() parser.add_argument('--jsonl_folder_list', type=str, default='', nargs='+', help='patha pathb pathc') parser.add_argument('--save_dir', type=str, default='') parser.add_argument('--chunk_size', type=int, default=1000) parser.add_argument('--bucket_size', type=int, default=1000) parser.add_argument('--worker', type=int, default=128) args = parser.parse_args() global pbar t1 = time.time() jsonl_files = [] for item in args.jsonl_folder_list: jsonl_files += glob.glob(osp.join(item, '*.jsonl')) # jsonl_files += glob.glob(osp.join(item, '*/*.jsonl' )) np.random.shuffle(jsonl_files) pbar = tqdm.tqdm(total=len(jsonl_files)) lines = read_jsonls(jsonl_files, args.worker) print(f'total {len(lines)} lines') line_num = len(lines) missing, chunk_id2save_files = get_part_jsonls(args.save_dir, line_num, chunk_size=args.chunk_size, bucket_size=args.bucket_size) split_large_txt_files(lines, chunk_id2save_files) t2 = time.time() print(f'split takes {t2-t1}s')