File size: 4,688 Bytes
e363d75 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 | #!/usr/bin/env python3
"""
白血病細胞画像の前処理用スクリプト
ch1とch6の画像をマージし、正規化してRGB形式で保存します。
必要なライブラリ:
- numpy
- tifffile (TIFFファイルの読み込み用)
- PIL (画像処理用)
使用方法:
python prepare_data.py input_dir output_dir [--workers N] [--recursive]
"""
import argparse
from pathlib import Path
import numpy as np
from PIL import Image
import tifffile
from concurrent.futures import ProcessPoolExecutor, as_completed
import sys
from typing import Tuple, List
import logging
def setup_logger():
"""ロガーの設定"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
return logging.getLogger(__name__)
def load_and_normalize(path: Path) -> np.ndarray:
"""
TIFF画像を読み込み、0~255の8bit画像に正規化する
"""
img = tifffile.imread(str(path))
img_norm = (img - np.min(img)) / (np.max(img) - np.min(img)) * 255
return img_norm.astype(np.uint8)
def process_image_pair(paths: Tuple[Path, Path, Path]) -> None:
"""
ch1とch6の画像ペアを処理してマージ画像を保存
"""
ch1_path, ch6_path, save_path = paths
try:
# 画像の読み込みと正規化
arr1 = load_and_normalize(ch1_path)
arr6 = load_and_normalize(ch6_path)
# 空のチャンネル作成
empty_channel = np.zeros_like(arr1)
# RGB形式で統合 (R: ch1, G: ch6, B: empty)
merged_array = np.stack((arr1, arr6, empty_channel), axis=-1)
merged_image = Image.fromarray(merged_array)
# 保存
save_path.parent.mkdir(parents=True, exist_ok=True)
merged_image.save(save_path)
return True
except Exception as e:
logging.error(f"Error processing {ch1_path}: {e}")
return False
def find_image_pairs(input_dir: Path) -> List[Tuple[Path, Path]]:
"""
入力ディレクトリからch1とch6のペアを見つける
"""
pairs = []
for ch1_file in input_dir.glob("ch1_*.tif"):
idx = ch1_file.stem.split('_')[1]
ch6_file = ch1_file.parent / f"ch6_{idx}.tif"
if ch6_file.exists():
pairs.append((ch1_file, ch6_file))
return pairs
def main():
parser = argparse.ArgumentParser(description='細胞画像の前処理スクリプト')
parser.add_argument('input_dir', type=str, help='入力ディレクトリのパス')
parser.add_argument('output_dir', type=str, help='出力ディレクトリのパス')
parser.add_argument('--workers', type=int, default=4, help='並列処理のワーカー数')
parser.add_argument('--recursive', action='store_true', help='サブディレクトリも処理する')
args = parser.parse_args()
logger = setup_logger()
input_path = Path(args.input_dir)
output_path = Path(args.output_dir)
if not input_path.exists():
logger.error(f"入力ディレクトリが存在しません: {args.input_dir}")
sys.exit(1)
# 処理対象のディレクトリを特定
target_dirs = list(input_path.glob("**/*")) if args.recursive else [input_path]
target_dirs = [d for d in target_dirs if d.is_dir()]
total_processed = 0
total_failed = 0
with ProcessPoolExecutor(max_workers=args.workers) as executor:
for current_dir in target_dirs:
# 画像ペアの検索
pairs = find_image_pairs(current_dir)
if not pairs:
continue
# 相対パスを保持した出力先の設定
rel_path = current_dir.relative_to(input_path)
current_output_dir = output_path / rel_path
# 処理タスクのリスト作成
tasks = [
(ch1_file, ch6_file, current_output_dir / f"merged_{ch1_file.stem.split('_')[1]}.tif")
for ch1_file, ch6_file in pairs
]
# 並列処理の実行
futures = [executor.submit(process_image_pair, task) for task in tasks]
successful = sum(1 for future in futures if future.result())
failed = len(futures) - successful
total_processed += successful
total_failed += failed
logger.info(f"{current_dir.name}: {successful}/{len(pairs)} files processed successfully")
logger.info(f"\n処理完了:")
logger.info(f"成功: {total_processed}")
logger.info(f"失敗: {total_failed}")
if __name__ == "__main__":
main() |