|
|
""" |
|
|
Process parquet file to extract JSON data and images. |
|
|
|
|
|
Usage: |
|
|
python scripts/process_parquet_to_json.py \ |
|
|
--parquet_path data/math_vision_1/data/test-00000-of-00001-3532b8d3f1b4047a.parquet \ |
|
|
--output_dir data/math_vision_1 |
|
|
""" |
|
|
|
|
|
import os |
|
|
import json |
|
|
import argparse |
|
|
import logging |
|
|
from pathlib import Path |
|
|
import pyarrow.parquet as pq |
|
|
from PIL import Image |
|
|
import io |
|
|
|
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') |
|
|
|
|
|
|
|
|
def clean_for_json(obj): |
|
|
"""Recursively clean object for JSON serialization.""" |
|
|
if isinstance(obj, bytes): |
|
|
return None |
|
|
elif isinstance(obj, dict): |
|
|
return {k: clean_for_json(v) for k, v in obj.items() if not isinstance(v, bytes)} |
|
|
elif isinstance(obj, (list, tuple)): |
|
|
return [clean_for_json(item) for item in obj if not isinstance(item, bytes)] |
|
|
else: |
|
|
return obj |
|
|
|
|
|
|
|
|
def process_parquet(parquet_path: str, output_dir: str): |
|
|
""" |
|
|
Process parquet file and extract data to JSON and images. |
|
|
|
|
|
Args: |
|
|
parquet_path: Path to the parquet file |
|
|
output_dir: Directory to save JSON and images |
|
|
""" |
|
|
|
|
|
output_dir = Path(output_dir) |
|
|
output_dir.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
images_dir = output_dir / "images" |
|
|
images_dir.mkdir(exist_ok=True) |
|
|
|
|
|
logging.info(f"Loading parquet file: {parquet_path}") |
|
|
parquet_file = pq.ParquetFile(parquet_path) |
|
|
|
|
|
|
|
|
table = parquet_file.read() |
|
|
|
|
|
logging.info(f"Loaded {len(table)} rows") |
|
|
logging.info(f"Columns: {table.column_names}") |
|
|
|
|
|
|
|
|
data_list = [] |
|
|
num_rows = len(table) |
|
|
|
|
|
for idx in range(num_rows): |
|
|
item = {} |
|
|
|
|
|
|
|
|
for col in table.column_names: |
|
|
|
|
|
if col == 'decoded_image': |
|
|
continue |
|
|
|
|
|
value = table.column(col)[idx].as_py() |
|
|
|
|
|
|
|
|
cleaned_value = clean_for_json(value) |
|
|
item[col] = cleaned_value |
|
|
|
|
|
|
|
|
if (idx + 1) % 500 == 0: |
|
|
logging.info(f"Processed {idx+1}/{num_rows} rows...") |
|
|
|
|
|
data_list.append(item) |
|
|
|
|
|
|
|
|
json_filename = Path(parquet_path).stem.replace('-00000-of-00001-3532b8d3f1b4047a', '') + '.json' |
|
|
json_path = output_dir / json_filename |
|
|
|
|
|
logging.info(f"Saving JSON to: {json_path}") |
|
|
with open(json_path, 'w', encoding='utf-8') as f: |
|
|
json.dump(data_list, f, ensure_ascii=False, indent=2) |
|
|
|
|
|
logging.info(f"Done! Processed {len(data_list)} items") |
|
|
logging.info(f" - JSON: {json_path}") |
|
|
logging.info(f" - Images: {images_dir} ({len(list(images_dir.glob('*.png')))} files)") |
|
|
|
|
|
|
|
|
def main(): |
|
|
parser = argparse.ArgumentParser(description="Process parquet file to JSON and images") |
|
|
parser.add_argument("--parquet_path", type=str, required=True, help="Path to parquet file") |
|
|
parser.add_argument("--output_dir", type=str, required=True, help="Output directory") |
|
|
|
|
|
args = parser.parse_args() |
|
|
|
|
|
if not os.path.exists(args.parquet_path): |
|
|
logging.error(f"Parquet file not found: {args.parquet_path}") |
|
|
return |
|
|
|
|
|
process_parquet(args.parquet_path, args.output_dir) |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |
|
|
|
|
|
|