#!/usr/bin/env python3 """ 白血病細胞画像の前処理用スクリプト ch1とch6の画像をマージし、正規化してRGB形式で保存します。 必要なライブラリ: - numpy - tifffile (TIFFファイルの読み込み用) - PIL (画像処理用) 使用方法: python prepare_data.py input_dir output_dir [--workers N] [--recursive] """ import argparse from pathlib import Path import numpy as np from PIL import Image import tifffile from concurrent.futures import ProcessPoolExecutor, as_completed import sys from typing import Tuple, List import logging def setup_logger(): """ロガーの設定""" logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) return logging.getLogger(__name__) def load_and_normalize(path: Path) -> np.ndarray: """ TIFF画像を読み込み、0~255の8bit画像に正規化する """ img = tifffile.imread(str(path)) img_norm = (img - np.min(img)) / (np.max(img) - np.min(img)) * 255 return img_norm.astype(np.uint8) def process_image_pair(paths: Tuple[Path, Path, Path]) -> None: """ ch1とch6の画像ペアを処理してマージ画像を保存 """ ch1_path, ch6_path, save_path = paths try: # 画像の読み込みと正規化 arr1 = load_and_normalize(ch1_path) arr6 = load_and_normalize(ch6_path) # 空のチャンネル作成 empty_channel = np.zeros_like(arr1) # RGB形式で統合 (R: ch1, G: ch6, B: empty) merged_array = np.stack((arr1, arr6, empty_channel), axis=-1) merged_image = Image.fromarray(merged_array) # 保存 save_path.parent.mkdir(parents=True, exist_ok=True) merged_image.save(save_path) return True except Exception as e: logging.error(f"Error processing {ch1_path}: {e}") return False def find_image_pairs(input_dir: Path) -> List[Tuple[Path, Path]]: """ 入力ディレクトリからch1とch6のペアを見つける """ pairs = [] for ch1_file in input_dir.glob("ch1_*.tif"): idx = ch1_file.stem.split('_')[1] ch6_file = ch1_file.parent / f"ch6_{idx}.tif" if ch6_file.exists(): pairs.append((ch1_file, ch6_file)) return pairs def main(): parser = argparse.ArgumentParser(description='細胞画像の前処理スクリプト') parser.add_argument('input_dir', type=str, help='入力ディレクトリのパス') parser.add_argument('output_dir', type=str, help='出力ディレクトリのパス') parser.add_argument('--workers', type=int, default=4, help='並列処理のワーカー数') parser.add_argument('--recursive', action='store_true', help='サブディレクトリも処理する') args = parser.parse_args() logger = setup_logger() input_path = Path(args.input_dir) output_path = Path(args.output_dir) if not input_path.exists(): logger.error(f"入力ディレクトリが存在しません: {args.input_dir}") sys.exit(1) # 処理対象のディレクトリを特定 target_dirs = list(input_path.glob("**/*")) if args.recursive else [input_path] target_dirs = [d for d in target_dirs if d.is_dir()] total_processed = 0 total_failed = 0 with ProcessPoolExecutor(max_workers=args.workers) as executor: for current_dir in target_dirs: # 画像ペアの検索 pairs = find_image_pairs(current_dir) if not pairs: continue # 相対パスを保持した出力先の設定 rel_path = current_dir.relative_to(input_path) current_output_dir = output_path / rel_path # 処理タスクのリスト作成 tasks = [ (ch1_file, ch6_file, current_output_dir / f"merged_{ch1_file.stem.split('_')[1]}.tif") for ch1_file, ch6_file in pairs ] # 並列処理の実行 futures = [executor.submit(process_image_pair, task) for task in tasks] successful = sum(1 for future in futures if future.result()) failed = len(futures) - successful total_processed += successful total_failed += failed logger.info(f"{current_dir.name}: {successful}/{len(pairs)} files processed successfully") logger.info(f"\n処理完了:") logger.info(f"成功: {total_processed}") logger.info(f"失敗: {total_failed}") if __name__ == "__main__": main()