vit16L-FT-cellclassification / prepare_data.py
dakesan
initial commit
e363d75
#!/usr/bin/env python3
"""
白血病細胞画像の前処理用スクリプト
ch1とch6の画像をマージし、正規化してRGB形式で保存します。
必要なライブラリ:
- numpy
- tifffile (TIFFファイルの読み込み用)
- PIL (画像処理用)
使用方法:
python prepare_data.py input_dir output_dir [--workers N] [--recursive]
"""
import argparse
from pathlib import Path
import numpy as np
from PIL import Image
import tifffile
from concurrent.futures import ProcessPoolExecutor, as_completed
import sys
from typing import Tuple, List
import logging
def setup_logger():
"""ロガーの設定"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
return logging.getLogger(__name__)
def load_and_normalize(path: Path) -> np.ndarray:
"""
TIFF画像を読み込み、0~255の8bit画像に正規化する
"""
img = tifffile.imread(str(path))
img_norm = (img - np.min(img)) / (np.max(img) - np.min(img)) * 255
return img_norm.astype(np.uint8)
def process_image_pair(paths: Tuple[Path, Path, Path]) -> None:
"""
ch1とch6の画像ペアを処理してマージ画像を保存
"""
ch1_path, ch6_path, save_path = paths
try:
# 画像の読み込みと正規化
arr1 = load_and_normalize(ch1_path)
arr6 = load_and_normalize(ch6_path)
# 空のチャンネル作成
empty_channel = np.zeros_like(arr1)
# RGB形式で統合 (R: ch1, G: ch6, B: empty)
merged_array = np.stack((arr1, arr6, empty_channel), axis=-1)
merged_image = Image.fromarray(merged_array)
# 保存
save_path.parent.mkdir(parents=True, exist_ok=True)
merged_image.save(save_path)
return True
except Exception as e:
logging.error(f"Error processing {ch1_path}: {e}")
return False
def find_image_pairs(input_dir: Path) -> List[Tuple[Path, Path]]:
"""
入力ディレクトリからch1とch6のペアを見つける
"""
pairs = []
for ch1_file in input_dir.glob("ch1_*.tif"):
idx = ch1_file.stem.split('_')[1]
ch6_file = ch1_file.parent / f"ch6_{idx}.tif"
if ch6_file.exists():
pairs.append((ch1_file, ch6_file))
return pairs
def main():
parser = argparse.ArgumentParser(description='細胞画像の前処理スクリプト')
parser.add_argument('input_dir', type=str, help='入力ディレクトリのパス')
parser.add_argument('output_dir', type=str, help='出力ディレクトリのパス')
parser.add_argument('--workers', type=int, default=4, help='並列処理のワーカー数')
parser.add_argument('--recursive', action='store_true', help='サブディレクトリも処理する')
args = parser.parse_args()
logger = setup_logger()
input_path = Path(args.input_dir)
output_path = Path(args.output_dir)
if not input_path.exists():
logger.error(f"入力ディレクトリが存在しません: {args.input_dir}")
sys.exit(1)
# 処理対象のディレクトリを特定
target_dirs = list(input_path.glob("**/*")) if args.recursive else [input_path]
target_dirs = [d for d in target_dirs if d.is_dir()]
total_processed = 0
total_failed = 0
with ProcessPoolExecutor(max_workers=args.workers) as executor:
for current_dir in target_dirs:
# 画像ペアの検索
pairs = find_image_pairs(current_dir)
if not pairs:
continue
# 相対パスを保持した出力先の設定
rel_path = current_dir.relative_to(input_path)
current_output_dir = output_path / rel_path
# 処理タスクのリスト作成
tasks = [
(ch1_file, ch6_file, current_output_dir / f"merged_{ch1_file.stem.split('_')[1]}.tif")
for ch1_file, ch6_file in pairs
]
# 並列処理の実行
futures = [executor.submit(process_image_pair, task) for task in tasks]
successful = sum(1 for future in futures if future.result())
failed = len(futures) - successful
total_processed += successful
total_failed += failed
logger.info(f"{current_dir.name}: {successful}/{len(pairs)} files processed successfully")
logger.info(f"\n処理完了:")
logger.info(f"成功: {total_processed}")
logger.info(f"失敗: {total_failed}")
if __name__ == "__main__":
main()