GRN / tools /split_jsonl.py
hanjian.thu123
Remove demo directory with binary files and update project files
9ef2cbc
import os
import os.path as osp
import time
import itertools
import shutil
import glob
import argparse
import json
from concurrent.futures import ThreadPoolExecutor, as_completed
import tqdm
import numpy as np
def save_lines(lines, filename):
os.makedirs(osp.dirname(filename), exist_ok=True)
with open(filename, 'w') as f:
f.writelines(lines)
del lines
def get_part_jsonls(save_dir, total_line_number, ext='.jsonl', chunk_size=1000, bucket_size=1000):
if osp.exists(save_dir):
shutil.rmtree(save_dir)
chunk_id2save_files = {}
missing = False
parts = int(np.ceil(total_line_number / chunk_size))
for chunk_id in range(1, parts+1):
if chunk_id == parts:
num_of_lines = total_line_number - chunk_size * (parts-1)
else:
num_of_lines = chunk_size
bucket = (chunk_id-1) // args.bucket_size + 1
chunk_id2save_files[chunk_id] = osp.join(save_dir, f'{bucket:06d}', f'{chunk_id:04d}_{parts:04d}_{num_of_lines:09d}{ext}')
if not osp.exists(chunk_id2save_files[chunk_id]):
missing = True
return missing, chunk_id2save_files
def split_large_txt_files(all_lines, chunk_id2save_files):
chunk_id = 1
total = len(all_lines)
pbar = tqdm.tqdm(total=len(chunk_id2save_files))
chunk = []
futures = []
max_workers = 128
with ThreadPoolExecutor(max_workers=max_workers) as executor:
for line in all_lines:
chunk.append(line)
cur_chunk_size = int(osp.splitext(osp.basename(chunk_id2save_files[chunk_id]))[0].split('_')[-1])
if len(chunk) >= cur_chunk_size:
futures.append(
executor.submit(save_lines, chunk, chunk_id2save_files[chunk_id])
)
pbar.update(1)
chunk = []
chunk_id += 1
if len(chunk):
raise ValueError("last chunk not save, means misalign data!")
for future in as_completed(futures):
future.result()
pbar.close()
from multiprocessing import Manager
lock = Manager().Lock()
def read_jsonl(jsonl_file):
with open(jsonl_file, 'r') as f:
lines = f.readlines()
global pbar
with lock:
pbar.update(1)
return lines
def read_jsonls(jsonl_files, worker):
global pbar
from multiprocessing.pool import ThreadPool
pbar = tqdm.tqdm(total=len(jsonl_files))
print(f'[Data Loading] Reading {len(jsonl_files)} meta files...')
all_lines = []
if len(jsonl_files) == 1:
lines_num = int(osp.splitext(jsonl_files[0])[0].split('_')[-1])
pbar = tqdm.tqdm(total=lines_num)
with open(jsonl_files[0], 'r') as f:
for line in f:
pbar.update(1)
all_lines.append(line)
else:
with ThreadPool(worker) as pool:
for img_metas in pool.starmap(read_jsonl, [(bin_file,) for bin_file in jsonl_files]):
all_lines.extend(img_metas)
np.random.shuffle(all_lines)
return all_lines
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--jsonl_folder_list', type=str, default='', nargs='+', help='patha pathb pathc')
parser.add_argument('--save_dir', type=str, default='')
parser.add_argument('--chunk_size', type=int, default=1000)
parser.add_argument('--bucket_size', type=int, default=1000)
parser.add_argument('--worker', type=int, default=128)
args = parser.parse_args()
global pbar
t1 = time.time()
jsonl_files = []
for item in args.jsonl_folder_list:
jsonl_files += glob.glob(osp.join(item, '*.jsonl'))
# jsonl_files += glob.glob(osp.join(item, '*/*.jsonl' ))
np.random.shuffle(jsonl_files)
pbar = tqdm.tqdm(total=len(jsonl_files))
lines = read_jsonls(jsonl_files, args.worker)
print(f'total {len(lines)} lines')
line_num = len(lines)
missing, chunk_id2save_files = get_part_jsonls(args.save_dir, line_num, chunk_size=args.chunk_size, bucket_size=args.bucket_size)
split_large_txt_files(lines, chunk_id2save_files)
t2 = time.time()
print(f'split takes {t2-t1}s')