Spaces:
Running
on
Zero
Running
on
Zero
File size: 2,201 Bytes
939bf35 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
import argparse
import json
import os
from multiprocessing import Manager, Pool
from pathlib import Path
import pandas as pd
from natsort import index_natsorted
from .get_meta_file import parallel_rglob
from .logger import logger
def process_file(file_path, shared_list):
with open(file_path, "r") as f:
for line in f:
data = json.loads(line)
shared_list.append(data)
def parse_args():
parser = argparse.ArgumentParser(description="Gather all jsonl files in a folder (meta_folder) to a single jsonl file (meta_file_path).")
parser.add_argument("--meta_folder", type=str, required=True)
parser.add_argument("--video_path_column", type=str, default="video_path")
parser.add_argument("--meta_file_path", type=str, required=True)
parser.add_argument("--n_jobs", type=int, default=1)
parser.add_argument("--recursive", action="store_true", help="Whether to search sub-folders recursively.")
args = parser.parse_args()
return args
def main():
args = parse_args()
if not os.path.exists(args.meta_folder):
raise ValueError(f"The meta_folder {args.meta_folder} does not exist.")
meta_folder = Path(args.meta_folder)
if args.recursive:
jsonl_files = [str(file) for file in parallel_rglob(meta_folder, f"*.jsonl", max_workers=args.n_jobs)]
else:
jsonl_files = [str(file) for file in meta_folder.glob(f"*.jsonl")]
with Manager() as manager:
shared_list = manager.list()
with Pool(processes=args.n_jobs) as pool:
for file_path in jsonl_files:
pool.apply_async(process_file, args=(file_path, shared_list))
pool.close()
pool.join()
with open(args.meta_file_path, "w") as f:
for item in shared_list:
f.write(json.dumps(item) + '\n')
df = pd.read_json(args.meta_file_path, lines=True)
df = df.iloc[index_natsorted(df[args.video_path_column])].reset_index(drop=True)
logger.info(f"Save the gathered single jsonl file to {args.meta_file_path}.")
df.to_json(args.meta_file_path, orient="records", lines=True, force_ascii=False)
if __name__ == '__main__':
main()
|