# =============== Fibril Segmentation — DeepLabV3+ with EfficientNet-B3 =============== import os, random, subprocess from glob import glob import numpy as np from PIL import Image from tqdm import tqdm os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True" import torch torch.cuda.empty_cache() import torch.nn as nn from torch.utils.data import Dataset, DataLoader import albumentations as A from albumentations.pytorch import ToTensorV2 import segmentation_models_pytorch as smp import json from sklearn.utils import shuffle import os import subprocess # ─── GPU Selection Function ─────────────────────────────── def get_free_gpu(threshold_mb=1000): try: result = subprocess.run( ["nvidia-smi", "--query-gpu=memory.used,memory.total", "--format=csv,nounits,noheader"], stdout=subprocess.PIPE, text=True ) for idx, line in enumerate(result.stdout.strip().split("\n")): used, total = map(int, line.split(",")) if total - used > threshold_mb: return str(idx) except Exception as e: print("GPU check failed:", e) return None # ─── Find Free GPU BEFORE Defining Config ──────────────── free_gpu_id = get_free_gpu() # ─── Configurations ─────────────────────────────────────── config = { "seed": 42, "img_size": 512, "batch_size": 2, "num_workers": 4, "epochs": 100, "lr": 1e-4, "train_img_dir": "./alldataset/images", "train_mask_dir": "./alldataset/masks", "save_path": "./trained-models/encoder_resnest101e_decoder_UnetPlusPlus_fibril_seg_model.pth", "gpu_id": free_gpu_id, } # ─── GPU Setup ──────────────────────────────────────────── if config["gpu_id"] is not None: os.environ["CUDA_VISIBLE_DEVICES"] = config["gpu_id"] print(f"✅ Using GPU ID: {config['gpu_id']}") else: print("⚠️ No free GPU detected — training may use default device or fail") # ─── Reproducibility ─────────────────────────────────────── def seed_everything(seed=42): random.seed(seed) np.random.seed(seed) torch.manual_seed(seed) torch.cuda.manual_seed_all(seed) torch.backends.cudnn.deterministic = True torch.backends.cudnn.benchmark = False seed_everything(config["seed"]) # ─── Dataset ─────────────────────────────────────────────── class FibrilSegmentationDataset(torch.utils.data.Dataset): def __init__(self, image_paths, mask_paths, transform=None): self.image_paths = image_paths self.mask_paths = mask_paths self.transform = transform def __len__(self): return len(self.image_paths) def __getitem__(self, idx): image = np.array(Image.open(self.image_paths[idx]).convert("L")) mask = (np.array(Image.open(self.mask_paths[idx]).convert("L")) > 127).astype(np.float32) if self.transform: aug = self.transform(image=image, mask=mask) image, mask = aug['image'], aug['mask'] return image, mask.unsqueeze(0) # ─── Image-Mask Matcher ──────────────────────────────────── def match_images_and_masks(img_dir, mask_dir, img_exts=("jpg", "jpeg", "png"), mask_exts=("jpg", "png")): image_paths, mask_paths = [], [] for ext in img_exts: for img_path in glob(f"{img_dir}/*.{ext}"): base = os.path.splitext(os.path.basename(img_path))[0] for mext in mask_exts: mask_path = os.path.join(mask_dir, f"{base}-vectors.{mext}") if os.path.exists(mask_path): image_paths.append(img_path) mask_paths.append(mask_path) break return image_paths, mask_paths # ─── Loss Function ───────────────────────────────────────── class DiceBCELoss(nn.Module): def __init__(self): super().__init__() self.bce = nn.BCEWithLogitsLoss() # def forward(self, inputs, targets): # inputs = torch.sigmoid(inputs) # intersection = (inputs * targets).sum() # dice = (2. * intersection + 1e-6) / (inputs.sum() + targets.sum() + 1e-6) # return 1 - dice + self.bce(inputs, targets) def forward(self, inputs, targets): bce_loss = self.bce(inputs, targets) # Raw logits inputs = torch.sigmoid(inputs) # Probabilities for Dice intersection = (inputs * targets).sum() dice_loss = 1 - (2. * intersection + 1e-6) / (inputs.sum() + targets.sum() + 1e-6) return dice_loss + bce_loss # ─── Metrics ─────────────────────────────────────────────── @torch.no_grad() def dice_coeff(pred, target, smooth=1e-6): pred = (torch.sigmoid(pred) > 0.5).float() intersection = (pred * target).sum() return (2. * intersection + smooth) / (pred.sum() + target.sum() + smooth) @torch.no_grad() def iou_score(pred, target, smooth=1e-6): pred = (torch.sigmoid(pred) > 0.5).float() intersection = (pred * target).sum() union = pred.sum() + target.sum() - intersection return (intersection + smooth) / (union + smooth) # ─── Data Preparation ────────────────────────────────────── # image_paths, mask_paths = match_images_and_masks(config["train_img_dir"], config["train_mask_dir"]) # split = int(0.8 * len(image_paths)) # train_imgs, val_imgs = image_paths[:split], image_paths[split:] # train_masks, val_masks = mask_paths[:split], mask_paths[split:] # ─── Data Preparation with persistent train/val split ────── split_path = "train_val_split.json" if os.path.exists(split_path): print(f"Loading saved train/val split from {split_path}") with open(split_path, "r") as f: split_data = json.load(f) train_imgs = split_data["train_images"] train_masks = split_data["train_masks"] val_imgs = split_data["val_images"] val_masks = split_data["val_masks"] else: print("Creating new train/val split and saving it...") image_paths, mask_paths = match_images_and_masks(config["train_img_dir"], config["train_mask_dir"]) # Shuffle dataset to randomize train_val = list(zip(image_paths, mask_paths)) random.seed(config["seed"]) random.shuffle(train_val) image_paths, mask_paths = zip(*train_val) split = int(0.8 * len(image_paths)) train_imgs = list(image_paths[:split]) train_masks = list(mask_paths[:split]) val_imgs = list(image_paths[split:]) val_masks = list(mask_paths[split:]) split_data = { "train_images": train_imgs, "train_masks": train_masks, "val_images": val_imgs, "val_masks": val_masks } with open(split_path, "w") as f: json.dump(split_data, f, indent=2) common_norm = A.Normalize(mean=(0.5,), std=(0.5,)) train_tf = A.Compose([ A.Resize(config["img_size"], config["img_size"]), A.HorizontalFlip(0.5), A.VerticalFlip(0.5), A.RandomRotate90(0.5), A.Affine(scale=(0.9, 1.1), translate_percent=0.05, rotate=(-30, 30), shear=(-5, 5), p=0.5), A.RandomBrightnessContrast(0.3), A.ElasticTransform(alpha=1.0, sigma=50.0, approximate=True, p=0.2), A.Blur(3, p=0.2), common_norm, ToTensorV2() ]) val_tf = A.Compose([A.Resize(config["img_size"], config["img_size"]), common_norm, ToTensorV2()]) train_loader = DataLoader(FibrilSegmentationDataset(train_imgs, train_masks, train_tf), batch_size=config["batch_size"], shuffle=True, num_workers=config["num_workers"]) val_loader = DataLoader(FibrilSegmentationDataset(val_imgs, val_masks, val_tf), batch_size=1, shuffle=False, num_workers=config["num_workers"]) print(f"Train samples: {len(train_imgs)} | Batch size: {config['batch_size']}") print(f"Steps/epoch: {int(np.ceil(len(train_imgs) / config['batch_size']))}") # ─── Model Setup ────────────────────────────────────────── device = torch.device("cuda" if torch.cuda.is_available() else "cpu") # device = torch.device("cpu") # model = smp.Unet( # encoder_name="resnet34", # encoder_weights="imagenet", # in_channels=1, # grayscale # classes=1 # binary segmentation # ).to(device) # model = smp.Unet( # encoder_name="efficientnet-b3", # encoder_weights="imagenet", # in_channels=1, # classes=1 # ).to(device) # model = smp.DeepLabV3Plus( # encoder_name='efficientnet-b3', # encoder_depth=5, # encoder_weights='imagenet', # decoder_use_norm='batchnorm', # decoder_channels=(256, 128, 64, 32, 16), # decoder_attention_type=None, # decoder_interpolation='nearest', # in_channels=1, # classes=1, # activation=None, # aux_params=None # ).to(device) # model = smp.Unet( # encoder_name="mobilenet_v2", # much lighter than resnet34 # encoder_weights="imagenet", # in_channels=1, # grayscale input # classes=1 # binary mask # ).to(device) # model = smp.UnetPlusPlus( # encoder_name='resnet34', # encoder_depth=5, # encoder_weights='imagenet', # decoder_use_norm='batchnorm', # decoder_channels=(256, 128, 64, 32, 16), # decoder_attention_type=None, # decoder_interpolation='nearest', # in_channels=1, # classes=1, # activation=None, # aux_params=None # ).to(device) model = smp.UnetPlusPlus( encoder_name='resnest101e', encoder_depth=5, encoder_weights='imagenet', decoder_use_norm='batchnorm', decoder_channels=(256, 128, 64, 32, 16), decoder_attention_type=None, decoder_interpolation='nearest', in_channels=1, classes=1, activation=None, aux_params=None ).to(device) # model = smp.UnetPlusPlus( # encoder_name='efficientnet-b3', # Lightweight, solid performance # encoder_depth=5, # Standard depth # encoder_weights='imagenet', # Useful even for grayscale (see note below) # decoder_use_norm='batchnorm', # Recommended for stability # decoder_channels=(256, 128, 64, 32, 16), # Deep decoder, good for details # decoder_attention_type=None, # Optional, can add SE or SCSE for boost # decoder_interpolation='nearest', # Good, avoids checkerboard artifacts # in_channels=1, # Correct for grayscale (e.g., EM images) # classes=1, # Binary segmentation (fibrils vs background) # activation=None, # No activation for logits output # aux_params=None # No classification head # ).to(device) loss_fn = DiceBCELoss() optimizer = torch.optim.Adam(model.parameters(), lr=config["lr"]) scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=5) # ─── Training Loop ───────────────────────────────────────── best_dice = 0.0 os.makedirs(os.path.dirname(config["save_path"]), exist_ok=True) for epoch in range(1, config["epochs"] + 1): model.train() total_loss, total_dice = 0, 0 for imgs, masks in tqdm(train_loader, desc=f"Epoch {epoch} - Train"): imgs, masks = imgs.to(device), masks.to(device) preds = model(imgs) loss = loss_fn(preds, masks) optimizer.zero_grad() loss.backward() nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0) optimizer.step() total_loss += loss.item() total_dice += dice_coeff(preds, masks).item() avg_loss = total_loss / len(train_loader) avg_dice = total_dice / len(train_loader) print(f"[Train] Epoch {epoch} | Loss: {avg_loss:.4f} | Dice: {avg_dice:.4f}") # ─── Validation ──────────────────────────────────────── model.eval() val_loss, val_dice, val_iou = 0, 0, 0 with torch.no_grad(): for imgs, masks in val_loader: imgs, masks = imgs.to(device), masks.to(device) preds = model(imgs) val_loss += loss_fn(preds, masks).item() val_dice += dice_coeff(preds, masks).item() val_iou += iou_score(preds, masks).item() val_loss /= len(val_loader) val_dice /= len(val_loader) val_iou /= len(val_loader) scheduler.step(val_loss) print(f"[Val] Epoch {epoch} | Loss: {val_loss:.4f} | Dice: {val_dice:.4f} | IoU: {val_iou:.4f}") if val_dice > best_dice: best_dice = val_dice torch.save(model.state_dict(), config["save_path"]) print(f"✅ Saved Best Model (Epoch {epoch} - Dice: {val_dice:.4f})") # import os # import random # import subprocess # from glob import glob # import numpy as np # from PIL import Image # from tqdm import tqdm # import torch # import torch.nn as nn # from torch.utils.data import Dataset, DataLoader # from torch.cuda.amp import autocast, GradScaler # import albumentations as A # from albumentations.pytorch import ToTensorV2 # import segmentation_models_pytorch as smp # # ─── Select Free GPU ────────────────────────────────────── # def get_free_gpu(threshold_mb=500): # try: # result = subprocess.run( # ["nvidia-smi", "--query-gpu=memory.used,memory.total", "--format=csv,nounits,noheader"], # stdout=subprocess.PIPE, text=True # ) # for idx, line in enumerate(result.stdout.strip().split("\n")): # used, total = map(int, line.strip().split(",")) # if total - used > threshold_mb: # return str(idx) # except Exception as e: # print("GPU check failed:", e) # return None # free_gpu = get_free_gpu() # if free_gpu is not None: # os.environ["CUDA_VISIBLE_DEVICES"] = free_gpu # print(f"Using GPU {free_gpu}") # else: # print("No free GPU found — training may fail due to lack of memory") # # ─── Seed Everything ────────────────────────────────────── # def seed_everything(seed=42): # random.seed(seed) # np.random.seed(seed) # torch.manual_seed(seed) # torch.cuda.manual_seed_all(seed) # torch.backends.cudnn.deterministic = True # torch.backends.cudnn.benchmark = False # seed_everything() # # ─── Dataset ────────────────────────────────────────────── # class FibrilSegmentationDataset(Dataset): # def __init__(self, image_paths, mask_paths, transform=None): # self.image_paths = image_paths # self.mask_paths = mask_paths # self.transform = transform # def __len__(self): # return len(self.image_paths) # def __getitem__(self, idx): # image = Image.open(self.image_paths[idx]).convert("L") # mask = Image.open(self.mask_paths[idx]).convert("L") # image = np.array(image) # mask = (np.array(mask) > 127).astype(np.float32) # if self.transform: # augmented = self.transform(image=image, mask=mask) # image = augmented['image'] # mask = augmented['mask'] # return image, mask.unsqueeze(0) # [1, H, W] # # ─── Match Image-Mask ───────────────────────────────────── # def match_images_and_masks(img_dir, mask_dir, img_exts=("jpg", "jpeg", "png"), mask_exts=("jpg", "png")): # image_paths, mask_paths = [], [] # for ext in img_exts: # for img_path in glob(f"{img_dir}/*.{ext}"): # base_name = os.path.splitext(os.path.basename(img_path))[0] # for mask_ext in mask_exts: # possible_mask = os.path.join(mask_dir, f"{base_name}-vectors.{mask_ext}") # if os.path.exists(possible_mask): # image_paths.append(img_path) # mask_paths.append(possible_mask) # break # return image_paths, mask_paths # # ─── Loss Function ──────────────────────────────────────── # class DiceBCELoss(nn.Module): # def __init__(self): # super().__init__() # self.bce = nn.BCEWithLogitsLoss() # def forward(self, inputs, targets): # smooth = 1e-6 # inputs = torch.sigmoid(inputs) # intersection = (inputs * targets).sum() # dice = (2.*intersection + smooth)/(inputs.sum() + targets.sum() + smooth) # return 1 - dice + self.bce(inputs, targets) # # ─── Data ───────────────────────────────────────────────── # image_paths, mask_paths = match_images_and_masks("./dataset4/images", "./dataset4/masks") # split = int(0.8 * len(image_paths)) # train_imgs, val_imgs = image_paths[:split], image_paths[split:] # train_masks, val_masks = mask_paths[:split], mask_paths[split:] # common_normalization = A.Normalize(mean=(0.5,), std=(0.5,)) # train_transform = A.Compose([ # A.Resize(512, 512), # A.HorizontalFlip(p=0.5), # A.VerticalFlip(p=0.5), # A.RandomRotate90(p=0.5), # A.Affine(scale=(0.9, 1.1), translate_percent=(0.05, 0.05), rotate=(-30, 30), shear=(-5, 5), p=0.5), # A.RandomBrightnessContrast(p=0.3), # A.ElasticTransform(alpha=1.0, sigma=50.0, approximate=True, p=0.2), # A.Blur(blur_limit=3, p=0.2), # common_normalization, # ToTensorV2() # ]) # val_transform = A.Compose([ # A.Resize(512, 512), # common_normalization, # ToTensorV2() # ]) # train_ds = FibrilSegmentationDataset(train_imgs, train_masks, train_transform) # val_ds = FibrilSegmentationDataset(val_imgs, val_masks, val_transform) # train_loader = DataLoader(train_ds, batch_size=2, shuffle=True, num_workers=4) # val_loader = DataLoader(val_ds, batch_size=1, shuffle=False, num_workers=4) # # ─── Model ──────────────────────────────────────────────── # device = torch.device("cuda" if torch.cuda.is_available() else "cpu") # model = smp.DeepLabV3Plus( # encoder_name="efficientnet-b3", # encoder_weights="imagenet", # in_channels=1, # classes=1 # ).to(device) # loss_fn = DiceBCELoss() # optimizer = torch.optim.Adam(model.parameters(), lr=1e-4) # scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=5) # scaler = GradScaler() # # ─── Metrics ─────────────────────────────────────────────── # def dice_coeff(pred, target, smooth=1e-6): # pred = torch.sigmoid(pred) # pred = (pred > 0.5).float() # intersection = (pred * target).sum() # return (2. * intersection + smooth) / (pred.sum() + target.sum() + smooth) # def iou_score(pred, target, smooth=1e-6): # pred = torch.sigmoid(pred) # pred = (pred > 0.5).float() # intersection = (pred * target).sum() # union = pred.sum() + target.sum() - intersection # return (intersection + smooth) / (union + smooth) # # ─── Training ────────────────────────────────────────────── # best_dice = 0.0 # os.makedirs("./trained-models", exist_ok=True) # for epoch in range(1, 101): # model.train() # total_loss, total_dice = 0, 0 # for imgs, masks in tqdm(train_loader, desc=f"Epoch {epoch} - Train"): # imgs, masks = imgs.to(device), masks.to(device) # optimizer.zero_grad() # with autocast(): # preds = model(imgs) # loss = loss_fn(preds, masks) # scaler.scale(loss).backward() # nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0) # scaler.step(optimizer) # scaler.update() # total_loss += loss.item() # total_dice += dice_coeff(preds, masks).item() # avg_loss = total_loss / len(train_loader) # avg_dice = total_dice / len(train_loader) # print(f"[Train] Epoch {epoch} | Loss: {avg_loss:.4f} | Dice: {avg_dice:.4f}") # model.eval() # val_loss, val_dice, val_iou = 0, 0, 0 # with torch.no_grad(): # for imgs, masks in val_loader: # imgs, masks = imgs.to(device), masks.to(device) # preds = model(imgs) # val_loss += loss_fn(preds, masks).item() # val_dice += dice_coeff(preds, masks).item() # val_iou += iou_score(preds, masks).item() # val_loss /= len(val_loader) # val_dice /= len(val_loader) # val_iou /= len(val_loader) # scheduler.step(val_loss) # print(f"[Val] Epoch {epoch} | Loss: {val_loss:.4f} | Dice: {val_dice:.4f} | IoU: {val_iou:.4f}") # if val_dice > best_dice: # best_dice = val_dice # torch.save(model.state_dict(), f"./trained-models/fibril_epoch{epoch}_dice{val_dice:.4f}.pth") # print(f"✅ Saved Best Model (Epoch {epoch} - Dice: {val_dice:.4f})") # # # =============== Working fine with Gary images (UNet model with ResNet34 as the encoder =================== # # # =============== Encoder (ResNet34) and Decoder (UNet)============== # import os # import random # from glob import glob # import numpy as np # from PIL import Image # from tqdm import tqdm # from itertools import chain # import torch # import torch.nn as nn # from torch.utils.data import Dataset, DataLoader # import albumentations as A # from albumentations.pytorch import ToTensorV2 # import segmentation_models_pytorch as smp # import subprocess # import os # # Force GPU selection if available # # import os # # os.environ["CUDA_VISIBLE_DEVICES"] = "3" # Change '3' to any free GPU ID # def get_free_gpu(threshold_mb=500): # try: # result = subprocess.run( # ["nvidia-smi", "--query-gpu=memory.used,memory.total", "--format=csv,nounits,noheader"], # stdout=subprocess.PIPE, text=True # ) # for idx, line in enumerate(result.stdout.strip().split("\n")): # used, total = map(int, line.strip().split(",")) # if total - used > threshold_mb: # return str(idx) # except Exception as e: # print("GPU check failed:", e) # return None # # free_gpu = get_free_gpu() # free_gpu = "5" # if free_gpu is not None: # os.environ["CUDA_VISIBLE_DEVICES"] = free_gpu # print(f"Using GPU {free_gpu}") # else: # print("No free GPU found — training may fail due to lack of memory") # # ─── Seed for Reproducibility ───────────────────────────── # def seed_everything(seed=42): # random.seed(seed) # np.random.seed(seed) # torch.manual_seed(seed) # torch.cuda.manual_seed_all(seed) # torch.backends.cudnn.deterministic = True # torch.backends.cudnn.benchmark = False # seed_everything() # # ─── Dataset ────────────────────────────────────────────── # class FibrilSegmentationDataset(Dataset): # def __init__(self, image_paths, mask_paths, transform=None): # self.image_paths = image_paths # self.mask_paths = mask_paths # self.transform = transform # def __len__(self): # return len(self.image_paths) # def __getitem__(self, idx): # image = Image.open(self.image_paths[idx]).convert("L") # mask = Image.open(self.mask_paths[idx]).convert("L") # image = np.array(image) # mask = (np.array(mask) > 127).astype(np.float32) # if self.transform: # augmented = self.transform(image=image, mask=mask) # image = augmented['image'] # mask = augmented['mask'] # return image, mask.unsqueeze(0) # [1, H, W] # # ─── Utility to Match Image-Mask Pairs ───────────────────── # def match_images_and_masks(img_dir, mask_dir, img_exts=("jpg", "jpeg", "png"), mask_exts=("jpg", "png")): # image_paths, mask_paths = [], [] # for ext in img_exts: # for img_path in glob(f"{img_dir}/*.{ext}"): # base_name = os.path.splitext(os.path.basename(img_path))[0] # for mask_ext in mask_exts: # # possible_mask = os.path.join(mask_dir, f"{base_name}_mask.{mask_ext}") # possible_mask = os.path.join(mask_dir, f"{base_name}-vectors.{mask_ext}") # if os.path.exists(possible_mask): # image_paths.append(img_path) # mask_paths.append(possible_mask) # break # Stop after first match # return image_paths, mask_paths # class DiceBCELoss(nn.Module): # def __init__(self): # super().__init__() # self.bce = nn.BCEWithLogitsLoss() # def forward(self, inputs, targets): # smooth = 1e-6 # inputs = torch.sigmoid(inputs) # intersection = (inputs * targets).sum() # dice = (2.*intersection + smooth)/(inputs.sum() + targets.sum() + smooth) # return 1 - dice + self.bce(inputs, targets) # # ─── Load Dataset ────────────────────────────────────────── # image_paths, mask_paths = match_images_and_masks("./dataset4/images", "./dataset4/masks") # split = int(0.8 * len(image_paths)) # train_imgs, val_imgs = image_paths[:split], image_paths[split:] # train_masks, val_masks = mask_paths[:split], mask_paths[split:] # # ─── Transformations ────────────────────────────────────── # common_normalization = A.Normalize(mean=(0.5,), std=(0.5,)) # train_transform = A.Compose([ # A.Resize(512, 512), # A.HorizontalFlip(p=0.5), # A.VerticalFlip(p=0.5), # A.RandomRotate90(p=0.5), # A.Affine(scale=(0.9, 1.1), translate_percent=(0.05, 0.05), rotate=(-30, 30), shear=(-5, 5), p=0.5), # A.RandomBrightnessContrast(p=0.3), # A.ElasticTransform(alpha=1.0, sigma=50.0, approximate=True, p=0.2), # A.Blur(blur_limit=3, p=0.2), # common_normalization, # ToTensorV2() # ]) # val_transform = A.Compose([ # A.Resize(512, 512), # common_normalization, # ToTensorV2() # ]) # # ─── Datasets & Loaders ─────────────────────────────────── # train_ds = FibrilSegmentationDataset(train_imgs, train_masks, train_transform) # val_ds = FibrilSegmentationDataset(val_imgs, val_masks, val_transform) # # train_loader = DataLoader(train_ds, batch_size=8, shuffle=True, num_workers=4) # # train_loader = DataLoader(train_ds, batch_size=4, shuffle=True, num_workers=4) # # For training (20 samples): # train_loader = DataLoader(train_ds, batch_size=2, shuffle=True, num_workers=4) # print(f"Train samples: {len(train_ds)}") # print(f"Batch size: {train_loader.batch_size}") # print(f"Expected steps per epoch: {int(np.ceil(len(train_ds)/train_loader.batch_size))}") # # val_loader = DataLoader(val_ds, batch_size=8, num_workers=4) # # For validation (5 samples): # val_loader = DataLoader(val_ds, batch_size=1, shuffle=False, num_workers=4) # # ─── Model Setup ────────────────────────────────────────── # device = torch.device("cuda" if torch.cuda.is_available() else "cpu") # # device = torch.device("cpu") # # model = smp.Unet( # # encoder_name="resnet34", # # encoder_weights="imagenet", # # in_channels=1, # grayscale # # classes=1 # binary segmentation # # ).to(device) # # model = smp.Unet( # # encoder_name="efficientnet-b3", # # encoder_weights="imagenet", # # in_channels=1, # # classes=1 # # ).to(device) # model = smp.DeepLabV3Plus( # encoder_name="efficientnet-b3", # encoder_weights="imagenet", # in_channels=1, # classes=1 # ).to(device) # # model = smp.Unet( # # encoder_name="mobilenet_v2", # much lighter than resnet34 # # encoder_weights="imagenet", # # in_channels=1, # grayscale input # # classes=1 # binary mask # # ).to(device) # # loss_fn = nn.BCEWithLogitsLoss() # loss_fn = DiceBCELoss() # optimizer = torch.optim.Adam(model.parameters(), lr=1e-4) # scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=5) # # ─── Metrics ─────────────────────────────────────────────── # def dice_coeff(pred, target, smooth=1e-6): # pred = torch.sigmoid(pred) # pred = (pred > 0.5).float() # intersection = (pred * target).sum() # return (2. * intersection + smooth) / (pred.sum() + target.sum() + smooth) # def iou_score(pred, target, smooth=1e-6): # pred = torch.sigmoid(pred) # pred = (pred > 0.5).float() # intersection = (pred * target).sum() # union = pred.sum() + target.sum() - intersection # return (intersection + smooth) / (union + smooth) # # ─── Training Loop ───────────────────────────────────────── # best_dice = 0.0 # os.makedirs("./trained-models", exist_ok=True) # for epoch in range(1, 101): # model.train() # total_loss, total_dice = 0, 0 # for imgs, masks in tqdm(train_loader, desc=f"Epoch {epoch} - Train"): # imgs, masks = imgs.to(device), masks.to(device) # preds = model(imgs) # loss = loss_fn(preds, masks) # optimizer.zero_grad() # loss.backward() # nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0) # optimizer.step() # total_loss += loss.item() # total_dice += dice_coeff(preds, masks).item() # avg_loss = total_loss / len(train_loader) # avg_dice = total_dice / len(train_loader) # print(f"[Train] Epoch {epoch} | Loss: {avg_loss:.4f} | Dice: {avg_dice:.4f}") # # Validation # model.eval() # val_loss, val_dice, val_iou = 0, 0, 0 # with torch.no_grad(): # for imgs, masks in val_loader: # imgs, masks = imgs.to(device), masks.to(device) # preds = model(imgs) # val_loss += loss_fn(preds, masks).item() # val_dice += dice_coeff(preds, masks).item() # val_iou += iou_score(preds, masks).item() # val_loss /= len(val_loader) # val_dice /= len(val_loader) # val_iou /= len(val_loader) # scheduler.step(val_loss) # print(f"[Val] Epoch {epoch} | Loss: {val_loss:.4f} | Dice: {val_dice:.4f} | IoU: {val_iou:.4f}") # # Save best model # if val_dice > best_dice: # best_dice = val_dice # torch.save(model.state_dict(), "./trained-models/amalesh_encoder_efficientnet-b3_decoder_DeepLabV3Plus_fibril_seg_model.pth") # print(f"✅ Saved Best Model (Epoch {epoch} - Dice: {val_dice:.4f})") # # Working on the gray images fine # # =============== Working fine with Gary images (UNet model with ResNet34 as the encoder =================== # # =============== Encoder (ResNet34) and Decoder (UNet)============== # import os # from glob import glob # import numpy as np # from PIL import Image # from tqdm import tqdm # import torch # import torch.nn as nn # from torch.utils.data import Dataset, DataLoader # import albumentations as A # from albumentations.pytorch import ToTensorV2 # import segmentation_models_pytorch as smp # # ─── Dataset ──────────────────────────── # class FibrilSegmentationDataset(Dataset): # def __init__(self, image_paths, mask_paths, transform=None): # self.image_paths = image_paths # self.mask_paths = mask_paths # self.transform = transform # def __len__(self): # return len(self.image_paths) # def __getitem__(self, idx): # # Load grayscale image and mask # image = Image.open(self.image_paths[idx]).convert("L") # mask = Image.open(self.mask_paths[idx]).convert("L") # image = image.resize((512, 512)) # mask = mask.resize((512, 512)) # image = np.array(image) # mask = np.array(mask) # # Binarize mask # mask = (mask > 127).astype(np.float32) # if self.transform: # augmented = self.transform(image=image, mask=mask) # image = augmented["image"] # mask = augmented["mask"] # # image shape: [1, H, W], mask shape: [H, W] # return image, mask.unsqueeze(0) # # ─── Paths ───────────────────────────── # image_paths = sorted(glob("./dataset/images/*.jpg")) # mask_paths = sorted(glob("./dataset/masks/*.jpg")) # split = int(0.8 * len(image_paths)) # train_imgs, val_imgs = image_paths[:split], image_paths[split:] # train_masks, val_masks = mask_paths[:split], mask_paths[split:] # # ─── Augmentations ───────────────────── # train_transform = A.Compose([ # A.Resize(512, 512), # A.HorizontalFlip(p=0.5), # A.VerticalFlip(p=0.5), # A.RandomRotate90(p=0.5), # A.Affine( # scale=(0.9, 1.1), # translate_percent=(0.05, 0.05), # rotate=(-30, 30), # shear=(-5, 5), # p=0.5 # ), # A.RandomBrightnessContrast( # brightness_limit=0.2, # contrast_limit=0.2, # p=0.3 # ), # A.ElasticTransform( # alpha=1.0, # sigma=50.0, # approximate=True, # p=0.2 # ), # A.Blur(blur_limit=3, p=0.2), # A.Normalize(mean=(0.5,), std=(0.5,)), # ToTensorV2() # ]) # val_transform = A.Compose([ # A.Resize(512, 512), # A.Normalize(mean=(0.5,), std=(0.5,)), # ToTensorV2() # ]) # train_ds = FibrilSegmentationDataset(train_imgs, train_masks, transform=train_transform) # val_ds = FibrilSegmentationDataset(val_imgs, val_masks, transform=val_transform) # train_loader = DataLoader(train_ds, batch_size=4, shuffle=True, num_workers=4) # val_loader = DataLoader(val_ds, batch_size=4, num_workers=4) # # ─── Model ─────────────────────────────── # device = torch.device("cuda" if torch.cuda.is_available() else "cpu") # model = smp.Unet( # encoder_name="resnet34", # encoder_weights="imagenet", # in_channels=1, # grayscale input # classes=1 # binary segmentation # ).to(device) # loss_fn = nn.() # optimizer = torch.optim.Adam(model.parameters(), lr=1e-4) # # ─── Metrics ───────────────────────────── # def dice_coeff(pred, target, smooth=1e-6): # pred = torch.sigmoid(pred) # pred = (pred > 0.5).float() # intersection = (pred * target).sum() # return (2. * intersection + smooth) / (pred.sum() + target.sum() + smooth) # # ─── Train Loop ────────────────────────── # for epoch in range(1, 100): # model.train() # total_loss = 0 # total_dice = 0 # for imgs, masks in tqdm(train_loader, desc=f"Epoch {epoch} - Train"): # imgs, masks = imgs.to(device), masks.to(device) # preds = model(imgs) # loss = loss_fn(preds, masks) # optimizer.zero_grad() # loss.backward() # optimizer.step() # total_loss += loss.item() # total_dice += dice_coeff(preds, masks).item() # avg_loss = total_loss / len(train_loader) # avg_dice = total_dice / len(train_loader) # print(f"Epoch {epoch} - Train Loss: {avg_loss:.4f}, Dice: {avg_dice:.4f}") # # Validation # model.eval() # val_loss = 0 # val_dice = 0 # with torch.no_grad(): # for imgs, masks in val_loader: # imgs, masks = imgs.to(device), masks.to(device) # preds = model(imgs) # loss = loss_fn(preds, masks) # val_loss += loss.item() # val_dice += dice_coeff(preds, masks).item() # val_loss /= len(val_loader) # val_dice /= len(val_loader) # print(f"Epoch {epoch} - Val Loss: {val_loss:.4f}, Val Dice: {val_dice:.4f}") # torch.save(model.state_dict(), "./trained-models/fibril_seg_model.pth") # print("✅ Model saved as fibril_seg_model.pth")