| import argparse |
| import json |
| import os |
| import logging |
| import sys |
| from pathlib import Path |
| from concurrent.futures import ThreadPoolExecutor |
| from ast import literal_eval |
| import time |
| from typing import Tuple |
| import pandas as pd |
|
|
| |
| logging.basicConfig( |
| level=logging.INFO, |
| format="%(asctime)s - %(levelname)s - %(message)s", |
| stream=sys.stdout |
| ) |
|
|
| def process_row(args): |
| """处理单行数据(线程安全)""" |
| index, row, file_stem= args |
| try: |
| |
| media_path = "./" + (Path("data") / file_stem / f"{index}.png").as_posix() |
| |
| |
| return { |
| "index": index, |
| "media_type": "image", |
| "media_paths": media_path, |
| "description": "", |
| "task_type": "Vision-Question-Answer", |
| "question": [row.get('question', '')], |
| "question_type": "free-form", |
| "annotations":[], |
| "options": [], |
| "answer": [row.get('answer', '')], |
| "source": "PathQA", |
| "domain": "Biomedical" |
| } |
| except Exception as e: |
| logging.error(f"处理行 {index} 时出错: {str(e)}") |
| return None |
|
|
| def process_single_parquet(parquet_path: Path, output_root: Path) -> Tuple[int, int]: |
| """处理单个Parquet文件""" |
| start_time = time.time() |
| file_stem = parquet_path.stem |
| output_dir = output_root |
| output_json = output_dir / f"{file_stem}.json" |
| |
| success_count = 0 |
| error_count = 0 |
| results = [] |
| |
| try: |
| df = pd.read_parquet(parquet_path) |
| total_rows = len(df) |
| |
| logging.info(f"\n{'='*40}\nProcessing: {parquet_path.name}") |
| logging.info(f"Output directory: {output_dir}") |
| |
| |
| with ThreadPoolExecutor(max_workers=os.cpu_count() * 2) as executor: |
| task_args = [(idx, row, file_stem) for idx, row in df.iterrows()] |
| futures = [executor.submit(process_row, args) for args in task_args] |
| |
| for future in futures: |
| result = future.result() |
| if result: |
| results.append(result) |
| success_count += 1 |
| else: |
| error_count += 1 |
|
|
| |
| with open(output_json, 'w', encoding='utf-8') as f: |
| json.dump(results, f, ensure_ascii=False, indent=2) |
| |
| |
| process_time = time.time() - start_time |
| logging.info( |
| f"Processed: {success_count}/{total_rows} | " |
| f"Errors: {error_count} | " |
| f"Time: {process_time:.2f}s" |
| ) |
| |
| return success_count, error_count |
| |
| except Exception as e: |
| logging.error(f"处理文件失败: {str(e)}") |
| return 0, total_rows |
|
|
| def batch_process_parquets(input_dir: Path, output_root: Path): |
| """批量处理目录下所有Parquet文件""" |
| input_path = Path(input_dir) |
| output_root = Path(output_root) |
| |
| if not input_path.exists(): |
| raise FileNotFoundError(f"输入目录不存在: {input_path}") |
| |
| parquet_files = list(input_path.glob("*.parquet")) |
| if not parquet_files: |
| logging.warning("未找到Parquet文件") |
| return |
| |
| total_stats = {'success': 0, 'errors': 0} |
| |
| for parquet_file in parquet_files: |
| success, errors = process_single_parquet(parquet_file, output_root) |
| total_stats['success'] += success |
| total_stats['errors'] += errors |
| |
| logging.info(f"\n{'='*40}\n批量处理完成") |
| logging.info(f"处理文件总数: {len(parquet_files)}") |
| logging.info(f"总成功条目: {total_stats['success']}") |
| logging.info(f"总失败条目: {total_stats['errors']}") |
|
|
| if __name__ == "__main__": |
| parser = argparse.ArgumentParser(description='批量处理Parquet文件转JSON') |
| parser.add_argument('-i', '--input', required=True, help='输入目录路径') |
| parser.add_argument('-o', '--output', required=True, help='输出根目录路径') |
| |
| args = parser.parse_args() |
| |
| try: |
| start_time = time.time() |
| batch_process_parquets( |
| input_dir=args.input, |
| output_root=args.output |
| ) |
| logging.info(f"\n总耗时: {time.time()-start_time:.2f}s") |
| except Exception as e: |
| logging.error(f"程序异常终止: {str(e)}") |
| sys.exit(1) |