| | |
| | """ |
| | 白血病細胞画像の前処理用スクリプト |
| | ch1とch6の画像をマージし、正規化してRGB形式で保存します。 |
| | |
| | 必要なライブラリ: |
| | - numpy |
| | - tifffile (TIFFファイルの読み込み用) |
| | - PIL (画像処理用) |
| | |
| | 使用方法: |
| | python prepare_data.py input_dir output_dir [--workers N] [--recursive] |
| | """ |
| | import argparse |
| | from pathlib import Path |
| | import numpy as np |
| | from PIL import Image |
| | import tifffile |
| | from concurrent.futures import ProcessPoolExecutor, as_completed |
| | import sys |
| | from typing import Tuple, List |
| | import logging |
| |
|
| | def setup_logger(): |
| | """ロガーの設定""" |
| | logging.basicConfig( |
| | level=logging.INFO, |
| | format='%(asctime)s - %(levelname)s - %(message)s' |
| | ) |
| | return logging.getLogger(__name__) |
| |
|
| | def load_and_normalize(path: Path) -> np.ndarray: |
| | """ |
| | TIFF画像を読み込み、0~255の8bit画像に正規化する |
| | """ |
| | img = tifffile.imread(str(path)) |
| | img_norm = (img - np.min(img)) / (np.max(img) - np.min(img)) * 255 |
| | return img_norm.astype(np.uint8) |
| |
|
| | def process_image_pair(paths: Tuple[Path, Path, Path]) -> None: |
| | """ |
| | ch1とch6の画像ペアを処理してマージ画像を保存 |
| | """ |
| | ch1_path, ch6_path, save_path = paths |
| | try: |
| | |
| | arr1 = load_and_normalize(ch1_path) |
| | arr6 = load_and_normalize(ch6_path) |
| | |
| | |
| | empty_channel = np.zeros_like(arr1) |
| | |
| | |
| | merged_array = np.stack((arr1, arr6, empty_channel), axis=-1) |
| | merged_image = Image.fromarray(merged_array) |
| | |
| | |
| | save_path.parent.mkdir(parents=True, exist_ok=True) |
| | merged_image.save(save_path) |
| | return True |
| | |
| | except Exception as e: |
| | logging.error(f"Error processing {ch1_path}: {e}") |
| | return False |
| |
|
| | def find_image_pairs(input_dir: Path) -> List[Tuple[Path, Path]]: |
| | """ |
| | 入力ディレクトリからch1とch6のペアを見つける |
| | """ |
| | pairs = [] |
| | for ch1_file in input_dir.glob("ch1_*.tif"): |
| | idx = ch1_file.stem.split('_')[1] |
| | ch6_file = ch1_file.parent / f"ch6_{idx}.tif" |
| | if ch6_file.exists(): |
| | pairs.append((ch1_file, ch6_file)) |
| | return pairs |
| |
|
| | def main(): |
| | parser = argparse.ArgumentParser(description='細胞画像の前処理スクリプト') |
| | parser.add_argument('input_dir', type=str, help='入力ディレクトリのパス') |
| | parser.add_argument('output_dir', type=str, help='出力ディレクトリのパス') |
| | parser.add_argument('--workers', type=int, default=4, help='並列処理のワーカー数') |
| | parser.add_argument('--recursive', action='store_true', help='サブディレクトリも処理する') |
| | args = parser.parse_args() |
| |
|
| | logger = setup_logger() |
| | input_path = Path(args.input_dir) |
| | output_path = Path(args.output_dir) |
| | |
| | if not input_path.exists(): |
| | logger.error(f"入力ディレクトリが存在しません: {args.input_dir}") |
| | sys.exit(1) |
| | |
| | |
| | target_dirs = list(input_path.glob("**/*")) if args.recursive else [input_path] |
| | target_dirs = [d for d in target_dirs if d.is_dir()] |
| | |
| | total_processed = 0 |
| | total_failed = 0 |
| | |
| | with ProcessPoolExecutor(max_workers=args.workers) as executor: |
| | for current_dir in target_dirs: |
| | |
| | pairs = find_image_pairs(current_dir) |
| | if not pairs: |
| | continue |
| | |
| | |
| | rel_path = current_dir.relative_to(input_path) |
| | current_output_dir = output_path / rel_path |
| | |
| | |
| | tasks = [ |
| | (ch1_file, ch6_file, current_output_dir / f"merged_{ch1_file.stem.split('_')[1]}.tif") |
| | for ch1_file, ch6_file in pairs |
| | ] |
| | |
| | |
| | futures = [executor.submit(process_image_pair, task) for task in tasks] |
| | |
| | successful = sum(1 for future in futures if future.result()) |
| | failed = len(futures) - successful |
| | |
| | total_processed += successful |
| | total_failed += failed |
| | |
| | logger.info(f"{current_dir.name}: {successful}/{len(pairs)} files processed successfully") |
| | |
| | logger.info(f"\n処理完了:") |
| | logger.info(f"成功: {total_processed}") |
| | logger.info(f"失敗: {total_failed}") |
| |
|
| | if __name__ == "__main__": |
| | main() |