File size: 3,476 Bytes
e34b94f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
"""
Process parquet file to extract JSON data and images.

Usage:
    python scripts/process_parquet_to_json.py \
        --parquet_path data/math_vision_1/data/test-00000-of-00001-3532b8d3f1b4047a.parquet \
        --output_dir data/math_vision_1
"""

import os
import json
import argparse
import logging
from pathlib import Path
import pyarrow.parquet as pq
from PIL import Image
import io

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')


def clean_for_json(obj):
    """Recursively clean object for JSON serialization."""
    if isinstance(obj, bytes):
        return None  # Skip bytes
    elif isinstance(obj, dict):
        return {k: clean_for_json(v) for k, v in obj.items() if not isinstance(v, bytes)}
    elif isinstance(obj, (list, tuple)):
        return [clean_for_json(item) for item in obj if not isinstance(item, bytes)]
    else:
        return obj


def process_parquet(parquet_path: str, output_dir: str):
    """
    Process parquet file and extract data to JSON and images.
    
    Args:
        parquet_path: Path to the parquet file
        output_dir: Directory to save JSON and images
    """
    # Create output directories
    output_dir = Path(output_dir)
    output_dir.mkdir(parents=True, exist_ok=True)
    
    images_dir = output_dir / "images"
    images_dir.mkdir(exist_ok=True)
    
    logging.info(f"Loading parquet file: {parquet_path}")
    parquet_file = pq.ParquetFile(parquet_path)
    
    # Read first row group to get schema
    table = parquet_file.read()
    
    logging.info(f"Loaded {len(table)} rows")
    logging.info(f"Columns: {table.column_names}")
    
    # Process each row
    data_list = []
    num_rows = len(table)
    
    for idx in range(num_rows):
        item = {}
        
        # Extract all fields
        for col in table.column_names:
            # Skip decoded_image column (contains binary data)
            if col == 'decoded_image':
                continue
                
            value = table.column(col)[idx].as_py()
            
            # Clean value for JSON serialization
            cleaned_value = clean_for_json(value)
            item[col] = cleaned_value
        
        # Log progress
        if (idx + 1) % 500 == 0:
            logging.info(f"Processed {idx+1}/{num_rows} rows...")
        
        data_list.append(item)
    
    # Save JSON
    json_filename = Path(parquet_path).stem.replace('-00000-of-00001-3532b8d3f1b4047a', '') + '.json'
    json_path = output_dir / json_filename
    
    logging.info(f"Saving JSON to: {json_path}")
    with open(json_path, 'w', encoding='utf-8') as f:
        json.dump(data_list, f, ensure_ascii=False, indent=2)
    
    logging.info(f"Done! Processed {len(data_list)} items")
    logging.info(f"  - JSON: {json_path}")
    logging.info(f"  - Images: {images_dir} ({len(list(images_dir.glob('*.png')))} files)")


def main():
    parser = argparse.ArgumentParser(description="Process parquet file to JSON and images")
    parser.add_argument("--parquet_path", type=str, required=True, help="Path to parquet file")
    parser.add_argument("--output_dir", type=str, required=True, help="Output directory")
    
    args = parser.parse_args()
    
    if not os.path.exists(args.parquet_path):
        logging.error(f"Parquet file not found: {args.parquet_path}")
        return
    
    process_parquet(args.parquet_path, args.output_dir)


if __name__ == "__main__":
    main()