""" Process parquet file to extract JSON data and images. Usage: python scripts/process_parquet_to_json.py \ --parquet_path data/math_vision_1/data/test-00000-of-00001-3532b8d3f1b4047a.parquet \ --output_dir data/math_vision_1 """ import os import json import argparse import logging from pathlib import Path import pyarrow.parquet as pq from PIL import Image import io logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') def clean_for_json(obj): """Recursively clean object for JSON serialization.""" if isinstance(obj, bytes): return None # Skip bytes elif isinstance(obj, dict): return {k: clean_for_json(v) for k, v in obj.items() if not isinstance(v, bytes)} elif isinstance(obj, (list, tuple)): return [clean_for_json(item) for item in obj if not isinstance(item, bytes)] else: return obj def process_parquet(parquet_path: str, output_dir: str): """ Process parquet file and extract data to JSON and images. Args: parquet_path: Path to the parquet file output_dir: Directory to save JSON and images """ # Create output directories output_dir = Path(output_dir) output_dir.mkdir(parents=True, exist_ok=True) images_dir = output_dir / "images" images_dir.mkdir(exist_ok=True) logging.info(f"Loading parquet file: {parquet_path}") parquet_file = pq.ParquetFile(parquet_path) # Read first row group to get schema table = parquet_file.read() logging.info(f"Loaded {len(table)} rows") logging.info(f"Columns: {table.column_names}") # Process each row data_list = [] num_rows = len(table) for idx in range(num_rows): item = {} # Extract all fields for col in table.column_names: # Skip decoded_image column (contains binary data) if col == 'decoded_image': continue value = table.column(col)[idx].as_py() # Clean value for JSON serialization cleaned_value = clean_for_json(value) item[col] = cleaned_value # Log progress if (idx + 1) % 500 == 0: logging.info(f"Processed {idx+1}/{num_rows} rows...") data_list.append(item) # Save JSON json_filename = Path(parquet_path).stem.replace('-00000-of-00001-3532b8d3f1b4047a', '') + '.json' json_path = output_dir / json_filename logging.info(f"Saving JSON to: {json_path}") with open(json_path, 'w', encoding='utf-8') as f: json.dump(data_list, f, ensure_ascii=False, indent=2) logging.info(f"Done! Processed {len(data_list)} items") logging.info(f" - JSON: {json_path}") logging.info(f" - Images: {images_dir} ({len(list(images_dir.glob('*.png')))} files)") def main(): parser = argparse.ArgumentParser(description="Process parquet file to JSON and images") parser.add_argument("--parquet_path", type=str, required=True, help="Path to parquet file") parser.add_argument("--output_dir", type=str, required=True, help="Output directory") args = parser.parse_args() if not os.path.exists(args.parquet_path): logging.error(f"Parquet file not found: {args.parquet_path}") return process_parquet(args.parquet_path, args.output_dir) if __name__ == "__main__": main()