File size: 3,476 Bytes
e34b94f |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 |
"""
Process parquet file to extract JSON data and images.
Usage:
python scripts/process_parquet_to_json.py \
--parquet_path data/math_vision_1/data/test-00000-of-00001-3532b8d3f1b4047a.parquet \
--output_dir data/math_vision_1
"""
import os
import json
import argparse
import logging
from pathlib import Path
import pyarrow.parquet as pq
from PIL import Image
import io
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def clean_for_json(obj):
"""Recursively clean object for JSON serialization."""
if isinstance(obj, bytes):
return None # Skip bytes
elif isinstance(obj, dict):
return {k: clean_for_json(v) for k, v in obj.items() if not isinstance(v, bytes)}
elif isinstance(obj, (list, tuple)):
return [clean_for_json(item) for item in obj if not isinstance(item, bytes)]
else:
return obj
def process_parquet(parquet_path: str, output_dir: str):
"""
Process parquet file and extract data to JSON and images.
Args:
parquet_path: Path to the parquet file
output_dir: Directory to save JSON and images
"""
# Create output directories
output_dir = Path(output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
images_dir = output_dir / "images"
images_dir.mkdir(exist_ok=True)
logging.info(f"Loading parquet file: {parquet_path}")
parquet_file = pq.ParquetFile(parquet_path)
# Read first row group to get schema
table = parquet_file.read()
logging.info(f"Loaded {len(table)} rows")
logging.info(f"Columns: {table.column_names}")
# Process each row
data_list = []
num_rows = len(table)
for idx in range(num_rows):
item = {}
# Extract all fields
for col in table.column_names:
# Skip decoded_image column (contains binary data)
if col == 'decoded_image':
continue
value = table.column(col)[idx].as_py()
# Clean value for JSON serialization
cleaned_value = clean_for_json(value)
item[col] = cleaned_value
# Log progress
if (idx + 1) % 500 == 0:
logging.info(f"Processed {idx+1}/{num_rows} rows...")
data_list.append(item)
# Save JSON
json_filename = Path(parquet_path).stem.replace('-00000-of-00001-3532b8d3f1b4047a', '') + '.json'
json_path = output_dir / json_filename
logging.info(f"Saving JSON to: {json_path}")
with open(json_path, 'w', encoding='utf-8') as f:
json.dump(data_list, f, ensure_ascii=False, indent=2)
logging.info(f"Done! Processed {len(data_list)} items")
logging.info(f" - JSON: {json_path}")
logging.info(f" - Images: {images_dir} ({len(list(images_dir.glob('*.png')))} files)")
def main():
parser = argparse.ArgumentParser(description="Process parquet file to JSON and images")
parser.add_argument("--parquet_path", type=str, required=True, help="Path to parquet file")
parser.add_argument("--output_dir", type=str, required=True, help="Output directory")
args = parser.parse_args()
if not os.path.exists(args.parquet_path):
logging.error(f"Parquet file not found: {args.parquet_path}")
return
process_parquet(args.parquet_path, args.output_dir)
if __name__ == "__main__":
main()
|