model111 / scripts /process_parquet_to_json.py
LCZZZZ's picture
Upload MemGen code and data
e34b94f verified
"""
Process parquet file to extract JSON data and images.
Usage:
python scripts/process_parquet_to_json.py \
--parquet_path data/math_vision_1/data/test-00000-of-00001-3532b8d3f1b4047a.parquet \
--output_dir data/math_vision_1
"""
import os
import json
import argparse
import logging
from pathlib import Path
import pyarrow.parquet as pq
from PIL import Image
import io
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def clean_for_json(obj):
"""Recursively clean object for JSON serialization."""
if isinstance(obj, bytes):
return None # Skip bytes
elif isinstance(obj, dict):
return {k: clean_for_json(v) for k, v in obj.items() if not isinstance(v, bytes)}
elif isinstance(obj, (list, tuple)):
return [clean_for_json(item) for item in obj if not isinstance(item, bytes)]
else:
return obj
def process_parquet(parquet_path: str, output_dir: str):
"""
Process parquet file and extract data to JSON and images.
Args:
parquet_path: Path to the parquet file
output_dir: Directory to save JSON and images
"""
# Create output directories
output_dir = Path(output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
images_dir = output_dir / "images"
images_dir.mkdir(exist_ok=True)
logging.info(f"Loading parquet file: {parquet_path}")
parquet_file = pq.ParquetFile(parquet_path)
# Read first row group to get schema
table = parquet_file.read()
logging.info(f"Loaded {len(table)} rows")
logging.info(f"Columns: {table.column_names}")
# Process each row
data_list = []
num_rows = len(table)
for idx in range(num_rows):
item = {}
# Extract all fields
for col in table.column_names:
# Skip decoded_image column (contains binary data)
if col == 'decoded_image':
continue
value = table.column(col)[idx].as_py()
# Clean value for JSON serialization
cleaned_value = clean_for_json(value)
item[col] = cleaned_value
# Log progress
if (idx + 1) % 500 == 0:
logging.info(f"Processed {idx+1}/{num_rows} rows...")
data_list.append(item)
# Save JSON
json_filename = Path(parquet_path).stem.replace('-00000-of-00001-3532b8d3f1b4047a', '') + '.json'
json_path = output_dir / json_filename
logging.info(f"Saving JSON to: {json_path}")
with open(json_path, 'w', encoding='utf-8') as f:
json.dump(data_list, f, ensure_ascii=False, indent=2)
logging.info(f"Done! Processed {len(data_list)} items")
logging.info(f" - JSON: {json_path}")
logging.info(f" - Images: {images_dir} ({len(list(images_dir.glob('*.png')))} files)")
def main():
parser = argparse.ArgumentParser(description="Process parquet file to JSON and images")
parser.add_argument("--parquet_path", type=str, required=True, help="Path to parquet file")
parser.add_argument("--output_dir", type=str, required=True, help="Output directory")
args = parser.parse_args()
if not os.path.exists(args.parquet_path):
logging.error(f"Parquet file not found: {args.parquet_path}")
return
process_parquet(args.parquet_path, args.output_dir)
if __name__ == "__main__":
main()